123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- """Frame objects that do the frame demarshaling and marshaling."""
- import logging
- import struct
- from pika import amqp_object
- from pika import exceptions
- from pika import spec
- from pika.compat import byte
- LOGGER = logging.getLogger(__name__)
- class Frame(amqp_object.AMQPObject):
- """Base Frame object mapping. Defines a behavior for all child classes for
- assignment of core attributes and implementation of the a core _marshal
- method which child classes use to create the binary AMQP frame.
- """
- NAME = 'Frame'
- def __init__(self, frame_type, channel_number):
- """Create a new instance of a frame
- :param int frame_type: The frame type
- :param int channel_number: The channel number for the frame
- """
- self.frame_type = frame_type
- self.channel_number = channel_number
- def _marshal(self, pieces):
- """Create the full AMQP wire protocol frame data representation
- :rtype: bytes
- """
- payload = b''.join(pieces)
- return struct.pack('>BHI', self.frame_type, self.channel_number,
- len(payload)) + payload + byte(spec.FRAME_END)
- def marshal(self):
- """To be ended by child classes
- :raises NotImplementedError
- """
- raise NotImplementedError
- class Method(Frame):
- """Base Method frame object mapping. AMQP method frames are mapped on top
- of this class for creating or accessing their data and attributes.
- """
- NAME = 'METHOD'
- def __init__(self, channel_number, method):
- """Create a new instance of a frame
- :param int channel_number: The frame type
- :param pika.Spec.Class.Method method: The AMQP Class.Method
- """
- Frame.__init__(self, spec.FRAME_METHOD, channel_number)
- self.method = method
- def marshal(self):
- """Return the AMQP binary encoded value of the frame
- :rtype: str
- """
- pieces = self.method.encode()
- pieces.insert(0, struct.pack('>I', self.method.INDEX))
- return self._marshal(pieces)
- class Header(Frame):
- """Header frame object mapping. AMQP content header frames are mapped
- on top of this class for creating or accessing their data and attributes.
- """
- NAME = 'Header'
- def __init__(self, channel_number, body_size, props):
- """Create a new instance of a AMQP ContentHeader object
- :param int channel_number: The channel number for the frame
- :param int body_size: The number of bytes for the body
- :param pika.spec.BasicProperties props: Basic.Properties object
- """
- Frame.__init__(self, spec.FRAME_HEADER, channel_number)
- self.body_size = body_size
- self.properties = props
- def marshal(self):
- """Return the AMQP binary encoded value of the frame
- :rtype: str
- """
- pieces = self.properties.encode()
- pieces.insert(
- 0, struct.pack('>HxxQ', self.properties.INDEX, self.body_size))
- return self._marshal(pieces)
- class Body(Frame):
- """Body frame object mapping class. AMQP content body frames are mapped on
- to this base class for getting/setting of attributes/data.
- """
- NAME = 'Body'
- def __init__(self, channel_number, fragment):
- """
- Parameters:
- - channel_number: int
- - fragment: unicode or str
- """
- Frame.__init__(self, spec.FRAME_BODY, channel_number)
- self.fragment = fragment
- def marshal(self):
- """Return the AMQP binary encoded value of the frame
- :rtype: str
- """
- return self._marshal([self.fragment])
- class Heartbeat(Frame):
- """Heartbeat frame object mapping class. AMQP Heartbeat frames are mapped
- on to this class for a common access structure to the attributes/data
- values.
- """
- NAME = 'Heartbeat'
- def __init__(self):
- """Create a new instance of the Heartbeat frame"""
- Frame.__init__(self, spec.FRAME_HEARTBEAT, 0)
- def marshal(self):
- """Return the AMQP binary encoded value of the frame
- :rtype: str
- """
- return self._marshal(list())
- class ProtocolHeader(amqp_object.AMQPObject):
- """AMQP Protocol header frame class which provides a pythonic interface
- for creating AMQP Protocol headers
- """
- NAME = 'ProtocolHeader'
- def __init__(self, major=None, minor=None, revision=None):
- """Construct a Protocol Header frame object for the specified AMQP
- version
- :param int major: Major version number
- :param int minor: Minor version number
- :param int revision: Revision
- """
- self.frame_type = -1
- self.major = major or spec.PROTOCOL_VERSION[0]
- self.minor = minor or spec.PROTOCOL_VERSION[1]
- self.revision = revision or spec.PROTOCOL_VERSION[2]
- def marshal(self):
- """Return the full AMQP wire protocol frame data representation of the
- ProtocolHeader frame
- :rtype: str
- """
- return b'AMQP' + struct.pack('BBBB', 0, self.major, self.minor,
- self.revision)
- def decode_frame(data_in): # pylint: disable=R0911,R0914
- """Receives raw socket data and attempts to turn it into a frame.
- Returns bytes used to make the frame and the frame
- :param str data_in: The raw data stream
- :rtype: tuple(bytes consumed, frame)
- :raises: pika.exceptions.InvalidFrameError
- """
- # Look to see if it's a protocol header frame
- try:
- if data_in[0:4] == b'AMQP':
- major, minor, revision = struct.unpack_from('BBB', data_in, 5)
- return 8, ProtocolHeader(major, minor, revision)
- except (IndexError, struct.error):
- return 0, None
- # Get the Frame Type, Channel Number and Frame Size
- try:
- (frame_type, channel_number, frame_size) = struct.unpack(
- '>BHL', data_in[0:7])
- except struct.error:
- return 0, None
- # Get the frame data
- frame_end = spec.FRAME_HEADER_SIZE + frame_size + spec.FRAME_END_SIZE
- # We don't have all of the frame yet
- if frame_end > len(data_in):
- return 0, None
- # The Frame termination chr is wrong
- if data_in[frame_end - 1:frame_end] != byte(spec.FRAME_END):
- raise exceptions.InvalidFrameError("Invalid FRAME_END marker")
- # Get the raw frame data
- frame_data = data_in[spec.FRAME_HEADER_SIZE:frame_end - 1]
- if frame_type == spec.FRAME_METHOD:
- # Get the Method ID from the frame data
- method_id = struct.unpack_from('>I', frame_data)[0]
- # Get a Method object for this method_id
- method = spec.methods[method_id]()
- # Decode the content
- method.decode(frame_data, 4)
- # Return the amount of data consumed and the Method object
- return frame_end, Method(channel_number, method)
- elif frame_type == spec.FRAME_HEADER:
- # Return the header class and body size
- class_id, weight, body_size = struct.unpack_from('>HHQ', frame_data)
- # Get the Properties type
- properties = spec.props[class_id]()
- # Decode the properties
- out = properties.decode(frame_data[12:])
- # Return a Header frame
- return frame_end, Header(channel_number, body_size, properties)
- elif frame_type == spec.FRAME_BODY:
- # Return the amount of data consumed and the Body frame w/ data
- return frame_end, Body(channel_number, frame_data)
- elif frame_type == spec.FRAME_HEARTBEAT:
- # Return the amount of data and a Heartbeat frame
- return frame_end, Heartbeat()
- raise exceptions.InvalidFrameError("Unknown frame type: %i" % frame_type)
|