future.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from __future__ import absolute_import
  2. import collections
  3. import threading
  4. from .. import errors as Errors
  5. from ..future import Future
  6. class FutureProduceResult(Future):
  7. def __init__(self, topic_partition):
  8. super(FutureProduceResult, self).__init__()
  9. self.topic_partition = topic_partition
  10. self._latch = threading.Event()
  11. def success(self, value):
  12. ret = super(FutureProduceResult, self).success(value)
  13. self._latch.set()
  14. return ret
  15. def failure(self, error):
  16. ret = super(FutureProduceResult, self).failure(error)
  17. self._latch.set()
  18. return ret
  19. def wait(self, timeout=None):
  20. # wait() on python2.6 returns None instead of the flag value
  21. return self._latch.wait(timeout) or self._latch.is_set()
  22. class FutureRecordMetadata(Future):
  23. def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
  24. super(FutureRecordMetadata, self).__init__()
  25. self._produce_future = produce_future
  26. # packing args as a tuple is a minor speed optimization
  27. self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
  28. produce_future.add_callback(self._produce_success)
  29. produce_future.add_errback(self.failure)
  30. def _produce_success(self, offset_and_timestamp):
  31. offset, produce_timestamp_ms = offset_and_timestamp
  32. # Unpacking from args tuple is minor speed optimization
  33. (relative_offset, timestamp_ms, checksum,
  34. serialized_key_size, serialized_value_size) = self.args
  35. if produce_timestamp_ms is not None:
  36. timestamp_ms = produce_timestamp_ms
  37. if offset != -1 and relative_offset is not None:
  38. offset += relative_offset
  39. tp = self._produce_future.topic_partition
  40. metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
  41. checksum, serialized_key_size,
  42. serialized_value_size)
  43. self.success(metadata)
  44. def get(self, timeout=None):
  45. if not self.is_done and not self._produce_future.wait(timeout):
  46. raise Errors.KafkaTimeoutError(
  47. "Timeout after waiting for %s secs." % timeout)
  48. assert self.is_done
  49. if self.failed():
  50. raise self.exception # pylint: disable-msg=raising-bad-type
  51. return self.value
  52. RecordMetadata = collections.namedtuple(
  53. 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
  54. 'checksum', 'serialized_key_size', 'serialized_value_size'])