gateway.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. """
  2. gateway code for initiating popen, socket and ssh connections.
  3. (c) 2004-2013, Holger Krekel and others
  4. """
  5. import sys
  6. import os
  7. import inspect
  8. import types
  9. import linecache
  10. import textwrap
  11. import execnet
  12. from execnet.gateway_base import Message
  13. from execnet import gateway_base
  14. importdir = os.path.dirname(os.path.dirname(execnet.__file__))
  15. class Gateway(gateway_base.BaseGateway):
  16. """ Gateway to a local or remote Python Intepreter. """
  17. def __init__(self, io, spec):
  18. super(Gateway, self).__init__(io=io, id=spec.id, _startcount=1)
  19. self.spec = spec
  20. self._initreceive()
  21. @property
  22. def remoteaddress(self):
  23. return self._io.remoteaddress
  24. def __repr__(self):
  25. """ return string representing gateway type and status. """
  26. try:
  27. r = (self.hasreceiver() and 'receive-live' or 'not-receiving')
  28. i = len(self._channelfactory.channels())
  29. except AttributeError:
  30. r = "uninitialized"
  31. i = "no"
  32. return "<{} id={!r} {}, {} model, {} active channels>".format(
  33. self.__class__.__name__,
  34. self.id,
  35. r,
  36. self.execmodel.backend,
  37. i,
  38. )
  39. def exit(self):
  40. """ trigger gateway exit. Defer waiting for finishing
  41. of receiver-thread and subprocess activity to when
  42. group.terminate() is called.
  43. """
  44. self._trace("gateway.exit() called")
  45. if self not in self._group:
  46. self._trace("gateway already unregistered with group")
  47. return
  48. self._group._unregister(self)
  49. try:
  50. self._trace("--> sending GATEWAY_TERMINATE")
  51. self._send(Message.GATEWAY_TERMINATE)
  52. self._trace("--> io.close_write")
  53. self._io.close_write()
  54. except (ValueError, EOFError, IOError):
  55. v = sys.exc_info()[1]
  56. self._trace("io-error: could not send termination sequence")
  57. self._trace(" exception: %r" % v)
  58. def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False):
  59. """
  60. set the string coercion for this gateway
  61. the default is to try to convert py2 str as py3 str,
  62. but not to try and convert py3 str to py2 str
  63. """
  64. self._strconfig = (py2str_as_py3str, py3str_as_py2str)
  65. data = gateway_base.dumps_internal(self._strconfig)
  66. self._send(Message.RECONFIGURE, data=data)
  67. def _rinfo(self, update=False):
  68. """ return some sys/env information from remote. """
  69. if update or not hasattr(self, '_cache_rinfo'):
  70. ch = self.remote_exec(rinfo_source)
  71. self._cache_rinfo = RInfo(ch.receive())
  72. return self._cache_rinfo
  73. def hasreceiver(self):
  74. """ return True if gateway is able to receive data. """
  75. return self._receivepool.active_count() > 0
  76. def remote_status(self):
  77. """ return information object about remote execution status. """
  78. channel = self.newchannel()
  79. self._send(Message.STATUS, channel.id)
  80. statusdict = channel.receive()
  81. # the other side didn't actually instantiate a channel
  82. # so we just delete the internal id/channel mapping
  83. self._channelfactory._local_close(channel.id)
  84. return RemoteStatus(statusdict)
  85. def remote_exec(self, source, **kwargs):
  86. """ return channel object and connect it to a remote
  87. execution thread where the given ``source`` executes.
  88. * ``source`` is a string: execute source string remotely
  89. with a ``channel`` put into the global namespace.
  90. * ``source`` is a pure function: serialize source and
  91. call function with ``**kwargs``, adding a
  92. ``channel`` object to the keyword arguments.
  93. * ``source`` is a pure module: execute source of module
  94. with a ``channel`` in its global namespace
  95. In all cases the binding ``__name__='__channelexec__'``
  96. will be available in the global namespace of the remotely
  97. executing code.
  98. """
  99. call_name = None
  100. if isinstance(source, types.ModuleType):
  101. linecache.updatecache(inspect.getsourcefile(source))
  102. source = inspect.getsource(source)
  103. elif isinstance(source, types.FunctionType):
  104. call_name = source.__name__
  105. source = _source_of_function(source)
  106. else:
  107. source = textwrap.dedent(str(source))
  108. if call_name is None and kwargs:
  109. raise TypeError("can't pass kwargs to non-function remote_exec")
  110. channel = self.newchannel()
  111. self._send(Message.CHANNEL_EXEC,
  112. channel.id,
  113. gateway_base.dumps_internal((source, call_name, kwargs)))
  114. return channel
  115. def remote_init_threads(self, num=None):
  116. """ DEPRECATED. Is currently a NO-OPERATION already."""
  117. print ("WARNING: remote_init_threads()"
  118. " is a no-operation in execnet-1.2")
  119. class RInfo:
  120. def __init__(self, kwargs):
  121. self.__dict__.update(kwargs)
  122. def __repr__(self):
  123. info = ", ".join(
  124. "%s=%s" % item for item in sorted(self.__dict__.items()))
  125. return "<RInfo %r>" % info
  126. RemoteStatus = RInfo
  127. def rinfo_source(channel):
  128. import sys
  129. import os
  130. channel.send(dict(
  131. executable=sys.executable,
  132. version_info=sys.version_info[:5],
  133. platform=sys.platform,
  134. cwd=os.getcwd(),
  135. pid=os.getpid(),
  136. ))
  137. def _find_non_builtin_globals(source, codeobj):
  138. import ast
  139. try:
  140. import __builtin__
  141. except ImportError:
  142. import builtins as __builtin__
  143. vars = dict.fromkeys(codeobj.co_varnames)
  144. return [
  145. node.id for node in ast.walk(ast.parse(source))
  146. if isinstance(node, ast.Name) and
  147. node.id not in vars and
  148. node.id not in __builtin__.__dict__
  149. ]
  150. def _source_of_function(function):
  151. if function.__name__ == '<lambda>':
  152. raise ValueError("can't evaluate lambda functions'")
  153. # XXX: we dont check before remote instanciation
  154. # if arguments are used propperly
  155. try:
  156. sig = inspect.getfullargspec(function)
  157. except AttributeError:
  158. args = inspect.getargspec(function)[0]
  159. else:
  160. args = sig.args
  161. if args[0] != 'channel':
  162. raise ValueError('expected first function argument to be `channel`')
  163. if gateway_base.ISPY3:
  164. closure = function.__closure__
  165. codeobj = function.__code__
  166. else:
  167. closure = function.func_closure
  168. codeobj = function.func_code
  169. if closure is not None:
  170. raise ValueError("functions with closures can't be passed")
  171. try:
  172. source = inspect.getsource(function)
  173. except IOError:
  174. raise ValueError("can't find source file for %s" % function)
  175. source = textwrap.dedent(source) # just for inner functions
  176. used_globals = _find_non_builtin_globals(source, codeobj)
  177. if used_globals:
  178. raise ValueError(
  179. "the use of non-builtin globals isn't supported",
  180. used_globals,
  181. )
  182. return source