123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- """
- Helper functions for dealing with Twisted deferreds
- """
- from twisted.internet import defer, reactor, task
- from twisted.python import failure
- from scrapy.exceptions import IgnoreRequest
- def defer_fail(_failure):
- """Same as twisted.internet.defer.fail but delay calling errback until
- next reactor loop
- It delays by 100ms so reactor has a chance to go through readers and writers
- before attending pending delayed calls, so do not set delay to zero.
- """
- d = defer.Deferred()
- reactor.callLater(0.1, d.errback, _failure)
- return d
- def defer_succeed(result):
- """Same as twisted.internet.defer.succeed but delay calling callback until
- next reactor loop
- It delays by 100ms so reactor has a chance to go trough readers and writers
- before attending pending delayed calls, so do not set delay to zero.
- """
- d = defer.Deferred()
- reactor.callLater(0.1, d.callback, result)
- return d
- def defer_result(result):
- if isinstance(result, defer.Deferred):
- return result
- elif isinstance(result, failure.Failure):
- return defer_fail(result)
- else:
- return defer_succeed(result)
- def mustbe_deferred(f, *args, **kw):
- """Same as twisted.internet.defer.maybeDeferred, but delay calling
- callback/errback to next reactor loop
- """
- try:
- result = f(*args, **kw)
- # FIXME: Hack to avoid introspecting tracebacks. This to speed up
- # processing of IgnoreRequest errors which are, by far, the most common
- # exception in Scrapy - see #125
- except IgnoreRequest as e:
- return defer_fail(failure.Failure(e))
- except Exception:
- return defer_fail(failure.Failure())
- else:
- return defer_result(result)
- def parallel(iterable, count, callable, *args, **named):
- """Execute a callable over the objects in the given iterable, in parallel,
- using no more than ``count`` concurrent calls.
- Taken from: https://jcalderone.livejournal.com/24285.html
- """
- coop = task.Cooperator()
- work = (callable(elem, *args, **named) for elem in iterable)
- return defer.DeferredList([coop.coiterate(work) for _ in range(count)])
- def process_chain(callbacks, input, *a, **kw):
- """Return a Deferred built by chaining the given callbacks"""
- d = defer.Deferred()
- for x in callbacks:
- d.addCallback(x, *a, **kw)
- d.callback(input)
- return d
- def process_chain_both(callbacks, errbacks, input, *a, **kw):
- """Return a Deferred built by chaining the given callbacks and errbacks"""
- d = defer.Deferred()
- for cb, eb in zip(callbacks, errbacks):
- d.addCallbacks(cb, eb, callbackArgs=a, callbackKeywords=kw,
- errbackArgs=a, errbackKeywords=kw)
- if isinstance(input, failure.Failure):
- d.errback(input)
- else:
- d.callback(input)
- return d
- def process_parallel(callbacks, input, *a, **kw):
- """Return a Deferred with the output of all successful calls to the given
- callbacks
- """
- dfds = [defer.succeed(input).addCallback(x, *a, **kw) for x in callbacks]
- d = defer.DeferredList(dfds, fireOnOneErrback=1, consumeErrors=1)
- d.addCallbacks(lambda r: [x[1] for x in r], lambda f: f.value.subFailure)
- return d
- def iter_errback(iterable, errback, *a, **kw):
- """Wraps an iterable calling an errback if an error is caught while
- iterating it.
- """
- it = iter(iterable)
- while True:
- try:
- yield next(it)
- except StopIteration:
- break
- except Exception:
- errback(failure.Failure(), *a, **kw)
|