grequests.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # -*- coding: utf-8 -*-
  2. """
  3. grequests
  4. ~~~~~~~~~
  5. This module contains an asynchronous replica of ``requests.api``, powered
  6. by gevent. All API methods return a ``Request`` instance (as opposed to
  7. ``Response``). A list of requests can be sent with ``map()``.
  8. """
  9. from functools import partial
  10. import traceback
  11. try:
  12. import gevent
  13. from gevent import monkey as curious_george
  14. from gevent.pool import Pool
  15. except ImportError:
  16. raise RuntimeError('Gevent is required for grequests.')
  17. # Monkey-patch.
  18. curious_george.patch_all(thread=False, select=False)
  19. from requests import Session
  20. __all__ = (
  21. 'map', 'imap',
  22. 'get', 'options', 'head', 'post', 'put', 'patch', 'delete', 'request'
  23. )
  24. class AsyncRequest(object):
  25. """ Asynchronous request.
  26. Accept same parameters as ``Session.request`` and some additional:
  27. :param session: Session which will do request
  28. :param callback: Callback called on response.
  29. Same as passing ``hooks={'response': callback}``
  30. """
  31. def __init__(self, method, url, **kwargs):
  32. #: Request method
  33. self.method = method
  34. #: URL to request
  35. self.url = url
  36. #: Associated ``Session``
  37. self.session = kwargs.pop('session', None)
  38. if self.session is None:
  39. self.session = Session()
  40. callback = kwargs.pop('callback', None)
  41. if callback:
  42. kwargs['hooks'] = {'response': callback}
  43. #: The rest arguments for ``Session.request``
  44. self.kwargs = kwargs
  45. #: Resulting ``Response``
  46. self.response = None
  47. def send(self, **kwargs):
  48. """
  49. Prepares request based on parameter passed to constructor and optional ``kwargs```.
  50. Then sends request and saves response to :attr:`response`
  51. :returns: ``Response``
  52. """
  53. merged_kwargs = {}
  54. merged_kwargs.update(self.kwargs)
  55. merged_kwargs.update(kwargs)
  56. try:
  57. self.response = self.session.request(self.method,
  58. self.url, **merged_kwargs)
  59. except Exception as e:
  60. self.exception = e
  61. self.traceback = traceback.format_exc()
  62. return self
  63. def send(r, pool=None, stream=False):
  64. """Sends the request object using the specified pool. If a pool isn't
  65. specified this method blocks. Pools are useful because you can specify size
  66. and can hence limit concurrency."""
  67. if pool is not None:
  68. return pool.spawn(r.send, stream=stream)
  69. return gevent.spawn(r.send, stream=stream)
  70. # Shortcuts for creating AsyncRequest with appropriate HTTP method
  71. get = partial(AsyncRequest, 'GET')
  72. options = partial(AsyncRequest, 'OPTIONS')
  73. head = partial(AsyncRequest, 'HEAD')
  74. post = partial(AsyncRequest, 'POST')
  75. put = partial(AsyncRequest, 'PUT')
  76. patch = partial(AsyncRequest, 'PATCH')
  77. delete = partial(AsyncRequest, 'DELETE')
  78. # synonym
  79. def request(method, url, **kwargs):
  80. return AsyncRequest(method, url, **kwargs)
  81. def map(requests, stream=False, size=None, exception_handler=None, gtimeout=None):
  82. """Concurrently converts a list of Requests to Responses.
  83. :param requests: a collection of Request objects.
  84. :param stream: If True, the content will not be downloaded immediately.
  85. :param size: Specifies the number of requests to make at a time. If None, no throttling occurs.
  86. :param exception_handler: Callback function, called when exception occured. Params: Request, Exception
  87. :param gtimeout: Gevent joinall timeout in seconds. (Note: unrelated to requests timeout)
  88. """
  89. requests = list(requests)
  90. pool = Pool(size) if size else None
  91. jobs = [send(r, pool, stream=stream) for r in requests]
  92. gevent.joinall(jobs, timeout=gtimeout)
  93. ret = []
  94. for request in requests:
  95. if request.response is not None:
  96. ret.append(request.response)
  97. elif exception_handler and hasattr(request, 'exception'):
  98. ret.append(exception_handler(request, request.exception))
  99. else:
  100. ret.append(None)
  101. return ret
  102. def imap(requests, stream=False, size=2, exception_handler=None):
  103. """Concurrently converts a generator object of Requests to
  104. a generator of Responses.
  105. :param requests: a generator of Request objects.
  106. :param stream: If True, the content will not be downloaded immediately.
  107. :param size: Specifies the number of requests to make at a time. default is 2
  108. :param exception_handler: Callback function, called when exception occurred. Params: Request, Exception
  109. """
  110. pool = Pool(size)
  111. def send(r):
  112. return r.send(stream=stream)
  113. for request in pool.imap_unordered(send, requests):
  114. if request.response is not None:
  115. yield request.response
  116. elif exception_handler:
  117. ex_result = exception_handler(request, request.exception)
  118. if ex_result is not None:
  119. yield ex_result
  120. pool.join()