frame.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. """Frame objects that do the frame demarshaling and marshaling."""
  2. import logging
  3. import struct
  4. from pika import amqp_object
  5. from pika import exceptions
  6. from pika import spec
  7. from pika.compat import byte
  8. LOGGER = logging.getLogger(__name__)
  9. class Frame(amqp_object.AMQPObject):
  10. """Base Frame object mapping. Defines a behavior for all child classes for
  11. assignment of core attributes and implementation of the a core _marshal
  12. method which child classes use to create the binary AMQP frame.
  13. """
  14. NAME = 'Frame'
  15. def __init__(self, frame_type, channel_number):
  16. """Create a new instance of a frame
  17. :param int frame_type: The frame type
  18. :param int channel_number: The channel number for the frame
  19. """
  20. self.frame_type = frame_type
  21. self.channel_number = channel_number
  22. def _marshal(self, pieces):
  23. """Create the full AMQP wire protocol frame data representation
  24. :rtype: bytes
  25. """
  26. payload = b''.join(pieces)
  27. return struct.pack('>BHI', self.frame_type, self.channel_number,
  28. len(payload)) + payload + byte(spec.FRAME_END)
  29. def marshal(self):
  30. """To be ended by child classes
  31. :raises NotImplementedError
  32. """
  33. raise NotImplementedError
  34. class Method(Frame):
  35. """Base Method frame object mapping. AMQP method frames are mapped on top
  36. of this class for creating or accessing their data and attributes.
  37. """
  38. NAME = 'METHOD'
  39. def __init__(self, channel_number, method):
  40. """Create a new instance of a frame
  41. :param int channel_number: The frame type
  42. :param pika.Spec.Class.Method method: The AMQP Class.Method
  43. """
  44. Frame.__init__(self, spec.FRAME_METHOD, channel_number)
  45. self.method = method
  46. def marshal(self):
  47. """Return the AMQP binary encoded value of the frame
  48. :rtype: str
  49. """
  50. pieces = self.method.encode()
  51. pieces.insert(0, struct.pack('>I', self.method.INDEX))
  52. return self._marshal(pieces)
  53. class Header(Frame):
  54. """Header frame object mapping. AMQP content header frames are mapped
  55. on top of this class for creating or accessing their data and attributes.
  56. """
  57. NAME = 'Header'
  58. def __init__(self, channel_number, body_size, props):
  59. """Create a new instance of a AMQP ContentHeader object
  60. :param int channel_number: The channel number for the frame
  61. :param int body_size: The number of bytes for the body
  62. :param pika.spec.BasicProperties props: Basic.Properties object
  63. """
  64. Frame.__init__(self, spec.FRAME_HEADER, channel_number)
  65. self.body_size = body_size
  66. self.properties = props
  67. def marshal(self):
  68. """Return the AMQP binary encoded value of the frame
  69. :rtype: str
  70. """
  71. pieces = self.properties.encode()
  72. pieces.insert(
  73. 0, struct.pack('>HxxQ', self.properties.INDEX, self.body_size))
  74. return self._marshal(pieces)
  75. class Body(Frame):
  76. """Body frame object mapping class. AMQP content body frames are mapped on
  77. to this base class for getting/setting of attributes/data.
  78. """
  79. NAME = 'Body'
  80. def __init__(self, channel_number, fragment):
  81. """
  82. Parameters:
  83. - channel_number: int
  84. - fragment: unicode or str
  85. """
  86. Frame.__init__(self, spec.FRAME_BODY, channel_number)
  87. self.fragment = fragment
  88. def marshal(self):
  89. """Return the AMQP binary encoded value of the frame
  90. :rtype: str
  91. """
  92. return self._marshal([self.fragment])
  93. class Heartbeat(Frame):
  94. """Heartbeat frame object mapping class. AMQP Heartbeat frames are mapped
  95. on to this class for a common access structure to the attributes/data
  96. values.
  97. """
  98. NAME = 'Heartbeat'
  99. def __init__(self):
  100. """Create a new instance of the Heartbeat frame"""
  101. Frame.__init__(self, spec.FRAME_HEARTBEAT, 0)
  102. def marshal(self):
  103. """Return the AMQP binary encoded value of the frame
  104. :rtype: str
  105. """
  106. return self._marshal(list())
  107. class ProtocolHeader(amqp_object.AMQPObject):
  108. """AMQP Protocol header frame class which provides a pythonic interface
  109. for creating AMQP Protocol headers
  110. """
  111. NAME = 'ProtocolHeader'
  112. def __init__(self, major=None, minor=None, revision=None):
  113. """Construct a Protocol Header frame object for the specified AMQP
  114. version
  115. :param int major: Major version number
  116. :param int minor: Minor version number
  117. :param int revision: Revision
  118. """
  119. self.frame_type = -1
  120. self.major = major or spec.PROTOCOL_VERSION[0]
  121. self.minor = minor or spec.PROTOCOL_VERSION[1]
  122. self.revision = revision or spec.PROTOCOL_VERSION[2]
  123. def marshal(self):
  124. """Return the full AMQP wire protocol frame data representation of the
  125. ProtocolHeader frame
  126. :rtype: str
  127. """
  128. return b'AMQP' + struct.pack('BBBB', 0, self.major, self.minor,
  129. self.revision)
  130. def decode_frame(data_in): # pylint: disable=R0911,R0914
  131. """Receives raw socket data and attempts to turn it into a frame.
  132. Returns bytes used to make the frame and the frame
  133. :param str data_in: The raw data stream
  134. :rtype: tuple(bytes consumed, frame)
  135. :raises: pika.exceptions.InvalidFrameError
  136. """
  137. # Look to see if it's a protocol header frame
  138. try:
  139. if data_in[0:4] == b'AMQP':
  140. major, minor, revision = struct.unpack_from('BBB', data_in, 5)
  141. return 8, ProtocolHeader(major, minor, revision)
  142. except (IndexError, struct.error):
  143. return 0, None
  144. # Get the Frame Type, Channel Number and Frame Size
  145. try:
  146. (frame_type, channel_number, frame_size) = struct.unpack(
  147. '>BHL', data_in[0:7])
  148. except struct.error:
  149. return 0, None
  150. # Get the frame data
  151. frame_end = spec.FRAME_HEADER_SIZE + frame_size + spec.FRAME_END_SIZE
  152. # We don't have all of the frame yet
  153. if frame_end > len(data_in):
  154. return 0, None
  155. # The Frame termination chr is wrong
  156. if data_in[frame_end - 1:frame_end] != byte(spec.FRAME_END):
  157. raise exceptions.InvalidFrameError("Invalid FRAME_END marker")
  158. # Get the raw frame data
  159. frame_data = data_in[spec.FRAME_HEADER_SIZE:frame_end - 1]
  160. if frame_type == spec.FRAME_METHOD:
  161. # Get the Method ID from the frame data
  162. method_id = struct.unpack_from('>I', frame_data)[0]
  163. # Get a Method object for this method_id
  164. method = spec.methods[method_id]()
  165. # Decode the content
  166. method.decode(frame_data, 4)
  167. # Return the amount of data consumed and the Method object
  168. return frame_end, Method(channel_number, method)
  169. elif frame_type == spec.FRAME_HEADER:
  170. # Return the header class and body size
  171. class_id, weight, body_size = struct.unpack_from('>HHQ', frame_data)
  172. # Get the Properties type
  173. properties = spec.props[class_id]()
  174. # Decode the properties
  175. out = properties.decode(frame_data[12:])
  176. # Return a Header frame
  177. return frame_end, Header(channel_number, body_size, properties)
  178. elif frame_type == spec.FRAME_BODY:
  179. # Return the amount of data consumed and the Body frame w/ data
  180. return frame_end, Body(channel_number, frame_data)
  181. elif frame_type == spec.FRAME_HEARTBEAT:
  182. # Return the amount of data and a Heartbeat frame
  183. return frame_end, Heartbeat()
  184. raise exceptions.InvalidFrameError("Unknown frame type: %i" % frame_type)