gateway_base.py 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525
  1. """
  2. base execnet gateway code send to the other side for bootstrapping.
  3. NOTE: aims to be compatible to Python 2.5-3.X, Jython and IronPython
  4. :copyright: 2004-2015
  5. :authors:
  6. - Holger Krekel
  7. - Armin Rigo
  8. - Benjamin Peterson
  9. - Ronny Pfannschmidt
  10. - many others
  11. """
  12. from __future__ import with_statement
  13. import sys
  14. import os
  15. import weakref
  16. import traceback
  17. import struct
  18. # NOTE that we want to avoid try/except style importing
  19. # to avoid setting sys.exc_info() during import
  20. #
  21. ISPY3 = sys.version_info >= (3, 0)
  22. if ISPY3:
  23. from io import BytesIO
  24. exec("def do_exec(co, loc): exec(co, loc)\n"
  25. "def reraise(cls, val, tb): raise val\n")
  26. unicode = str
  27. _long_type = int
  28. from _thread import interrupt_main
  29. SUBPROCESS32 = False
  30. else:
  31. from StringIO import StringIO as BytesIO
  32. exec("def do_exec(co, loc): exec co in loc\n"
  33. "def reraise(cls, val, tb): raise cls, val, tb\n")
  34. bytes = str
  35. _long_type = long
  36. try:
  37. from thread import interrupt_main
  38. except ImportError:
  39. interrupt_main = None
  40. try:
  41. import subprocess32 # NOQA
  42. SUBPROCESS32 = True
  43. except ImportError:
  44. SUBPROCESS32 = False
  45. sys.exc_clear()
  46. # f = open("/tmp/execnet-%s" % os.getpid(), "w")
  47. # def log_extra(*msg):
  48. # f.write(" ".join([str(x) for x in msg]) + "\n")
  49. class EmptySemaphore:
  50. acquire = release = lambda self: None
  51. def get_execmodel(backend):
  52. if hasattr(backend, "backend"):
  53. return backend
  54. if backend == "thread":
  55. importdef = {
  56. 'get_ident': ['thread::get_ident', '_thread::get_ident'],
  57. '_start_new_thread': ['thread::start_new_thread',
  58. '_thread::start_new_thread'],
  59. 'threading': ["threading"],
  60. 'queue': ["queue" if ISPY3 else "Queue"],
  61. 'sleep': ['time::sleep'],
  62. 'subprocess': ['subprocess32' if SUBPROCESS32 else 'subprocess'],
  63. 'socket': ['socket'],
  64. '_fdopen': ['os::fdopen'],
  65. '_lock': ['threading'],
  66. '_event': ['threading'],
  67. }
  68. def exec_start(self, func, args=()):
  69. self._start_new_thread(func, args)
  70. elif backend == "eventlet":
  71. importdef = {
  72. 'get_ident': ['eventlet.green.thread::get_ident'],
  73. '_spawn_n': ['eventlet::spawn_n'],
  74. 'threading': ['eventlet.green.threading'],
  75. 'queue': ["eventlet.queue"],
  76. 'sleep': ['eventlet::sleep'],
  77. 'subprocess': ['eventlet.green.subprocess'],
  78. 'socket': ['eventlet.green.socket'],
  79. '_fdopen': ['eventlet.green.os::fdopen'],
  80. '_lock': ['eventlet.green.threading'],
  81. '_event': ['eventlet.green.threading'],
  82. }
  83. def exec_start(self, func, args=()):
  84. self._spawn_n(func, *args)
  85. elif backend == "gevent":
  86. importdef = {
  87. 'get_ident': ['gevent.thread::get_ident'],
  88. '_spawn_n': ['gevent::spawn'],
  89. 'threading': ['threading'],
  90. 'queue': ["gevent.queue"],
  91. 'sleep': ['gevent::sleep'],
  92. 'subprocess': ['gevent.subprocess'],
  93. 'socket': ['gevent.socket'],
  94. # XXX
  95. '_fdopen': ['gevent.fileobject::FileObjectThread'],
  96. '_lock': ['gevent.lock'],
  97. '_event': ['gevent.event'],
  98. }
  99. def exec_start(self, func, args=()):
  100. self._spawn_n(func, *args)
  101. else:
  102. raise ValueError("unknown execmodel {!r}".format(backend))
  103. class ExecModel:
  104. def __init__(self, name):
  105. self._importdef = importdef
  106. self.backend = name
  107. self._count = 0
  108. def __repr__(self):
  109. return "<ExecModel %r>" % self.backend
  110. def __getattr__(self, name):
  111. locs = self._importdef.get(name)
  112. if locs is None:
  113. raise AttributeError(name)
  114. for loc in locs:
  115. parts = loc.split("::")
  116. loc = parts.pop(0)
  117. try:
  118. mod = __import__(loc, None, None, "__doc__")
  119. except ImportError:
  120. pass
  121. else:
  122. if parts:
  123. mod = getattr(mod, parts[0])
  124. setattr(self, name, mod)
  125. return mod
  126. raise AttributeError(name)
  127. start = exec_start
  128. def fdopen(self, fd, mode, bufsize=1):
  129. return self._fdopen(fd, mode, bufsize)
  130. def WorkerPool(self, hasprimary=False):
  131. return WorkerPool(self, hasprimary=hasprimary)
  132. def Semaphore(self, size=None):
  133. if size is None:
  134. return EmptySemaphore()
  135. return self._lock.Semaphore(size)
  136. def Lock(self):
  137. return self._lock.RLock()
  138. def RLock(self):
  139. return self._lock.RLock()
  140. def Event(self):
  141. return self._event.Event()
  142. def PopenPiped(self, args):
  143. PIPE = self.subprocess.PIPE
  144. return self.subprocess.Popen(args, stdout=PIPE, stdin=PIPE)
  145. return ExecModel(backend)
  146. class Reply(object):
  147. """ reply instances provide access to the result
  148. of a function execution that got dispatched
  149. through WorkerPool.spawn()
  150. """
  151. def __init__(self, task, threadmodel):
  152. self.task = task
  153. self._result_ready = threadmodel.Event()
  154. self.running = True
  155. def get(self, timeout=None):
  156. """ get the result object from an asynchronous function execution.
  157. if the function execution raised an exception,
  158. then calling get() will reraise that exception
  159. including its traceback.
  160. """
  161. self.waitfinish(timeout)
  162. try:
  163. return self._result
  164. except AttributeError:
  165. reraise(*(self._excinfo[:3])) # noqa
  166. def waitfinish(self, timeout=None):
  167. if not self._result_ready.wait(timeout):
  168. raise IOError("timeout waiting for {!r}".format(self.task))
  169. def run(self):
  170. func, args, kwargs = self.task
  171. try:
  172. try:
  173. self._result = func(*args, **kwargs)
  174. except:
  175. # sys may be already None when shutting down the interpreter
  176. if sys is not None:
  177. self._excinfo = sys.exc_info()
  178. finally:
  179. self._result_ready.set()
  180. self.running = False
  181. class WorkerPool(object):
  182. """ A WorkerPool allows to spawn function executions
  183. to threads, returning a reply object on which you
  184. can ask for the result (and get exceptions reraised).
  185. This implementation allows the main thread to integrate
  186. itself into performing function execution through
  187. calling integrate_as_primary_thread() which will return
  188. when the pool received a trigger_shutdown().
  189. """
  190. def __init__(self, execmodel, hasprimary=False):
  191. """ by default allow unlimited number of spawns. """
  192. self.execmodel = execmodel
  193. self._running_lock = self.execmodel.Lock()
  194. self._running = set()
  195. self._shuttingdown = False
  196. self._waitall_events = []
  197. if hasprimary:
  198. if self.execmodel.backend != "thread":
  199. raise ValueError("hasprimary=True requires thread model")
  200. self._primary_thread_task_ready = self.execmodel.Event()
  201. else:
  202. self._primary_thread_task_ready = None
  203. def integrate_as_primary_thread(self):
  204. """ integrate the thread with which we are called as a primary
  205. thread for executing functions triggered with spawn().
  206. """
  207. assert self.execmodel.backend == "thread", self.execmodel
  208. primary_thread_task_ready = self._primary_thread_task_ready
  209. # interacts with code at REF1
  210. while 1:
  211. primary_thread_task_ready.wait()
  212. reply = self._primary_thread_task
  213. if reply is None: # trigger_shutdown() woke us up
  214. break
  215. self._perform_spawn(reply)
  216. # we are concurrent with trigger_shutdown and spawn
  217. with self._running_lock:
  218. if self._shuttingdown:
  219. break
  220. primary_thread_task_ready.clear()
  221. def trigger_shutdown(self):
  222. with self._running_lock:
  223. self._shuttingdown = True
  224. if self._primary_thread_task_ready is not None:
  225. self._primary_thread_task = None
  226. self._primary_thread_task_ready.set()
  227. def active_count(self):
  228. return len(self._running)
  229. def _perform_spawn(self, reply):
  230. reply.run()
  231. with self._running_lock:
  232. self._running.remove(reply)
  233. if not self._running:
  234. while self._waitall_events:
  235. waitall_event = self._waitall_events.pop()
  236. waitall_event.set()
  237. def _try_send_to_primary_thread(self, reply):
  238. # REF1 in 'thread' model we give priority to running in main thread
  239. # note that we should be called with _running_lock hold
  240. primary_thread_task_ready = self._primary_thread_task_ready
  241. if primary_thread_task_ready is not None:
  242. if not primary_thread_task_ready.isSet():
  243. self._primary_thread_task = reply
  244. # wake up primary thread
  245. primary_thread_task_ready.set()
  246. return True
  247. return False
  248. def spawn(self, func, *args, **kwargs):
  249. """ return Reply object for the asynchronous dispatch
  250. of the given func(*args, **kwargs).
  251. """
  252. reply = Reply((func, args, kwargs), self.execmodel)
  253. with self._running_lock:
  254. if self._shuttingdown:
  255. raise ValueError("pool is shutting down")
  256. self._running.add(reply)
  257. if not self._try_send_to_primary_thread(reply):
  258. self.execmodel.start(self._perform_spawn, (reply,))
  259. return reply
  260. def terminate(self, timeout=None):
  261. """ trigger shutdown and wait for completion of all executions. """
  262. self.trigger_shutdown()
  263. return self.waitall(timeout=timeout)
  264. def waitall(self, timeout=None):
  265. """ wait until all active spawns have finished executing. """
  266. with self._running_lock:
  267. if not self._running:
  268. return True
  269. # if a Reply still runs, we let run_and_release
  270. # signal us -- note that we are still holding the
  271. # _running_lock to avoid race conditions
  272. my_waitall_event = self.execmodel.Event()
  273. self._waitall_events.append(my_waitall_event)
  274. return my_waitall_event.wait(timeout=timeout)
  275. sysex = (KeyboardInterrupt, SystemExit)
  276. DEBUG = os.environ.get('EXECNET_DEBUG')
  277. pid = os.getpid()
  278. if DEBUG == '2':
  279. def trace(*msg):
  280. try:
  281. line = " ".join(map(str, msg))
  282. sys.stderr.write("[{}] {}\n".format(pid, line))
  283. sys.stderr.flush()
  284. except Exception:
  285. pass # nothing we can do, likely interpreter-shutdown
  286. elif DEBUG:
  287. import tempfile
  288. import os
  289. fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid)
  290. # sys.stderr.write("execnet-debug at %r" % (fn,))
  291. debugfile = open(fn, 'w')
  292. def trace(*msg):
  293. try:
  294. line = " ".join(map(str, msg))
  295. debugfile.write(line + "\n")
  296. debugfile.flush()
  297. except Exception:
  298. try:
  299. v = sys.exc_info()[1]
  300. sys.stderr.write(
  301. "[{}] exception during tracing: {!r}\n".format(pid, v))
  302. except Exception:
  303. pass # nothing we can do, likely interpreter-shutdown
  304. else:
  305. notrace = trace = lambda *msg: None
  306. class Popen2IO:
  307. error = (IOError, OSError, EOFError)
  308. def __init__(self, outfile, infile, execmodel):
  309. # we need raw byte streams
  310. self.outfile, self.infile = outfile, infile
  311. if sys.platform == "win32":
  312. import msvcrt
  313. try:
  314. msvcrt.setmode(infile.fileno(), os.O_BINARY)
  315. msvcrt.setmode(outfile.fileno(), os.O_BINARY)
  316. except (AttributeError, IOError):
  317. pass
  318. self._read = getattr(infile, "buffer", infile).read
  319. self._write = getattr(outfile, "buffer", outfile).write
  320. self.execmodel = execmodel
  321. def read(self, numbytes):
  322. """Read exactly 'numbytes' bytes from the pipe. """
  323. # a file in non-blocking mode may return less bytes, so we loop
  324. buf = bytes()
  325. while numbytes > len(buf):
  326. data = self._read(numbytes-len(buf))
  327. if not data:
  328. raise EOFError(
  329. "expected %d bytes, got %d" % (numbytes, len(buf)))
  330. buf += data
  331. return buf
  332. def write(self, data):
  333. """write out all data bytes. """
  334. assert isinstance(data, bytes)
  335. self._write(data)
  336. self.outfile.flush()
  337. def close_read(self):
  338. self.infile.close()
  339. def close_write(self):
  340. self.outfile.close()
  341. class Message:
  342. """ encapsulates Messages and their wire protocol. """
  343. _types = []
  344. def __init__(self, msgcode, channelid=0, data=''):
  345. self.msgcode = msgcode
  346. self.channelid = channelid
  347. self.data = data
  348. @staticmethod
  349. def from_io(io):
  350. try:
  351. header = io.read(9) # type 1, channel 4, payload 4
  352. if not header:
  353. raise EOFError("empty read")
  354. except EOFError:
  355. e = sys.exc_info()[1]
  356. raise EOFError('couldnt load message header, ' + e.args[0])
  357. msgtype, channel, payload = struct.unpack('!bii', header)
  358. return Message(msgtype, channel, io.read(payload))
  359. def to_io(self, io):
  360. header = struct.pack('!bii', self.msgcode, self.channelid,
  361. len(self.data))
  362. io.write(header+self.data)
  363. def received(self, gateway):
  364. self._types[self.msgcode](self, gateway)
  365. def __repr__(self):
  366. name = self._types[self.msgcode].__name__.upper()
  367. return "<Message {} channel={} lendata={}>".format(
  368. name, self.channelid, len(self.data))
  369. class GatewayReceivedTerminate(Exception):
  370. """ Receiverthread got termination message. """
  371. def _setupmessages():
  372. def status(message, gateway):
  373. # we use the channelid to send back information
  374. # but don't instantiate a channel object
  375. d = {
  376. 'numchannels': len(gateway._channelfactory._channels),
  377. 'numexecuting': gateway._execpool.active_count(),
  378. 'execmodel': gateway.execmodel.backend,
  379. }
  380. gateway._send(Message.CHANNEL_DATA, message.channelid,
  381. dumps_internal(d))
  382. gateway._send(Message.CHANNEL_CLOSE, message.channelid)
  383. def channel_exec(message, gateway):
  384. channel = gateway._channelfactory.new(message.channelid)
  385. gateway._local_schedulexec(channel=channel, sourcetask=message.data)
  386. def channel_data(message, gateway):
  387. gateway._channelfactory._local_receive(message.channelid, message.data)
  388. def channel_close(message, gateway):
  389. gateway._channelfactory._local_close(message.channelid)
  390. def channel_close_error(message, gateway):
  391. remote_error = RemoteError(loads_internal(message.data))
  392. gateway._channelfactory._local_close(message.channelid, remote_error)
  393. def channel_last_message(message, gateway):
  394. gateway._channelfactory._local_close(message.channelid, sendonly=True)
  395. def gateway_terminate(message, gateway):
  396. raise GatewayReceivedTerminate(gateway)
  397. def reconfigure(message, gateway):
  398. if message.channelid == 0:
  399. target = gateway
  400. else:
  401. target = gateway._channelfactory.new(message.channelid)
  402. target._strconfig = loads_internal(message.data, gateway)
  403. types = [
  404. status, reconfigure, gateway_terminate,
  405. channel_exec, channel_data, channel_close,
  406. channel_close_error, channel_last_message,
  407. ]
  408. for i, handler in enumerate(types):
  409. Message._types.append(handler)
  410. setattr(Message, handler.__name__.upper(), i)
  411. _setupmessages()
  412. def geterrortext(excinfo,
  413. format_exception=traceback.format_exception, sysex=sysex):
  414. try:
  415. l = format_exception(*excinfo)
  416. errortext = "".join(l)
  417. except sysex:
  418. raise
  419. except:
  420. errortext = '{}: {}'.format(excinfo[0].__name__,
  421. excinfo[1])
  422. return errortext
  423. class RemoteError(Exception):
  424. """ Exception containing a stringified error from the other side. """
  425. def __init__(self, formatted):
  426. self.formatted = formatted
  427. Exception.__init__(self)
  428. def __str__(self):
  429. return self.formatted
  430. def __repr__(self):
  431. return "{}: {}".format(self.__class__.__name__, self.formatted)
  432. def warn(self):
  433. if self.formatted != INTERRUPT_TEXT:
  434. # XXX do this better
  435. sys.stderr.write("[%s] Warning: unhandled %r\n"
  436. % (os.getpid(), self,))
  437. class TimeoutError(IOError):
  438. """ Exception indicating that a timeout was reached. """
  439. NO_ENDMARKER_WANTED = object()
  440. class Channel(object):
  441. "Communication channel between two Python Interpreter execution points."
  442. RemoteError = RemoteError
  443. TimeoutError = TimeoutError
  444. _INTERNALWAKEUP = 1000
  445. _executing = False
  446. def __init__(self, gateway, id):
  447. assert isinstance(id, int)
  448. self.gateway = gateway
  449. # XXX: defaults copied from Unserializer
  450. self._strconfig = getattr(gateway, '_strconfig', (True, False))
  451. self.id = id
  452. self._items = self.gateway.execmodel.queue.Queue()
  453. self._closed = False
  454. self._receiveclosed = self.gateway.execmodel.Event()
  455. self._remoteerrors = []
  456. def _trace(self, *msg):
  457. self.gateway._trace(self.id, *msg)
  458. def setcallback(self, callback, endmarker=NO_ENDMARKER_WANTED):
  459. """ set a callback function for receiving items.
  460. All already queued items will immediately trigger the callback.
  461. Afterwards the callback will execute in the receiver thread
  462. for each received data item and calls to ``receive()`` will
  463. raise an error.
  464. If an endmarker is specified the callback will eventually
  465. be called with the endmarker when the channel closes.
  466. """
  467. _callbacks = self.gateway._channelfactory._callbacks
  468. with self.gateway._receivelock:
  469. if self._items is None:
  470. raise IOError("{!r} has callback already registered".format(self))
  471. items = self._items
  472. self._items = None
  473. while 1:
  474. try:
  475. olditem = items.get(block=False)
  476. except self.gateway.execmodel.queue.Empty:
  477. if not (self._closed or self._receiveclosed.isSet()):
  478. _callbacks[self.id] = (
  479. callback,
  480. endmarker,
  481. self._strconfig,
  482. )
  483. break
  484. else:
  485. if olditem is ENDMARKER:
  486. items.put(olditem) # for other receivers
  487. if endmarker is not NO_ENDMARKER_WANTED:
  488. callback(endmarker)
  489. break
  490. else:
  491. callback(olditem)
  492. def __repr__(self):
  493. flag = self.isclosed() and "closed" or "open"
  494. return "<Channel id=%d %s>" % (self.id, flag)
  495. def __del__(self):
  496. if self.gateway is None: # can be None in tests
  497. return
  498. self._trace("channel.__del__")
  499. # no multithreading issues here, because we have the last ref to 'self'
  500. if self._closed:
  501. # state transition "closed" --> "deleted"
  502. for error in self._remoteerrors:
  503. error.warn()
  504. elif self._receiveclosed.isSet():
  505. # state transition "sendonly" --> "deleted"
  506. # the remote channel is already in "deleted" state, nothing to do
  507. pass
  508. else:
  509. # state transition "opened" --> "deleted"
  510. # check if we are in the middle of interpreter shutdown
  511. # in which case the process will go away and we probably
  512. # don't need to try to send a closing or last message
  513. # (and often it won't work anymore to send things out)
  514. if Message is not None:
  515. if self._items is None: # has_callback
  516. msgcode = Message.CHANNEL_LAST_MESSAGE
  517. else:
  518. msgcode = Message.CHANNEL_CLOSE
  519. try:
  520. self.gateway._send(msgcode, self.id)
  521. except (IOError, ValueError): # ignore problems with sending
  522. pass
  523. def _getremoteerror(self):
  524. try:
  525. return self._remoteerrors.pop(0)
  526. except IndexError:
  527. try:
  528. return self.gateway._error
  529. except AttributeError:
  530. pass
  531. return None
  532. #
  533. # public API for channel objects
  534. #
  535. def isclosed(self):
  536. """ return True if the channel is closed. A closed
  537. channel may still hold items.
  538. """
  539. return self._closed
  540. def makefile(self, mode='w', proxyclose=False):
  541. """ return a file-like object.
  542. mode can be 'w' or 'r' for writeable/readable files.
  543. if proxyclose is true file.close() will also close the channel.
  544. """
  545. if mode == "w":
  546. return ChannelFileWrite(channel=self, proxyclose=proxyclose)
  547. elif mode == "r":
  548. return ChannelFileRead(channel=self, proxyclose=proxyclose)
  549. raise ValueError("mode {!r} not availabe".format(mode))
  550. def close(self, error=None):
  551. """ close down this channel with an optional error message.
  552. Note that closing of a channel tied to remote_exec happens
  553. automatically at the end of execution and cannot
  554. be done explicitely.
  555. """
  556. if self._executing:
  557. raise IOError("cannot explicitly close channel within remote_exec")
  558. if self._closed:
  559. self.gateway._trace(self, "ignoring redundant call to close()")
  560. if not self._closed:
  561. # state transition "opened/sendonly" --> "closed"
  562. # threads warning: the channel might be closed under our feet,
  563. # but it's never damaging to send too many CHANNEL_CLOSE messages
  564. # however, if the other side triggered a close already, we
  565. # do not send back a closed message.
  566. if not self._receiveclosed.isSet():
  567. put = self.gateway._send
  568. if error is not None:
  569. put(Message.CHANNEL_CLOSE_ERROR, self.id,
  570. dumps_internal(error))
  571. else:
  572. put(Message.CHANNEL_CLOSE, self.id)
  573. self._trace("sent channel close message")
  574. if isinstance(error, RemoteError):
  575. self._remoteerrors.append(error)
  576. self._closed = True # --> "closed"
  577. self._receiveclosed.set()
  578. queue = self._items
  579. if queue is not None:
  580. queue.put(ENDMARKER)
  581. self.gateway._channelfactory._no_longer_opened(self.id)
  582. def waitclose(self, timeout=None):
  583. """ wait until this channel is closed (or the remote side
  584. otherwise signalled that no more data was being sent).
  585. The channel may still hold receiveable items, but not receive
  586. any more after waitclose() has returned. Exceptions from executing
  587. code on the other side are reraised as local channel.RemoteErrors.
  588. EOFError is raised if the reading-connection was prematurely closed,
  589. which often indicates a dying process.
  590. self.TimeoutError is raised after the specified number of seconds
  591. (default is None, i.e. wait indefinitely).
  592. """
  593. # wait for non-"opened" state
  594. self._receiveclosed.wait(timeout=timeout)
  595. if not self._receiveclosed.isSet():
  596. raise self.TimeoutError("Timeout after %r seconds" % timeout)
  597. error = self._getremoteerror()
  598. if error:
  599. raise error
  600. def send(self, item):
  601. """sends the given item to the other side of the channel,
  602. possibly blocking if the sender queue is full.
  603. The item must be a simple python type and will be
  604. copied to the other side by value. IOError is
  605. raised if the write pipe was prematurely closed.
  606. """
  607. if self.isclosed():
  608. raise IOError("cannot send to {!r}".format(self))
  609. self.gateway._send(Message.CHANNEL_DATA, self.id, dumps_internal(item))
  610. def receive(self, timeout=None):
  611. """receive a data item that was sent from the other side.
  612. timeout: None [default] blocked waiting. A positive number
  613. indicates the number of seconds after which a channel.TimeoutError
  614. exception will be raised if no item was received.
  615. Note that exceptions from the remotely executing code will be
  616. reraised as channel.RemoteError exceptions containing
  617. a textual representation of the remote traceback.
  618. """
  619. itemqueue = self._items
  620. if itemqueue is None:
  621. raise IOError("cannot receive(), channel has receiver callback")
  622. try:
  623. x = itemqueue.get(timeout=timeout)
  624. except self.gateway.execmodel.queue.Empty:
  625. raise self.TimeoutError("no item after %r seconds" % timeout)
  626. if x is ENDMARKER:
  627. itemqueue.put(x) # for other receivers
  628. raise self._getremoteerror() or EOFError()
  629. else:
  630. return x
  631. def __iter__(self):
  632. return self
  633. def next(self):
  634. try:
  635. return self.receive()
  636. except EOFError:
  637. raise StopIteration
  638. __next__ = next
  639. def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False):
  640. """
  641. set the string coercion for this channel
  642. the default is to try to convert py2 str as py3 str,
  643. but not to try and convert py3 str to py2 str
  644. """
  645. self._strconfig = (py2str_as_py3str, py3str_as_py2str)
  646. data = dumps_internal(self._strconfig)
  647. self.gateway._send(Message.RECONFIGURE, self.id, data=data)
  648. ENDMARKER = object()
  649. INTERRUPT_TEXT = "keyboard-interrupted"
  650. class ChannelFactory(object):
  651. def __init__(self, gateway, startcount=1):
  652. self._channels = weakref.WeakValueDictionary()
  653. self._callbacks = {}
  654. self._writelock = gateway.execmodel.Lock()
  655. self.gateway = gateway
  656. self.count = startcount
  657. self.finished = False
  658. self._list = list # needed during interp-shutdown
  659. def new(self, id=None):
  660. """ create a new Channel with 'id' (or create new id if None). """
  661. with self._writelock:
  662. if self.finished:
  663. raise IOError("connexion already closed: {}".format(self.gateway))
  664. if id is None:
  665. id = self.count
  666. self.count += 2
  667. try:
  668. channel = self._channels[id]
  669. except KeyError:
  670. channel = self._channels[id] = Channel(self.gateway, id)
  671. return channel
  672. def channels(self):
  673. return self._list(self._channels.values())
  674. #
  675. # internal methods, called from the receiver thread
  676. #
  677. def _no_longer_opened(self, id):
  678. try:
  679. del self._channels[id]
  680. except KeyError:
  681. pass
  682. try:
  683. callback, endmarker, strconfig = self._callbacks.pop(id)
  684. except KeyError:
  685. pass
  686. else:
  687. if endmarker is not NO_ENDMARKER_WANTED:
  688. callback(endmarker)
  689. def _local_close(self, id, remoteerror=None, sendonly=False):
  690. channel = self._channels.get(id)
  691. if channel is None:
  692. # channel already in "deleted" state
  693. if remoteerror:
  694. remoteerror.warn()
  695. self._no_longer_opened(id)
  696. else:
  697. # state transition to "closed" state
  698. if remoteerror:
  699. channel._remoteerrors.append(remoteerror)
  700. queue = channel._items
  701. if queue is not None:
  702. queue.put(ENDMARKER)
  703. self._no_longer_opened(id)
  704. if not sendonly: # otherwise #--> "sendonly"
  705. channel._closed = True # --> "closed"
  706. channel._receiveclosed.set()
  707. def _local_receive(self, id, data):
  708. # executes in receiver thread
  709. channel = self._channels.get(id)
  710. try:
  711. callback, endmarker, strconfig = self._callbacks[id]
  712. except KeyError:
  713. queue = channel and channel._items
  714. if queue is None:
  715. pass # drop data
  716. else:
  717. item = loads_internal(data, channel)
  718. queue.put(item)
  719. else:
  720. try:
  721. data = loads_internal(data, channel, strconfig)
  722. callback(data) # even if channel may be already closed
  723. except Exception:
  724. excinfo = sys.exc_info()
  725. self.gateway._trace("exception during callback: %s" %
  726. excinfo[1])
  727. errortext = self.gateway._geterrortext(excinfo)
  728. self.gateway._send(Message.CHANNEL_CLOSE_ERROR,
  729. id, dumps_internal(errortext))
  730. self._local_close(id, errortext)
  731. def _finished_receiving(self):
  732. with self._writelock:
  733. self.finished = True
  734. for id in self._list(self._channels):
  735. self._local_close(id, sendonly=True)
  736. for id in self._list(self._callbacks):
  737. self._no_longer_opened(id)
  738. class ChannelFile(object):
  739. def __init__(self, channel, proxyclose=True):
  740. self.channel = channel
  741. self._proxyclose = proxyclose
  742. def isatty(self):
  743. return False
  744. def close(self):
  745. if self._proxyclose:
  746. self.channel.close()
  747. def __repr__(self):
  748. state = self.channel.isclosed() and 'closed' or 'open'
  749. return '<ChannelFile %d %s>' % (self.channel.id, state)
  750. class ChannelFileWrite(ChannelFile):
  751. def write(self, out):
  752. self.channel.send(out)
  753. def flush(self):
  754. pass
  755. class ChannelFileRead(ChannelFile):
  756. def __init__(self, channel, proxyclose=True):
  757. super(ChannelFileRead, self).__init__(channel, proxyclose)
  758. self._buffer = None
  759. def read(self, n):
  760. try:
  761. if self._buffer is None:
  762. self._buffer = self.channel.receive()
  763. while len(self._buffer) < n:
  764. self._buffer += self.channel.receive()
  765. except EOFError:
  766. self.close()
  767. if self._buffer is None:
  768. ret = ""
  769. else:
  770. ret = self._buffer[:n]
  771. self._buffer = self._buffer[n:]
  772. return ret
  773. def readline(self):
  774. if self._buffer is not None:
  775. i = self._buffer.find("\n")
  776. if i != -1:
  777. return self.read(i+1)
  778. line = self.read(len(self._buffer)+1)
  779. else:
  780. line = self.read(1)
  781. while line and line[-1] != "\n":
  782. c = self.read(1)
  783. if not c:
  784. break
  785. line += c
  786. return line
  787. class BaseGateway(object):
  788. exc_info = sys.exc_info
  789. _sysex = sysex
  790. id = "<slave>"
  791. def __init__(self, io, id, _startcount=2):
  792. self.execmodel = io.execmodel
  793. self._io = io
  794. self.id = id
  795. self._strconfig = (Unserializer.py2str_as_py3str,
  796. Unserializer.py3str_as_py2str)
  797. self._channelfactory = ChannelFactory(self, _startcount)
  798. self._receivelock = self.execmodel.RLock()
  799. # globals may be NONE at process-termination
  800. self.__trace = trace
  801. self._geterrortext = geterrortext
  802. self._receivepool = self.execmodel.WorkerPool()
  803. def _trace(self, *msg):
  804. self.__trace(self.id, *msg)
  805. def _initreceive(self):
  806. self._receivepool.spawn(self._thread_receiver)
  807. def _thread_receiver(self):
  808. def log(*msg):
  809. self._trace("[receiver-thread]", *msg)
  810. log("RECEIVERTHREAD: starting to run")
  811. io = self._io
  812. try:
  813. while 1:
  814. msg = Message.from_io(io)
  815. log("received", msg)
  816. with self._receivelock:
  817. msg.received(self)
  818. del msg
  819. except (KeyboardInterrupt, GatewayReceivedTerminate):
  820. pass
  821. except EOFError:
  822. log("EOF without prior gateway termination message")
  823. self._error = self.exc_info()[1]
  824. except Exception:
  825. log(self._geterrortext(self.exc_info()))
  826. log('finishing receiving thread')
  827. # wake up and terminate any execution waiting to receive
  828. self._channelfactory._finished_receiving()
  829. log('terminating execution')
  830. self._terminate_execution()
  831. log('closing read')
  832. self._io.close_read()
  833. log('closing write')
  834. self._io.close_write()
  835. log('terminating our receive pseudo pool')
  836. self._receivepool.trigger_shutdown()
  837. def _terminate_execution(self):
  838. pass
  839. def _send(self, msgcode, channelid=0, data=bytes()):
  840. message = Message(msgcode, channelid, data)
  841. try:
  842. message.to_io(self._io)
  843. self._trace('sent', message)
  844. except (IOError, ValueError):
  845. e = sys.exc_info()[1]
  846. self._trace('failed to send', message, e)
  847. # ValueError might be because the IO is already closed
  848. raise IOError("cannot send (already closed?)")
  849. def _local_schedulexec(self, channel, sourcetask):
  850. channel.close("execution disallowed")
  851. # _____________________________________________________________________
  852. #
  853. # High Level Interface
  854. # _____________________________________________________________________
  855. #
  856. def newchannel(self):
  857. """ return a new independent channel. """
  858. return self._channelfactory.new()
  859. def join(self, timeout=None):
  860. """ Wait for receiverthread to terminate. """
  861. self._trace("waiting for receiver thread to finish")
  862. self._receivepool.waitall()
  863. class SlaveGateway(BaseGateway):
  864. def _local_schedulexec(self, channel, sourcetask):
  865. sourcetask = loads_internal(sourcetask)
  866. self._execpool.spawn(self.executetask, (channel, sourcetask))
  867. def _terminate_execution(self):
  868. # called from receiverthread
  869. self._trace("shutting down execution pool")
  870. self._execpool.trigger_shutdown()
  871. if not self._execpool.waitall(5.0):
  872. self._trace(
  873. "execution ongoing after 5 secs,"" trying interrupt_main")
  874. # We try hard to terminate execution based on the assumption
  875. # that there is only one gateway object running per-process.
  876. if sys.platform != "win32":
  877. self._trace("sending ourselves a SIGINT")
  878. os.kill(os.getpid(), 2) # send ourselves a SIGINT
  879. elif interrupt_main is not None:
  880. self._trace("calling interrupt_main()")
  881. interrupt_main()
  882. if not self._execpool.waitall(10.0):
  883. self._trace("execution did not finish in another 10 secs, "
  884. "calling os._exit()")
  885. os._exit(1)
  886. def serve(self):
  887. def trace(msg):
  888. self._trace("[serve] " + msg)
  889. hasprimary = self.execmodel.backend == "thread"
  890. self._execpool = self.execmodel.WorkerPool(hasprimary=hasprimary)
  891. trace("spawning receiver thread")
  892. self._initreceive()
  893. try:
  894. if hasprimary:
  895. # this will return when we are in shutdown
  896. trace("integrating as primary thread")
  897. self._execpool.integrate_as_primary_thread()
  898. trace("joining receiver thread")
  899. self.join()
  900. except KeyboardInterrupt:
  901. # in the slave we can't really do anything sensible
  902. trace("swallowing keyboardinterrupt, serve finished")
  903. def executetask(self, item):
  904. try:
  905. channel, (source, call_name, kwargs) = item
  906. if not ISPY3 and kwargs:
  907. # some python2 versions do not accept unicode keyword params
  908. # note: Unserializer generally turns py2-str to py3-str objects
  909. newkwargs = {}
  910. for name, value in kwargs.items():
  911. if isinstance(name, unicode):
  912. name = name.encode('ascii')
  913. newkwargs[name] = value
  914. kwargs = newkwargs
  915. loc = {'channel': channel, '__name__': '__channelexec__'}
  916. self._trace("execution starts[%s]: %s" %
  917. (channel.id, repr(source)[:50]))
  918. channel._executing = True
  919. try:
  920. co = compile(source+'\n', '<remote exec>', 'exec')
  921. do_exec(co, loc) # noqa
  922. if call_name:
  923. self._trace('calling %s(**%60r)' % (call_name, kwargs))
  924. function = loc[call_name]
  925. function(channel, **kwargs)
  926. finally:
  927. channel._executing = False
  928. self._trace("execution finished")
  929. except KeyboardInterrupt:
  930. channel.close(INTERRUPT_TEXT)
  931. raise
  932. except:
  933. excinfo = self.exc_info()
  934. if not isinstance(excinfo[1], EOFError):
  935. if not channel.gateway._channelfactory.finished:
  936. self._trace("got exception: {!r}".format(excinfo[1]))
  937. errortext = self._geterrortext(excinfo)
  938. channel.close(errortext)
  939. return
  940. self._trace("ignoring EOFError because receiving finished")
  941. channel.close()
  942. #
  943. # Cross-Python pickling code, tested from test_serializer.py
  944. #
  945. class DataFormatError(Exception):
  946. pass
  947. class DumpError(DataFormatError):
  948. """Error while serializing an object."""
  949. class LoadError(DataFormatError):
  950. """Error while unserializing an object."""
  951. if ISPY3:
  952. def bchr(n):
  953. return bytes([n])
  954. else:
  955. bchr = chr
  956. DUMPFORMAT_VERSION = bchr(1)
  957. FOUR_BYTE_INT_MAX = 2147483647
  958. FLOAT_FORMAT = "!d"
  959. FLOAT_FORMAT_SIZE = struct.calcsize(FLOAT_FORMAT)
  960. COMPLEX_FORMAT = "!dd"
  961. COMPLEX_FORMAT_SIZE = struct.calcsize(COMPLEX_FORMAT)
  962. class _Stop(Exception):
  963. pass
  964. class Unserializer(object):
  965. num2func = {} # is filled after this class definition
  966. py2str_as_py3str = True # True
  967. py3str_as_py2str = False # false means py2 will get unicode
  968. def __init__(self, stream, channel_or_gateway=None, strconfig=None):
  969. gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway)
  970. strconfig = getattr(channel_or_gateway, '_strconfig', strconfig)
  971. if strconfig:
  972. self.py2str_as_py3str, self.py3str_as_py2str = strconfig
  973. self.stream = stream
  974. self.channelfactory = getattr(gateway, '_channelfactory', gateway)
  975. def load(self, versioned=False):
  976. if versioned:
  977. ver = self.stream.read(1)
  978. if ver != DUMPFORMAT_VERSION:
  979. raise LoadError("wrong dumpformat version %r" % ver)
  980. self.stack = []
  981. try:
  982. while True:
  983. opcode = self.stream.read(1)
  984. if not opcode:
  985. raise EOFError
  986. try:
  987. loader = self.num2func[opcode]
  988. except KeyError:
  989. raise LoadError(
  990. "unkown opcode %r - "
  991. "wire protocol corruption?" % (opcode,))
  992. loader(self)
  993. except _Stop:
  994. if len(self.stack) != 1:
  995. raise LoadError("internal unserialization error")
  996. return self.stack.pop(0)
  997. else:
  998. raise LoadError("didn't get STOP")
  999. def load_none(self):
  1000. self.stack.append(None)
  1001. def load_true(self):
  1002. self.stack.append(True)
  1003. def load_false(self):
  1004. self.stack.append(False)
  1005. def load_int(self):
  1006. i = self._read_int4()
  1007. self.stack.append(i)
  1008. def load_longint(self):
  1009. s = self._read_byte_string()
  1010. self.stack.append(int(s))
  1011. if ISPY3:
  1012. load_long = load_int
  1013. load_longlong = load_longint
  1014. else:
  1015. def load_long(self):
  1016. i = self._read_int4()
  1017. self.stack.append(long(i))
  1018. def load_longlong(self):
  1019. l = self._read_byte_string()
  1020. self.stack.append(long(l))
  1021. def load_float(self):
  1022. binary = self.stream.read(FLOAT_FORMAT_SIZE)
  1023. self.stack.append(struct.unpack(FLOAT_FORMAT, binary)[0])
  1024. def load_complex(self):
  1025. binary = self.stream.read(COMPLEX_FORMAT_SIZE)
  1026. self.stack.append(complex(*struct.unpack(COMPLEX_FORMAT, binary)))
  1027. def _read_int4(self):
  1028. return struct.unpack("!i", self.stream.read(4))[0]
  1029. def _read_byte_string(self):
  1030. length = self._read_int4()
  1031. as_bytes = self.stream.read(length)
  1032. return as_bytes
  1033. def load_py3string(self):
  1034. as_bytes = self._read_byte_string()
  1035. if not ISPY3 and self.py3str_as_py2str:
  1036. # XXX Should we try to decode into latin-1?
  1037. self.stack.append(as_bytes)
  1038. else:
  1039. self.stack.append(as_bytes.decode("utf-8"))
  1040. def load_py2string(self):
  1041. as_bytes = self._read_byte_string()
  1042. if ISPY3 and self.py2str_as_py3str:
  1043. s = as_bytes.decode("latin-1")
  1044. else:
  1045. s = as_bytes
  1046. self.stack.append(s)
  1047. def load_bytes(self):
  1048. s = self._read_byte_string()
  1049. self.stack.append(s)
  1050. def load_unicode(self):
  1051. self.stack.append(self._read_byte_string().decode("utf-8"))
  1052. def load_newlist(self):
  1053. length = self._read_int4()
  1054. self.stack.append([None] * length)
  1055. def load_setitem(self):
  1056. if len(self.stack) < 3:
  1057. raise LoadError("not enough items for setitem")
  1058. value = self.stack.pop()
  1059. key = self.stack.pop()
  1060. self.stack[-1][key] = value
  1061. def load_newdict(self):
  1062. self.stack.append({})
  1063. def _load_collection(self, type_):
  1064. length = self._read_int4()
  1065. if length:
  1066. res = type_(self.stack[-length:])
  1067. del self.stack[-length:]
  1068. self.stack.append(res)
  1069. else:
  1070. self.stack.append(type_())
  1071. def load_buildtuple(self):
  1072. self._load_collection(tuple)
  1073. def load_set(self):
  1074. self._load_collection(set)
  1075. def load_frozenset(self):
  1076. self._load_collection(frozenset)
  1077. def load_stop(self):
  1078. raise _Stop
  1079. def load_channel(self):
  1080. id = self._read_int4()
  1081. newchannel = self.channelfactory.new(id)
  1082. self.stack.append(newchannel)
  1083. # automatically build opcodes and byte-encoding
  1084. class opcode:
  1085. """ container for name -> num mappings. """
  1086. def _buildopcodes():
  1087. l = []
  1088. later_added = {
  1089. 'COMPLEX': 1,
  1090. }
  1091. for name, func in Unserializer.__dict__.items():
  1092. if name.startswith("load_"):
  1093. opname = name[5:].upper()
  1094. l.append((opname, func))
  1095. l.sort(key=lambda x: (later_added.get(x[0], 0), x[0]))
  1096. for i, (opname, func) in enumerate(l):
  1097. assert i < 26, "xxx"
  1098. i = bchr(64+i)
  1099. Unserializer.num2func[i] = func
  1100. setattr(opcode, opname, i)
  1101. _buildopcodes()
  1102. def dumps(obj):
  1103. """ return a serialized bytestring of the given obj.
  1104. The obj and all contained objects must be of a builtin
  1105. python type (so nested dicts, sets, etc. are all ok but
  1106. not user-level instances).
  1107. """
  1108. return _Serializer().save(obj, versioned=True)
  1109. def dump(byteio, obj):
  1110. """ write a serialized bytestring of the given obj to the given stream. """
  1111. _Serializer(write=byteio.write).save(obj, versioned=True)
  1112. def loads(bytestring, py2str_as_py3str=False, py3str_as_py2str=False):
  1113. """ return the object as deserialized from the given bytestring.
  1114. py2str_as_py3str: if true then string (str) objects previously
  1115. dumped on Python2 will be loaded as Python3
  1116. strings which really are text objects.
  1117. py3str_as_py2str: if true then string (str) objects previously
  1118. dumped on Python3 will be loaded as Python2
  1119. strings instead of unicode objects.
  1120. if the bytestring was dumped with an incompatible protocol
  1121. version or if the bytestring is corrupted, the
  1122. ``execnet.DataFormatError`` will be raised.
  1123. """
  1124. io = BytesIO(bytestring)
  1125. return load(io,
  1126. py2str_as_py3str=py2str_as_py3str,
  1127. py3str_as_py2str=py3str_as_py2str)
  1128. def load(io, py2str_as_py3str=False, py3str_as_py2str=False):
  1129. """ derserialize an object form the specified stream.
  1130. Behaviour and parameters are otherwise the same as with ``loads``
  1131. """
  1132. strconfig = (py2str_as_py3str, py3str_as_py2str)
  1133. return Unserializer(io, strconfig=strconfig).load(versioned=True)
  1134. def loads_internal(bytestring, channelfactory=None, strconfig=None):
  1135. io = BytesIO(bytestring)
  1136. return Unserializer(io, channelfactory, strconfig).load()
  1137. def dumps_internal(obj):
  1138. return _Serializer().save(obj)
  1139. class _Serializer(object):
  1140. _dispatch = {}
  1141. def __init__(self, write=None):
  1142. if write is None:
  1143. self._streamlist = []
  1144. write = self._streamlist.append
  1145. self._write = write
  1146. def save(self, obj, versioned=False):
  1147. # calling here is not re-entrant but multiple instances
  1148. # may write to the same stream because of the common platform
  1149. # atomic-write guaruantee (concurrent writes each happen atomicly)
  1150. if versioned:
  1151. self._write(DUMPFORMAT_VERSION)
  1152. self._save(obj)
  1153. self._write(opcode.STOP)
  1154. try:
  1155. streamlist = self._streamlist
  1156. except AttributeError:
  1157. return None
  1158. return type(streamlist[0])().join(streamlist)
  1159. def _save(self, obj):
  1160. tp = type(obj)
  1161. try:
  1162. dispatch = self._dispatch[tp]
  1163. except KeyError:
  1164. methodname = 'save_' + tp.__name__
  1165. meth = getattr(self.__class__, methodname, None)
  1166. if meth is None:
  1167. raise DumpError("can't serialize {}".format(tp))
  1168. dispatch = self._dispatch[tp] = meth
  1169. dispatch(self, obj)
  1170. def save_NoneType(self, non):
  1171. self._write(opcode.NONE)
  1172. def save_bool(self, boolean):
  1173. if boolean:
  1174. self._write(opcode.TRUE)
  1175. else:
  1176. self._write(opcode.FALSE)
  1177. def save_bytes(self, bytes_):
  1178. self._write(opcode.BYTES)
  1179. self._write_byte_sequence(bytes_)
  1180. if ISPY3:
  1181. def save_str(self, s):
  1182. self._write(opcode.PY3STRING)
  1183. self._write_unicode_string(s)
  1184. else:
  1185. def save_str(self, s):
  1186. self._write(opcode.PY2STRING)
  1187. self._write_byte_sequence(s)
  1188. def save_unicode(self, s):
  1189. self._write(opcode.UNICODE)
  1190. self._write_unicode_string(s)
  1191. def _write_unicode_string(self, s):
  1192. try:
  1193. as_bytes = s.encode("utf-8")
  1194. except UnicodeEncodeError:
  1195. raise DumpError("strings must be utf-8 encodable")
  1196. self._write_byte_sequence(as_bytes)
  1197. def _write_byte_sequence(self, bytes_):
  1198. self._write_int4(len(bytes_), "string is too long")
  1199. self._write(bytes_)
  1200. def _save_integral(self, i, short_op, long_op):
  1201. if i <= FOUR_BYTE_INT_MAX:
  1202. self._write(short_op)
  1203. self._write_int4(i)
  1204. else:
  1205. self._write(long_op)
  1206. self._write_byte_sequence(str(i).rstrip("L").encode("ascii"))
  1207. def save_int(self, i):
  1208. self._save_integral(i, opcode.INT, opcode.LONGINT)
  1209. def save_long(self, l):
  1210. self._save_integral(l, opcode.LONG, opcode.LONGLONG)
  1211. def save_float(self, flt):
  1212. self._write(opcode.FLOAT)
  1213. self._write(struct.pack(FLOAT_FORMAT, flt))
  1214. def save_complex(self, cpx):
  1215. self._write(opcode.COMPLEX)
  1216. self._write(struct.pack(COMPLEX_FORMAT, cpx.real, cpx.imag))
  1217. def _write_int4(self, i, error="int must be less than %i" %
  1218. (FOUR_BYTE_INT_MAX,)):
  1219. if i > FOUR_BYTE_INT_MAX:
  1220. raise DumpError(error)
  1221. self._write(struct.pack("!i", i))
  1222. def save_list(self, L):
  1223. self._write(opcode.NEWLIST)
  1224. self._write_int4(len(L), "list is too long")
  1225. for i, item in enumerate(L):
  1226. self._write_setitem(i, item)
  1227. def _write_setitem(self, key, value):
  1228. self._save(key)
  1229. self._save(value)
  1230. self._write(opcode.SETITEM)
  1231. def save_dict(self, d):
  1232. self._write(opcode.NEWDICT)
  1233. for key, value in d.items():
  1234. self._write_setitem(key, value)
  1235. def save_tuple(self, tup):
  1236. for item in tup:
  1237. self._save(item)
  1238. self._write(opcode.BUILDTUPLE)
  1239. self._write_int4(len(tup), "tuple is too long")
  1240. def _write_set(self, s, op):
  1241. for item in s:
  1242. self._save(item)
  1243. self._write(op)
  1244. self._write_int4(len(s), "set is too long")
  1245. def save_set(self, s):
  1246. self._write_set(s, opcode.SET)
  1247. def save_frozenset(self, s):
  1248. self._write_set(s, opcode.FROZENSET)
  1249. def save_Channel(self, channel):
  1250. self._write(opcode.CHANNEL)
  1251. self._write_int4(channel.id)
  1252. def init_popen_io(execmodel):
  1253. if not hasattr(os, 'dup'): # jython
  1254. io = Popen2IO(sys.stdout, sys.stdin, execmodel)
  1255. import tempfile
  1256. sys.stdin = tempfile.TemporaryFile('r')
  1257. sys.stdout = tempfile.TemporaryFile('w')
  1258. else:
  1259. try:
  1260. devnull = os.devnull
  1261. except AttributeError:
  1262. if os.name == 'nt':
  1263. devnull = 'NUL'
  1264. else:
  1265. devnull = '/dev/null'
  1266. # stdin
  1267. stdin = execmodel.fdopen(os.dup(0), 'r', 1)
  1268. fd = os.open(devnull, os.O_RDONLY)
  1269. os.dup2(fd, 0)
  1270. os.close(fd)
  1271. # stdout
  1272. stdout = execmodel.fdopen(os.dup(1), 'w', 1)
  1273. fd = os.open(devnull, os.O_WRONLY)
  1274. os.dup2(fd, 1)
  1275. # stderr for win32
  1276. if os.name == 'nt':
  1277. sys.stderr = execmodel.fdopen(os.dup(2), 'w', 1)
  1278. os.dup2(fd, 2)
  1279. os.close(fd)
  1280. io = Popen2IO(stdout, stdin, execmodel)
  1281. sys.stdin = execmodel.fdopen(0, 'r', 1)
  1282. sys.stdout = execmodel.fdopen(1, 'w', 1)
  1283. return io
  1284. def serve(io, id):
  1285. trace("creating slavegateway on {!r}".format(io))
  1286. SlaveGateway(io=io, id=id, _startcount=2).serve()