preforkserver.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. # Copyright (c) 2005 Allan Saddi <allan@saddi.com>
  2. # All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions
  6. # are met:
  7. # 1. Redistributions of source code must retain the above copyright
  8. # notice, this list of conditions and the following disclaimer.
  9. # 2. Redistributions in binary form must reproduce the above copyright
  10. # notice, this list of conditions and the following disclaimer in the
  11. # documentation and/or other materials provided with the distribution.
  12. #
  13. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  14. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  15. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  16. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  17. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  18. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  19. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  20. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  21. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  22. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  23. # SUCH DAMAGE.
  24. #
  25. # $Id$
  26. __author__ = 'Allan Saddi <allan@saddi.com>'
  27. __version__ = '$Revision$'
  28. import sys
  29. import os
  30. import socket
  31. import select
  32. import errno
  33. import signal
  34. import random
  35. import time
  36. try:
  37. import fcntl
  38. except ImportError:
  39. def setCloseOnExec(sock):
  40. pass
  41. else:
  42. def setCloseOnExec(sock):
  43. fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
  44. # If running Python < 2.4, require eunuchs module for socket.socketpair().
  45. # See <http://www.inoi.fi/open/trac/eunuchs>.
  46. if not hasattr(socket, 'socketpair'):
  47. try:
  48. import eunuchs.socketpair
  49. except ImportError:
  50. # TODO: Other alternatives? Perhaps using os.pipe()?
  51. raise ImportError, 'Requires eunuchs module for Python < 2.4'
  52. def socketpair():
  53. s1, s2 = eunuchs.socketpair.socketpair()
  54. p, c = (socket.fromfd(s1, socket.AF_UNIX, socket.SOCK_STREAM),
  55. socket.fromfd(s2, socket.AF_UNIX, socket.SOCK_STREAM))
  56. os.close(s1)
  57. os.close(s2)
  58. return p, c
  59. socket.socketpair = socketpair
  60. class PreforkServer(object):
  61. """
  62. A preforked server model conceptually similar to Apache httpd(2). At
  63. any given time, ensures there are at least minSpare children ready to
  64. process new requests (up to a maximum of maxChildren children total).
  65. If the number of idle children is ever above maxSpare, the extra
  66. children are killed.
  67. If maxRequests is positive, each child will only handle that many
  68. requests in its lifetime before exiting.
  69. jobClass should be a class whose constructor takes at least two
  70. arguments: the client socket and client address. jobArgs, which
  71. must be a list or tuple, is any additional (static) arguments you
  72. wish to pass to the constructor.
  73. jobClass should have a run() method (taking no arguments) that does
  74. the actual work. When run() returns, the request is considered
  75. complete and the child process moves to idle state.
  76. """
  77. def __init__(self, minSpare=1, maxSpare=5, maxChildren=50,
  78. maxRequests=0, jobClass=None, jobArgs=()):
  79. self._minSpare = minSpare
  80. self._maxSpare = maxSpare
  81. self._maxChildren = max(maxSpare, maxChildren)
  82. self._maxRequests = maxRequests
  83. self._jobClass = jobClass
  84. self._jobArgs = jobArgs
  85. # Internal state of children. Maps pids to dictionaries with two
  86. # members: 'file' and 'avail'. 'file' is the socket to that
  87. # individidual child and 'avail' is whether or not the child is
  88. # free to process requests.
  89. self._children = {}
  90. def run(self, sock):
  91. """
  92. The main loop. Pass a socket that is ready to accept() client
  93. connections. Return value will be True or False indiciating whether
  94. or not the loop was exited due to SIGHUP.
  95. """
  96. # Set up signal handlers.
  97. self._keepGoing = True
  98. self._hupReceived = False
  99. self._installSignalHandlers()
  100. # Don't want operations on main socket to block.
  101. sock.setblocking(0)
  102. # Set close-on-exec
  103. setCloseOnExec(sock)
  104. # Main loop.
  105. while self._keepGoing:
  106. # Maintain minimum number of children.
  107. while len(self._children) < self._maxSpare:
  108. if not self._spawnChild(sock): break
  109. # Wait on any socket activity from live children.
  110. r = [x['file'] for x in self._children.values()
  111. if x['file'] is not None]
  112. if len(r) == len(self._children):
  113. timeout = None
  114. else:
  115. # There are dead children that need to be reaped, ensure
  116. # that they are by timing out, if necessary.
  117. timeout = 2
  118. try:
  119. r, w, e = select.select(r, [], [], timeout)
  120. except select.error, e:
  121. if e[0] != errno.EINTR:
  122. raise
  123. # Scan child sockets and tend to those that need attention.
  124. for child in r:
  125. # Receive status byte.
  126. try:
  127. state = child.recv(1)
  128. except socket.error, e:
  129. if e[0] in (errno.EAGAIN, errno.EINTR):
  130. # Guess it really didn't need attention?
  131. continue
  132. raise
  133. # Try to match it with a child. (Do we need a reverse map?)
  134. for pid,d in self._children.items():
  135. if child is d['file']:
  136. if state:
  137. # Set availability status accordingly.
  138. self._children[pid]['avail'] = state != '\x00'
  139. else:
  140. # Didn't receive anything. Child is most likely
  141. # dead.
  142. d = self._children[pid]
  143. d['file'].close()
  144. d['file'] = None
  145. d['avail'] = False
  146. # Reap children.
  147. self._reapChildren()
  148. # See who and how many children are available.
  149. availList = filter(lambda x: x[1]['avail'], self._children.items())
  150. avail = len(availList)
  151. if avail < self._minSpare:
  152. # Need to spawn more children.
  153. while avail < self._minSpare and \
  154. len(self._children) < self._maxChildren:
  155. if not self._spawnChild(sock): break
  156. avail += 1
  157. elif avail > self._maxSpare:
  158. # Too many spares, kill off the extras.
  159. pids = [x[0] for x in availList]
  160. pids.sort()
  161. pids = pids[self._maxSpare:]
  162. for pid in pids:
  163. d = self._children[pid]
  164. d['file'].close()
  165. d['file'] = None
  166. d['avail'] = False
  167. # Clean up all child processes.
  168. self._cleanupChildren()
  169. # Restore signal handlers.
  170. self._restoreSignalHandlers()
  171. # Return bool based on whether or not SIGHUP was received.
  172. return self._hupReceived
  173. def _cleanupChildren(self):
  174. """
  175. Closes all child sockets (letting those that are available know
  176. that it's time to exit). Sends SIGINT to those that are currently
  177. processing (and hopes that it finishses ASAP).
  178. Any children remaining after 10 seconds is SIGKILLed.
  179. """
  180. # Let all children know it's time to go.
  181. for pid,d in self._children.items():
  182. if d['file'] is not None:
  183. d['file'].close()
  184. d['file'] = None
  185. if not d['avail']:
  186. # Child is unavailable. SIGINT it.
  187. try:
  188. os.kill(pid, signal.SIGINT)
  189. except OSError, e:
  190. if e[0] != errno.ESRCH:
  191. raise
  192. def alrmHandler(signum, frame):
  193. pass
  194. # Set up alarm to wake us up after 10 seconds.
  195. oldSIGALRM = signal.getsignal(signal.SIGALRM)
  196. signal.signal(signal.SIGALRM, alrmHandler)
  197. signal.alarm(10)
  198. # Wait for all children to die.
  199. while len(self._children):
  200. try:
  201. pid, status = os.wait()
  202. except OSError, e:
  203. if e[0] in (errno.ECHILD, errno.EINTR):
  204. break
  205. if self._children.has_key(pid):
  206. del self._children[pid]
  207. signal.signal(signal.SIGALRM, oldSIGALRM)
  208. # Forcefully kill any remaining children.
  209. for pid in self._children.keys():
  210. try:
  211. os.kill(pid, signal.SIGKILL)
  212. except OSError, e:
  213. if e[0] != errno.ESRCH:
  214. raise
  215. def _reapChildren(self):
  216. """Cleans up self._children whenever children die."""
  217. while True:
  218. try:
  219. pid, status = os.waitpid(-1, os.WNOHANG)
  220. except OSError, e:
  221. if e[0] == errno.ECHILD:
  222. break
  223. raise
  224. if pid <= 0:
  225. break
  226. if self._children.has_key(pid): # Sanity check.
  227. if self._children[pid]['file'] is not None:
  228. self._children[pid]['file'].close()
  229. del self._children[pid]
  230. def _spawnChild(self, sock):
  231. """
  232. Spawn a single child. Returns True if successful, False otherwise.
  233. """
  234. # This socket pair is used for very simple communication between
  235. # the parent and its children.
  236. parent, child = socket.socketpair()
  237. parent.setblocking(0)
  238. setCloseOnExec(parent)
  239. child.setblocking(0)
  240. setCloseOnExec(child)
  241. try:
  242. pid = os.fork()
  243. except OSError, e:
  244. if e[0] in (errno.EAGAIN, errno.ENOMEM):
  245. return False # Can't fork anymore.
  246. raise
  247. if not pid:
  248. # Child
  249. child.close()
  250. # Put child into its own process group.
  251. pid = os.getpid()
  252. os.setpgid(pid, pid)
  253. # Restore signal handlers.
  254. self._restoreSignalHandlers()
  255. # Close copies of child sockets.
  256. for f in [x['file'] for x in self._children.values()
  257. if x['file'] is not None]:
  258. f.close()
  259. self._children = {}
  260. try:
  261. # Enter main loop.
  262. self._child(sock, parent)
  263. except KeyboardInterrupt:
  264. pass
  265. sys.exit(0)
  266. else:
  267. # Parent
  268. parent.close()
  269. d = self._children[pid] = {}
  270. d['file'] = child
  271. d['avail'] = True
  272. return True
  273. def _isClientAllowed(self, addr):
  274. """Override to provide access control."""
  275. return True
  276. def _notifyParent(self, parent, msg):
  277. """Send message to parent, ignoring EPIPE and retrying on EAGAIN"""
  278. while True:
  279. try:
  280. parent.send(msg)
  281. return True
  282. except socket.error, e:
  283. if e[0] == errno.EPIPE:
  284. return False # Parent is gone
  285. if e[0] == errno.EAGAIN:
  286. # Wait for socket change before sending again
  287. select.select([], [parent], [])
  288. else:
  289. raise
  290. def _child(self, sock, parent):
  291. """Main loop for children."""
  292. requestCount = 0
  293. # Re-seed random module
  294. preseed = ''
  295. # urandom only exists in Python >= 2.4
  296. if hasattr(os, 'urandom'):
  297. try:
  298. preseed = os.urandom(16)
  299. except NotImplementedError:
  300. pass
  301. # Have doubts about this. random.seed will just hash the string
  302. random.seed('%s%s%s' % (preseed, os.getpid(), time.time()))
  303. del preseed
  304. while True:
  305. # Wait for any activity on the main socket or parent socket.
  306. r, w, e = select.select([sock, parent], [], [])
  307. for f in r:
  308. # If there's any activity on the parent socket, it
  309. # means the parent wants us to die or has died itself.
  310. # Either way, exit.
  311. if f is parent:
  312. return
  313. # Otherwise, there's activity on the main socket...
  314. try:
  315. clientSock, addr = sock.accept()
  316. except socket.error, e:
  317. if e[0] == errno.EAGAIN:
  318. # Or maybe not.
  319. continue
  320. raise
  321. setCloseOnExec(clientSock)
  322. # Check if this client is allowed.
  323. if not self._isClientAllowed(addr):
  324. clientSock.close()
  325. continue
  326. # Notify parent we're no longer available.
  327. self._notifyParent(parent, '\x00')
  328. # Do the job.
  329. self._jobClass(clientSock, addr, *self._jobArgs).run()
  330. # If we've serviced the maximum number of requests, exit.
  331. if self._maxRequests > 0:
  332. requestCount += 1
  333. if requestCount >= self._maxRequests:
  334. break
  335. # Tell parent we're free again.
  336. if not self._notifyParent(parent, '\xff'):
  337. return # Parent is gone.
  338. # Signal handlers
  339. def _hupHandler(self, signum, frame):
  340. self._keepGoing = False
  341. self._hupReceived = True
  342. def _intHandler(self, signum, frame):
  343. self._keepGoing = False
  344. def _chldHandler(self, signum, frame):
  345. # Do nothing (breaks us out of select and allows us to reap children).
  346. pass
  347. def _installSignalHandlers(self):
  348. supportedSignals = [signal.SIGINT, signal.SIGTERM]
  349. if hasattr(signal, 'SIGHUP'):
  350. supportedSignals.append(signal.SIGHUP)
  351. self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
  352. for sig in supportedSignals:
  353. if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
  354. signal.signal(sig, self._hupHandler)
  355. else:
  356. signal.signal(sig, self._intHandler)
  357. def _restoreSignalHandlers(self):
  358. """Restores previous signal handlers."""
  359. for signum,handler in self._oldSIGs:
  360. signal.signal(signum, handler)
  361. if __name__ == '__main__':
  362. class TestJob(object):
  363. def __init__(self, sock, addr):
  364. self._sock = sock
  365. self._addr = addr
  366. def run(self):
  367. print "Client connection opened from %s:%d" % self._addr
  368. self._sock.send('Hello World!\n')
  369. self._sock.setblocking(1)
  370. self._sock.recv(1)
  371. self._sock.close()
  372. print "Client connection closed from %s:%d" % self._addr
  373. sock = socket.socket()
  374. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  375. sock.bind(('', 8080))
  376. sock.listen(socket.SOMAXCONN)
  377. PreforkServer(maxChildren=10, jobClass=TestJob).run(sock)