process.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153
  1. # -*- test-case-name: twisted.test.test_process -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. UNIX Process management.
  6. Do NOT use this module directly - use reactor.spawnProcess() instead.
  7. Maintainer: Itamar Shtull-Trauring
  8. """
  9. from __future__ import division, absolute_import, print_function
  10. from twisted.python.runtime import platform
  11. if platform.isWindows():
  12. raise ImportError(("twisted.internet.process does not work on Windows. "
  13. "Use the reactor.spawnProcess() API instead."))
  14. import errno
  15. import gc
  16. import os
  17. import io
  18. import select
  19. import signal
  20. import stat
  21. import sys
  22. import traceback
  23. try:
  24. import pty
  25. except ImportError:
  26. pty = None
  27. try:
  28. import fcntl, termios
  29. except ImportError:
  30. fcntl = None
  31. from zope.interface import implementer
  32. from twisted.python import log, failure
  33. from twisted.python.util import switchUID
  34. from twisted.python.compat import items, range, _PY3
  35. from twisted.internet import fdesc, abstract, error
  36. from twisted.internet.main import CONNECTION_LOST, CONNECTION_DONE
  37. from twisted.internet._baseprocess import BaseProcess
  38. from twisted.internet.interfaces import IProcessTransport
  39. # Some people were importing this, which is incorrect, just keeping it
  40. # here for backwards compatibility:
  41. ProcessExitedAlready = error.ProcessExitedAlready
  42. reapProcessHandlers = {}
  43. def reapAllProcesses():
  44. """
  45. Reap all registered processes.
  46. """
  47. # Coerce this to a list, as reaping the process changes the dictionary and
  48. # causes a "size changed during iteration" exception
  49. for process in list(reapProcessHandlers.values()):
  50. process.reapProcess()
  51. def registerReapProcessHandler(pid, process):
  52. """
  53. Register a process handler for the given pid, in case L{reapAllProcesses}
  54. is called.
  55. @param pid: the pid of the process.
  56. @param process: a process handler.
  57. """
  58. if pid in reapProcessHandlers:
  59. raise RuntimeError("Try to register an already registered process.")
  60. try:
  61. auxPID, status = os.waitpid(pid, os.WNOHANG)
  62. except:
  63. log.msg('Failed to reap %d:' % pid)
  64. log.err()
  65. auxPID = None
  66. if auxPID:
  67. process.processEnded(status)
  68. else:
  69. # if auxPID is 0, there are children but none have exited
  70. reapProcessHandlers[pid] = process
  71. def unregisterReapProcessHandler(pid, process):
  72. """
  73. Unregister a process handler previously registered with
  74. L{registerReapProcessHandler}.
  75. """
  76. if not (pid in reapProcessHandlers
  77. and reapProcessHandlers[pid] == process):
  78. raise RuntimeError("Try to unregister a process not registered.")
  79. del reapProcessHandlers[pid]
  80. def detectLinuxBrokenPipeBehavior():
  81. """
  82. On some Linux version, write-only pipe are detected as readable. This
  83. function is here to check if this bug is present or not.
  84. See L{ProcessWriter.doRead} for a more detailed explanation.
  85. @return: C{True} if Linux pipe behaviour is broken.
  86. @rtype : L{bool}
  87. """
  88. r, w = os.pipe()
  89. os.write(w, b'a')
  90. reads, writes, exes = select.select([w], [], [], 0)
  91. if reads:
  92. # Linux < 2.6.11 says a write-only pipe is readable.
  93. brokenPipeBehavior = True
  94. else:
  95. brokenPipeBehavior = False
  96. os.close(r)
  97. os.close(w)
  98. return brokenPipeBehavior
  99. brokenLinuxPipeBehavior = detectLinuxBrokenPipeBehavior()
  100. class ProcessWriter(abstract.FileDescriptor):
  101. """
  102. (Internal) Helper class to write into a Process's input pipe.
  103. I am a helper which describes a selectable asynchronous writer to a
  104. process's input pipe, including stdin.
  105. @ivar enableReadHack: A flag which determines how readability on this
  106. write descriptor will be handled. If C{True}, then readability may
  107. indicate the reader for this write descriptor has been closed (ie,
  108. the connection has been lost). If C{False}, then readability events
  109. are ignored.
  110. """
  111. connected = 1
  112. ic = 0
  113. enableReadHack = False
  114. def __init__(self, reactor, proc, name, fileno, forceReadHack=False):
  115. """
  116. Initialize, specifying a Process instance to connect to.
  117. """
  118. abstract.FileDescriptor.__init__(self, reactor)
  119. fdesc.setNonBlocking(fileno)
  120. self.proc = proc
  121. self.name = name
  122. self.fd = fileno
  123. if not stat.S_ISFIFO(os.fstat(self.fileno()).st_mode):
  124. # If the fd is not a pipe, then the read hack is never
  125. # applicable. This case arises when ProcessWriter is used by
  126. # StandardIO and stdout is redirected to a normal file.
  127. self.enableReadHack = False
  128. elif forceReadHack:
  129. self.enableReadHack = True
  130. else:
  131. # Detect if this fd is actually a write-only fd. If it's
  132. # valid to read, don't try to detect closing via read.
  133. # This really only means that we cannot detect a TTY's write
  134. # pipe being closed.
  135. try:
  136. os.read(self.fileno(), 0)
  137. except OSError:
  138. # It's a write-only pipe end, enable hack
  139. self.enableReadHack = True
  140. if self.enableReadHack:
  141. self.startReading()
  142. def fileno(self):
  143. """
  144. Return the fileno() of my process's stdin.
  145. """
  146. return self.fd
  147. def writeSomeData(self, data):
  148. """
  149. Write some data to the open process.
  150. """
  151. rv = fdesc.writeToFD(self.fd, data)
  152. if rv == len(data) and self.enableReadHack:
  153. # If the send buffer is now empty and it is necessary to monitor
  154. # this descriptor for readability to detect close, try detecting
  155. # readability now.
  156. self.startReading()
  157. return rv
  158. def write(self, data):
  159. self.stopReading()
  160. abstract.FileDescriptor.write(self, data)
  161. def doRead(self):
  162. """
  163. The only way a write pipe can become "readable" is at EOF, because the
  164. child has closed it, and we're using a reactor which doesn't
  165. distinguish between readable and closed (such as the select reactor).
  166. Except that's not true on linux < 2.6.11. It has the following
  167. characteristics: write pipe is completely empty => POLLOUT (writable in
  168. select), write pipe is not completely empty => POLLIN (readable in
  169. select), write pipe's reader closed => POLLIN|POLLERR (readable and
  170. writable in select)
  171. That's what this funky code is for. If linux was not broken, this
  172. function could be simply "return CONNECTION_LOST".
  173. BUG: We call select no matter what the reactor.
  174. If the reactor is pollreactor, and the fd is > 1024, this will fail.
  175. (only occurs on broken versions of linux, though).
  176. """
  177. if self.enableReadHack:
  178. if brokenLinuxPipeBehavior:
  179. fd = self.fd
  180. r, w, x = select.select([fd], [fd], [], 0)
  181. if r and w:
  182. return CONNECTION_LOST
  183. else:
  184. return CONNECTION_LOST
  185. else:
  186. self.stopReading()
  187. def connectionLost(self, reason):
  188. """
  189. See abstract.FileDescriptor.connectionLost.
  190. """
  191. # At least on OS X 10.4, exiting while stdout is non-blocking can
  192. # result in data loss. For some reason putting the file descriptor
  193. # back into blocking mode seems to resolve this issue.
  194. fdesc.setBlocking(self.fd)
  195. abstract.FileDescriptor.connectionLost(self, reason)
  196. self.proc.childConnectionLost(self.name, reason)
  197. class ProcessReader(abstract.FileDescriptor):
  198. """
  199. ProcessReader
  200. I am a selectable representation of a process's output pipe, such as
  201. stdout and stderr.
  202. """
  203. connected = True
  204. def __init__(self, reactor, proc, name, fileno):
  205. """
  206. Initialize, specifying a process to connect to.
  207. """
  208. abstract.FileDescriptor.__init__(self, reactor)
  209. fdesc.setNonBlocking(fileno)
  210. self.proc = proc
  211. self.name = name
  212. self.fd = fileno
  213. self.startReading()
  214. def fileno(self):
  215. """
  216. Return the fileno() of my process's stderr.
  217. """
  218. return self.fd
  219. def writeSomeData(self, data):
  220. # the only time this is actually called is after .loseConnection Any
  221. # actual write attempt would fail, so we must avoid that. This hack
  222. # allows us to use .loseConnection on both readers and writers.
  223. assert data == b""
  224. return CONNECTION_LOST
  225. def doRead(self):
  226. """
  227. This is called when the pipe becomes readable.
  228. """
  229. return fdesc.readFromFD(self.fd, self.dataReceived)
  230. def dataReceived(self, data):
  231. self.proc.childDataReceived(self.name, data)
  232. def loseConnection(self):
  233. if self.connected and not self.disconnecting:
  234. self.disconnecting = 1
  235. self.stopReading()
  236. self.reactor.callLater(0, self.connectionLost,
  237. failure.Failure(CONNECTION_DONE))
  238. def connectionLost(self, reason):
  239. """
  240. Close my end of the pipe, signal the Process (which signals the
  241. ProcessProtocol).
  242. """
  243. abstract.FileDescriptor.connectionLost(self, reason)
  244. self.proc.childConnectionLost(self.name, reason)
  245. class _BaseProcess(BaseProcess, object):
  246. """
  247. Base class for Process and PTYProcess.
  248. """
  249. status = None
  250. pid = None
  251. def reapProcess(self):
  252. """
  253. Try to reap a process (without blocking) via waitpid.
  254. This is called when sigchild is caught or a Process object loses its
  255. "connection" (stdout is closed) This ought to result in reaping all
  256. zombie processes, since it will be called twice as often as it needs
  257. to be.
  258. (Unfortunately, this is a slightly experimental approach, since
  259. UNIX has no way to be really sure that your process is going to
  260. go away w/o blocking. I don't want to block.)
  261. """
  262. try:
  263. try:
  264. pid, status = os.waitpid(self.pid, os.WNOHANG)
  265. except OSError as e:
  266. if e.errno == errno.ECHILD:
  267. # no child process
  268. pid = None
  269. else:
  270. raise
  271. except:
  272. log.msg('Failed to reap %d:' % self.pid)
  273. log.err()
  274. pid = None
  275. if pid:
  276. self.processEnded(status)
  277. unregisterReapProcessHandler(pid, self)
  278. def _getReason(self, status):
  279. exitCode = sig = None
  280. if os.WIFEXITED(status):
  281. exitCode = os.WEXITSTATUS(status)
  282. else:
  283. sig = os.WTERMSIG(status)
  284. if exitCode or sig:
  285. return error.ProcessTerminated(exitCode, sig, status)
  286. return error.ProcessDone(status)
  287. def signalProcess(self, signalID):
  288. """
  289. Send the given signal C{signalID} to the process. It'll translate a
  290. few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
  291. representation to its int value, otherwise it'll pass directly the
  292. value provided
  293. @type signalID: C{str} or C{int}
  294. """
  295. if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
  296. signalID = getattr(signal, 'SIG%s' % (signalID,))
  297. if self.pid is None:
  298. raise ProcessExitedAlready()
  299. try:
  300. os.kill(self.pid, signalID)
  301. except OSError as e:
  302. if e.errno == errno.ESRCH:
  303. raise ProcessExitedAlready()
  304. else:
  305. raise
  306. def _resetSignalDisposition(self):
  307. # The Python interpreter ignores some signals, and our child
  308. # process will inherit that behaviour. To have a child process
  309. # that responds to signals normally, we need to reset our
  310. # child process's signal handling (just) after we fork and
  311. # before we execvpe.
  312. for signalnum in range(1, signal.NSIG):
  313. if signal.getsignal(signalnum) == signal.SIG_IGN:
  314. # Reset signal handling to the default
  315. signal.signal(signalnum, signal.SIG_DFL)
  316. def _fork(self, path, uid, gid, executable, args, environment, **kwargs):
  317. """
  318. Fork and then exec sub-process.
  319. @param path: the path where to run the new process.
  320. @type path: L{bytes} or L{unicode}
  321. @param uid: if defined, the uid used to run the new process.
  322. @type uid: L{int}
  323. @param gid: if defined, the gid used to run the new process.
  324. @type gid: L{int}
  325. @param executable: the executable to run in a new process.
  326. @type executable: L{str}
  327. @param args: arguments used to create the new process.
  328. @type args: L{list}.
  329. @param environment: environment used for the new process.
  330. @type environment: L{dict}.
  331. @param kwargs: keyword arguments to L{_setupChild} method.
  332. """
  333. collectorEnabled = gc.isenabled()
  334. gc.disable()
  335. try:
  336. self.pid = os.fork()
  337. except:
  338. # Still in the parent process
  339. if collectorEnabled:
  340. gc.enable()
  341. raise
  342. else:
  343. if self.pid == 0:
  344. # A return value of 0 from fork() indicates that we are now
  345. # executing in the child process.
  346. # Do not put *ANY* code outside the try block. The child
  347. # process must either exec or _exit. If it gets outside this
  348. # block (due to an exception that is not handled here, but
  349. # which might be handled higher up), there will be two copies
  350. # of the parent running in parallel, doing all kinds of damage.
  351. # After each change to this code, review it to make sure there
  352. # are no exit paths.
  353. try:
  354. # Stop debugging. If I am, I don't care anymore.
  355. sys.settrace(None)
  356. self._setupChild(**kwargs)
  357. self._execChild(path, uid, gid, executable, args,
  358. environment)
  359. except:
  360. # If there are errors, try to write something descriptive
  361. # to stderr before exiting.
  362. # The parent's stderr isn't *necessarily* fd 2 anymore, or
  363. # even still available; however, even libc assumes that
  364. # write(2, err) is a useful thing to attempt.
  365. try:
  366. stderr = os.fdopen(2, 'wb')
  367. msg = ("Upon execvpe {0} {1} in environment id {2}"
  368. "\n:").format(executable, str(args),
  369. id(environment))
  370. if _PY3:
  371. # On Python 3, print_exc takes a text stream, but
  372. # on Python 2 it still takes a byte stream. So on
  373. # Python 3 we will wrap up the byte stream returned
  374. # by os.fdopen using TextIOWrapper.
  375. # We hard-code UTF-8 as the encoding here, rather
  376. # than looking at something like
  377. # getfilesystemencoding() or sys.stderr.encoding,
  378. # because we want an encoding that will be able to
  379. # encode the full range of code points. We are
  380. # (most likely) talking to the parent process on
  381. # the other end of this pipe and not the filesystem
  382. # or the original sys.stderr, so there's no point
  383. # in trying to match the encoding of one of those
  384. # objects.
  385. stderr = io.TextIOWrapper(stderr, encoding="utf-8")
  386. stderr.write(msg)
  387. traceback.print_exc(file=stderr)
  388. stderr.flush()
  389. for fd in range(3):
  390. os.close(fd)
  391. except:
  392. # Handle all errors during the error-reporting process
  393. # silently to ensure that the child terminates.
  394. pass
  395. # See comment above about making sure that we reach this line
  396. # of code.
  397. os._exit(1)
  398. # we are now in parent process
  399. if collectorEnabled:
  400. gc.enable()
  401. self.status = -1 # this records the exit status of the child
  402. def _setupChild(self, *args, **kwargs):
  403. """
  404. Setup the child process. Override in subclasses.
  405. """
  406. raise NotImplementedError()
  407. def _execChild(self, path, uid, gid, executable, args, environment):
  408. """
  409. The exec() which is done in the forked child.
  410. """
  411. if path:
  412. os.chdir(path)
  413. if uid is not None or gid is not None:
  414. if uid is None:
  415. uid = os.geteuid()
  416. if gid is None:
  417. gid = os.getegid()
  418. # set the UID before I actually exec the process
  419. os.setuid(0)
  420. os.setgid(0)
  421. switchUID(uid, gid)
  422. os.execvpe(executable, args, environment)
  423. def __repr__(self):
  424. """
  425. String representation of a process.
  426. """
  427. return "<%s pid=%s status=%s>" % (self.__class__.__name__,
  428. self.pid, self.status)
  429. class _FDDetector(object):
  430. """
  431. This class contains the logic necessary to decide which of the available
  432. system techniques should be used to detect the open file descriptors for
  433. the current process. The chosen technique gets monkey-patched into the
  434. _listOpenFDs method of this class so that the detection only needs to occur
  435. once.
  436. @ivar listdir: The implementation of listdir to use. This gets overwritten
  437. by the test cases.
  438. @ivar getpid: The implementation of getpid to use, returns the PID of the
  439. running process.
  440. @ivar openfile: The implementation of open() to use, by default the Python
  441. builtin.
  442. """
  443. # So that we can unit test this
  444. listdir = os.listdir
  445. getpid = os.getpid
  446. openfile = open
  447. def __init__(self):
  448. self._implementations = [
  449. self._procFDImplementation, self._devFDImplementation,
  450. self._fallbackFDImplementation]
  451. def _listOpenFDs(self):
  452. """
  453. Return an iterable of file descriptors which I{may} be open in this
  454. process.
  455. This will try to return the fewest possible descriptors without missing
  456. any.
  457. """
  458. self._listOpenFDs = self._getImplementation()
  459. return self._listOpenFDs()
  460. def _getImplementation(self):
  461. """
  462. Pick a method which gives correct results for C{_listOpenFDs} in this
  463. runtime environment.
  464. This involves a lot of very platform-specific checks, some of which may
  465. be relatively expensive. Therefore the returned method should be saved
  466. and re-used, rather than always calling this method to determine what it
  467. is.
  468. See the implementation for the details of how a method is selected.
  469. """
  470. for impl in self._implementations:
  471. try:
  472. before = impl()
  473. except:
  474. continue
  475. with self.openfile("/dev/null", "r"):
  476. after = impl()
  477. if before != after:
  478. return impl
  479. # If no implementation can detect the newly opened file above, then just
  480. # return the last one. The last one should therefore always be one
  481. # which makes a simple static guess which includes all possible open
  482. # file descriptors, but perhaps also many other values which do not
  483. # correspond to file descriptors. For example, the scheme implemented
  484. # by _fallbackFDImplementation is suitable to be the last entry.
  485. return impl
  486. def _devFDImplementation(self):
  487. """
  488. Simple implementation for systems where /dev/fd actually works.
  489. See: http://www.freebsd.org/cgi/man.cgi?fdescfs
  490. """
  491. dname = "/dev/fd"
  492. result = [int(fd) for fd in self.listdir(dname)]
  493. return result
  494. def _procFDImplementation(self):
  495. """
  496. Simple implementation for systems where /proc/pid/fd exists (we assume
  497. it works).
  498. """
  499. dname = "/proc/%d/fd" % (self.getpid(),)
  500. return [int(fd) for fd in self.listdir(dname)]
  501. def _fallbackFDImplementation(self):
  502. """
  503. Fallback implementation where either the resource module can inform us
  504. about the upper bound of how many FDs to expect, or where we just guess
  505. a constant maximum if there is no resource module.
  506. All possible file descriptors from 0 to that upper bound are returned
  507. with no attempt to exclude invalid file descriptor values.
  508. """
  509. try:
  510. import resource
  511. except ImportError:
  512. maxfds = 1024
  513. else:
  514. # OS-X reports 9223372036854775808. That's a lot of fds to close.
  515. # OS-X should get the /dev/fd implementation instead, so mostly
  516. # this check probably isn't necessary.
  517. maxfds = min(1024, resource.getrlimit(resource.RLIMIT_NOFILE)[1])
  518. return range(maxfds)
  519. detector = _FDDetector()
  520. def _listOpenFDs():
  521. """
  522. Use the global detector object to figure out which FD implementation to
  523. use.
  524. """
  525. return detector._listOpenFDs()
  526. @implementer(IProcessTransport)
  527. class Process(_BaseProcess):
  528. """
  529. An operating-system Process.
  530. This represents an operating-system process with arbitrary input/output
  531. pipes connected to it. Those pipes may represent standard input,
  532. standard output, and standard error, or any other file descriptor.
  533. On UNIX, this is implemented using fork(), exec(), pipe()
  534. and fcntl(). These calls may not exist elsewhere so this
  535. code is not cross-platform. (also, windows can only select
  536. on sockets...)
  537. """
  538. debug = False
  539. debug_child = False
  540. status = -1
  541. pid = None
  542. processWriterFactory = ProcessWriter
  543. processReaderFactory = ProcessReader
  544. def __init__(self,
  545. reactor, executable, args, environment, path, proto,
  546. uid=None, gid=None, childFDs=None):
  547. """
  548. Spawn an operating-system process.
  549. This is where the hard work of disconnecting all currently open
  550. files / forking / executing the new process happens. (This is
  551. executed automatically when a Process is instantiated.)
  552. This will also run the subprocess as a given user ID and group ID, if
  553. specified. (Implementation Note: this doesn't support all the arcane
  554. nuances of setXXuid on UNIX: it will assume that either your effective
  555. or real UID is 0.)
  556. """
  557. if not proto:
  558. assert 'r' not in childFDs.values()
  559. assert 'w' not in childFDs.values()
  560. _BaseProcess.__init__(self, proto)
  561. self.pipes = {}
  562. # keys are childFDs, we can sense them closing
  563. # values are ProcessReader/ProcessWriters
  564. helpers = {}
  565. # keys are childFDs
  566. # values are parentFDs
  567. if childFDs is None:
  568. childFDs = {0: "w", # we write to the child's stdin
  569. 1: "r", # we read from their stdout
  570. 2: "r", # and we read from their stderr
  571. }
  572. debug = self.debug
  573. if debug: print("childFDs", childFDs)
  574. _openedPipes = []
  575. def pipe():
  576. r, w = os.pipe()
  577. _openedPipes.extend([r, w])
  578. return r, w
  579. # fdmap.keys() are filenos of pipes that are used by the child.
  580. fdmap = {} # maps childFD to parentFD
  581. try:
  582. for childFD, target in items(childFDs):
  583. if debug: print("[%d]" % childFD, target)
  584. if target == "r":
  585. # we need a pipe that the parent can read from
  586. readFD, writeFD = pipe()
  587. if debug: print("readFD=%d, writeFD=%d" % (readFD, writeFD))
  588. fdmap[childFD] = writeFD # child writes to this
  589. helpers[childFD] = readFD # parent reads from this
  590. elif target == "w":
  591. # we need a pipe that the parent can write to
  592. readFD, writeFD = pipe()
  593. if debug: print("readFD=%d, writeFD=%d" % (readFD, writeFD))
  594. fdmap[childFD] = readFD # child reads from this
  595. helpers[childFD] = writeFD # parent writes to this
  596. else:
  597. assert type(target) == int, '%r should be an int' % (target,)
  598. fdmap[childFD] = target # parent ignores this
  599. if debug: print("fdmap", fdmap)
  600. if debug: print("helpers", helpers)
  601. # the child only cares about fdmap.values()
  602. self._fork(path, uid, gid, executable, args, environment, fdmap=fdmap)
  603. except:
  604. for pipe in _openedPipes:
  605. os.close(pipe)
  606. raise
  607. # we are the parent process:
  608. self.proto = proto
  609. # arrange for the parent-side pipes to be read and written
  610. for childFD, parentFD in items(helpers):
  611. os.close(fdmap[childFD])
  612. if childFDs[childFD] == "r":
  613. reader = self.processReaderFactory(reactor, self, childFD,
  614. parentFD)
  615. self.pipes[childFD] = reader
  616. if childFDs[childFD] == "w":
  617. writer = self.processWriterFactory(reactor, self, childFD,
  618. parentFD, forceReadHack=True)
  619. self.pipes[childFD] = writer
  620. try:
  621. # the 'transport' is used for some compatibility methods
  622. if self.proto is not None:
  623. self.proto.makeConnection(self)
  624. except:
  625. log.err()
  626. # The reactor might not be running yet. This might call back into
  627. # processEnded synchronously, triggering an application-visible
  628. # callback. That's probably not ideal. The replacement API for
  629. # spawnProcess should improve upon this situation.
  630. registerReapProcessHandler(self.pid, self)
  631. def _setupChild(self, fdmap):
  632. """
  633. fdmap[childFD] = parentFD
  634. The child wants to end up with 'childFD' attached to what used to be
  635. the parent's parentFD. As an example, a bash command run like
  636. 'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}.
  637. 'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}.
  638. This is accomplished in two steps::
  639. 1. close all file descriptors that aren't values of fdmap. This
  640. means 0 .. maxfds (or just the open fds within that range, if
  641. the platform supports '/proc/<pid>/fd').
  642. 2. for each childFD::
  643. - if fdmap[childFD] == childFD, the descriptor is already in
  644. place. Make sure the CLOEXEC flag is not set, then delete
  645. the entry from fdmap.
  646. - if childFD is in fdmap.values(), then the target descriptor
  647. is busy. Use os.dup() to move it elsewhere, update all
  648. fdmap[childFD] items that point to it, then close the
  649. original. Then fall through to the next case.
  650. - now fdmap[childFD] is not in fdmap.values(), and is free.
  651. Use os.dup2() to move it to the right place, then close the
  652. original.
  653. """
  654. debug = self.debug_child
  655. if debug:
  656. errfd = sys.stderr
  657. errfd.write("starting _setupChild\n")
  658. destList = fdmap.values()
  659. for fd in _listOpenFDs():
  660. if fd in destList:
  661. continue
  662. if debug and fd == errfd.fileno():
  663. continue
  664. try:
  665. os.close(fd)
  666. except:
  667. pass
  668. # at this point, the only fds still open are the ones that need to
  669. # be moved to their appropriate positions in the child (the targets
  670. # of fdmap, i.e. fdmap.values() )
  671. if debug: print("fdmap", fdmap, file=errfd)
  672. for child in sorted(fdmap.keys()):
  673. target = fdmap[child]
  674. if target == child:
  675. # fd is already in place
  676. if debug: print("%d already in place" % target, file=errfd)
  677. fdesc._unsetCloseOnExec(child)
  678. else:
  679. if child in fdmap.values():
  680. # we can't replace child-fd yet, as some other mapping
  681. # still needs the fd it wants to target. We must preserve
  682. # that old fd by duping it to a new home.
  683. newtarget = os.dup(child) # give it a safe home
  684. if debug: print("os.dup(%d) -> %d" % (child, newtarget),
  685. file=errfd)
  686. os.close(child) # close the original
  687. for c, p in items(fdmap):
  688. if p == child:
  689. fdmap[c] = newtarget # update all pointers
  690. # now it should be available
  691. if debug: print("os.dup2(%d,%d)" % (target, child), file=errfd)
  692. os.dup2(target, child)
  693. # At this point, the child has everything it needs. We want to close
  694. # everything that isn't going to be used by the child, i.e.
  695. # everything not in fdmap.keys(). The only remaining fds open are
  696. # those in fdmap.values().
  697. # Any given fd may appear in fdmap.values() multiple times, so we
  698. # need to remove duplicates first.
  699. old = []
  700. for fd in fdmap.values():
  701. if not fd in old:
  702. if not fd in fdmap.keys():
  703. old.append(fd)
  704. if debug: print("old", old, file=errfd)
  705. for fd in old:
  706. os.close(fd)
  707. self._resetSignalDisposition()
  708. def writeToChild(self, childFD, data):
  709. self.pipes[childFD].write(data)
  710. def closeChildFD(self, childFD):
  711. # for writer pipes, loseConnection tries to write the remaining data
  712. # out to the pipe before closing it
  713. # if childFD is not in the list of pipes, assume that it is already
  714. # closed
  715. if childFD in self.pipes:
  716. self.pipes[childFD].loseConnection()
  717. def pauseProducing(self):
  718. for p in self.pipes.itervalues():
  719. if isinstance(p, ProcessReader):
  720. p.stopReading()
  721. def resumeProducing(self):
  722. for p in self.pipes.itervalues():
  723. if isinstance(p, ProcessReader):
  724. p.startReading()
  725. # compatibility
  726. def closeStdin(self):
  727. """
  728. Call this to close standard input on this process.
  729. """
  730. self.closeChildFD(0)
  731. def closeStdout(self):
  732. self.closeChildFD(1)
  733. def closeStderr(self):
  734. self.closeChildFD(2)
  735. def loseConnection(self):
  736. self.closeStdin()
  737. self.closeStderr()
  738. self.closeStdout()
  739. def write(self, data):
  740. """
  741. Call this to write to standard input on this process.
  742. NOTE: This will silently lose data if there is no standard input.
  743. """
  744. if 0 in self.pipes:
  745. self.pipes[0].write(data)
  746. def registerProducer(self, producer, streaming):
  747. """
  748. Call this to register producer for standard input.
  749. If there is no standard input producer.stopProducing() will
  750. be called immediately.
  751. """
  752. if 0 in self.pipes:
  753. self.pipes[0].registerProducer(producer, streaming)
  754. else:
  755. producer.stopProducing()
  756. def unregisterProducer(self):
  757. """
  758. Call this to unregister producer for standard input."""
  759. if 0 in self.pipes:
  760. self.pipes[0].unregisterProducer()
  761. def writeSequence(self, seq):
  762. """
  763. Call this to write to standard input on this process.
  764. NOTE: This will silently lose data if there is no standard input.
  765. """
  766. if 0 in self.pipes:
  767. self.pipes[0].writeSequence(seq)
  768. def childDataReceived(self, name, data):
  769. self.proto.childDataReceived(name, data)
  770. def childConnectionLost(self, childFD, reason):
  771. # this is called when one of the helpers (ProcessReader or
  772. # ProcessWriter) notices their pipe has been closed
  773. os.close(self.pipes[childFD].fileno())
  774. del self.pipes[childFD]
  775. try:
  776. self.proto.childConnectionLost(childFD)
  777. except:
  778. log.err()
  779. self.maybeCallProcessEnded()
  780. def maybeCallProcessEnded(self):
  781. # we don't call ProcessProtocol.processEnded until:
  782. # the child has terminated, AND
  783. # all writers have indicated an error status, AND
  784. # all readers have indicated EOF
  785. # This insures that we've gathered all output from the process.
  786. if self.pipes:
  787. return
  788. if not self.lostProcess:
  789. self.reapProcess()
  790. return
  791. _BaseProcess.maybeCallProcessEnded(self)
  792. @implementer(IProcessTransport)
  793. class PTYProcess(abstract.FileDescriptor, _BaseProcess):
  794. """
  795. An operating-system Process that uses PTY support.
  796. """
  797. status = -1
  798. pid = None
  799. def __init__(self, reactor, executable, args, environment, path, proto,
  800. uid=None, gid=None, usePTY=None):
  801. """
  802. Spawn an operating-system process.
  803. This is where the hard work of disconnecting all currently open
  804. files / forking / executing the new process happens. (This is
  805. executed automatically when a Process is instantiated.)
  806. This will also run the subprocess as a given user ID and group ID, if
  807. specified. (Implementation Note: this doesn't support all the arcane
  808. nuances of setXXuid on UNIX: it will assume that either your effective
  809. or real UID is 0.)
  810. """
  811. if pty is None and not isinstance(usePTY, (tuple, list)):
  812. # no pty module and we didn't get a pty to use
  813. raise NotImplementedError(
  814. "cannot use PTYProcess on platforms without the pty module.")
  815. abstract.FileDescriptor.__init__(self, reactor)
  816. _BaseProcess.__init__(self, proto)
  817. if isinstance(usePTY, (tuple, list)):
  818. masterfd, slavefd, _ = usePTY
  819. else:
  820. masterfd, slavefd = pty.openpty()
  821. try:
  822. self._fork(path, uid, gid, executable, args, environment,
  823. masterfd=masterfd, slavefd=slavefd)
  824. except:
  825. if not isinstance(usePTY, (tuple, list)):
  826. os.close(masterfd)
  827. os.close(slavefd)
  828. raise
  829. # we are now in parent process:
  830. os.close(slavefd)
  831. fdesc.setNonBlocking(masterfd)
  832. self.fd = masterfd
  833. self.startReading()
  834. self.connected = 1
  835. self.status = -1
  836. try:
  837. self.proto.makeConnection(self)
  838. except:
  839. log.err()
  840. registerReapProcessHandler(self.pid, self)
  841. def _setupChild(self, masterfd, slavefd):
  842. """
  843. Set up child process after C{fork()} but before C{exec()}.
  844. This involves:
  845. - closing C{masterfd}, since it is not used in the subprocess
  846. - creating a new session with C{os.setsid}
  847. - changing the controlling terminal of the process (and the new
  848. session) to point at C{slavefd}
  849. - duplicating C{slavefd} to standard input, output, and error
  850. - closing all other open file descriptors (according to
  851. L{_listOpenFDs})
  852. - re-setting all signal handlers to C{SIG_DFL}
  853. @param masterfd: The master end of a PTY file descriptors opened with
  854. C{openpty}.
  855. @type masterfd: L{int}
  856. @param slavefd: The slave end of a PTY opened with C{openpty}.
  857. @type slavefd: L{int}
  858. """
  859. os.close(masterfd)
  860. os.setsid()
  861. fcntl.ioctl(slavefd, termios.TIOCSCTTY, '')
  862. for fd in range(3):
  863. if fd != slavefd:
  864. os.close(fd)
  865. os.dup2(slavefd, 0) # stdin
  866. os.dup2(slavefd, 1) # stdout
  867. os.dup2(slavefd, 2) # stderr
  868. for fd in _listOpenFDs():
  869. if fd > 2:
  870. try:
  871. os.close(fd)
  872. except:
  873. pass
  874. self._resetSignalDisposition()
  875. def closeStdin(self):
  876. # PTYs do not have stdin/stdout/stderr. They only have in and out, just
  877. # like sockets. You cannot close one without closing off the entire PTY
  878. pass
  879. def closeStdout(self):
  880. pass
  881. def closeStderr(self):
  882. pass
  883. def doRead(self):
  884. """
  885. Called when my standard output stream is ready for reading.
  886. """
  887. return fdesc.readFromFD(
  888. self.fd,
  889. lambda data: self.proto.childDataReceived(1, data))
  890. def fileno(self):
  891. """
  892. This returns the file number of standard output on this process.
  893. """
  894. return self.fd
  895. def maybeCallProcessEnded(self):
  896. # two things must happen before we call the ProcessProtocol's
  897. # processEnded method. 1: the child process must die and be reaped
  898. # (which calls our own processEnded method). 2: the child must close
  899. # their stdin/stdout/stderr fds, causing the pty to close, causing
  900. # our connectionLost method to be called. #2 can also be triggered
  901. # by calling .loseConnection().
  902. if self.lostProcess == 2:
  903. _BaseProcess.maybeCallProcessEnded(self)
  904. def connectionLost(self, reason):
  905. """
  906. I call this to clean up when one or all of my connections has died.
  907. """
  908. abstract.FileDescriptor.connectionLost(self, reason)
  909. os.close(self.fd)
  910. self.lostProcess += 1
  911. self.maybeCallProcessEnded()
  912. def writeSomeData(self, data):
  913. """
  914. Write some data to the open process.
  915. """
  916. return fdesc.writeToFD(self.fd, data)