produce.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from __future__ import absolute_import
  2. from .api import Request, Response
  3. from .message import MessageSet
  4. from .types import Int16, Int32, Int64, String, Array, Schema
  5. class ProduceResponse_v0(Response):
  6. API_KEY = 0
  7. API_VERSION = 0
  8. SCHEMA = Schema(
  9. ('topics', Array(
  10. ('topic', String('utf-8')),
  11. ('partitions', Array(
  12. ('partition', Int32),
  13. ('error_code', Int16),
  14. ('offset', Int64)))))
  15. )
  16. class ProduceResponse_v1(Response):
  17. API_KEY = 0
  18. API_VERSION = 1
  19. SCHEMA = Schema(
  20. ('topics', Array(
  21. ('topic', String('utf-8')),
  22. ('partitions', Array(
  23. ('partition', Int32),
  24. ('error_code', Int16),
  25. ('offset', Int64))))),
  26. ('throttle_time_ms', Int32)
  27. )
  28. class ProduceResponse_v2(Response):
  29. API_KEY = 0
  30. API_VERSION = 2
  31. SCHEMA = Schema(
  32. ('topics', Array(
  33. ('topic', String('utf-8')),
  34. ('partitions', Array(
  35. ('partition', Int32),
  36. ('error_code', Int16),
  37. ('offset', Int64),
  38. ('timestamp', Int64))))),
  39. ('throttle_time_ms', Int32)
  40. )
  41. class ProduceResponse_v3(Response):
  42. API_KEY = 0
  43. API_VERSION = 3
  44. SCHEMA = ProduceResponse_v2.SCHEMA
  45. class ProduceRequest_v0(Request):
  46. API_KEY = 0
  47. API_VERSION = 0
  48. RESPONSE_TYPE = ProduceResponse_v0
  49. SCHEMA = Schema(
  50. ('required_acks', Int16),
  51. ('timeout', Int32),
  52. ('topics', Array(
  53. ('topic', String('utf-8')),
  54. ('partitions', Array(
  55. ('partition', Int32),
  56. ('messages', MessageSet)))))
  57. )
  58. def expect_response(self):
  59. if self.required_acks == 0: # pylint: disable=no-member
  60. return False
  61. return True
  62. class ProduceRequest_v1(Request):
  63. API_KEY = 0
  64. API_VERSION = 1
  65. RESPONSE_TYPE = ProduceResponse_v1
  66. SCHEMA = ProduceRequest_v0.SCHEMA
  67. def expect_response(self):
  68. if self.required_acks == 0: # pylint: disable=no-member
  69. return False
  70. return True
  71. class ProduceRequest_v2(Request):
  72. API_KEY = 0
  73. API_VERSION = 2
  74. RESPONSE_TYPE = ProduceResponse_v2
  75. SCHEMA = ProduceRequest_v1.SCHEMA
  76. def expect_response(self):
  77. if self.required_acks == 0: # pylint: disable=no-member
  78. return False
  79. return True
  80. class ProduceRequest_v3(Request):
  81. API_KEY = 0
  82. API_VERSION = 3
  83. RESPONSE_TYPE = ProduceResponse_v3
  84. SCHEMA = Schema(
  85. ('transactional_id', String('utf-8')),
  86. ('required_acks', Int16),
  87. ('timeout', Int32),
  88. ('topics', Array(
  89. ('topic', String('utf-8')),
  90. ('partitions', Array(
  91. ('partition', Int32),
  92. ('messages', MessageSet)))))
  93. )
  94. def expect_response(self):
  95. if self.required_acks == 0: # pylint: disable=no-member
  96. return False
  97. return True
  98. ProduceRequest = [
  99. ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
  100. ProduceRequest_v3
  101. ]
  102. ProduceResponse = [
  103. ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
  104. ProduceResponse_v2
  105. ]