123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- from __future__ import absolute_import
- import collections
- import threading
- from .. import errors as Errors
- from ..future import Future
- class FutureProduceResult(Future):
- def __init__(self, topic_partition):
- super(FutureProduceResult, self).__init__()
- self.topic_partition = topic_partition
- self._latch = threading.Event()
- def success(self, value):
- ret = super(FutureProduceResult, self).success(value)
- self._latch.set()
- return ret
- def failure(self, error):
- ret = super(FutureProduceResult, self).failure(error)
- self._latch.set()
- return ret
- def wait(self, timeout=None):
- # wait() on python2.6 returns None instead of the flag value
- return self._latch.wait(timeout) or self._latch.is_set()
- class FutureRecordMetadata(Future):
- def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
- super(FutureRecordMetadata, self).__init__()
- self._produce_future = produce_future
- # packing args as a tuple is a minor speed optimization
- self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
- produce_future.add_callback(self._produce_success)
- produce_future.add_errback(self.failure)
- def _produce_success(self, offset_and_timestamp):
- offset, produce_timestamp_ms = offset_and_timestamp
- # Unpacking from args tuple is minor speed optimization
- (relative_offset, timestamp_ms, checksum,
- serialized_key_size, serialized_value_size) = self.args
- if produce_timestamp_ms is not None:
- timestamp_ms = produce_timestamp_ms
- if offset != -1 and relative_offset is not None:
- offset += relative_offset
- tp = self._produce_future.topic_partition
- metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
- checksum, serialized_key_size,
- serialized_value_size)
- self.success(metadata)
- def get(self, timeout=None):
- if not self.is_done and not self._produce_future.wait(timeout):
- raise Errors.KafkaTimeoutError(
- "Timeout after waiting for %s secs." % timeout)
- assert self.is_done
- if self.failed():
- raise self.exception # pylint: disable-msg=raising-bad-type
- return self.value
- RecordMetadata = collections.namedtuple(
- 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
- 'checksum', 'serialized_key_size', 'serialized_value_size'])
|