gateway_io.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. """
  2. execnet io initialization code
  3. creates io instances used for gateway io
  4. """
  5. import os
  6. import sys
  7. import shlex
  8. try:
  9. from execnet.gateway_base import Popen2IO, Message
  10. except ImportError:
  11. from __main__ import Popen2IO, Message
  12. from functools import partial
  13. class Popen2IOMaster(Popen2IO):
  14. def __init__(self, args, execmodel):
  15. self.popen = p = execmodel.PopenPiped(args)
  16. Popen2IO.__init__(self, p.stdin, p.stdout, execmodel=execmodel)
  17. def wait(self):
  18. try:
  19. return self.popen.wait()
  20. except OSError:
  21. pass # subprocess probably dead already
  22. def kill(self):
  23. killpopen(self.popen)
  24. def killpopen(popen):
  25. try:
  26. if hasattr(popen, 'kill'):
  27. popen.kill()
  28. else:
  29. killpid(popen.pid)
  30. except EnvironmentError:
  31. sys.stderr.write("ERROR killing: %s\n" % (sys.exc_info()[1]))
  32. sys.stderr.flush()
  33. def killpid(pid):
  34. if hasattr(os, 'kill'):
  35. os.kill(pid, 15)
  36. elif sys.platform == "win32" or getattr(os, '_name', None) == 'nt':
  37. import ctypes
  38. PROCESS_TERMINATE = 1
  39. handle = ctypes.windll.kernel32.OpenProcess(
  40. PROCESS_TERMINATE, False, pid)
  41. ctypes.windll.kernel32.TerminateProcess(handle, -1)
  42. ctypes.windll.kernel32.CloseHandle(handle)
  43. else:
  44. raise EnvironmentError("no method to kill {}".format(pid))
  45. popen_bootstrapline = "import sys;exec(eval(sys.stdin.readline()))"
  46. def shell_split_path(path):
  47. """
  48. Use shell lexer to split the given path into a list of components,
  49. taking care to handle Windows' '\' correctly.
  50. """
  51. if sys.platform.startswith('win'):
  52. # replace \\ by / otherwise shlex will strip them out
  53. path = path.replace('\\', '/')
  54. return shlex.split(path)
  55. def popen_args(spec):
  56. args = shell_split_path(spec.python) if spec.python else [sys.executable]
  57. args.append('-u')
  58. if spec is not None and spec.dont_write_bytecode:
  59. args.append("-B")
  60. # Slight gymnastics in ordering these arguments because CPython (as of
  61. # 2.7.1) ignores -B if you provide `python -c "something" -B`
  62. args.extend(['-c', popen_bootstrapline])
  63. return args
  64. def ssh_args(spec):
  65. # NOTE: If changing this, you need to sync those changes to vagrant_args
  66. # as well, or, take some time to further refactor the commonalities of
  67. # ssh_args and vagrant_args.
  68. remotepython = spec.python or "python"
  69. args = ["ssh", "-C"]
  70. if spec.ssh_config is not None:
  71. args.extend(['-F', str(spec.ssh_config)])
  72. args.extend(spec.ssh.split())
  73. remotecmd = '{} -c "{}"'.format(remotepython, popen_bootstrapline)
  74. args.append(remotecmd)
  75. return args
  76. def vagrant_ssh_args(spec):
  77. # This is the vagrant-wrapped version of SSH. Unfortunately the
  78. # command lines are incompatible to just channel through ssh_args
  79. # due to ordering/templating issues.
  80. # NOTE: This should be kept in sync with the ssh_args behaviour.
  81. # spec.vagrant is identical to spec.ssh in that they both carry
  82. # the remote host "address".
  83. remotepython = spec.python or 'python'
  84. args = ['vagrant', 'ssh', spec.vagrant_ssh, '--', '-C']
  85. if spec.ssh_config is not None:
  86. args.extend(['-F', str(spec.ssh_config)])
  87. remotecmd = '{} -c "{}"'.format(remotepython, popen_bootstrapline)
  88. args.extend([remotecmd])
  89. return args
  90. def create_io(spec, execmodel):
  91. if spec.popen:
  92. args = popen_args(spec)
  93. return Popen2IOMaster(args, execmodel)
  94. if spec.ssh:
  95. args = ssh_args(spec)
  96. io = Popen2IOMaster(args, execmodel)
  97. io.remoteaddress = spec.ssh
  98. return io
  99. if spec.vagrant_ssh:
  100. args = vagrant_ssh_args(spec)
  101. io = Popen2IOMaster(args, execmodel)
  102. io.remoteaddress = spec.vagrant_ssh
  103. return io
  104. #
  105. # Proxy Gateway handling code
  106. #
  107. # master: proxy initiator
  108. # forwarder: forwards between master and sub
  109. # sub: sub process that is proxied to the initiator
  110. RIO_KILL = 1
  111. RIO_WAIT = 2
  112. RIO_REMOTEADDRESS = 3
  113. RIO_CLOSE_WRITE = 4
  114. class ProxyIO(object):
  115. """ A Proxy IO object allows to instantiate a Gateway
  116. through another "via" gateway. A master:ProxyIO object
  117. provides an IO object effectively connected to the sub
  118. via the forwarder. To achieve this, master:ProxyIO interacts
  119. with forwarder:serve_proxy_io() which itself
  120. instantiates and interacts with the sub.
  121. """
  122. def __init__(self, proxy_channel, execmodel):
  123. # after exchanging the control channel we use proxy_channel
  124. # for messaging IO
  125. self.controlchan = proxy_channel.gateway.newchannel()
  126. proxy_channel.send(self.controlchan)
  127. self.iochan = proxy_channel
  128. self.iochan_file = self.iochan.makefile('r')
  129. self.execmodel = execmodel
  130. def read(self, nbytes):
  131. return self.iochan_file.read(nbytes)
  132. def write(self, data):
  133. return self.iochan.send(data)
  134. def _controll(self, event):
  135. self.controlchan.send(event)
  136. return self.controlchan.receive()
  137. def close_write(self):
  138. self._controll(RIO_CLOSE_WRITE)
  139. def kill(self):
  140. self._controll(RIO_KILL)
  141. def wait(self):
  142. return self._controll(RIO_WAIT)
  143. @property
  144. def remoteaddress(self):
  145. return self._controll(RIO_REMOTEADDRESS)
  146. def __repr__(self):
  147. return '<RemoteIO via {}>'.format(self.iochan.gateway.id)
  148. class PseudoSpec:
  149. def __init__(self, vars):
  150. self.__dict__.update(vars)
  151. def __getattr__(self, name):
  152. return None
  153. def serve_proxy_io(proxy_channelX):
  154. execmodel = proxy_channelX.gateway.execmodel
  155. log = partial(
  156. proxy_channelX.gateway._trace,
  157. "serve_proxy_io:%s" % proxy_channelX.id)
  158. spec = PseudoSpec(proxy_channelX.receive())
  159. # create sub IO object which we will proxy back to our proxy initiator
  160. sub_io = create_io(spec, execmodel)
  161. control_chan = proxy_channelX.receive()
  162. log("got control chan", control_chan)
  163. # read data from master, forward it to the sub
  164. # XXX writing might block, thus blocking the receiver thread
  165. def forward_to_sub(data):
  166. log("forward data to sub, size %s" % len(data))
  167. sub_io.write(data)
  168. proxy_channelX.setcallback(forward_to_sub)
  169. def controll(data):
  170. if data == RIO_WAIT:
  171. control_chan.send(sub_io.wait())
  172. elif data == RIO_KILL:
  173. control_chan.send(sub_io.kill())
  174. elif data == RIO_REMOTEADDRESS:
  175. control_chan.send(sub_io.remoteaddress)
  176. elif data == RIO_CLOSE_WRITE:
  177. control_chan.send(sub_io.close_write())
  178. control_chan.setcallback(controll)
  179. # write data to the master coming from the sub
  180. forward_to_master_file = proxy_channelX.makefile("w")
  181. # read bootstrap byte from sub, send it on to master
  182. log('reading bootstrap byte from sub', spec.id)
  183. initial = sub_io.read(1)
  184. assert initial == '1'.encode('ascii'), initial
  185. log('forwarding bootstrap byte from sub', spec.id)
  186. forward_to_master_file.write(initial)
  187. # enter message forwarding loop
  188. while True:
  189. try:
  190. message = Message.from_io(sub_io)
  191. except EOFError:
  192. log('EOF from sub, terminating proxying loop', spec.id)
  193. break
  194. message.to_io(forward_to_master_file)
  195. # proxy_channelX will be closed from remote_exec's finalization code
  196. if __name__ == "__channelexec__":
  197. serve_proxy_io(channel) # noqa