123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- # Copyright (c) 2005 Allan Saddi <allan@saddi.com>
- # All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions
- # are met:
- # 1. Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- # 2. Redistributions in binary form must reproduce the above copyright
- # notice, this list of conditions and the following disclaimer in the
- # documentation and/or other materials provided with the distribution.
- #
- # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
- # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- # SUCH DAMAGE.
- #
- # $Id$
- __author__ = 'Allan Saddi <allan@saddi.com>'
- __version__ = '$Revision$'
- import sys
- import os
- import socket
- import select
- import errno
- import signal
- import random
- import time
- try:
- import fcntl
- except ImportError:
- def setCloseOnExec(sock):
- pass
- else:
- def setCloseOnExec(sock):
- fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
- # If running Python < 2.4, require eunuchs module for socket.socketpair().
- # See <http://www.inoi.fi/open/trac/eunuchs>.
- if not hasattr(socket, 'socketpair'):
- try:
- import eunuchs.socketpair
- except ImportError:
- # TODO: Other alternatives? Perhaps using os.pipe()?
- raise ImportError, 'Requires eunuchs module for Python < 2.4'
- def socketpair():
- s1, s2 = eunuchs.socketpair.socketpair()
- p, c = (socket.fromfd(s1, socket.AF_UNIX, socket.SOCK_STREAM),
- socket.fromfd(s2, socket.AF_UNIX, socket.SOCK_STREAM))
- os.close(s1)
- os.close(s2)
- return p, c
- socket.socketpair = socketpair
- class PreforkServer(object):
- """
- A preforked server model conceptually similar to Apache httpd(2). At
- any given time, ensures there are at least minSpare children ready to
- process new requests (up to a maximum of maxChildren children total).
- If the number of idle children is ever above maxSpare, the extra
- children are killed.
- If maxRequests is positive, each child will only handle that many
- requests in its lifetime before exiting.
-
- jobClass should be a class whose constructor takes at least two
- arguments: the client socket and client address. jobArgs, which
- must be a list or tuple, is any additional (static) arguments you
- wish to pass to the constructor.
- jobClass should have a run() method (taking no arguments) that does
- the actual work. When run() returns, the request is considered
- complete and the child process moves to idle state.
- """
- def __init__(self, minSpare=1, maxSpare=5, maxChildren=50,
- maxRequests=0, jobClass=None, jobArgs=()):
- self._minSpare = minSpare
- self._maxSpare = maxSpare
- self._maxChildren = max(maxSpare, maxChildren)
- self._maxRequests = maxRequests
- self._jobClass = jobClass
- self._jobArgs = jobArgs
- # Internal state of children. Maps pids to dictionaries with two
- # members: 'file' and 'avail'. 'file' is the socket to that
- # individidual child and 'avail' is whether or not the child is
- # free to process requests.
- self._children = {}
- def run(self, sock):
- """
- The main loop. Pass a socket that is ready to accept() client
- connections. Return value will be True or False indiciating whether
- or not the loop was exited due to SIGHUP.
- """
- # Set up signal handlers.
- self._keepGoing = True
- self._hupReceived = False
- self._installSignalHandlers()
- # Don't want operations on main socket to block.
- sock.setblocking(0)
- # Set close-on-exec
- setCloseOnExec(sock)
-
- # Main loop.
- while self._keepGoing:
- # Maintain minimum number of children.
- while len(self._children) < self._maxSpare:
- if not self._spawnChild(sock): break
- # Wait on any socket activity from live children.
- r = [x['file'] for x in self._children.values()
- if x['file'] is not None]
- if len(r) == len(self._children):
- timeout = None
- else:
- # There are dead children that need to be reaped, ensure
- # that they are by timing out, if necessary.
- timeout = 2
- try:
- r, w, e = select.select(r, [], [], timeout)
- except select.error, e:
- if e[0] != errno.EINTR:
- raise
- # Scan child sockets and tend to those that need attention.
- for child in r:
- # Receive status byte.
- try:
- state = child.recv(1)
- except socket.error, e:
- if e[0] in (errno.EAGAIN, errno.EINTR):
- # Guess it really didn't need attention?
- continue
- raise
- # Try to match it with a child. (Do we need a reverse map?)
- for pid,d in self._children.items():
- if child is d['file']:
- if state:
- # Set availability status accordingly.
- self._children[pid]['avail'] = state != '\x00'
- else:
- # Didn't receive anything. Child is most likely
- # dead.
- d = self._children[pid]
- d['file'].close()
- d['file'] = None
- d['avail'] = False
- # Reap children.
- self._reapChildren()
- # See who and how many children are available.
- availList = filter(lambda x: x[1]['avail'], self._children.items())
- avail = len(availList)
- if avail < self._minSpare:
- # Need to spawn more children.
- while avail < self._minSpare and \
- len(self._children) < self._maxChildren:
- if not self._spawnChild(sock): break
- avail += 1
- elif avail > self._maxSpare:
- # Too many spares, kill off the extras.
- pids = [x[0] for x in availList]
- pids.sort()
- pids = pids[self._maxSpare:]
- for pid in pids:
- d = self._children[pid]
- d['file'].close()
- d['file'] = None
- d['avail'] = False
- # Clean up all child processes.
- self._cleanupChildren()
- # Restore signal handlers.
- self._restoreSignalHandlers()
- # Return bool based on whether or not SIGHUP was received.
- return self._hupReceived
- def _cleanupChildren(self):
- """
- Closes all child sockets (letting those that are available know
- that it's time to exit). Sends SIGINT to those that are currently
- processing (and hopes that it finishses ASAP).
- Any children remaining after 10 seconds is SIGKILLed.
- """
- # Let all children know it's time to go.
- for pid,d in self._children.items():
- if d['file'] is not None:
- d['file'].close()
- d['file'] = None
- if not d['avail']:
- # Child is unavailable. SIGINT it.
- try:
- os.kill(pid, signal.SIGINT)
- except OSError, e:
- if e[0] != errno.ESRCH:
- raise
- def alrmHandler(signum, frame):
- pass
- # Set up alarm to wake us up after 10 seconds.
- oldSIGALRM = signal.getsignal(signal.SIGALRM)
- signal.signal(signal.SIGALRM, alrmHandler)
- signal.alarm(10)
- # Wait for all children to die.
- while len(self._children):
- try:
- pid, status = os.wait()
- except OSError, e:
- if e[0] in (errno.ECHILD, errno.EINTR):
- break
- if self._children.has_key(pid):
- del self._children[pid]
- signal.signal(signal.SIGALRM, oldSIGALRM)
- # Forcefully kill any remaining children.
- for pid in self._children.keys():
- try:
- os.kill(pid, signal.SIGKILL)
- except OSError, e:
- if e[0] != errno.ESRCH:
- raise
- def _reapChildren(self):
- """Cleans up self._children whenever children die."""
- while True:
- try:
- pid, status = os.waitpid(-1, os.WNOHANG)
- except OSError, e:
- if e[0] == errno.ECHILD:
- break
- raise
- if pid <= 0:
- break
- if self._children.has_key(pid): # Sanity check.
- if self._children[pid]['file'] is not None:
- self._children[pid]['file'].close()
- del self._children[pid]
- def _spawnChild(self, sock):
- """
- Spawn a single child. Returns True if successful, False otherwise.
- """
- # This socket pair is used for very simple communication between
- # the parent and its children.
- parent, child = socket.socketpair()
- parent.setblocking(0)
- setCloseOnExec(parent)
- child.setblocking(0)
- setCloseOnExec(child)
- try:
- pid = os.fork()
- except OSError, e:
- if e[0] in (errno.EAGAIN, errno.ENOMEM):
- return False # Can't fork anymore.
- raise
- if not pid:
- # Child
- child.close()
- # Put child into its own process group.
- pid = os.getpid()
- os.setpgid(pid, pid)
- # Restore signal handlers.
- self._restoreSignalHandlers()
- # Close copies of child sockets.
- for f in [x['file'] for x in self._children.values()
- if x['file'] is not None]:
- f.close()
- self._children = {}
- try:
- # Enter main loop.
- self._child(sock, parent)
- except KeyboardInterrupt:
- pass
- sys.exit(0)
- else:
- # Parent
- parent.close()
- d = self._children[pid] = {}
- d['file'] = child
- d['avail'] = True
- return True
- def _isClientAllowed(self, addr):
- """Override to provide access control."""
- return True
- def _notifyParent(self, parent, msg):
- """Send message to parent, ignoring EPIPE and retrying on EAGAIN"""
- while True:
- try:
- parent.send(msg)
- return True
- except socket.error, e:
- if e[0] == errno.EPIPE:
- return False # Parent is gone
- if e[0] == errno.EAGAIN:
- # Wait for socket change before sending again
- select.select([], [parent], [])
- else:
- raise
-
- def _child(self, sock, parent):
- """Main loop for children."""
- requestCount = 0
- # Re-seed random module
- preseed = ''
- # urandom only exists in Python >= 2.4
- if hasattr(os, 'urandom'):
- try:
- preseed = os.urandom(16)
- except NotImplementedError:
- pass
- # Have doubts about this. random.seed will just hash the string
- random.seed('%s%s%s' % (preseed, os.getpid(), time.time()))
- del preseed
- while True:
- # Wait for any activity on the main socket or parent socket.
- r, w, e = select.select([sock, parent], [], [])
- for f in r:
- # If there's any activity on the parent socket, it
- # means the parent wants us to die or has died itself.
- # Either way, exit.
- if f is parent:
- return
- # Otherwise, there's activity on the main socket...
- try:
- clientSock, addr = sock.accept()
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- # Or maybe not.
- continue
- raise
- setCloseOnExec(clientSock)
-
- # Check if this client is allowed.
- if not self._isClientAllowed(addr):
- clientSock.close()
- continue
- # Notify parent we're no longer available.
- self._notifyParent(parent, '\x00')
- # Do the job.
- self._jobClass(clientSock, addr, *self._jobArgs).run()
- # If we've serviced the maximum number of requests, exit.
- if self._maxRequests > 0:
- requestCount += 1
- if requestCount >= self._maxRequests:
- break
-
- # Tell parent we're free again.
- if not self._notifyParent(parent, '\xff'):
- return # Parent is gone.
- # Signal handlers
- def _hupHandler(self, signum, frame):
- self._keepGoing = False
- self._hupReceived = True
- def _intHandler(self, signum, frame):
- self._keepGoing = False
- def _chldHandler(self, signum, frame):
- # Do nothing (breaks us out of select and allows us to reap children).
- pass
- def _installSignalHandlers(self):
- supportedSignals = [signal.SIGINT, signal.SIGTERM]
- if hasattr(signal, 'SIGHUP'):
- supportedSignals.append(signal.SIGHUP)
- self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
- for sig in supportedSignals:
- if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
- signal.signal(sig, self._hupHandler)
- else:
- signal.signal(sig, self._intHandler)
- def _restoreSignalHandlers(self):
- """Restores previous signal handlers."""
- for signum,handler in self._oldSIGs:
- signal.signal(signum, handler)
- if __name__ == '__main__':
- class TestJob(object):
- def __init__(self, sock, addr):
- self._sock = sock
- self._addr = addr
- def run(self):
- print "Client connection opened from %s:%d" % self._addr
- self._sock.send('Hello World!\n')
- self._sock.setblocking(1)
- self._sock.recv(1)
- self._sock.close()
- print "Client connection closed from %s:%d" % self._addr
- sock = socket.socket()
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(('', 8080))
- sock.listen(socket.SOMAXCONN)
- PreforkServer(maxChildren=10, jobClass=TestJob).run(sock)
|