defer.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. """
  2. Helper functions for dealing with Twisted deferreds
  3. """
  4. from twisted.internet import defer, reactor, task
  5. from twisted.python import failure
  6. from scrapy.exceptions import IgnoreRequest
  7. def defer_fail(_failure):
  8. """Same as twisted.internet.defer.fail but delay calling errback until
  9. next reactor loop
  10. It delays by 100ms so reactor has a chance to go through readers and writers
  11. before attending pending delayed calls, so do not set delay to zero.
  12. """
  13. d = defer.Deferred()
  14. reactor.callLater(0.1, d.errback, _failure)
  15. return d
  16. def defer_succeed(result):
  17. """Same as twisted.internet.defer.succeed but delay calling callback until
  18. next reactor loop
  19. It delays by 100ms so reactor has a chance to go trough readers and writers
  20. before attending pending delayed calls, so do not set delay to zero.
  21. """
  22. d = defer.Deferred()
  23. reactor.callLater(0.1, d.callback, result)
  24. return d
  25. def defer_result(result):
  26. if isinstance(result, defer.Deferred):
  27. return result
  28. elif isinstance(result, failure.Failure):
  29. return defer_fail(result)
  30. else:
  31. return defer_succeed(result)
  32. def mustbe_deferred(f, *args, **kw):
  33. """Same as twisted.internet.defer.maybeDeferred, but delay calling
  34. callback/errback to next reactor loop
  35. """
  36. try:
  37. result = f(*args, **kw)
  38. # FIXME: Hack to avoid introspecting tracebacks. This to speed up
  39. # processing of IgnoreRequest errors which are, by far, the most common
  40. # exception in Scrapy - see #125
  41. except IgnoreRequest as e:
  42. return defer_fail(failure.Failure(e))
  43. except Exception:
  44. return defer_fail(failure.Failure())
  45. else:
  46. return defer_result(result)
  47. def parallel(iterable, count, callable, *args, **named):
  48. """Execute a callable over the objects in the given iterable, in parallel,
  49. using no more than ``count`` concurrent calls.
  50. Taken from: https://jcalderone.livejournal.com/24285.html
  51. """
  52. coop = task.Cooperator()
  53. work = (callable(elem, *args, **named) for elem in iterable)
  54. return defer.DeferredList([coop.coiterate(work) for _ in range(count)])
  55. def process_chain(callbacks, input, *a, **kw):
  56. """Return a Deferred built by chaining the given callbacks"""
  57. d = defer.Deferred()
  58. for x in callbacks:
  59. d.addCallback(x, *a, **kw)
  60. d.callback(input)
  61. return d
  62. def process_chain_both(callbacks, errbacks, input, *a, **kw):
  63. """Return a Deferred built by chaining the given callbacks and errbacks"""
  64. d = defer.Deferred()
  65. for cb, eb in zip(callbacks, errbacks):
  66. d.addCallbacks(cb, eb, callbackArgs=a, callbackKeywords=kw,
  67. errbackArgs=a, errbackKeywords=kw)
  68. if isinstance(input, failure.Failure):
  69. d.errback(input)
  70. else:
  71. d.callback(input)
  72. return d
  73. def process_parallel(callbacks, input, *a, **kw):
  74. """Return a Deferred with the output of all successful calls to the given
  75. callbacks
  76. """
  77. dfds = [defer.succeed(input).addCallback(x, *a, **kw) for x in callbacks]
  78. d = defer.DeferredList(dfds, fireOnOneErrback=1, consumeErrors=1)
  79. d.addCallbacks(lambda r: [x[1] for x in r], lambda f: f.value.subFailure)
  80. return d
  81. def iter_errback(iterable, errback, *a, **kw):
  82. """Wraps an iterable calling an errback if an error is caught while
  83. iterating it.
  84. """
  85. it = iter(iterable)
  86. while True:
  87. try:
  88. yield next(it)
  89. except StopIteration:
  90. break
  91. except Exception:
  92. errback(failure.Failure(), *a, **kw)