123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- """
- Low-level operating system functions from :mod:`os`.
- Cooperative I/O
- ===============
- This module provides cooperative versions of :func:`os.read` and
- :func:`os.write`. These functions are *not* monkey-patched; you
- must explicitly call them or monkey patch them yourself.
- POSIX functions
- ---------------
- On POSIX, non-blocking IO is available.
- - :func:`nb_read`
- - :func:`nb_write`
- - :func:`make_nonblocking`
- All Platforms
- -------------
- On non-POSIX platforms (e.g., Windows), non-blocking IO is not
- available. On those platforms (and on POSIX), cooperative IO can
- be done with the threadpool.
- - :func:`tp_read`
- - :func:`tp_write`
- Child Processes
- ===============
- The functions :func:`fork` and (on POSIX) :func:`forkpty` and :func:`waitpid` can be used
- to manage child processes.
- .. warning::
- Forking a process that uses greenlets does not eliminate all non-running
- greenlets. Any that were scheduled in the hub of the forking thread in the parent
- remain scheduled in the child; compare this to how normal threads operate. (This behaviour
- may change is a subsequent major release.)
- """
- from __future__ import absolute_import
- import os
- import sys
- from gevent.hub import get_hub, reinit
- from gevent._compat import PY3
- from gevent._util import copy_globals
- import errno
- EAGAIN = getattr(errno, 'EAGAIN', 11)
- try:
- import fcntl
- except ImportError:
- fcntl = None
- __implements__ = ['fork']
- __extensions__ = ['tp_read', 'tp_write']
- _read = os.read
- _write = os.write
- ignored_errors = [EAGAIN, errno.EINTR]
- if fcntl:
- __extensions__ += ['make_nonblocking', 'nb_read', 'nb_write']
- def make_nonblocking(fd):
- """Put the file descriptor *fd* into non-blocking mode if possible.
- :return: A boolean value that evaluates to True if successful."""
- flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
- if not bool(flags & os.O_NONBLOCK):
- fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
- return True
- def nb_read(fd, n):
- """Read up to `n` bytes from file descriptor `fd`. Return a string
- containing the bytes read. If end-of-file is reached, an empty string
- is returned.
- The descriptor must be in non-blocking mode.
- """
- hub, event = None, None
- while True:
- try:
- return _read(fd, n)
- except OSError as e:
- if e.errno not in ignored_errors:
- raise
- if not PY3:
- sys.exc_clear()
- if hub is None:
- hub = get_hub()
- event = hub.loop.io(fd, 1)
- hub.wait(event)
- def nb_write(fd, buf):
- """Write bytes from buffer `buf` to file descriptor `fd`. Return the
- number of bytes written.
- The file descriptor must be in non-blocking mode.
- """
- hub, event = None, None
- while True:
- try:
- return _write(fd, buf)
- except OSError as e:
- if e.errno not in ignored_errors:
- raise
- if not PY3:
- sys.exc_clear()
- if hub is None:
- hub = get_hub()
- event = hub.loop.io(fd, 2)
- hub.wait(event)
- def tp_read(fd, n):
- """Read up to *n* bytes from file descriptor *fd*. Return a string
- containing the bytes read. If end-of-file is reached, an empty string
- is returned.
- Reading is done using the threadpool.
- """
- return get_hub().threadpool.apply(_read, (fd, n))
- def tp_write(fd, buf):
- """Write bytes from buffer *buf* to file descriptor *fd*. Return the
- number of bytes written.
- Writing is done using the threadpool.
- """
- return get_hub().threadpool.apply(_write, (fd, buf))
- if hasattr(os, 'fork'):
- # pylint:disable=function-redefined,redefined-outer-name
- _raw_fork = os.fork
- def fork_gevent():
- """
- Forks the process using :func:`os.fork` and prepares the
- child process to continue using gevent before returning.
- .. note::
- The PID returned by this function may not be waitable with
- either the original :func:`os.waitpid` or this module's
- :func:`waitpid` and it may not generate SIGCHLD signals if
- libev child watchers are or ever have been in use. For
- example, the :mod:`gevent.subprocess` module uses libev
- child watchers (which parts of gevent use libev child
- watchers is subject to change at any time). Most
- applications should use :func:`fork_and_watch`, which is
- monkey-patched as the default replacement for
- :func:`os.fork` and implements the ``fork`` function of
- this module by default, unless the environment variable
- ``GEVENT_NOWAITPID`` is defined before this module is
- imported.
- .. versionadded:: 1.1b2
- """
- result = _raw_fork()
- if not result:
- reinit()
- return result
- def fork():
- """
- A wrapper for :func:`fork_gevent` for non-POSIX platforms.
- """
- return fork_gevent()
- if hasattr(os, 'forkpty'):
- _raw_forkpty = os.forkpty
- def forkpty_gevent():
- """
- Forks the process using :func:`os.forkpty` and prepares the
- child process to continue using gevent before returning.
- Returns a tuple (pid, master_fd). The `master_fd` is *not* put into
- non-blocking mode.
- Availability: Some Unix systems.
- .. seealso:: This function has the same limitations as :func:`fork_gevent`.
- .. versionadded:: 1.1b5
- """
- pid, master_fd = _raw_forkpty()
- if not pid:
- reinit()
- return pid, master_fd
- forkpty = forkpty_gevent
- __implements__.append('forkpty')
- __extensions__.append("forkpty_gevent")
- if hasattr(os, 'WNOWAIT') or hasattr(os, 'WNOHANG'):
- # We can only do this on POSIX
- import time
- _waitpid = os.waitpid
- _WNOHANG = os.WNOHANG
- # replaced by the signal module.
- _on_child_hook = lambda: None
- # {pid -> watcher or tuple(pid, rstatus, timestamp)}
- _watched_children = {}
- def _on_child(watcher, callback):
- # XXX: Could handle tracing here by not stopping
- # until the pid is terminated
- watcher.stop()
- _watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time())
- if callback:
- callback(watcher)
- # dispatch an "event"; used by gevent.signal.signal
- _on_child_hook()
- # now is as good a time as any to reap children
- _reap_children()
- def _reap_children(timeout=60):
- # Remove all the dead children that haven't been waited on
- # for the *timeout* seconds.
- # Some platforms queue delivery of SIGCHLD for all children that die;
- # in that case, a well-behaved application should call waitpid() for each
- # signal.
- # Some platforms (linux) only guarantee one delivery if multiple children
- # die. On that platform, the well-behave application calls waitpid() in a loop
- # until it gets back -1, indicating no more dead children need to be waited for.
- # In either case, waitpid should be called the same number of times as dead children,
- # thus removing all the watchers when a SIGCHLD arrives. The (generous) timeout
- # is to work with applications that neglect to call waitpid and prevent "unlimited"
- # growth.
- # Note that we don't watch for the case of pid wraparound. That is, we fork a new
- # child with the same pid as an existing watcher, but the child is already dead,
- # just not waited on yet.
- now = time.time()
- oldest_allowed = now - timeout
- dead = [pid for pid, val
- in _watched_children.items()
- if isinstance(val, tuple) and val[2] < oldest_allowed]
- for pid in dead:
- del _watched_children[pid]
- def waitpid(pid, options):
- """
- Wait for a child process to finish.
- If the child process was spawned using
- :func:`fork_and_watch`, then this function behaves
- cooperatively. If not, it *may* have race conditions; see
- :func:`fork_gevent` for more information.
- The arguments are as for the underlying
- :func:`os.waitpid`. Some combinations of *options* may not
- be supported cooperatively (as of 1.1 that includes
- WUNTRACED). Using a *pid* of 0 to request waiting on only processes
- from the current process group is not cooperative.
- Availability: POSIX.
- .. versionadded:: 1.1b1
- .. versionchanged:: 1.2a1
- More cases are handled in a cooperative manner.
- """
- # XXX Does not handle tracing children
- # So long as libev's loop doesn't run, it's OK to add
- # child watchers. The SIGCHLD handler only feeds events
- # for the next iteration of the loop to handle. (And the
- # signal handler itself is only called from the next loop
- # iteration.)
- if pid <= 0:
- # magic functions for multiple children.
- if pid == -1:
- # Any child. If we have one that we're watching and that finished,
- # we will use that one. Otherwise, let the OS take care of it.
- for k, v in _watched_children.items():
- if isinstance(v, tuple):
- pid = k
- break
- if pid <= 0:
- # We didn't have one that was ready. If there are
- # no funky options set, and the pid was -1
- # (meaning any process, not 0, which means process
- # group--- libev doesn't know about process
- # groups) then we can use a child watcher of pid 0; otherwise,
- # pass through to the OS.
- if pid == -1 and options == 0:
- hub = get_hub()
- watcher = hub.loop.child(0, False)
- hub.wait(watcher)
- return watcher.rpid, watcher.rstatus
- # There were funky options/pid, so we must go to the OS.
- return _waitpid(pid, options)
- if pid in _watched_children:
- # yes, we're watching it
- if options & _WNOHANG or isinstance(_watched_children[pid], tuple):
- # We're either asked not to block, or it already finished, in which
- # case blocking doesn't matter
- result = _watched_children[pid]
- if isinstance(result, tuple):
- # it finished. libev child watchers
- # are one-shot
- del _watched_children[pid]
- return result[:2]
- # it's not finished
- return (0, 0)
- else:
- # Ok, we need to "block". Do so via a watcher so that we're
- # cooperative. We know it's our child, etc, so this should work.
- watcher = _watched_children[pid]
- # We can't start a watcher that's already started,
- # so we can't reuse the existing watcher.
- new_watcher = watcher.loop.child(pid, False)
- get_hub().wait(new_watcher)
- # Ok, so now the new watcher is done. That means
- # the old watcher's callback (_on_child) should
- # have fired, potentially taking this child out of
- # _watched_children (but that could depend on how
- # many callbacks there were to run, so use the
- # watcher object directly; libev sets all the
- # watchers at the same time).
- return watcher.rpid, watcher.rstatus
- # we're not watching it and it may not even be our child,
- # so we must go to the OS to be sure to get the right semantics (exception)
- return _waitpid(pid, options)
- def fork_and_watch(callback=None, loop=None, ref=False, fork=fork_gevent):
- """
- Fork a child process and start a child watcher for it in the parent process.
- This call cooperates with :func:`waitpid` to enable cooperatively waiting
- for children to finish. When monkey-patching, these functions are patched in as
- :func:`os.fork` and :func:`os.waitpid`, respectively.
- In the child process, this function calls :func:`gevent.hub.reinit` before returning.
- Availability: POSIX.
- :keyword callback: If given, a callable that will be called with the child watcher
- when the child finishes.
- :keyword loop: The loop to start the watcher in. Defaults to the
- loop of the current hub.
- :keyword fork: The fork function. Defaults to :func:`the one defined in this
- module <gevent.os.fork_gevent>` (which automatically calls :func:`gevent.hub.reinit`).
- Pass the builtin :func:`os.fork` function if you do not need to
- initialize gevent in the child process.
- .. versionadded:: 1.1b1
- .. seealso::
- :func:`gevent.monkey.get_original` To access the builtin :func:`os.fork`.
- """
- pid = fork()
- if pid:
- # parent
- loop = loop or get_hub().loop
- watcher = loop.child(pid, ref=ref)
- _watched_children[pid] = watcher
- watcher.start(_on_child, watcher, callback)
- return pid
- __extensions__.append('fork_and_watch')
- __extensions__.append('fork_gevent')
- if 'forkpty' in __implements__:
- def forkpty_and_watch(callback=None, loop=None, ref=False, forkpty=forkpty_gevent):
- """
- Like :func:`fork_and_watch`, except using :func:`forkpty_gevent`.
- Availability: Some Unix systems.
- .. versionadded:: 1.1b5
- """
- result = []
- def _fork():
- pid_and_fd = forkpty()
- result.append(pid_and_fd)
- return pid_and_fd[0]
- fork_and_watch(callback, loop, ref, _fork)
- return result[0]
- __extensions__.append('forkpty_and_watch')
- # Watch children by default
- if not os.getenv('GEVENT_NOWAITPID'):
- # Broken out into separate functions instead of simple name aliases
- # for documentation purposes.
- def fork(*args, **kwargs):
- """
- Forks a child process and starts a child watcher for it in the
- parent process so that ``waitpid`` and SIGCHLD work as expected.
- This implementation of ``fork`` is a wrapper for :func:`fork_and_watch`
- when the environment variable ``GEVENT_NOWAITPID`` is *not* defined.
- This is the default and should be used by most applications.
- .. versionchanged:: 1.1b2
- """
- # take any args to match fork_and_watch
- return fork_and_watch(*args, **kwargs)
- if 'forkpty' in __implements__:
- def forkpty(*args, **kwargs):
- """
- Like :func:`fork`, but using :func:`forkpty_gevent`.
- This implementation of ``forkpty`` is a wrapper for :func:`forkpty_and_watch`
- when the environment variable ``GEVENT_NOWAITPID`` is *not* defined.
- This is the default and should be used by most applications.
- .. versionadded:: 1.1b5
- """
- # take any args to match fork_and_watch
- return forkpty_and_watch(*args, **kwargs)
- __implements__.append("waitpid")
- else:
- def fork():
- """
- Forks a child process, initializes gevent in the child,
- but *does not* prepare the parent to wait for the child or receive SIGCHLD.
- This implementation of ``fork`` is a wrapper for :func:`fork_gevent`
- when the environment variable ``GEVENT_NOWAITPID`` *is* defined.
- This is not recommended for most applications.
- """
- return fork_gevent()
- if 'forkpty' in __implements__:
- def forkpty():
- """
- Like :func:`fork`, but using :func:`os.forkpty`
- This implementation of ``forkpty`` is a wrapper for :func:`forkpty_gevent`
- when the environment variable ``GEVENT_NOWAITPID`` *is* defined.
- This is not recommended for most applications.
- .. versionadded:: 1.1b5
- """
- return forkpty_gevent()
- __extensions__.append("waitpid")
- else:
- __implements__.remove('fork')
- __imports__ = copy_globals(os, globals(),
- names_to_ignore=__implements__ + __extensions__,
- dunder_names_to_keep=())
- __all__ = list(set(__implements__ + __extensions__))
|