gateway_socket.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. from execnet.gateway_bootstrap import HostNotFound
  2. import sys
  3. try:
  4. bytes
  5. except NameError:
  6. bytes = str
  7. class SocketIO:
  8. def __init__(self, sock, execmodel):
  9. self.sock = sock
  10. self.execmodel = execmodel
  11. socket = execmodel.socket
  12. try:
  13. # IPTOS_LOWDELAY
  14. sock.setsockopt(socket.SOL_IP, socket.IP_TOS, 0x10)
  15. sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
  16. except (AttributeError, socket.error):
  17. sys.stderr.write("WARNING: cannot set socketoption")
  18. def read(self, numbytes):
  19. "Read exactly 'bytes' bytes from the socket."
  20. buf = bytes()
  21. while len(buf) < numbytes:
  22. t = self.sock.recv(numbytes - len(buf))
  23. if not t:
  24. raise EOFError
  25. buf += t
  26. return buf
  27. def write(self, data):
  28. self.sock.sendall(data)
  29. def close_read(self):
  30. try:
  31. self.sock.shutdown(0)
  32. except self.execmodel.socket.error:
  33. pass
  34. def close_write(self):
  35. try:
  36. self.sock.shutdown(1)
  37. except self.execmodel.socket.error:
  38. pass
  39. def wait(self):
  40. pass
  41. def kill(self):
  42. pass
  43. def start_via(gateway, hostport=None):
  44. """ return a host, port tuple,
  45. after instanciating a socketserver on the given gateway
  46. """
  47. if hostport is None:
  48. host, port = ('localhost', 0)
  49. else:
  50. host, port = hostport
  51. from execnet.script import socketserver
  52. # execute the above socketserverbootstrap on the other side
  53. channel = gateway.remote_exec(socketserver)
  54. channel.send((host, port))
  55. (realhost, realport) = channel.receive()
  56. # self._trace("new_remote received"
  57. # "port=%r, hostname = %r" %(realport, hostname))
  58. if not realhost or realhost == "0.0.0.0":
  59. realhost = "localhost"
  60. return realhost, realport
  61. def create_io(spec, group, execmodel):
  62. assert not spec.python, (
  63. "socket: specifying python executables not yet supported")
  64. gateway_id = spec.installvia
  65. if gateway_id:
  66. host, port = start_via(group[gateway_id])
  67. else:
  68. host, port = spec.socket.split(":")
  69. port = int(port)
  70. socket = execmodel.socket
  71. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  72. io = SocketIO(sock, execmodel)
  73. io.remoteaddress = '%s:%d' % (host, port)
  74. try:
  75. sock.connect((host, port))
  76. except execmodel.socket.gaierror:
  77. raise HostNotFound(str(sys.exc_info()[1]))
  78. return io