direct_url.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. """ PEP 610 """
  2. import json
  3. import re
  4. from pip._vendor import six
  5. from pip._vendor.six.moves.urllib import parse as urllib_parse
  6. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  7. if MYPY_CHECK_RUNNING:
  8. from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union
  9. T = TypeVar("T")
  10. DIRECT_URL_METADATA_NAME = "direct_url.json"
  11. ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
  12. __all__ = [
  13. "DirectUrl",
  14. "DirectUrlValidationError",
  15. "DirInfo",
  16. "ArchiveInfo",
  17. "VcsInfo",
  18. ]
  19. class DirectUrlValidationError(Exception):
  20. pass
  21. def _get(d, expected_type, key, default=None):
  22. # type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T]
  23. """Get value from dictionary and verify expected type."""
  24. if key not in d:
  25. return default
  26. value = d[key]
  27. if six.PY2 and expected_type is str:
  28. expected_type = six.string_types # type: ignore
  29. if not isinstance(value, expected_type):
  30. raise DirectUrlValidationError(
  31. "{!r} has unexpected type for {} (expected {})".format(
  32. value, key, expected_type
  33. )
  34. )
  35. return value
  36. def _get_required(d, expected_type, key, default=None):
  37. # type: (Dict[str, Any], Type[T], str, Optional[T]) -> T
  38. value = _get(d, expected_type, key, default)
  39. if value is None:
  40. raise DirectUrlValidationError("{} must have a value".format(key))
  41. return value
  42. def _exactly_one_of(infos):
  43. # type: (Iterable[Optional[InfoType]]) -> InfoType
  44. infos = [info for info in infos if info is not None]
  45. if not infos:
  46. raise DirectUrlValidationError(
  47. "missing one of archive_info, dir_info, vcs_info"
  48. )
  49. if len(infos) > 1:
  50. raise DirectUrlValidationError(
  51. "more than one of archive_info, dir_info, vcs_info"
  52. )
  53. assert infos[0] is not None
  54. return infos[0]
  55. def _filter_none(**kwargs):
  56. # type: (Any) -> Dict[str, Any]
  57. """Make dict excluding None values."""
  58. return {k: v for k, v in kwargs.items() if v is not None}
  59. class VcsInfo(object):
  60. name = "vcs_info"
  61. def __init__(
  62. self,
  63. vcs, # type: str
  64. commit_id, # type: str
  65. requested_revision=None, # type: Optional[str]
  66. resolved_revision=None, # type: Optional[str]
  67. resolved_revision_type=None, # type: Optional[str]
  68. ):
  69. self.vcs = vcs
  70. self.requested_revision = requested_revision
  71. self.commit_id = commit_id
  72. self.resolved_revision = resolved_revision
  73. self.resolved_revision_type = resolved_revision_type
  74. @classmethod
  75. def _from_dict(cls, d):
  76. # type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo]
  77. if d is None:
  78. return None
  79. return cls(
  80. vcs=_get_required(d, str, "vcs"),
  81. commit_id=_get_required(d, str, "commit_id"),
  82. requested_revision=_get(d, str, "requested_revision"),
  83. resolved_revision=_get(d, str, "resolved_revision"),
  84. resolved_revision_type=_get(d, str, "resolved_revision_type"),
  85. )
  86. def _to_dict(self):
  87. # type: () -> Dict[str, Any]
  88. return _filter_none(
  89. vcs=self.vcs,
  90. requested_revision=self.requested_revision,
  91. commit_id=self.commit_id,
  92. resolved_revision=self.resolved_revision,
  93. resolved_revision_type=self.resolved_revision_type,
  94. )
  95. class ArchiveInfo(object):
  96. name = "archive_info"
  97. def __init__(
  98. self,
  99. hash=None, # type: Optional[str]
  100. ):
  101. self.hash = hash
  102. @classmethod
  103. def _from_dict(cls, d):
  104. # type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo]
  105. if d is None:
  106. return None
  107. return cls(hash=_get(d, str, "hash"))
  108. def _to_dict(self):
  109. # type: () -> Dict[str, Any]
  110. return _filter_none(hash=self.hash)
  111. class DirInfo(object):
  112. name = "dir_info"
  113. def __init__(
  114. self,
  115. editable=False, # type: bool
  116. ):
  117. self.editable = editable
  118. @classmethod
  119. def _from_dict(cls, d):
  120. # type: (Optional[Dict[str, Any]]) -> Optional[DirInfo]
  121. if d is None:
  122. return None
  123. return cls(
  124. editable=_get_required(d, bool, "editable", default=False)
  125. )
  126. def _to_dict(self):
  127. # type: () -> Dict[str, Any]
  128. return _filter_none(editable=self.editable or None)
  129. if MYPY_CHECK_RUNNING:
  130. InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
  131. class DirectUrl(object):
  132. def __init__(
  133. self,
  134. url, # type: str
  135. info, # type: InfoType
  136. subdirectory=None, # type: Optional[str]
  137. ):
  138. self.url = url
  139. self.info = info
  140. self.subdirectory = subdirectory
  141. def _remove_auth_from_netloc(self, netloc):
  142. # type: (str) -> str
  143. if "@" not in netloc:
  144. return netloc
  145. user_pass, netloc_no_user_pass = netloc.split("@", 1)
  146. if (
  147. isinstance(self.info, VcsInfo) and
  148. self.info.vcs == "git" and
  149. user_pass == "git"
  150. ):
  151. return netloc
  152. if ENV_VAR_RE.match(user_pass):
  153. return netloc
  154. return netloc_no_user_pass
  155. @property
  156. def redacted_url(self):
  157. # type: () -> str
  158. """url with user:password part removed unless it is formed with
  159. environment variables as specified in PEP 610, or it is ``git``
  160. in the case of a git URL.
  161. """
  162. purl = urllib_parse.urlsplit(self.url)
  163. netloc = self._remove_auth_from_netloc(purl.netloc)
  164. surl = urllib_parse.urlunsplit(
  165. (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
  166. )
  167. return surl
  168. def validate(self):
  169. # type: () -> None
  170. self.from_dict(self.to_dict())
  171. @classmethod
  172. def from_dict(cls, d):
  173. # type: (Dict[str, Any]) -> DirectUrl
  174. return DirectUrl(
  175. url=_get_required(d, str, "url"),
  176. subdirectory=_get(d, str, "subdirectory"),
  177. info=_exactly_one_of(
  178. [
  179. ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
  180. DirInfo._from_dict(_get(d, dict, "dir_info")),
  181. VcsInfo._from_dict(_get(d, dict, "vcs_info")),
  182. ]
  183. ),
  184. )
  185. def to_dict(self):
  186. # type: () -> Dict[str, Any]
  187. res = _filter_none(
  188. url=self.redacted_url,
  189. subdirectory=self.subdirectory,
  190. )
  191. res[self.info.name] = self.info._to_dict()
  192. return res
  193. @classmethod
  194. def from_json(cls, s):
  195. # type: (str) -> DirectUrl
  196. return cls.from_dict(json.loads(s))
  197. def to_json(self):
  198. # type: () -> str
  199. return json.dumps(self.to_dict(), sort_keys=True)