1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132 |
- # pylint:disable=too-many-lines, protected-access, redefined-outer-name, not-callable,
- # pylint:disable=no-member
- from __future__ import absolute_import, print_function
- import sys
- import os
- import traceback
- import signal as signalmodule
- # pylint:disable=undefined-all-variable
- __all__ = [
- 'get_version',
- 'get_header_version',
- 'supported_backends',
- 'recommended_backends',
- 'embeddable_backends',
- 'time',
- 'loop',
- ]
- import gevent.libev._corecffi as _corecffi # pylint:disable=no-name-in-module
- ffi = _corecffi.ffi # pylint:disable=no-member
- libev = _corecffi.lib # pylint:disable=no-member
- if hasattr(libev, 'vfd_open'):
- # Must be on windows
- assert sys.platform.startswith("win"), "vfd functions only needed on windows"
- vfd_open = libev.vfd_open
- vfd_free = libev.vfd_free
- vfd_get = libev.vfd_get
- else:
- vfd_open = vfd_free = vfd_get = lambda fd: fd
- #####
- ## NOTE on Windows:
- # The C implementation does several things specially for Windows;
- # a possibly incomplete list is:
- #
- # - the loop runs a periodic signal checker;
- # - the io watcher constructor is different and it has a destructor;
- # - the child watcher is not defined
- #
- # The CFFI implementation does none of these things, and so
- # is possibly NOT FUNCTIONALLY CORRECT on Win32
- #####
- #####
- ## Note on CFFI objects, callbacks and the lifecycle of watcher objects
- #
- # Each subclass of `watcher` allocates a C structure of the
- # appropriate type e.g., struct gevent_ev_io and holds this pointer in
- # its `_gwatcher` attribute. When that watcher instance is garbage
- # collected, then the C structure is also freed. The C structure is
- # passed to libev from the watcher's start() method and then to the
- # appropriate C callback function, e.g., _gevent_ev_io_callback, which
- # passes it back to python's _python_callback where we need the
- # watcher instance. Therefore, as long as that callback is active (the
- # watcher is started), the watcher instance must not be allowed to get
- # GC'd---any access at the C level or even the FFI level to the freed
- # memory could crash the process.
- #
- # However, the typical idiom calls for writing something like this:
- # loop.io(fd, python_cb).start()
- # thus forgetting the newly created watcher subclass and allowing it to be immediately
- # GC'd. To combat this, when the watcher is started, it places itself into the loop's
- # `_keepaliveset`, and it only removes itself when the watcher's `stop()` method is called.
- # Often, this is the *only* reference keeping the watcher object, and hence its C structure,
- # alive.
- #
- # This is slightly complicated by the fact that the python-level
- # callback, called from the C callback, could choose to manually stop
- # the watcher. When we return to the C level callback, we now have an
- # invalid pointer, and attempting to pass it back to Python (e.g., to
- # handle an error) could crash. Hence, _python_callback,
- # _gevent_io_callback, and _python_handle_error cooperate to make sure
- # that the watcher instance stays in the loops `_keepaliveset` while
- # the C code could be running---and if it gets removed, to not call back
- # to Python again.
- # See also https://github.com/gevent/gevent/issues/676
- ####
- @ffi.callback("int(void* handle, int revents)")
- def _python_callback(handle, revents):
- """
- Returns an integer having one of three values:
- - -1
- An exception occurred during the callback and you must call
- :func:`_python_handle_error` to deal with it. The Python watcher
- object will have the exception tuple saved in ``_exc_info``.
- - 0
- Everything went according to plan. You should check to see if the libev
- watcher is still active, and call :func:`_python_stop` if so. This will
- clean up the memory.
- - 1
- Everything went according to plan, but the watcher has already
- been stopped. Its memory may no longer be valid.
- """
- try:
- # Even dereferencing the handle needs to be inside the try/except;
- # if we don't return normally (e.g., a signal) then we wind up going
- # to the 'onerror' handler, which
- # is not what we want; that can permanently wedge the loop depending
- # on which callback was executing
- the_watcher = ffi.from_handle(handle)
- args = the_watcher.args
- if args is None:
- # Legacy behaviour from corecext: convert None into ()
- # See test__core_watcher.py
- args = _NOARGS
- if args and args[0] == GEVENT_CORE_EVENTS:
- args = (revents, ) + args[1:]
- the_watcher.callback(*args)
- except: # pylint:disable=bare-except
- the_watcher._exc_info = sys.exc_info()
- # Depending on when the exception happened, the watcher
- # may or may not have been stopped. We need to make sure its
- # memory stays valid so we can stop it at the ev level if needed.
- the_watcher.loop._keepaliveset.add(the_watcher)
- return -1
- else:
- if the_watcher in the_watcher.loop._keepaliveset:
- # It didn't stop itself
- return 0
- return 1 # It stopped itself
- libev.python_callback = _python_callback
- @ffi.callback("void(void* handle, int revents)")
- def _python_handle_error(handle, revents):
- try:
- watcher = ffi.from_handle(handle)
- exc_info = watcher._exc_info
- del watcher._exc_info
- watcher.loop.handle_error(watcher, *exc_info)
- finally:
- # XXX Since we're here on an error condition, and we
- # made sure that the watcher object was put in loop._keepaliveset,
- # what about not stopping the watcher? Looks like a possible
- # memory leak?
- if revents & (libev.EV_READ | libev.EV_WRITE):
- try:
- watcher.stop()
- except: # pylint:disable=bare-except
- watcher.loop.handle_error(watcher, *sys.exc_info())
- return # pylint:disable=lost-exception
- libev.python_handle_error = _python_handle_error
- @ffi.callback("void(void* handle)")
- def _python_stop(handle):
- watcher = ffi.from_handle(handle)
- watcher.stop()
- libev.python_stop = _python_stop
- UNDEF = libev.EV_UNDEF
- NONE = libev.EV_NONE
- READ = libev.EV_READ
- WRITE = libev.EV_WRITE
- TIMER = libev.EV_TIMER
- PERIODIC = libev.EV_PERIODIC
- SIGNAL = libev.EV_SIGNAL
- CHILD = libev.EV_CHILD
- STAT = libev.EV_STAT
- IDLE = libev.EV_IDLE
- PREPARE = libev.EV_PREPARE
- CHECK = libev.EV_CHECK
- EMBED = libev.EV_EMBED
- FORK = libev.EV_FORK
- CLEANUP = libev.EV_CLEANUP
- ASYNC = libev.EV_ASYNC
- CUSTOM = libev.EV_CUSTOM
- ERROR = libev.EV_ERROR
- READWRITE = libev.EV_READ | libev.EV_WRITE
- MINPRI = libev.EV_MINPRI
- MAXPRI = libev.EV_MAXPRI
- BACKEND_PORT = libev.EVBACKEND_PORT
- BACKEND_KQUEUE = libev.EVBACKEND_KQUEUE
- BACKEND_EPOLL = libev.EVBACKEND_EPOLL
- BACKEND_POLL = libev.EVBACKEND_POLL
- BACKEND_SELECT = libev.EVBACKEND_SELECT
- FORKCHECK = libev.EVFLAG_FORKCHECK
- NOINOTIFY = libev.EVFLAG_NOINOTIFY
- SIGNALFD = libev.EVFLAG_SIGNALFD
- NOSIGMASK = libev.EVFLAG_NOSIGMASK
- class _EVENTSType(object):
- def __repr__(self):
- return 'gevent.core.EVENTS'
- EVENTS = GEVENT_CORE_EVENTS = _EVENTSType()
- def get_version():
- return 'libev-%d.%02d' % (libev.ev_version_major(), libev.ev_version_minor())
- def get_header_version():
- return 'libev-%d.%02d' % (libev.EV_VERSION_MAJOR, libev.EV_VERSION_MINOR)
- _flags = [(libev.EVBACKEND_PORT, 'port'),
- (libev.EVBACKEND_KQUEUE, 'kqueue'),
- (libev.EVBACKEND_EPOLL, 'epoll'),
- (libev.EVBACKEND_POLL, 'poll'),
- (libev.EVBACKEND_SELECT, 'select'),
- (libev.EVFLAG_NOENV, 'noenv'),
- (libev.EVFLAG_FORKCHECK, 'forkcheck'),
- (libev.EVFLAG_SIGNALFD, 'signalfd'),
- (libev.EVFLAG_NOSIGMASK, 'nosigmask')]
- _flags_str2int = dict((string, flag) for (flag, string) in _flags)
- _events = [(libev.EV_READ, 'READ'),
- (libev.EV_WRITE, 'WRITE'),
- (libev.EV__IOFDSET, '_IOFDSET'),
- (libev.EV_PERIODIC, 'PERIODIC'),
- (libev.EV_SIGNAL, 'SIGNAL'),
- (libev.EV_CHILD, 'CHILD'),
- (libev.EV_STAT, 'STAT'),
- (libev.EV_IDLE, 'IDLE'),
- (libev.EV_PREPARE, 'PREPARE'),
- (libev.EV_CHECK, 'CHECK'),
- (libev.EV_EMBED, 'EMBED'),
- (libev.EV_FORK, 'FORK'),
- (libev.EV_CLEANUP, 'CLEANUP'),
- (libev.EV_ASYNC, 'ASYNC'),
- (libev.EV_CUSTOM, 'CUSTOM'),
- (libev.EV_ERROR, 'ERROR')]
- def _flags_to_list(flags):
- result = []
- for code, value in _flags:
- if flags & code:
- result.append(value)
- flags &= ~code
- if not flags:
- break
- if flags:
- result.append(flags)
- return result
- if sys.version_info[0] >= 3:
- basestring = (bytes, str)
- integer_types = (int,)
- else:
- import __builtin__ # pylint:disable=import-error
- basestring = __builtin__.basestring,
- integer_types = (int, __builtin__.long)
- def _flags_to_int(flags):
- # Note, that order does not matter, libev has its own predefined order
- if not flags:
- return 0
- if isinstance(flags, integer_types):
- return flags
- result = 0
- try:
- if isinstance(flags, basestring):
- flags = flags.split(',')
- for value in flags:
- value = value.strip().lower()
- if value:
- result |= _flags_str2int[value]
- except KeyError as ex:
- raise ValueError('Invalid backend or flag: %s\nPossible values: %s' % (ex, ', '.join(sorted(_flags_str2int.keys()))))
- return result
- def _str_hex(flag):
- if isinstance(flag, integer_types):
- return hex(flag)
- return str(flag)
- def _check_flags(flags):
- as_list = []
- flags &= libev.EVBACKEND_MASK
- if not flags:
- return
- if not flags & libev.EVBACKEND_ALL:
- raise ValueError('Invalid value for backend: 0x%x' % flags)
- if not flags & libev.ev_supported_backends():
- as_list = [_str_hex(x) for x in _flags_to_list(flags)]
- raise ValueError('Unsupported backend: %s' % '|'.join(as_list))
- def _events_to_str(events):
- result = []
- for (flag, string) in _events:
- c_flag = flag
- if events & c_flag:
- result.append(string)
- events = events & (~c_flag)
- if not events:
- break
- if events:
- result.append(hex(events))
- return '|'.join(result)
- def supported_backends():
- return _flags_to_list(libev.ev_supported_backends())
- def recommended_backends():
- return _flags_to_list(libev.ev_recommended_backends())
- def embeddable_backends():
- return _flags_to_list(libev.ev_embeddable_backends())
- def time():
- return libev.ev_time()
- _default_loop_destroyed = False
- def _loop_callback(*args, **kwargs):
- return ffi.callback(*args, **kwargs)
- class loop(object):
- # pylint:disable=too-many-public-methods
- error_handler = None
- def __init__(self, flags=None, default=None):
- self._in_callback = False
- self._callbacks = []
- # self._check is a watcher that runs in each iteration of the
- # mainloop, just after the blocking call
- self._check = ffi.new("struct ev_check *")
- self._check_callback_ffi = _loop_callback("void(*)(struct ev_loop *, void*, int)",
- self._check_callback,
- onerror=self._check_callback_handle_error)
- libev.ev_check_init(self._check, self._check_callback_ffi)
- # self._prepare is a watcher that runs in each iteration of the mainloop,
- # just before the blocking call
- self._prepare = ffi.new("struct ev_prepare *")
- self._prepare_callback_ffi = _loop_callback("void(*)(struct ev_loop *, void*, int)",
- self._run_callbacks,
- onerror=self._check_callback_handle_error)
- libev.ev_prepare_init(self._prepare, self._prepare_callback_ffi)
- # A timer we start and stop on demand. If we have callbacks,
- # too many to run in one iteration of _run_callbacks, we turn this
- # on so as to have the next iteration of the run loop return to us
- # as quickly as possible.
- # TODO: There may be a more efficient way to do this using ev_timer_again;
- # see the "ev_timer" section of the ev manpage (http://linux.die.net/man/3/ev)
- self._timer0 = ffi.new("struct ev_timer *")
- libev.ev_timer_init(self._timer0, libev.gevent_noop, 0.0, 0.0)
- # TODO: We may be able to do something nicer and use the existing python_callback
- # combined with onerror and the class check/timer/prepare to simplify things
- # and unify our handling
- c_flags = _flags_to_int(flags)
- _check_flags(c_flags)
- c_flags |= libev.EVFLAG_NOENV
- c_flags |= libev.EVFLAG_FORKCHECK
- if default is None:
- default = True
- if _default_loop_destroyed:
- default = False
- if default:
- self._ptr = libev.gevent_ev_default_loop(c_flags)
- if not self._ptr:
- raise SystemError("ev_default_loop(%s) failed" % (c_flags, ))
- else:
- self._ptr = libev.ev_loop_new(c_flags)
- if not self._ptr:
- raise SystemError("ev_loop_new(%s) failed" % (c_flags, ))
- if default or globals()["__SYSERR_CALLBACK"] is None:
- set_syserr_cb(self._handle_syserr)
- libev.ev_prepare_start(self._ptr, self._prepare)
- self.unref()
- libev.ev_check_start(self._ptr, self._check)
- self.unref()
- self._keepaliveset = set()
- def _check_callback_handle_error(self, t, v, tb):
- # None as the context argument causes the exception to be raised
- # in the main greenlet.
- self.handle_error(None, t, v, tb)
- def _check_callback(self, *args):
- # If we have the onerror callback, this is a no-op; all the real
- # work to rethrow the exception is done by the onerror callback
- pass
- def _run_callbacks(self, _evloop, _, _revents):
- count = 1000
- libev.ev_timer_stop(self._ptr, self._timer0)
- while self._callbacks and count > 0:
- callbacks = self._callbacks
- self._callbacks = []
- for cb in callbacks:
- self.unref()
- callback = cb.callback
- args = cb.args
- if callback is None or args is None:
- # it's been stopped
- continue
- cb.callback = None
- try:
- callback(*args)
- except: # pylint:disable=bare-except
- # If we allow an exception to escape this method (while we are running the ev callback),
- # then CFFI will print the error and libev will continue executing.
- # There are two problems with this. The first is that the code after
- # the loop won't run. The second is that any remaining callbacks scheduled
- # for this loop iteration will be silently dropped; they won't run, but they'll
- # also not be *stopped* (which is not a huge deal unless you're looking for
- # consistency or checking the boolean/pending status; the loop doesn't keep
- # a reference to them like it does to watchers...*UNLESS* the callback itself had
- # a reference to a watcher; then I don't know what would happen, it depends on
- # the state of the watcher---a leak or crash is not totally inconceivable).
- # The Cython implementation in core.ppyx uses gevent_call from callbacks.c
- # to run the callback, which uses gevent_handle_error to handle any errors the
- # Python callback raises...it unconditionally simply prints any error raised
- # by loop.handle_error and clears it, so callback handling continues.
- # We take a similar approach (but are extra careful about printing)
- try:
- self.handle_error(cb, *sys.exc_info())
- except: # pylint:disable=bare-except
- try:
- print("Exception while handling another error", file=sys.stderr)
- traceback.print_exc()
- except: # pylint:disable=bare-except
- pass # Nothing we can do here
- finally:
- # NOTE: this must be reset here, because cb.args is used as a flag in
- # the callback class so that bool(cb) of a callback that has been run
- # becomes False
- cb.args = None
- count -= 1
- if self._callbacks:
- libev.ev_timer_start(self._ptr, self._timer0)
- def _stop_aux_watchers(self):
- if libev.ev_is_active(self._prepare):
- self.ref()
- libev.ev_prepare_stop(self._ptr, self._prepare)
- if libev.ev_is_active(self._check):
- self.ref()
- libev.ev_check_stop(self._ptr, self._check)
- def destroy(self):
- global _default_loop_destroyed
- if self._ptr:
- self._stop_aux_watchers()
- if globals()["__SYSERR_CALLBACK"] == self._handle_syserr:
- set_syserr_cb(None)
- if libev.ev_is_default_loop(self._ptr):
- _default_loop_destroyed = True
- libev.ev_loop_destroy(self._ptr)
- self._ptr = ffi.NULL
- @property
- def ptr(self):
- return self._ptr
- @property
- def WatcherType(self):
- return watcher
- @property
- def MAXPRI(self):
- return libev.EV_MAXPRI
- @property
- def MINPRI(self):
- return libev.EV_MINPRI
- def _handle_syserr(self, message, errno):
- try:
- errno = os.strerror(errno)
- except: # pylint:disable=bare-except
- traceback.print_exc()
- try:
- message = '%s: %s' % (message, errno)
- except: # pylint:disable=bare-except
- traceback.print_exc()
- self.handle_error(None, SystemError, SystemError(message), None)
- def handle_error(self, context, type, value, tb):
- handle_error = None
- error_handler = self.error_handler
- if error_handler is not None:
- # we do want to do getattr every time so that setting Hub.handle_error property just works
- handle_error = getattr(error_handler, 'handle_error', error_handler)
- handle_error(context, type, value, tb)
- else:
- self._default_handle_error(context, type, value, tb)
- def _default_handle_error(self, context, type, value, tb): # pylint:disable=unused-argument
- # note: Hub sets its own error handler so this is not used by gevent
- # this is here to make core.loop usable without the rest of gevent
- traceback.print_exception(type, value, tb)
- libev.ev_break(self._ptr, libev.EVBREAK_ONE)
- def run(self, nowait=False, once=False):
- flags = 0
- if nowait:
- flags |= libev.EVRUN_NOWAIT
- if once:
- flags |= libev.EVRUN_ONCE
- libev.ev_run(self._ptr, flags)
- def reinit(self):
- libev.ev_loop_fork(self._ptr)
- def ref(self):
- libev.ev_ref(self._ptr)
- def unref(self):
- libev.ev_unref(self._ptr)
- def break_(self, how=libev.EVBREAK_ONE):
- libev.ev_break(self._ptr, how)
- def verify(self):
- libev.ev_verify(self._ptr)
- def now(self):
- return libev.ev_now(self._ptr)
- def update(self):
- libev.ev_now_update(self._ptr)
- def __repr__(self):
- return '<%s at 0x%x %s>' % (self.__class__.__name__, id(self), self._format())
- @property
- def default(self):
- return True if libev.ev_is_default_loop(self._ptr) else False
- @property
- def iteration(self):
- return libev.ev_iteration(self._ptr)
- @property
- def depth(self):
- return libev.ev_depth(self._ptr)
- @property
- def backend_int(self):
- return libev.ev_backend(self._ptr)
- @property
- def backend(self):
- backend = libev.ev_backend(self._ptr)
- for key, value in _flags:
- if key == backend:
- return value
- return backend
- @property
- def pendingcnt(self):
- return libev.ev_pending_count(self._ptr)
- def io(self, fd, events, ref=True, priority=None):
- return io(self, fd, events, ref, priority)
- def timer(self, after, repeat=0.0, ref=True, priority=None):
- return timer(self, after, repeat, ref, priority)
- def signal(self, signum, ref=True, priority=None):
- return signal(self, signum, ref, priority)
- def idle(self, ref=True, priority=None):
- return idle(self, ref, priority)
- def prepare(self, ref=True, priority=None):
- return prepare(self, ref, priority)
- def check(self, ref=True, priority=None):
- return check(self, ref, priority)
- def fork(self, ref=True, priority=None):
- return fork(self, ref, priority)
- def async(self, ref=True, priority=None):
- return async(self, ref, priority)
- if sys.platform != "win32":
- def child(self, pid, trace=0, ref=True):
- return child(self, pid, trace, ref)
- def install_sigchld(self):
- libev.gevent_install_sigchld_handler()
- def reset_sigchld(self):
- libev.gevent_reset_sigchld_handler()
- def stat(self, path, interval=0.0, ref=True, priority=None):
- return stat(self, path, interval, ref, priority)
- def callback(self, priority=None):
- return callback(self, priority)
- def run_callback(self, func, *args):
- cb = callback(func, args)
- self._callbacks.append(cb)
- self.ref()
- return cb
- def _format(self):
- if not self._ptr:
- return 'destroyed'
- msg = self.backend
- if self.default:
- msg += ' default'
- msg += ' pending=%s' % self.pendingcnt
- msg += self._format_details()
- return msg
- def _format_details(self):
- msg = ''
- fileno = self.fileno()
- try:
- activecnt = self.activecnt
- except AttributeError:
- activecnt = None
- if activecnt is not None:
- msg += ' ref=' + repr(activecnt)
- if fileno is not None:
- msg += ' fileno=' + repr(fileno)
- #if sigfd is not None and sigfd != -1:
- # msg += ' sigfd=' + repr(sigfd)
- return msg
- def fileno(self):
- if self._ptr:
- fd = self._ptr.backend_fd
- if fd >= 0:
- return fd
- @property
- def activecnt(self):
- if not self._ptr:
- raise ValueError('operation on destroyed loop')
- return self._ptr.activecnt
- # For times when *args is captured but often not passed (empty),
- # we can avoid keeping the new tuple that was created for *args
- # around by using a constant.
- _NOARGS = ()
- class callback(object):
- __slots__ = ('callback', 'args')
- def __init__(self, callback, args):
- self.callback = callback
- self.args = args or _NOARGS
- def stop(self):
- self.callback = None
- self.args = None
- # Note that __nonzero__ and pending are different
- # bool() is used in contexts where we need to know whether to schedule another callback,
- # so it's true if it's pending or currently running
- # 'pending' has the same meaning as libev watchers: it is cleared before actually
- # running the callback
- def __nonzero__(self):
- # it's nonzero if it's pending or currently executing
- # NOTE: This depends on loop._run_callbacks setting the args property
- # to None.
- return self.args is not None
- __bool__ = __nonzero__
- @property
- def pending(self):
- return self.callback is not None
- def _format(self):
- return ''
- def __repr__(self):
- result = "<%s at 0x%x" % (self.__class__.__name__, id(self))
- if self.pending:
- result += " pending"
- if self.callback is not None:
- result += " callback=%r" % (self.callback, )
- if self.args is not None:
- result += " args=%r" % (self.args, )
- if self.callback is None and self.args is None:
- result += " stopped"
- return result + ">"
- class watcher(object):
- def __init__(self, _loop, ref=True, priority=None, args=_NOARGS):
- self.loop = _loop
- if ref:
- self._flags = 0
- else:
- self._flags = 4
- self._args = None
- self._callback = None
- self._handle = ffi.new_handle(self)
- self._watcher = ffi.new(self._watcher_struct_pointer_type)
- self._watcher.data = self._handle
- if priority is not None:
- libev.ev_set_priority(self._watcher, priority)
- self._watcher_init(self._watcher,
- self._watcher_callback,
- *args)
- # A string identifying the type of libev object we watch, e.g., 'ev_io'
- # This should be a class attribute.
- _watcher_type = None
- # A class attribute that is the callback on the libev object that init's the C struct,
- # e.g., libev.ev_io_init. If None, will be set by _init_subclasses.
- _watcher_init = None
- # A class attribute that is the callback on the libev object that starts the C watcher,
- # e.g., libev.ev_io_start. If None, will be set by _init_subclasses.
- _watcher_start = None
- # A class attribute that is the callback on the libev object that stops the C watcher,
- # e.g., libev.ev_io_stop. If None, will be set by _init_subclasses.
- _watcher_stop = None
- # A cffi ctype object identifying the struct pointer we create.
- # This is a class attribute set based on the _watcher_type
- _watcher_struct_pointer_type = None
- # The attribute of the libev object identifying the custom
- # callback function for this type of watcher. This is a class
- # attribute set based on the _watcher_type in _init_subclasses.
- _watcher_callback = None
- @classmethod
- def _init_subclasses(cls):
- for subclass in cls.__subclasses__(): # pylint:disable=no-member
- watcher_type = subclass._watcher_type
- subclass._watcher_struct_pointer_type = ffi.typeof('struct ' + watcher_type + '*')
- subclass._watcher_callback = ffi.addressof(libev,
- '_gevent_generic_callback')
- for name in 'start', 'stop', 'init':
- ev_name = watcher_type + '_' + name
- watcher_name = '_watcher' + '_' + name
- if getattr(subclass, watcher_name) is None:
- setattr(subclass, watcher_name,
- getattr(libev, ev_name))
- # this is not needed, since we keep alive the watcher while it's started
- #def __del__(self):
- # self._watcher_stop(self.loop._ptr, self._watcher)
- def __repr__(self):
- formats = self._format()
- result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats)
- if self.pending:
- result += " pending"
- if self.callback is not None:
- result += " callback=%r" % (self.callback, )
- if self.args is not None:
- result += " args=%r" % (self.args, )
- if self.callback is None and self.args is None:
- result += " stopped"
- result += " handle=%s" % (self._watcher.data)
- return result + ">"
- def _format(self):
- return ''
- def _libev_unref(self):
- if self._flags & 6 == 4:
- self.loop.unref()
- self._flags |= 2
- def _get_ref(self):
- return False if self._flags & 4 else True
- def _set_ref(self, value):
- if value:
- if not self._flags & 4:
- return # ref is already True
- if self._flags & 2: # ev_unref was called, undo
- self.loop.ref()
- self._flags &= ~6 # do not want unref, no outstanding unref
- else:
- if self._flags & 4:
- return # ref is already False
- self._flags |= 4
- if not self._flags & 2 and libev.ev_is_active(self._watcher):
- self.loop.unref()
- self._flags |= 2
- ref = property(_get_ref, _set_ref)
- def _get_callback(self):
- return self._callback
- def _set_callback(self, cb):
- if not callable(cb) and cb is not None:
- raise TypeError("Expected callable, not %r" % (cb, ))
- self._callback = cb
- callback = property(_get_callback, _set_callback)
- def _get_args(self):
- return self._args
- def _set_args(self, args):
- if not isinstance(args, tuple) and args is not None:
- raise TypeError("args must be a tuple or None")
- self._args = args
- args = property(_get_args, _set_args)
- def start(self, callback, *args):
- if callback is None:
- raise TypeError('callback must be callable, not None')
- self.callback = callback
- self.args = args or _NOARGS
- self._libev_unref()
- self.loop._keepaliveset.add(self)
- self._watcher_start(self.loop._ptr, self._watcher)
- def stop(self):
- if self._flags & 2:
- self.loop.ref()
- self._flags &= ~2
- self._watcher_stop(self.loop._ptr, self._watcher)
- self.loop._keepaliveset.discard(self)
- self._callback = None
- self.args = None
- def _get_priority(self):
- return libev.ev_priority(self._watcher)
- def _set_priority(self, priority):
- if libev.ev_is_active(self._watcher):
- raise AttributeError("Cannot set priority of an active watcher")
- libev.ev_set_priority(self._watcher, priority)
- priority = property(_get_priority, _set_priority)
- def feed(self, revents, callback, *args):
- self.callback = callback
- self.args = args or _NOARGS
- if self._flags & 6 == 4:
- self.loop.unref()
- self._flags |= 2
- libev.ev_feed_event(self.loop._ptr, self._watcher, revents)
- if not self._flags & 1:
- # Py_INCREF(<PyObjectPtr>self)
- self._flags |= 1
- @property
- def active(self):
- return True if libev.ev_is_active(self._watcher) else False
- @property
- def pending(self):
- return True if libev.ev_is_pending(self._watcher) else False
- class io(watcher):
- _watcher_type = 'ev_io'
- def __init__(self, loop, fd, events, ref=True, priority=None):
- # XXX: Win32: Need to vfd_open the fd and free the old one?
- # XXX: Win32: Need a destructor to free the old fd?
- if fd < 0:
- raise ValueError('fd must be non-negative: %r' % fd)
- if events & ~(libev.EV__IOFDSET | libev.EV_READ | libev.EV_WRITE):
- raise ValueError('illegal event mask: %r' % events)
- watcher.__init__(self, loop, ref=ref, priority=priority, args=(fd, events))
- def start(self, callback, *args, **kwargs):
- # pylint:disable=arguments-differ
- args = args or _NOARGS
- if kwargs.get('pass_events'):
- args = (GEVENT_CORE_EVENTS, ) + args
- watcher.start(self, callback, *args)
- def _get_fd(self):
- return vfd_get(self._watcher.fd)
- def _set_fd(self, fd):
- if libev.ev_is_active(self._watcher):
- raise AttributeError("'io' watcher attribute 'fd' is read-only while watcher is active")
- vfd = vfd_open(fd)
- vfd_free(self._watcher.fd)
- self._watcher_init(self._watcher, self._watcher_callback, vfd, self._watcher.events)
- fd = property(_get_fd, _set_fd)
- def _get_events(self):
- return self._watcher.events
- def _set_events(self, events):
- if libev.ev_is_active(self._watcher):
- raise AttributeError("'io' watcher attribute 'events' is read-only while watcher is active")
- self._watcher_init(self._watcher, self._watcher_callback, self._watcher.fd, events)
- events = property(_get_events, _set_events)
- @property
- def events_str(self):
- return _events_to_str(self._watcher.events)
- def _format(self):
- return ' fd=%s events=%s' % (self.fd, self.events_str)
- class timer(watcher):
- _watcher_type = 'ev_timer'
- def __init__(self, loop, after=0.0, repeat=0.0, ref=True, priority=None):
- if repeat < 0.0:
- raise ValueError("repeat must be positive or zero: %r" % repeat)
- watcher.__init__(self, loop, ref=ref, priority=priority, args=(after, repeat))
- def start(self, callback, *args, **kw):
- # pylint:disable=arguments-differ
- update = kw.get("update", True)
- if update:
- # Quoth the libev doc: "This is a costly operation and is
- # usually done automatically within ev_run(). This
- # function is rarely useful, but when some event callback
- # runs for a very long time without entering the event
- # loop, updating libev's idea of the current time is a
- # good idea."
- # So do we really need to default to true?
- libev.ev_now_update(self.loop._ptr)
- watcher.start(self, callback, *args)
- @property
- def at(self):
- return self._watcher.at
- def again(self, callback, *args, **kw):
- # Exactly the same as start(), just with a different initializer
- # function
- self._watcher_start = libev.ev_timer_again
- try:
- self.start(callback, *args, **kw)
- finally:
- del self._watcher_start
- class signal(watcher):
- _watcher_type = 'ev_signal'
- def __init__(self, loop, signalnum, ref=True, priority=None):
- if signalnum < 1 or signalnum >= signalmodule.NSIG:
- raise ValueError('illegal signal number: %r' % signalnum)
- # still possible to crash on one of libev's asserts:
- # 1) "libev: ev_signal_start called with illegal signal number"
- # EV_NSIG might be different from signal.NSIG on some platforms
- # 2) "libev: a signal must not be attached to two different loops"
- # we probably could check that in LIBEV_EMBED mode, but not in general
- watcher.__init__(self, loop, ref=ref, priority=priority, args=(signalnum, ))
- class idle(watcher):
- _watcher_type = 'ev_idle'
- class prepare(watcher):
- _watcher_type = 'ev_prepare'
- class check(watcher):
- _watcher_type = 'ev_check'
- class fork(watcher):
- _watcher_type = 'ev_fork'
- class async(watcher):
- _watcher_type = 'ev_async'
- def send(self):
- libev.ev_async_send(self.loop._ptr, self._watcher)
- @property
- def pending(self):
- return True if libev.ev_async_pending(self._watcher) else False
- class child(watcher):
- _watcher_type = 'ev_child'
- def __init__(self, loop, pid, trace=0, ref=True):
- if not loop.default:
- raise TypeError('child watchers are only available on the default loop')
- loop.install_sigchld()
- watcher.__init__(self, loop, ref=ref, args=(pid, trace))
- def _format(self):
- return ' pid=%r rstatus=%r' % (self.pid, self.rstatus)
- @property
- def pid(self):
- return self._watcher.pid
- @property
- def rpid(self, ):
- return self._watcher.rpid
- @rpid.setter
- def rpid(self, value):
- self._watcher.rpid = value
- @property
- def rstatus(self):
- return self._watcher.rstatus
- @rstatus.setter
- def rstatus(self, value):
- self._watcher.rstatus = value
- class stat(watcher):
- _watcher_type = 'ev_stat'
- @staticmethod
- def _encode_path(path):
- if isinstance(path, bytes):
- return path
- # encode for the filesystem. Not all systems (e.g., Unix)
- # will have an encoding specified
- encoding = sys.getfilesystemencoding() or 'utf-8'
- try:
- path = path.encode(encoding, 'surrogateescape')
- except LookupError:
- # Can't encode it, and the error handler doesn't
- # exist. Probably on Python 2 with an astral character.
- # Not sure how to handle this.
- raise UnicodeEncodeError("Can't encode path to filesystem encoding")
- return path
- def __init__(self, _loop, path, interval=0.0, ref=True, priority=None):
- # Store the encoded path in the same attribute that corecext does
- self._paths = self._encode_path(path)
- # Keep the original path to avoid re-encoding, especially on Python 3
- self._path = path
- # Although CFFI would automatically convert a bytes object into a char* when
- # calling ev_stat_init(..., char*, ...), on PyPy the char* pointer is not
- # guaranteed to live past the function call. On CPython, only with a constant/interned
- # bytes object is the pointer guaranteed to last path the function call. (And since
- # Python 3 is pretty much guaranteed to produce a newly-encoded bytes object above, thats
- # rarely the case). Therefore, we must keep a reference to the produced cdata object
- # so that the struct ev_stat_watcher's `path` pointer doesn't become invalid/deallocated
- self._cpath = ffi.new('char[]', self._paths)
- watcher.__init__(self, _loop, ref=ref, priority=priority,
- args=(self._cpath,
- interval))
- @property
- def path(self):
- return self._path
- @property
- def attr(self):
- if not self._watcher.attr.st_nlink:
- return
- return self._watcher.attr
- @property
- def prev(self):
- if not self._watcher.prev.st_nlink:
- return
- return self._watcher.prev
- @property
- def interval(self):
- return self._watcher.interval
- # All watcher subclasses must be declared above. Now we do some
- # initialization; this is not only a minor optimization, it protects
- # against later runtime typos and attribute errors
- watcher._init_subclasses()
- def _syserr_cb(msg):
- try:
- msg = ffi.string(msg)
- __SYSERR_CALLBACK(msg, ffi.errno)
- except:
- set_syserr_cb(None)
- raise # let cffi print the traceback
- _syserr_cb._cb = ffi.callback("void(*)(char *msg)", _syserr_cb)
- def set_syserr_cb(callback):
- global __SYSERR_CALLBACK
- if callback is None:
- libev.ev_set_syserr_cb(ffi.NULL)
- __SYSERR_CALLBACK = None
- elif callable(callback):
- libev.ev_set_syserr_cb(_syserr_cb._cb)
- __SYSERR_CALLBACK = callback
- else:
- raise TypeError('Expected callable or None, got %r' % (callback, ))
- __SYSERR_CALLBACK = None
- LIBEV_EMBED = True
|