compat.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. """Stuff that differs in different Python versions and platform
  2. distributions."""
  3. # The following comment should be removed at some point in the future.
  4. # mypy: disallow-untyped-defs=False
  5. from __future__ import absolute_import, division
  6. import codecs
  7. import functools
  8. import locale
  9. import logging
  10. import os
  11. import shutil
  12. import sys
  13. from pip._vendor.six import PY2, text_type
  14. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  15. if MYPY_CHECK_RUNNING:
  16. from typing import Callable, Optional, Protocol, Text, Tuple, TypeVar, Union
  17. # Used in the @lru_cache polyfill.
  18. F = TypeVar('F')
  19. class LruCache(Protocol):
  20. def __call__(self, maxsize=None):
  21. # type: (Optional[int]) -> Callable[[F], F]
  22. raise NotImplementedError
  23. try:
  24. import ipaddress
  25. except ImportError:
  26. try:
  27. from pip._vendor import ipaddress # type: ignore
  28. except ImportError:
  29. import ipaddr as ipaddress # type: ignore
  30. ipaddress.ip_address = ipaddress.IPAddress # type: ignore
  31. ipaddress.ip_network = ipaddress.IPNetwork # type: ignore
  32. __all__ = [
  33. "ipaddress", "uses_pycache", "console_to_str",
  34. "get_path_uid", "stdlib_pkgs", "WINDOWS", "samefile", "get_terminal_size",
  35. ]
  36. logger = logging.getLogger(__name__)
  37. if PY2:
  38. import imp
  39. try:
  40. cache_from_source = imp.cache_from_source # type: ignore
  41. except AttributeError:
  42. # does not use __pycache__
  43. cache_from_source = None
  44. uses_pycache = cache_from_source is not None
  45. else:
  46. uses_pycache = True
  47. from importlib.util import cache_from_source
  48. if PY2:
  49. # In Python 2.7, backslashreplace exists
  50. # but does not support use for decoding.
  51. # We implement our own replace handler for this
  52. # situation, so that we can consistently use
  53. # backslash replacement for all versions.
  54. def backslashreplace_decode_fn(err):
  55. raw_bytes = (err.object[i] for i in range(err.start, err.end))
  56. # Python 2 gave us characters - convert to numeric bytes
  57. raw_bytes = (ord(b) for b in raw_bytes)
  58. return u"".join(map(u"\\x{:x}".format, raw_bytes)), err.end
  59. codecs.register_error(
  60. "backslashreplace_decode",
  61. backslashreplace_decode_fn,
  62. )
  63. backslashreplace_decode = "backslashreplace_decode"
  64. else:
  65. backslashreplace_decode = "backslashreplace"
  66. def has_tls():
  67. # type: () -> bool
  68. try:
  69. import _ssl # noqa: F401 # ignore unused
  70. return True
  71. except ImportError:
  72. pass
  73. from pip._vendor.urllib3.util import IS_PYOPENSSL
  74. return IS_PYOPENSSL
  75. def str_to_display(data, desc=None):
  76. # type: (Union[bytes, Text], Optional[str]) -> Text
  77. """
  78. For display or logging purposes, convert a bytes object (or text) to
  79. text (e.g. unicode in Python 2) safe for output.
  80. :param desc: An optional phrase describing the input data, for use in
  81. the log message if a warning is logged. Defaults to "Bytes object".
  82. This function should never error out and so can take a best effort
  83. approach. It is okay to be lossy if needed since the return value is
  84. just for display.
  85. We assume the data is in the locale preferred encoding. If it won't
  86. decode properly, we warn the user but decode as best we can.
  87. We also ensure that the output can be safely written to standard output
  88. without encoding errors.
  89. """
  90. if isinstance(data, text_type):
  91. return data
  92. # Otherwise, data is a bytes object (str in Python 2).
  93. # First, get the encoding we assume. This is the preferred
  94. # encoding for the locale, unless that is not found, or
  95. # it is ASCII, in which case assume UTF-8
  96. encoding = locale.getpreferredencoding()
  97. if (not encoding) or codecs.lookup(encoding).name == "ascii":
  98. encoding = "utf-8"
  99. # Now try to decode the data - if we fail, warn the user and
  100. # decode with replacement.
  101. try:
  102. decoded_data = data.decode(encoding)
  103. except UnicodeDecodeError:
  104. logger.warning(
  105. '%s does not appear to be encoded as %s',
  106. desc or 'Bytes object',
  107. encoding,
  108. )
  109. decoded_data = data.decode(encoding, errors=backslashreplace_decode)
  110. # Make sure we can print the output, by encoding it to the output
  111. # encoding with replacement of unencodable characters, and then
  112. # decoding again.
  113. # We use stderr's encoding because it's less likely to be
  114. # redirected and if we don't find an encoding we skip this
  115. # step (on the assumption that output is wrapped by something
  116. # that won't fail).
  117. # The double getattr is to deal with the possibility that we're
  118. # being called in a situation where sys.__stderr__ doesn't exist,
  119. # or doesn't have an encoding attribute. Neither of these cases
  120. # should occur in normal pip use, but there's no harm in checking
  121. # in case people use pip in (unsupported) unusual situations.
  122. output_encoding = getattr(getattr(sys, "__stderr__", None),
  123. "encoding", None)
  124. if output_encoding:
  125. output_encoded = decoded_data.encode(
  126. output_encoding,
  127. errors="backslashreplace"
  128. )
  129. decoded_data = output_encoded.decode(output_encoding)
  130. return decoded_data
  131. def console_to_str(data):
  132. # type: (bytes) -> Text
  133. """Return a string, safe for output, of subprocess output.
  134. """
  135. return str_to_display(data, desc='Subprocess output')
  136. def get_path_uid(path):
  137. # type: (str) -> int
  138. """
  139. Return path's uid.
  140. Does not follow symlinks:
  141. https://github.com/pypa/pip/pull/935#discussion_r5307003
  142. Placed this function in compat due to differences on AIX and
  143. Jython, that should eventually go away.
  144. :raises OSError: When path is a symlink or can't be read.
  145. """
  146. if hasattr(os, 'O_NOFOLLOW'):
  147. fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
  148. file_uid = os.fstat(fd).st_uid
  149. os.close(fd)
  150. else: # AIX and Jython
  151. # WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW
  152. if not os.path.islink(path):
  153. # older versions of Jython don't have `os.fstat`
  154. file_uid = os.stat(path).st_uid
  155. else:
  156. # raise OSError for parity with os.O_NOFOLLOW above
  157. raise OSError(
  158. "{} is a symlink; Will not return uid for symlinks".format(
  159. path)
  160. )
  161. return file_uid
  162. def expanduser(path):
  163. # type: (str) -> str
  164. """
  165. Expand ~ and ~user constructions.
  166. Includes a workaround for https://bugs.python.org/issue14768
  167. """
  168. expanded = os.path.expanduser(path)
  169. if path.startswith('~/') and expanded.startswith('//'):
  170. expanded = expanded[1:]
  171. return expanded
  172. # packages in the stdlib that may have installation metadata, but should not be
  173. # considered 'installed'. this theoretically could be determined based on
  174. # dist.location (py27:`sysconfig.get_paths()['stdlib']`,
  175. # py26:sysconfig.get_config_vars('LIBDEST')), but fear platform variation may
  176. # make this ineffective, so hard-coding
  177. stdlib_pkgs = {"python", "wsgiref", "argparse"}
  178. # windows detection, covers cpython and ironpython
  179. WINDOWS = (sys.platform.startswith("win") or
  180. (sys.platform == 'cli' and os.name == 'nt'))
  181. def samefile(file1, file2):
  182. # type: (str, str) -> bool
  183. """Provide an alternative for os.path.samefile on Windows/Python2"""
  184. if hasattr(os.path, 'samefile'):
  185. return os.path.samefile(file1, file2)
  186. else:
  187. path1 = os.path.normcase(os.path.abspath(file1))
  188. path2 = os.path.normcase(os.path.abspath(file2))
  189. return path1 == path2
  190. if hasattr(shutil, 'get_terminal_size'):
  191. def get_terminal_size():
  192. # type: () -> Tuple[int, int]
  193. """
  194. Returns a tuple (x, y) representing the width(x) and the height(y)
  195. in characters of the terminal window.
  196. """
  197. return tuple(shutil.get_terminal_size()) # type: ignore
  198. else:
  199. def get_terminal_size():
  200. # type: () -> Tuple[int, int]
  201. """
  202. Returns a tuple (x, y) representing the width(x) and the height(y)
  203. in characters of the terminal window.
  204. """
  205. def ioctl_GWINSZ(fd):
  206. try:
  207. import fcntl
  208. import struct
  209. import termios
  210. cr = struct.unpack_from(
  211. 'hh',
  212. fcntl.ioctl(fd, termios.TIOCGWINSZ, '12345678')
  213. )
  214. except Exception:
  215. return None
  216. if cr == (0, 0):
  217. return None
  218. return cr
  219. cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
  220. if not cr:
  221. if sys.platform != "win32":
  222. try:
  223. fd = os.open(os.ctermid(), os.O_RDONLY)
  224. cr = ioctl_GWINSZ(fd)
  225. os.close(fd)
  226. except Exception:
  227. pass
  228. if not cr:
  229. cr = (os.environ.get('LINES', 25), os.environ.get('COLUMNS', 80))
  230. return int(cr[1]), int(cr[0])
  231. # Fallback to noop_lru_cache in Python 2
  232. # TODO: this can be removed when python 2 support is dropped!
  233. def noop_lru_cache(maxsize=None):
  234. # type: (Optional[int]) -> Callable[[F], F]
  235. def _wrapper(f):
  236. # type: (F) -> F
  237. return f
  238. return _wrapper
  239. lru_cache = getattr(functools, "lru_cache", noop_lru_cache) # type: LruCache