context.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. """
  2. Context manager to commit/rollback consumer offsets.
  3. """
  4. from __future__ import absolute_import
  5. from logging import getLogger
  6. from kafka.errors import check_error, OffsetOutOfRangeError
  7. from kafka.structs import OffsetCommitRequestPayload
  8. class OffsetCommitContext(object):
  9. """
  10. Provides commit/rollback semantics around a `SimpleConsumer`.
  11. Usage assumes that `auto_commit` is disabled, that messages are consumed in
  12. batches, and that the consuming process will record its own successful
  13. processing of each message. Both the commit and rollback operations respect
  14. a "high-water mark" to ensure that last unsuccessfully processed message
  15. will be retried.
  16. Example:
  17. .. code:: python
  18. consumer = SimpleConsumer(client, group, topic, auto_commit=False)
  19. consumer.provide_partition_info()
  20. consumer.fetch_last_known_offsets()
  21. while some_condition:
  22. with OffsetCommitContext(consumer) as context:
  23. messages = consumer.get_messages(count, block=False)
  24. for partition, message in messages:
  25. if can_process(message):
  26. context.mark(partition, message.offset)
  27. else:
  28. break
  29. if not context:
  30. sleep(delay)
  31. These semantics allow for deferred message processing (e.g. if `can_process`
  32. compares message time to clock time) and for repeated processing of the last
  33. unsuccessful message (until some external error is resolved).
  34. """
  35. def __init__(self, consumer):
  36. """
  37. :param consumer: an instance of `SimpleConsumer`
  38. """
  39. self.consumer = consumer
  40. self.initial_offsets = None
  41. self.high_water_mark = None
  42. self.logger = getLogger("kafka.context")
  43. def mark(self, partition, offset):
  44. """
  45. Set the high-water mark in the current context.
  46. In order to know the current partition, it is helpful to initialize
  47. the consumer to provide partition info via:
  48. .. code:: python
  49. consumer.provide_partition_info()
  50. """
  51. max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
  52. self.logger.debug("Setting high-water mark to: %s",
  53. {partition: max_offset})
  54. self.high_water_mark[partition] = max_offset
  55. def __nonzero__(self):
  56. """
  57. Return whether any operations were marked in the context.
  58. """
  59. return bool(self.high_water_mark)
  60. def __enter__(self):
  61. """
  62. Start a new context:
  63. - Record the initial offsets for rollback
  64. - Reset the high-water mark
  65. """
  66. self.initial_offsets = dict(self.consumer.offsets)
  67. self.high_water_mark = dict()
  68. self.logger.debug("Starting context at: %s", self.initial_offsets)
  69. return self
  70. def __exit__(self, exc_type, exc_value, traceback):
  71. """
  72. End a context.
  73. - If there was no exception, commit up to the current high-water mark.
  74. - If there was an offset of range error, attempt to find the correct
  75. initial offset.
  76. - If there was any other error, roll back to the initial offsets.
  77. """
  78. if exc_type is None:
  79. self.commit()
  80. elif isinstance(exc_value, OffsetOutOfRangeError):
  81. self.handle_out_of_range()
  82. return True
  83. else:
  84. self.rollback()
  85. def commit(self):
  86. """
  87. Commit this context's offsets:
  88. - If the high-water mark has moved, commit up to and position the
  89. consumer at the high-water mark.
  90. - Otherwise, reset to the consumer to the initial offsets.
  91. """
  92. if self.high_water_mark:
  93. self.logger.info("Committing offsets: %s", self.high_water_mark)
  94. self.commit_partition_offsets(self.high_water_mark)
  95. self.update_consumer_offsets(self.high_water_mark)
  96. else:
  97. self.update_consumer_offsets(self.initial_offsets)
  98. def rollback(self):
  99. """
  100. Rollback this context:
  101. - Position the consumer at the initial offsets.
  102. """
  103. self.logger.info("Rolling back context: %s", self.initial_offsets)
  104. self.update_consumer_offsets(self.initial_offsets)
  105. def commit_partition_offsets(self, partition_offsets):
  106. """
  107. Commit explicit partition/offset pairs.
  108. """
  109. self.logger.debug("Committing partition offsets: %s", partition_offsets)
  110. commit_requests = [
  111. OffsetCommitRequestPayload(self.consumer.topic, partition, offset, None)
  112. for partition, offset in partition_offsets.items()
  113. ]
  114. commit_responses = self.consumer.client.send_offset_commit_request(
  115. self.consumer.group,
  116. commit_requests,
  117. )
  118. for commit_response in commit_responses:
  119. check_error(commit_response)
  120. def update_consumer_offsets(self, partition_offsets):
  121. """
  122. Update consumer offsets to explicit positions.
  123. """
  124. self.logger.debug("Updating consumer offsets to: %s", partition_offsets)
  125. for partition, offset in partition_offsets.items():
  126. self.consumer.offsets[partition] = offset
  127. # consumer keeps other offset states beyond its `offsets` dictionary,
  128. # a relative seek with zero delta forces the consumer to reset to the
  129. # current value of the `offsets` dictionary
  130. self.consumer.seek(0, 1)
  131. def handle_out_of_range(self):
  132. """
  133. Handle out of range condition by seeking to the beginning of valid
  134. ranges.
  135. This assumes that an out of range doesn't happen by seeking past the end
  136. of valid ranges -- which is far less likely.
  137. """
  138. self.logger.info("Seeking beginning of partition on out of range error")
  139. self.consumer.seek(0, 0)