123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- # -*- coding: utf-8 -*-
- """Python 2/3 compatibility.
- Compatibility implementations of features
- only available in newer Python versions.
- """
- from __future__ import absolute_import, unicode_literals
- import errno
- import io
- import sys
- try:
- from collections import Counter
- except ImportError: # pragma: no cover
- from collections import defaultdict
- def Counter(): # noqa
- """Create counter."""
- return defaultdict(int)
- try:
- buffer_t = buffer
- except NameError: # pragma: no cover
- # Py3 does not have buffer, only use this for isa checks.
- class buffer_t(object): # noqa
- """Python 3 does not have a buffer type."""
- bytes_t = bytes
- __all__ = [
- 'Counter', 'reload', 'UserList', 'UserDict',
- 'Queue', 'Empty', 'Full', 'LifoQueue', 'builtins', 'array',
- 'zip_longest', 'map', 'zip', 'string', 'string_t', 'bytes_t',
- 'bytes_if_py2', 'long_t', 'text_t', 'int_types', 'module_name_t',
- 'range', 'items', 'keys', 'values', 'nextfun', 'reraise',
- 'WhateverIO', 'with_metaclass', 'StringIO', 'getfullargspec',
- 'THREAD_TIMEOUT_MAX', 'format_d', 'monotonic', 'buffer_t',
- 'python_2_unicode_compatible',
- ]
- # ############# py3k ########################################################
- PY3 = sys.version_info[0] >= 3
- PY2 = sys.version_info[0] < 3
- try:
- reload = reload # noqa
- except NameError: # pragma: no cover
- try:
- from importlib import reload # noqa
- except ImportError: # pragma: no cover
- from imp import reload # noqa
- try:
- from collections import UserList # noqa
- except ImportError: # pragma: no cover
- from UserList import UserList # noqa
- try:
- from collections import UserDict # noqa
- except ImportError: # pragma: no cover
- from UserDict import UserDict # noqa
- # ############# time.monotonic #############################################
- if sys.version_info < (3, 3):
- import platform
- SYSTEM = platform.system()
- try:
- import ctypes
- except ImportError: # pragma: no cover
- ctypes = None # noqa
- if SYSTEM == 'Darwin' and ctypes is not None:
- from ctypes.util import find_library
- libSystem = ctypes.CDLL(find_library('libSystem.dylib'))
- CoreServices = ctypes.CDLL(find_library('CoreServices'),
- use_errno=True)
- mach_absolute_time = libSystem.mach_absolute_time
- mach_absolute_time.restype = ctypes.c_uint64
- absolute_to_nanoseconds = CoreServices.AbsoluteToNanoseconds
- absolute_to_nanoseconds.restype = ctypes.c_uint64
- absolute_to_nanoseconds.argtypes = [ctypes.c_uint64]
- def _monotonic():
- return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9
- elif SYSTEM == 'Linux' and ctypes is not None:
- # from stackoverflow:
- # questions/1205722/how-do-i-get-monotonic-time-durations-in-python
- import os
- CLOCK_MONOTONIC = 1 # see <linux/time.h>
- class timespec(ctypes.Structure):
- _fields_ = [
- ('tv_sec', ctypes.c_long),
- ('tv_nsec', ctypes.c_long),
- ]
- try:
- librt = ctypes.CDLL('librt.so.1', use_errno=True)
- except Exception:
- try:
- librt = ctypes.CDLL('librt.so.0', use_errno=True)
- except Exception as exc:
- error = OSError(
- "Could not detect working librt library: {0}".format(
- exc))
- error.errno = errno.ENOENT
- raise error
- clock_gettime = librt.clock_gettime
- clock_gettime.argtypes = [
- ctypes.c_int, ctypes.POINTER(timespec),
- ]
- def _monotonic(): # noqa
- t = timespec()
- if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0:
- errno_ = ctypes.get_errno()
- raise OSError(errno_, os.strerror(errno_))
- return t.tv_sec + t.tv_nsec * 1e-9
- else:
- from time import time as _monotonic
- try:
- from time import monotonic
- except ImportError:
- monotonic = _monotonic # noqa
- # ############# Py3 <-> Py2 #################################################
- if PY3: # pragma: no cover
- import builtins
- from array import array
- from queue import Queue, Empty, Full, LifoQueue
- from itertools import zip_longest
- map = map
- zip = zip
- string = str
- string_t = str
- long_t = int
- text_t = str
- range = range
- int_types = (int,)
- module_name_t = str
- def bytes_if_py2(s):
- """Convert str to bytes if running under Python 2."""
- return s
- def items(d):
- """Get dict items iterator."""
- return d.items()
- def keys(d):
- """Get dict keys iterator."""
- return d.keys()
- def values(d):
- """Get dict values iterator."""
- return d.values()
- def nextfun(it):
- """Get iterator next method."""
- return it.__next__
- exec_ = getattr(builtins, 'exec')
- def reraise(tp, value, tb=None):
- """Reraise exception."""
- if value.__traceback__ is not tb:
- raise value.with_traceback(tb)
- raise value
- else:
- import __builtin__ as builtins # noqa
- from array import array as _array
- from Queue import Queue, Empty, Full, LifoQueue # noqa
- from itertools import ( # noqa
- imap as map,
- izip as zip,
- izip_longest as zip_longest,
- )
- string = unicode # noqa
- string_t = basestring # noqa
- text_t = unicode
- long_t = long # noqa
- range = xrange
- module_name_t = str
- int_types = (int, long)
- def array(typecode, *args, **kwargs):
- """Create array."""
- if isinstance(typecode, unicode):
- typecode = typecode.encode()
- return _array(typecode, *args, **kwargs)
- def bytes_if_py2(s):
- """Convert str to bytes if running under Python 2."""
- if isinstance(s, unicode):
- return s.encode()
- return s
- def items(d): # noqa
- """Return dict items iterator."""
- return d.iteritems()
- def keys(d): # noqa
- """Return dict key iterator."""
- return d.iterkeys()
- def values(d): # noqa
- """Return dict values iterator."""
- return d.itervalues()
- def nextfun(it): # noqa
- """Return iterator next method."""
- return it.next
- def exec_(code, globs=None, locs=None): # pragma: no cover
- """Execute code in a namespace."""
- if globs is None:
- frame = sys._getframe(1)
- globs = frame.f_globals
- if locs is None:
- locs = frame.f_locals
- del frame
- elif locs is None:
- locs = globs
- exec("""exec code in globs, locs""")
- exec_("""def reraise(tp, value, tb=None): raise tp, value, tb""")
- def with_metaclass(Type, skip_attrs=None):
- """Class decorator to set metaclass.
- Works with both Python 2 and Python 3 and it does not add
- an extra class in the lookup order like ``six.with_metaclass`` does
- (that is -- it copies the original class instead of using inheritance).
- """
- if skip_attrs is None:
- skip_attrs = {'__dict__', '__weakref__'}
- def _clone_with_metaclass(Class):
- attrs = {key: value for key, value in items(vars(Class))
- if key not in skip_attrs}
- return Type(Class.__name__, Class.__bases__, attrs)
- return _clone_with_metaclass
- # ############# threading.TIMEOUT_MAX ########################################
- try:
- from threading import TIMEOUT_MAX as THREAD_TIMEOUT_MAX
- except ImportError:
- THREAD_TIMEOUT_MAX = 1e10 # noqa
- # ############# format(int, ',d') ############################################
- if sys.version_info >= (2, 7): # pragma: no cover
- def format_d(i):
- """Format number."""
- return format(i, ',d')
- else: # pragma: no cover
- def format_d(i): # noqa
- """Format number."""
- s = '%d' % i
- groups = []
- while s and s[-1].isdigit():
- groups.append(s[-3:])
- s = s[:-3]
- return s + ','.join(reversed(groups))
- StringIO = io.StringIO
- _SIO_write = StringIO.write
- _SIO_init = StringIO.__init__
- class WhateverIO(StringIO):
- """StringIO that takes bytes or str."""
- def __init__(self, v=None, *a, **kw):
- _SIO_init(self, v.decode() if isinstance(v, bytes) else v, *a, **kw)
- def write(self, data):
- _SIO_write(self, data.decode() if isinstance(data, bytes) else data)
- def python_2_unicode_compatible(cls):
- """Class decorator to ensure class is compatible with Python 2."""
- return python_2_non_unicode_str(python_2_non_unicode_repr(cls))
- def python_2_non_unicode_repr(cls):
- """Ensure cls.__repr__ returns unicode.
- A class decorator that ensures ``__repr__`` returns non-unicode
- when running under Python 2.
- """
- if PY2:
- try:
- cls.__dict__['__repr__']
- except KeyError:
- pass
- else:
- def __repr__(self, *args, **kwargs):
- return self.__unicode_repr__(*args, **kwargs).encode(
- 'utf-8', 'replace')
- cls.__unicode_repr__, cls.__repr__ = cls.__repr__, __repr__
- return cls
- def python_2_non_unicode_str(cls):
- """Python 2 class string compatibility.
- A class decorator that defines ``__unicode__`` and ``__str__`` methods
- under Python 2. Under Python 3 it does nothing.
- To support Python 2 and 3 with a single code base, define a ``__str__``
- method returning text and apply this decorator to the class.
- """
- if PY2:
- try:
- cls.__dict__['__str__']
- except KeyError:
- pass
- else:
- def __str__(self, *args, **kwargs):
- return self.__unicode__(*args, **kwargs).encode(
- 'utf-8', 'replace')
- cls.__unicode__, cls.__str__ = cls.__str__, __str__
- return cls
- try: # pragma: no cover
- from inspect import formatargspec, getfullargspec
- except ImportError: # Py2
- from collections import namedtuple
- from inspect import formatargspec, getargspec as _getargspec # noqa
- FullArgSpec = namedtuple('FullArgSpec', (
- 'args', 'varargs', 'varkw', 'defaults',
- 'kwonlyargs', 'kwonlydefaults', 'annotations',
- ))
- def getfullargspec(fun, _fill=(None, ) * 3): # noqa
- """For compatibility with Python 3."""
- s = _getargspec(fun)
- return FullArgSpec(*s + _fill)
|