threadpoolserver.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. """
  2. Socket server based on a worker thread pool. Doesn't use select.
  3. Uses a single worker thread per client connection.
  4. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  5. """
  6. from __future__ import print_function
  7. import socket
  8. import logging
  9. import sys
  10. import time
  11. import os
  12. import Pyro4.util
  13. from Pyro4 import socketutil, errors
  14. from .threadpool import Pool, NoFreeWorkersError
  15. from ..threadutil import Lock, Thread, Event
  16. log = logging.getLogger("Pyro4.threadpoolserver")
  17. _client_disconnect_lock = Lock()
  18. class ClientConnectionJob(object):
  19. """
  20. Takes care of a single client connection and all requests
  21. that may arrive during its life span.
  22. """
  23. def __init__(self, clientSocket, clientAddr, daemon):
  24. self.csock = socketutil.SocketConnection(clientSocket)
  25. self.caddr = clientAddr
  26. self.daemon = daemon
  27. def __call__(self):
  28. if self.handleConnection():
  29. try:
  30. while True:
  31. try:
  32. self.daemon.handleRequest(self.csock)
  33. except (socket.error, errors.ConnectionClosedError):
  34. # client went away.
  35. log.debug("disconnected %s", self.caddr)
  36. break
  37. except errors.SecurityError:
  38. log.debug("security error on client %s", self.caddr)
  39. break
  40. except errors.TimeoutError as x:
  41. # for timeout errors we're not really interested in detailed traceback info
  42. log.warning("error during handleRequest: %s" % x)
  43. break
  44. except:
  45. # other errors log a warning, break this loop and close the client connection
  46. ex_t, ex_v, ex_tb = sys.exc_info()
  47. tb = Pyro4.util.formatTraceback(ex_t, ex_v, ex_tb)
  48. msg = "error during handleRequest: %s; %s" % (ex_v, "".join(tb))
  49. log.warning(msg)
  50. break
  51. finally:
  52. with _client_disconnect_lock:
  53. try:
  54. self.daemon._clientDisconnect(self.csock)
  55. except Exception as x:
  56. log.warning("Error in clientDisconnect: " + str(x))
  57. self.csock.close()
  58. def handleConnection(self):
  59. # connection handshake
  60. try:
  61. if self.daemon._handshake(self.csock):
  62. return True
  63. self.csock.close()
  64. except:
  65. ex_t, ex_v, ex_tb = sys.exc_info()
  66. tb = Pyro4.util.formatTraceback(ex_t, ex_v, ex_tb)
  67. log.warning("error during connect/handshake: %s; %s", ex_v, "\n".join(tb))
  68. self.csock.close()
  69. return False
  70. def denyConnection(self, reason):
  71. log.warning("client connection was denied: "+reason)
  72. # return failed handshake
  73. self.daemon._handshake(self.csock, denied_reason=reason)
  74. self.csock.close()
  75. class Housekeeper(Thread):
  76. def __init__(self, daemon):
  77. super(Housekeeper, self).__init__(name="housekeeper")
  78. self.pyroDaemon = daemon
  79. self.stop = Event()
  80. self.daemon = True
  81. self.waittime = min(Pyro4.config.POLLTIMEOUT or 0, max(Pyro4.config.COMMTIMEOUT or 0, 5))
  82. def run(self):
  83. while True:
  84. if self.stop.wait(self.waittime):
  85. break
  86. self.pyroDaemon._housekeeping()
  87. class SocketServer_Threadpool(object):
  88. """transport server for socket connections, worker thread pool version."""
  89. def __init__(self):
  90. self.daemon = self.sock = self._socketaddr = self.locationStr = self.pool = None
  91. self.shutting_down = False
  92. self.housekeeper = None
  93. def init(self, daemon, host, port, unixsocket=None):
  94. log.info("starting thread pool socketserver")
  95. self.daemon = daemon
  96. self.sock = None
  97. bind_location = unixsocket if unixsocket else (host, port)
  98. self.sock = socketutil.createSocket(bind=bind_location, reuseaddr=Pyro4.config.SOCK_REUSE, timeout=Pyro4.config.COMMTIMEOUT, noinherit=True, nodelay=Pyro4.config.SOCK_NODELAY)
  99. self._socketaddr = self.sock.getsockname()
  100. if not unixsocket and self._socketaddr[0].startswith("127."):
  101. if host is None or host.lower() != "localhost" and not host.startswith("127."):
  102. log.warning("weird DNS setup: %s resolves to localhost (127.x.x.x)", host)
  103. if unixsocket:
  104. self.locationStr = "./u:" + unixsocket
  105. else:
  106. host = host or self._socketaddr[0]
  107. port = port or self._socketaddr[1]
  108. if ":" in host: # ipv6
  109. self.locationStr = "[%s]:%d" % (host, port)
  110. else:
  111. self.locationStr = "%s:%d" % (host, port)
  112. self.pool = Pool()
  113. self.housekeeper = Housekeeper(daemon)
  114. self.housekeeper.start()
  115. def __del__(self):
  116. if self.sock is not None:
  117. self.sock.close()
  118. self.sock = None
  119. if self.pool is not None:
  120. self.pool.close()
  121. self.pool = None
  122. if self.housekeeper:
  123. self.housekeeper.stop.set()
  124. self.housekeeper.join()
  125. self.housekeeper = None
  126. def __repr__(self):
  127. return "<%s on %s; %d workers>" % (self.__class__.__name__, self.locationStr, self.pool.num_workers())
  128. def loop(self, loopCondition=lambda: True):
  129. log.debug("threadpool server requestloop")
  130. while (self.sock is not None) and not self.shutting_down and loopCondition():
  131. try:
  132. self.events([self.sock])
  133. except socket.error:
  134. x = sys.exc_info()[1]
  135. err = getattr(x, "errno", x.args[0])
  136. if not loopCondition():
  137. # swallow the socket error if loop terminates anyway
  138. # this can occur if we are asked to shutdown, socket can be invalid then
  139. break
  140. if err in socketutil.ERRNO_RETRIES:
  141. continue
  142. else:
  143. raise
  144. except KeyboardInterrupt:
  145. log.debug("stopping on break signal")
  146. break
  147. def combine_loop(self, server):
  148. raise TypeError("You can't use the loop combiner on the threadpool server type")
  149. def events(self, eventsockets):
  150. """used for external event loops: handle events that occur on one of the sockets of this server"""
  151. # we only react on events on our own server socket.
  152. # all other (client) sockets are owned by their individual threads.
  153. assert self.sock in eventsockets
  154. try:
  155. csock, caddr = self.sock.accept()
  156. if self.shutting_down:
  157. csock.close()
  158. return
  159. log.debug("connected %s", caddr)
  160. if Pyro4.config.COMMTIMEOUT:
  161. csock.settimeout(Pyro4.config.COMMTIMEOUT)
  162. job = ClientConnectionJob(csock, caddr, self.daemon)
  163. try:
  164. self.pool.process(job)
  165. except NoFreeWorkersError:
  166. job.denyConnection("no free workers, increase server threadpool size")
  167. except socket.timeout:
  168. pass # just continue the loop on a timeout on accept
  169. def shutdown(self):
  170. self.shutting_down = True
  171. self.wakeup()
  172. time.sleep(0.05)
  173. self.close()
  174. self.sock = None
  175. def close(self):
  176. if self.housekeeper:
  177. self.housekeeper.stop.set()
  178. self.housekeeper.join()
  179. self.housekeeper = None
  180. if self.sock:
  181. sockname = None
  182. try:
  183. sockname = self.sock.getsockname()
  184. except socket.error:
  185. pass
  186. try:
  187. self.sock.close()
  188. if type(sockname) is str:
  189. # it was a Unix domain socket, remove it from the filesystem
  190. if os.path.exists(sockname):
  191. os.remove(sockname)
  192. except Exception:
  193. pass
  194. self.sock = None
  195. self.pool.close()
  196. @property
  197. def sockets(self):
  198. # the server socket is all we care about, all client sockets are running in their own threads
  199. return [self.sock]
  200. @property
  201. def selector(self):
  202. raise TypeError("threadpool server doesn't have multiplexing selector")
  203. def wakeup(self):
  204. socketutil.interruptSocket(self._socketaddr)