threadedserver.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. # Copyright (c) 2005 Allan Saddi <allan@saddi.com>
  2. # All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions
  6. # are met:
  7. # 1. Redistributions of source code must retain the above copyright
  8. # notice, this list of conditions and the following disclaimer.
  9. # 2. Redistributions in binary form must reproduce the above copyright
  10. # notice, this list of conditions and the following disclaimer in the
  11. # documentation and/or other materials provided with the distribution.
  12. #
  13. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  14. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  15. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  16. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  17. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  18. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  19. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  20. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  21. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  22. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  23. # SUCH DAMAGE.
  24. #
  25. # $Id$
  26. __author__ = 'Allan Saddi <allan@saddi.com>'
  27. __version__ = '$Revision$'
  28. import sys
  29. import socket
  30. import select
  31. import signal
  32. import errno
  33. try:
  34. import fcntl
  35. except ImportError:
  36. def setCloseOnExec(sock):
  37. pass
  38. else:
  39. def setCloseOnExec(sock):
  40. fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
  41. from flup.server.threadpool import ThreadPool
  42. __all__ = ['ThreadedServer']
  43. class ThreadedServer(object):
  44. def __init__(self, jobClass=None, jobArgs=(), **kw):
  45. self._jobClass = jobClass
  46. self._jobArgs = jobArgs
  47. self._threadPool = ThreadPool(**kw)
  48. def run(self, sock, timeout=1.0):
  49. """
  50. The main loop. Pass a socket that is ready to accept() client
  51. connections. Return value will be True or False indiciating whether
  52. or not the loop was exited due to SIGHUP.
  53. """
  54. # Set up signal handlers.
  55. self._keepGoing = True
  56. self._hupReceived = False
  57. # Might need to revisit this?
  58. if not sys.platform.startswith('win'):
  59. self._installSignalHandlers()
  60. # Set close-on-exec
  61. setCloseOnExec(sock)
  62. # Main loop.
  63. while self._keepGoing:
  64. try:
  65. r, w, e = select.select([sock], [], [], timeout)
  66. except select.error, e:
  67. if e[0] == errno.EINTR:
  68. continue
  69. raise
  70. if r:
  71. try:
  72. clientSock, addr = sock.accept()
  73. except socket.error, e:
  74. if e[0] in (errno.EINTR, errno.EAGAIN):
  75. continue
  76. raise
  77. setCloseOnExec(clientSock)
  78. if not self._isClientAllowed(addr):
  79. clientSock.close()
  80. continue
  81. # Hand off to Connection.
  82. conn = self._jobClass(clientSock, addr, *self._jobArgs)
  83. if not self._threadPool.addJob(conn, allowQueuing=False):
  84. # No thread left, immediately close the socket to hopefully
  85. # indicate to the web server that we're at our limit...
  86. # and to prevent having too many opened (and useless)
  87. # files.
  88. clientSock.close()
  89. self._mainloopPeriodic()
  90. # Restore signal handlers.
  91. self._restoreSignalHandlers()
  92. # Return bool based on whether or not SIGHUP was received.
  93. return self._hupReceived
  94. def _mainloopPeriodic(self):
  95. """
  96. Called with just about each iteration of the main loop. Meant to
  97. be overridden.
  98. """
  99. pass
  100. def _exit(self, reload=False):
  101. """
  102. Protected convenience method for subclasses to force an exit. Not
  103. really thread-safe, which is why it isn't public.
  104. """
  105. if self._keepGoing:
  106. self._keepGoing = False
  107. self._hupReceived = reload
  108. def _isClientAllowed(self, addr):
  109. """Override to provide access control."""
  110. return True
  111. # Signal handlers
  112. def _hupHandler(self, signum, frame):
  113. self._hupReceived = True
  114. self._keepGoing = False
  115. def _intHandler(self, signum, frame):
  116. self._keepGoing = False
  117. def _installSignalHandlers(self):
  118. supportedSignals = [signal.SIGINT, signal.SIGTERM]
  119. if hasattr(signal, 'SIGHUP'):
  120. supportedSignals.append(signal.SIGHUP)
  121. self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
  122. for sig in supportedSignals:
  123. if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
  124. signal.signal(sig, self._hupHandler)
  125. else:
  126. signal.signal(sig, self._intHandler)
  127. def _restoreSignalHandlers(self):
  128. for signum,handler in self._oldSIGs:
  129. signal.signal(signum, handler)
  130. if __name__ == '__main__':
  131. class TestJob(object):
  132. def __init__(self, sock, addr):
  133. self._sock = sock
  134. self._addr = addr
  135. def run(self):
  136. print "Client connection opened from %s:%d" % self._addr
  137. self._sock.send('Hello World!\n')
  138. self._sock.setblocking(1)
  139. self._sock.recv(1)
  140. self._sock.close()
  141. print "Client connection closed from %s:%d" % self._addr
  142. sock = socket.socket()
  143. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  144. sock.bind(('', 8080))
  145. sock.listen(socket.SOMAXCONN)
  146. ThreadedServer(maxThreads=10, jobClass=TestJob).run(sock)