multiplexserver.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. """
  2. Socket server based on socket multiplexing. Doesn't use threads.
  3. Uses the best available selector (kqueue, poll, select).
  4. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  5. """
  6. from __future__ import print_function
  7. import socket
  8. import time
  9. import sys
  10. import logging
  11. import os
  12. from collections import defaultdict
  13. try:
  14. import selectors
  15. except ImportError:
  16. import selectors34 as selectors
  17. from Pyro4 import socketutil, errors, util
  18. import Pyro4.constants
  19. log = logging.getLogger("Pyro4.multiplexserver")
  20. class SocketServer_Multiplex(object):
  21. """Multiplexed transport server for socket connections (uses select, poll, kqueue, ...)"""
  22. def __init__(self):
  23. self.sock = self.daemon = self.locationStr = None
  24. self.selector = selectors.DefaultSelector()
  25. self.shutting_down = False
  26. def init(self, daemon, host, port, unixsocket=None):
  27. log.info("starting multiplexed socketserver")
  28. log.debug("selector implementation: "+self.selector.__class__.__name__)
  29. self.sock = None
  30. bind_location = unixsocket if unixsocket else (host, port)
  31. self.sock = socketutil.createSocket(bind=bind_location, reuseaddr=Pyro4.config.SOCK_REUSE, timeout=Pyro4.config.COMMTIMEOUT, noinherit=True, nodelay=Pyro4.config.SOCK_NODELAY)
  32. self.daemon = daemon
  33. self._socketaddr = sockaddr = self.sock.getsockname()
  34. if not unixsocket and sockaddr[0].startswith("127."):
  35. if host is None or host.lower() != "localhost" and not host.startswith("127."):
  36. log.warning("weird DNS setup: %s resolves to localhost (127.x.x.x)", host)
  37. if unixsocket:
  38. self.locationStr = "./u:" + unixsocket
  39. else:
  40. host = host or sockaddr[0]
  41. port = port or sockaddr[1]
  42. if ":" in host: # ipv6
  43. self.locationStr = "[%s]:%d" % (host, port)
  44. else:
  45. self.locationStr = "%s:%d" % (host, port)
  46. self.selector.register(self.sock, selectors.EVENT_READ, self)
  47. def __repr__(self):
  48. return "<%s on %s; %d connections>" % (self.__class__.__name__, self.locationStr, len(self.selector.get_map())-1)
  49. def __del__(self):
  50. if self.sock is not None:
  51. self.selector.close()
  52. self.sock.close()
  53. self.sock = None
  54. def events(self, eventsockets):
  55. """handle events that occur on one of the sockets of this server"""
  56. for s in eventsockets:
  57. if self.shutting_down:
  58. return
  59. if s is self.sock:
  60. # server socket, means new connection
  61. conn = self._handleConnection(self.sock)
  62. if conn:
  63. self.selector.register(conn, selectors.EVENT_READ, self)
  64. else:
  65. # must be client socket, means remote call
  66. active = self.handleRequest(s)
  67. if not active:
  68. try:
  69. self.daemon._clientDisconnect(s)
  70. except Exception as x:
  71. log.warning("Error in clientDisconnect: " + str(x))
  72. self.selector.unregister(s)
  73. s.close()
  74. self.daemon._housekeeping()
  75. def _handleConnection(self, sock):
  76. try:
  77. if sock is None:
  78. return
  79. csock, caddr = sock.accept()
  80. log.debug("connected %s", caddr)
  81. if Pyro4.config.COMMTIMEOUT:
  82. csock.settimeout(Pyro4.config.COMMTIMEOUT)
  83. except socket.error:
  84. x = sys.exc_info()[1]
  85. err = getattr(x, "errno", x.args[0])
  86. if err in socketutil.ERRNO_RETRIES:
  87. # just ignore this error for now and continue
  88. log.warning("accept() failed errno=%d, shouldn't happen", err)
  89. return None
  90. if err in socketutil.ERRNO_BADF or err in socketutil.ERRNO_ENOTSOCK:
  91. # our server socket got destroyed
  92. raise errors.ConnectionClosedError("server socket closed")
  93. raise
  94. try:
  95. conn = socketutil.SocketConnection(csock)
  96. if self.daemon._handshake(conn):
  97. return conn
  98. conn.close()
  99. except: # catch all errors, otherwise the event loop could terminate
  100. ex_t, ex_v, ex_tb = sys.exc_info()
  101. tb = util.formatTraceback(ex_t, ex_v, ex_tb)
  102. log.warning("error during connect/handshake: %s; %s", ex_v, "\n".join(tb))
  103. try:
  104. csock.shutdown(socket.SHUT_RDWR)
  105. except (OSError, socket.error):
  106. pass
  107. csock.close()
  108. return None
  109. def shutdown(self):
  110. self.shutting_down = True
  111. self.wakeup()
  112. time.sleep(0.05)
  113. self.close()
  114. self.sock = None
  115. def close(self):
  116. self.selector.close()
  117. if self.sock:
  118. sockname = None
  119. try:
  120. sockname = self.sock.getsockname()
  121. except socket.error:
  122. pass
  123. self.sock.close()
  124. if type(sockname) is str:
  125. # it was a Unix domain socket, remove it from the filesystem
  126. if os.path.exists(sockname):
  127. os.remove(sockname)
  128. self.sock = None
  129. @property
  130. def sockets(self):
  131. registrations = self.selector.get_map()
  132. if registrations:
  133. return [sk.fileobj for sk in registrations.values()]
  134. else:
  135. return []
  136. def wakeup(self):
  137. """bit of a hack to trigger a blocking server to get out of the loop, useful at clean shutdowns"""
  138. socketutil.interruptSocket(self._socketaddr)
  139. def handleRequest(self, conn):
  140. """Handles a single connection request event and returns if the connection is still active"""
  141. try:
  142. self.daemon.handleRequest(conn)
  143. return True
  144. except (socket.error, errors.ConnectionClosedError, errors.SecurityError):
  145. # client went away or caused a security error.
  146. # close the connection silently.
  147. try:
  148. peername = conn.sock.getpeername()
  149. log.debug("disconnected %s", peername)
  150. except socket.error:
  151. log.debug("disconnected a client")
  152. return False
  153. except errors.TimeoutError as x:
  154. # for timeout errors we're not really interested in detailed traceback info
  155. log.warning("error during handleRequest: %s" % x)
  156. return False
  157. except:
  158. # other error occurred, close the connection, but also log a warning
  159. ex_t, ex_v, ex_tb = sys.exc_info()
  160. tb = util.formatTraceback(ex_t, ex_v, ex_tb)
  161. msg = "error during handleRequest: %s; %s" % (ex_v, "".join(tb))
  162. log.warning(msg)
  163. return False
  164. def loop(self, loopCondition=lambda: True):
  165. log.debug("entering multiplexed requestloop")
  166. while loopCondition():
  167. try:
  168. try:
  169. events = self.selector.select(Pyro4.config.POLLTIMEOUT)
  170. except OSError:
  171. events = []
  172. # get all the socket connection objects that have a READ event
  173. # (the WRITE events are ignored here, they're registered to let timeouts work etc)
  174. events_per_server = defaultdict(list)
  175. for key, mask in events:
  176. if mask & selectors.EVENT_READ:
  177. events_per_server[key.data].append(key.fileobj)
  178. for server, fileobjs in events_per_server.items():
  179. server.events(fileobjs)
  180. if not events_per_server:
  181. self.daemon._housekeeping()
  182. except socket.timeout:
  183. pass # just continue the loop on a timeout
  184. except KeyboardInterrupt:
  185. log.debug("stopping on break signal")
  186. break
  187. def combine_loop(self, server):
  188. for sock in server.sockets:
  189. self.selector.register(sock, selectors.EVENT_READ, server)
  190. server.selector = self.selector