search.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. from __future__ import absolute_import
  2. import logging
  3. import sys
  4. import textwrap
  5. from collections import OrderedDict
  6. from pip._vendor import pkg_resources
  7. from pip._vendor.packaging.version import parse as parse_version
  8. # NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is
  9. # why we ignore the type on this import
  10. from pip._vendor.six.moves import xmlrpc_client # type: ignore
  11. from pip._internal.cli.base_command import Command
  12. from pip._internal.cli.req_command import SessionCommandMixin
  13. from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS
  14. from pip._internal.exceptions import CommandError
  15. from pip._internal.models.index import PyPI
  16. from pip._internal.network.xmlrpc import PipXmlrpcTransport
  17. from pip._internal.utils.compat import get_terminal_size
  18. from pip._internal.utils.logging import indent_log
  19. from pip._internal.utils.misc import get_distribution, write_output
  20. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  21. if MYPY_CHECK_RUNNING:
  22. from optparse import Values
  23. from typing import Dict, List, Optional
  24. from typing_extensions import TypedDict
  25. TransformedHit = TypedDict(
  26. 'TransformedHit',
  27. {'name': str, 'summary': str, 'versions': List[str]},
  28. )
  29. logger = logging.getLogger(__name__)
  30. class SearchCommand(Command, SessionCommandMixin):
  31. """Search for PyPI packages whose name or summary contains <query>."""
  32. usage = """
  33. %prog [options] <query>"""
  34. ignore_require_venv = True
  35. def add_options(self):
  36. # type: () -> None
  37. self.cmd_opts.add_option(
  38. '-i', '--index',
  39. dest='index',
  40. metavar='URL',
  41. default=PyPI.pypi_url,
  42. help='Base URL of Python Package Index (default %default)')
  43. self.parser.insert_option_group(0, self.cmd_opts)
  44. def run(self, options, args):
  45. # type: (Values, List[str]) -> int
  46. if not args:
  47. raise CommandError('Missing required argument (search query).')
  48. query = args
  49. pypi_hits = self.search(query, options)
  50. hits = transform_hits(pypi_hits)
  51. terminal_width = None
  52. if sys.stdout.isatty():
  53. terminal_width = get_terminal_size()[0]
  54. print_results(hits, terminal_width=terminal_width)
  55. if pypi_hits:
  56. return SUCCESS
  57. return NO_MATCHES_FOUND
  58. def search(self, query, options):
  59. # type: (List[str], Values) -> List[Dict[str, str]]
  60. index_url = options.index
  61. session = self.get_default_session(options)
  62. transport = PipXmlrpcTransport(index_url, session)
  63. pypi = xmlrpc_client.ServerProxy(index_url, transport)
  64. try:
  65. hits = pypi.search({'name': query, 'summary': query}, 'or')
  66. except xmlrpc_client.Fault as fault:
  67. message = "XMLRPC request failed [code: {code}]\n{string}".format(
  68. code=fault.faultCode,
  69. string=fault.faultString,
  70. )
  71. raise CommandError(message)
  72. return hits
  73. def transform_hits(hits):
  74. # type: (List[Dict[str, str]]) -> List[TransformedHit]
  75. """
  76. The list from pypi is really a list of versions. We want a list of
  77. packages with the list of versions stored inline. This converts the
  78. list from pypi into one we can use.
  79. """
  80. packages = OrderedDict() # type: OrderedDict[str, TransformedHit]
  81. for hit in hits:
  82. name = hit['name']
  83. summary = hit['summary']
  84. version = hit['version']
  85. if name not in packages.keys():
  86. packages[name] = {
  87. 'name': name,
  88. 'summary': summary,
  89. 'versions': [version],
  90. }
  91. else:
  92. packages[name]['versions'].append(version)
  93. # if this is the highest version, replace summary and score
  94. if version == highest_version(packages[name]['versions']):
  95. packages[name]['summary'] = summary
  96. return list(packages.values())
  97. def print_results(hits, name_column_width=None, terminal_width=None):
  98. # type: (List[TransformedHit], Optional[int], Optional[int]) -> None
  99. if not hits:
  100. return
  101. if name_column_width is None:
  102. name_column_width = max([
  103. len(hit['name']) + len(highest_version(hit.get('versions', ['-'])))
  104. for hit in hits
  105. ]) + 4
  106. installed_packages = [p.project_name for p in pkg_resources.working_set]
  107. for hit in hits:
  108. name = hit['name']
  109. summary = hit['summary'] or ''
  110. latest = highest_version(hit.get('versions', ['-']))
  111. if terminal_width is not None:
  112. target_width = terminal_width - name_column_width - 5
  113. if target_width > 10:
  114. # wrap and indent summary to fit terminal
  115. summary_lines = textwrap.wrap(summary, target_width)
  116. summary = ('\n' + ' ' * (name_column_width + 3)).join(
  117. summary_lines)
  118. line = '{name_latest:{name_column_width}} - {summary}'.format(
  119. name_latest='{name} ({latest})'.format(**locals()),
  120. **locals())
  121. try:
  122. write_output(line)
  123. if name in installed_packages:
  124. dist = get_distribution(name)
  125. assert dist is not None
  126. with indent_log():
  127. if dist.version == latest:
  128. write_output('INSTALLED: %s (latest)', dist.version)
  129. else:
  130. write_output('INSTALLED: %s', dist.version)
  131. if parse_version(latest).pre:
  132. write_output('LATEST: %s (pre-release; install'
  133. ' with "pip install --pre")', latest)
  134. else:
  135. write_output('LATEST: %s', latest)
  136. except UnicodeEncodeError:
  137. pass
  138. def highest_version(versions):
  139. # type: (List[str]) -> str
  140. return max(versions, key=parse_version)