fetch.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from __future__ import absolute_import
  2. from .api import Request, Response
  3. from .message import MessageSet
  4. from .types import Array, Int8, Int16, Int32, Int64, Schema, String
  5. class FetchResponse_v0(Response):
  6. API_KEY = 1
  7. API_VERSION = 0
  8. SCHEMA = Schema(
  9. ('topics', Array(
  10. ('topics', String('utf-8')),
  11. ('partitions', Array(
  12. ('partition', Int32),
  13. ('error_code', Int16),
  14. ('highwater_offset', Int64),
  15. ('message_set', MessageSet)))))
  16. )
  17. class FetchResponse_v1(Response):
  18. API_KEY = 1
  19. API_VERSION = 1
  20. SCHEMA = Schema(
  21. ('throttle_time_ms', Int32),
  22. ('topics', Array(
  23. ('topics', String('utf-8')),
  24. ('partitions', Array(
  25. ('partition', Int32),
  26. ('error_code', Int16),
  27. ('highwater_offset', Int64),
  28. ('message_set', MessageSet)))))
  29. )
  30. class FetchResponse_v2(Response):
  31. API_KEY = 1
  32. API_VERSION = 2
  33. SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally
  34. class FetchResponse_v3(Response):
  35. API_KEY = 1
  36. API_VERSION = 3
  37. SCHEMA = FetchResponse_v2.SCHEMA
  38. class FetchResponse_v4(Response):
  39. API_KEY = 1
  40. API_VERSION = 4
  41. SCHEMA = Schema(
  42. ('throttle_time_ms', Int32),
  43. ('topics', Array(
  44. ('topics', String('utf-8')),
  45. ('partitions', Array(
  46. ('partition', Int32),
  47. ('error_code', Int16),
  48. ('highwater_offset', Int64),
  49. ('last_stable_offset', Int64),
  50. ('aborted_transactions', Array(
  51. ('producer_id', Int64),
  52. ('first_offset', Int64))),
  53. ('message_set', MessageSet)))))
  54. )
  55. class FetchResponse_v5(Response):
  56. API_KEY = 1
  57. API_VERSION = 5
  58. SCHEMA = Schema(
  59. ('throttle_time_ms', Int32),
  60. ('topics', Array(
  61. ('topics', String('utf-8')),
  62. ('partitions', Array(
  63. ('partition', Int32),
  64. ('error_code', Int16),
  65. ('highwater_offset', Int64),
  66. ('last_stable_offset', Int64),
  67. ('log_start_offset', Int64),
  68. ('aborted_transactions', Array(
  69. ('producer_id', Int64),
  70. ('first_offset', Int64))),
  71. ('message_set', MessageSet)))))
  72. )
  73. class FetchRequest_v0(Request):
  74. API_KEY = 1
  75. API_VERSION = 0
  76. RESPONSE_TYPE = FetchResponse_v0
  77. SCHEMA = Schema(
  78. ('replica_id', Int32),
  79. ('max_wait_time', Int32),
  80. ('min_bytes', Int32),
  81. ('topics', Array(
  82. ('topic', String('utf-8')),
  83. ('partitions', Array(
  84. ('partition', Int32),
  85. ('offset', Int64),
  86. ('max_bytes', Int32)))))
  87. )
  88. class FetchRequest_v1(Request):
  89. API_KEY = 1
  90. API_VERSION = 1
  91. RESPONSE_TYPE = FetchResponse_v1
  92. SCHEMA = FetchRequest_v0.SCHEMA
  93. class FetchRequest_v2(Request):
  94. API_KEY = 1
  95. API_VERSION = 2
  96. RESPONSE_TYPE = FetchResponse_v2
  97. SCHEMA = FetchRequest_v1.SCHEMA
  98. class FetchRequest_v3(Request):
  99. API_KEY = 1
  100. API_VERSION = 3
  101. RESPONSE_TYPE = FetchResponse_v3
  102. SCHEMA = Schema(
  103. ('replica_id', Int32),
  104. ('max_wait_time', Int32),
  105. ('min_bytes', Int32),
  106. ('max_bytes', Int32), # This new field is only difference from FR_v2
  107. ('topics', Array(
  108. ('topic', String('utf-8')),
  109. ('partitions', Array(
  110. ('partition', Int32),
  111. ('offset', Int64),
  112. ('max_bytes', Int32)))))
  113. )
  114. class FetchRequest_v4(Request):
  115. # Adds isolation_level field
  116. API_KEY = 1
  117. API_VERSION = 4
  118. RESPONSE_TYPE = FetchResponse_v4
  119. SCHEMA = Schema(
  120. ('replica_id', Int32),
  121. ('max_wait_time', Int32),
  122. ('min_bytes', Int32),
  123. ('max_bytes', Int32),
  124. ('isolation_level', Int8),
  125. ('topics', Array(
  126. ('topic', String('utf-8')),
  127. ('partitions', Array(
  128. ('partition', Int32),
  129. ('offset', Int64),
  130. ('max_bytes', Int32)))))
  131. )
  132. class FetchRequest_v5(Request):
  133. # This may only be used in broker-broker api calls
  134. API_KEY = 1
  135. API_VERSION = 5
  136. RESPONSE_TYPE = FetchResponse_v5
  137. SCHEMA = Schema(
  138. ('replica_id', Int32),
  139. ('max_wait_time', Int32),
  140. ('min_bytes', Int32),
  141. ('max_bytes', Int32),
  142. ('isolation_level', Int8),
  143. ('topics', Array(
  144. ('topic', String('utf-8')),
  145. ('partitions', Array(
  146. ('partition', Int32),
  147. ('fetch_offset', Int64),
  148. ('log_start_offset', Int64),
  149. ('max_bytes', Int32)))))
  150. )
  151. FetchRequest = [
  152. FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
  153. FetchRequest_v3, FetchRequest_v4, FetchRequest_v5
  154. ]
  155. FetchResponse = [
  156. FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
  157. FetchResponse_v3, FetchResponse_v4, FetchResponse_v5
  158. ]