select_response.py 9.8 KB


  1. import platform
  2. import struct
  3. import requests
  4. from .compat import to_bytes
  5. from .exceptions import RequestError
  6. from .exceptions import SelectOperationFailed
  7. from .exceptions import SelectOperationClientError
  8. from .exceptions import InconsistentError
  9. from . import utils
  10. import logging
  11. logger = logging.getLogger(__name__)
  12. """
  13. The adapter class for Select object's response.
  14. The response consists of frames. Each frame has the following format:
  15. Type | Payload Length | Header Checksum | Payload | Payload Checksum
  16. |<4-->| <--4 bytes------><---4 bytes-------><-n/a-----><--4 bytes--------->
  17. And we have three kind of frames.
  18. Data Frame:
  19. Type:8388609
  20. Payload: Offset | Data
  21. <-8 bytes>
  22. Continuous Frame
  23. Type:8388612
  24. Payload: Offset (8-bytes)
  25. End Frame
  26. Type:8388613
  27. Payload: Offset | total scanned bytes | http status code | error message
  28. <-- 8bytes--><-----8 bytes--------><---4 bytes-------><---variabe--->
  29. """
  30. class SelectResponseAdapter(object):
  31. _CHUNK_SIZE = 8 * 1024
  32. _CONTINIOUS_FRAME_TYPE=8388612
  33. _DATA_FRAME_TYPE = 8388609
  34. _END_FRAME_TYPE = 8388613
  35. _META_END_FRAME_TYPE = 8388614
  36. _JSON_META_END_FRAME_TYPE = 8388615
  37. _FRAMES_FOR_PROGRESS_UPDATE = 10
  38. def __init__(self, response, progress_callback = None, content_length = None, enable_crc = False):
  39. self.response = response
  40. self.frame_off_set = 0
  41. self.frame_length = 0
  42. self.frame_data = b''
  43. self.check_sum_flag = 0
  44. self.file_offset = 0
  45. self.finished = 0
  46. self.raw_buffer = b''
  47. self.raw_buffer_offset = 0
  48. #self.resp_content_iter = response.__iter__()
  49. self.callback = progress_callback
  50. self.frames_since_last_progress_report = 0
  51. self.content_length = content_length
  52. self.resp_content_iter = response.__iter__()
  53. self.enable_crc = enable_crc
  54. self.payload = b''
  55. self.output_raw_data = response.headers.get("x-oss-select-output-raw", '') == "true"
  56. self.request_id = response.headers.get("x-oss-request-id",'')
  57. self.splits = 0
  58. self.rows = 0
  59. self.columns = 0
  60. def read(self):
  61. if self.finished:
  62. return b''
  63. content=b''
  64. for data in self:
  65. content += data
  66. return content
  67. def __iter__(self):
  68. return self
  69. def __next__(self):
  70. return self.next()
  71. def next(self):
  72. if self.output_raw_data == True:
  73. data = next(self.resp_content_iter)
  74. if len(data) != 0:
  75. return data
  76. else: raise StopIteration
  77. while self.finished == 0:
  78. if self.frame_off_set < self.frame_length:
  79. data = self.frame_data[self.frame_off_set : self.frame_length]
  80. self.frame_length = self.frame_off_set = 0
  81. return data
  82. else:
  83. self.read_next_frame()
  84. self.frames_since_last_progress_report += 1
  85. if (self.frames_since_last_progress_report >= SelectResponseAdapter._FRAMES_FOR_PROGRESS_UPDATE and self.callback is not None):
  86. self.callback(self.file_offset, self.content_length)
  87. self.frames_since_last_progress_report = 0
  88. raise StopIteration
  89. def read_raw(self, amt):
  90. ret = b''
  91. read_count = 0
  92. while amt > 0 and self.finished == 0:
  93. size = len(self.raw_buffer)
  94. if size == 0:
  95. self.raw_buffer = next(self.resp_content_iter)
  96. self.raw_buffer_offset = 0
  97. size = len(self.raw_buffer)
  98. if size == 0:
  99. break
  100. if size - self.raw_buffer_offset >= amt:
  101. data = self.raw_buffer[self.raw_buffer_offset:self.raw_buffer_offset + amt]
  102. data_size = len(data)
  103. self.raw_buffer_offset += data_size
  104. ret += data
  105. read_count += data_size
  106. amt -= data_size
  107. else:
  108. data = self.raw_buffer[self.raw_buffer_offset:]
  109. data_len = len(data)
  110. ret += data
  111. read_count += data_len
  112. amt -= data_len
  113. self.raw_buffer = b''
  114. return ret
  115. def read_next_frame(self):
  116. frame_type = bytearray(self.read_raw(4))
  117. payload_length = bytearray(self.read_raw(4))
  118. utils.change_endianness_if_needed(payload_length) # convert to little endian
  119. payload_length_val = struct.unpack("I", bytes(payload_length))[0]
  120. header_checksum = bytearray(self.read_raw(4))
  121. frame_type[0] = 0 #mask the version bit
  122. utils.change_endianness_if_needed(frame_type) # convert to little endian
  123. frame_type_val = struct.unpack("I", bytes(frame_type))[0]
  124. if (frame_type_val != SelectResponseAdapter._DATA_FRAME_TYPE and
  125. frame_type_val != SelectResponseAdapter._CONTINIOUS_FRAME_TYPE and
  126. frame_type_val != SelectResponseAdapter._END_FRAME_TYPE and
  127. frame_type_val != SelectResponseAdapter._META_END_FRAME_TYPE and
  128. frame_type_val != SelectResponseAdapter._JSON_META_END_FRAME_TYPE):
  129. logger.warning("Unexpected frame type: {0}. RequestId:{1}. This could be due to the old version of client.".format(frame_type_val, self.request_id))
  130. raise SelectOperationClientError(self.request_id, "Unexpected frame type:" + str(frame_type_val))
  131. self.payload = self.read_raw(payload_length_val)
  132. file_offset_bytes = bytearray(self.payload[0:8])
  133. utils.change_endianness_if_needed(file_offset_bytes)
  134. self.file_offset = struct.unpack("Q", bytes(file_offset_bytes))[0]
  135. if frame_type_val == SelectResponseAdapter._DATA_FRAME_TYPE:
  136. self.frame_length = payload_length_val - 8
  137. self.frame_off_set = 0
  138. self.check_sum_flag=1
  139. self.frame_data = self.payload[8:]
  140. checksum = bytearray(self.read_raw(4)) #read checksum crc32
  141. utils.change_endianness_if_needed(checksum)
  142. checksum_val = struct.unpack("I", bytes(checksum))[0]
  143. if self.enable_crc:
  144. crc32 = utils.Crc32()
  145. crc32.update(self.payload)
  146. checksum_calc = crc32.crc
  147. if checksum_val != checksum_calc:
  148. logger.warning("Incorrect checksum: Actual {0} and calculated {1}. RequestId:{2}".format(checksum_val, checksum_calc, self.request_id))
  149. raise InconsistentError("Incorrect checksum: Actual" + str(checksum_val) + ". Calculated:" + str(checksum_calc), self.request_id)
  150. elif frame_type_val == SelectResponseAdapter._CONTINIOUS_FRAME_TYPE:
  151. self.frame_length = self.frame_off_set = 0
  152. self.check_sum_flag=1
  153. self.read_raw(4)
  154. elif frame_type_val == SelectResponseAdapter._END_FRAME_TYPE:
  155. self.frame_off_set = 0
  156. scanned_size_bytes = bytearray(self.payload[8:16])
  157. status_bytes = bytearray(self.payload[16:20])
  158. utils.change_endianness_if_needed(status_bytes)
  159. status = struct.unpack("I", bytes(status_bytes))[0]
  160. error_msg_size = payload_length_val - 20
  161. error_msg=b''
  162. error_code = b''
  163. if error_msg_size > 0:
  164. error_msg = self.payload[20:error_msg_size + 20]
  165. error_code_index = error_msg.find(b'.')
  166. if error_code_index >= 0 and error_code_index < error_msg_size - 1:
  167. error_code = error_msg[0:error_code_index]
  168. error_msg = error_msg[error_code_index + 1:]
  169. if status // 100 != 2:
  170. raise SelectOperationFailed(status, error_code, error_msg)
  171. self.frame_length = 0
  172. if self.callback is not None:
  173. self.callback(self.file_offset, self.content_length)
  174. self.read_raw(4) # read the payload checksum
  175. self.frame_length = 0
  176. self.finished = 1
  177. elif frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE or frame_type_val == SelectResponseAdapter._JSON_META_END_FRAME_TYPE:
  178. self.frame_off_set = 0
  179. scanned_size_bytes = bytearray(self.payload[8:16])
  180. status_bytes = bytearray(self.payload[16:20])
  181. utils.change_endianness_if_needed(status_bytes)
  182. status = struct.unpack("I", bytes(status_bytes))[0]
  183. splits_bytes = bytearray(self.payload[20:24])
  184. utils.change_endianness_if_needed(splits_bytes)
  185. self.splits = struct.unpack("I", bytes(splits_bytes))[0]
  186. lines_bytes = bytearray(self.payload[24:32])
  187. utils.change_endianness_if_needed(lines_bytes)
  188. self.rows = struct.unpack("Q", bytes(lines_bytes))[0]
  189. error_index = 36
  190. if frame_type_val == SelectResponseAdapter._META_END_FRAME_TYPE:
  191. column_bytes = bytearray(self.payload[32:36])
  192. utils.change_endianness_if_needed(column_bytes)
  193. self.columns = struct.unpack("I", bytes(column_bytes))[0]
  194. else:
  195. error_index = 32
  196. error_size = payload_length_val - error_index
  197. error_msg = b''
  198. error_code = b''
  199. if (error_size > 0):
  200. error_msg = self.payload[error_index:error_index + error_size]
  201. error_code_index = error_msg.find(b'.')
  202. if error_code_index >= 0 and error_code_index < error_size - 1:
  203. error_code = error_msg[0:error_code_index]
  204. error_msg = error_msg[error_code_index + 1:]
  205. self.read_raw(4) # read the payload checksum
  206. self.final_status = status
  207. self.frame_length = 0
  208. self.finished = 1
  209. if (status / 100 != 2):
  210. raise SelectOperationFailed(status, error_code, error_msg)