message.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. """
  2. The pyro wire protocol message.
  3. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  4. """
  5. import hashlib
  6. import hmac
  7. import struct
  8. import logging
  9. import sys
  10. from Pyro4 import errors, constants
  11. import Pyro4.constants
  12. __all__ = ["Message", "secure_compare"]
  13. log = logging.getLogger("Pyro4.message")
  14. MSG_CONNECT = 1
  15. MSG_CONNECTOK = 2
  16. MSG_CONNECTFAIL = 3
  17. MSG_INVOKE = 4
  18. MSG_RESULT = 5
  19. MSG_PING = 6
  20. FLAGS_EXCEPTION = 1 << 0
  21. FLAGS_COMPRESSED = 1 << 1
  22. FLAGS_ONEWAY = 1 << 2
  23. FLAGS_BATCH = 1 << 3
  24. FLAGS_META_ON_CONNECT = 1 << 4
  25. FLAGS_ITEMSTREAMRESULT = 1 << 5
  26. SERIALIZER_SERPENT = 1
  27. SERIALIZER_JSON = 2
  28. SERIALIZER_MARSHAL = 3
  29. SERIALIZER_PICKLE = 4
  30. SERIALIZER_DILL = 5
  31. class Message(object):
  32. """
  33. Pyro write protocol message.
  34. Wire messages contains of a fixed size header, an optional set of annotation chunks,
  35. and then the payload data. This class doesn't deal with the payload data:
  36. (de)serialization and handling of that data is done elsewhere.
  37. Annotation chunks are only parsed, except the 'HMAC' chunk: that is created
  38. and validated because it is used as a message digest.
  39. The header format is::
  40. 4 id ('PYRO')
  41. 2 protocol version
  42. 2 message type
  43. 2 message flags
  44. 2 sequence number
  45. 4 data length (i.e. 2 Gb data size limitation)
  46. 2 data serialization format (serializer id)
  47. 2 annotations length (total of all chunks, 0 if no annotation chunks present)
  48. 2 (reserved)
  49. 2 checksum
  50. After the header, zero or more annotation chunks may follow, of the format::
  51. 4 id (ASCII)
  52. 2 chunk length
  53. x annotation chunk databytes
  54. After that, the actual payload data bytes follow.
  55. The sequencenumber is used to check if response messages correspond to the
  56. actual request message. This prevents the situation where Pyro would perhaps return
  57. the response data from another remote call (which would not result in an error otherwise!)
  58. This could happen for instance if the socket data stream gets out of sync, perhaps due To
  59. some form of signal that interrupts I/O.
  60. The header checksum is a simple sum of the header fields to make reasonably sure
  61. that we are dealing with an actual correct PYRO protocol header and not some random
  62. data that happens to start with the 'PYRO' protocol identifier.
  63. Pyro now uses two annotation chunks that you should not touch yourself:
  64. 'HMAC' contains the hmac digest of the message data bytes and
  65. all of the annotation chunk data bytes (except those of the HMAC chunk itself).
  66. 'CORR' contains the correlation id (guid bytes)
  67. Other chunk names are free to use for custom purposes, but Pyro has the right
  68. to reserve more of them for internal use in the future.
  69. """
  70. __slots__ = ["type", "flags", "seq", "data", "data_size", "serializer_id", "annotations", "annotations_size", "hmac_key"]
  71. header_format = '!4sHHHHiHHHH'
  72. header_size = struct.calcsize(header_format)
  73. checksum_magic = 0x34E9
  74. def __init__(self, msgType, databytes, serializer_id, flags, seq, annotations=None, hmac_key=None):
  75. self.type = msgType
  76. self.flags = flags
  77. self.seq = seq
  78. self.data = databytes
  79. self.data_size = len(self.data)
  80. self.serializer_id = serializer_id
  81. self.annotations = annotations or {}
  82. self.hmac_key = hmac_key
  83. if self.hmac_key:
  84. self.annotations["HMAC"] = self.hmac() # should be done last because it calculates hmac over other annotations
  85. self.annotations_size = sum([6 + len(v) for v in self.annotations.values()])
  86. if 0 < Pyro4.config.MAX_MESSAGE_SIZE < (self.data_size + self.annotations_size):
  87. raise errors.MessageTooLargeError("max message size exceeded (%d where max=%d)" % (self.data_size + self.annotations_size, Pyro4.config.MAX_MESSAGE_SIZE))
  88. def __repr__(self):
  89. return "<%s.%s at %x; type=%d flags=%d seq=%d datasize=%d #ann=%d>" % (self.__module__, self.__class__.__name__, id(self), self.type, self.flags, self.seq, self.data_size, len(self.annotations))
  90. def to_bytes(self):
  91. """creates a byte stream containing the header followed by annotations (if any) followed by the data"""
  92. return self.__header_bytes() + self.__annotations_bytes() + self.data
  93. def __header_bytes(self):
  94. if not (0 <= self.data_size <= 0x7fffffff):
  95. raise ValueError("invalid message size (outside range 0..2Gb)")
  96. checksum = (self.type + constants.PROTOCOL_VERSION + self.data_size + self.annotations_size + self.serializer_id + self.flags + self.seq + self.checksum_magic) & 0xffff
  97. return struct.pack(self.header_format, b"PYRO", constants.PROTOCOL_VERSION, self.type, self.flags, self.seq, self.data_size, self.serializer_id, self.annotations_size, 0, checksum)
  98. def __annotations_bytes(self):
  99. if self.annotations:
  100. a = []
  101. for k, v in self.annotations.items():
  102. if len(k) != 4:
  103. raise errors.ProtocolError("annotation key must be of length 4")
  104. if sys.version_info >= (3, 0):
  105. k = k.encode("ASCII")
  106. a.append(struct.pack("!4sH", k, len(v)))
  107. a.append(v)
  108. return b"".join(a)
  109. return b""
  110. # Note: this 'chunked' way of sending is not used because it triggers Nagle's algorithm
  111. # on some systems (linux). This causes big delays, unless you change the socket option
  112. # TCP_NODELAY to disable the algorithm. What also works, is sending all the message bytes
  113. # in one go: connection.send(message.to_bytes()). This is what Pyro does.
  114. def send(self, connection):
  115. """send the message as bytes over the connection"""
  116. connection.send(self.__header_bytes())
  117. if self.annotations:
  118. connection.send(self.__annotations_bytes())
  119. connection.send(self.data)
  120. @classmethod
  121. def from_header(cls, headerData):
  122. """Parses a message header. Does not yet process the annotations chunks and message data."""
  123. if not headerData or len(headerData) != cls.header_size:
  124. raise errors.ProtocolError("header data size mismatch")
  125. tag, ver, msg_type, flags, seq, data_size, serializer_id, annotations_size, _, checksum = struct.unpack(cls.header_format, headerData)
  126. if tag != b"PYRO" or ver != constants.PROTOCOL_VERSION:
  127. raise errors.ProtocolError("invalid data or unsupported protocol version")
  128. if checksum != (msg_type + ver + data_size + annotations_size + flags + serializer_id + seq + cls.checksum_magic) & 0xffff:
  129. raise errors.ProtocolError("header checksum mismatch")
  130. msg = Message(msg_type, b"", serializer_id, flags, seq)
  131. msg.data_size = data_size
  132. msg.annotations_size = annotations_size
  133. return msg
  134. @classmethod
  135. def recv(cls, connection, requiredMsgTypes=None, hmac_key=None):
  136. """
  137. Receives a pyro message from a given connection.
  138. Accepts the given message types (None=any, or pass a sequence).
  139. Also reads annotation chunks and the actual payload data.
  140. Validates a HMAC chunk if present.
  141. """
  142. msg = cls.from_header(connection.recv(cls.header_size))
  143. msg.hmac_key = hmac_key
  144. if 0 < Pyro4.config.MAX_MESSAGE_SIZE < (msg.data_size + msg.annotations_size):
  145. errorMsg = "max message size exceeded (%d where max=%d)" % (msg.data_size + msg.annotations_size, Pyro4.config.MAX_MESSAGE_SIZE)
  146. log.error("connection " + str(connection) + ": " + errorMsg)
  147. connection.close() # close the socket because at this point we can't return the correct sequence number for returning an error message
  148. exc = errors.MessageTooLargeError(errorMsg)
  149. exc.pyroMsg = msg
  150. raise exc
  151. if requiredMsgTypes and msg.type not in requiredMsgTypes:
  152. err = "invalid msg type %d received" % msg.type
  153. log.error(err)
  154. exc = errors.ProtocolError(err)
  155. exc.pyroMsg = msg
  156. raise exc
  157. if msg.annotations_size:
  158. # read annotation chunks
  159. annotations_data = connection.recv(msg.annotations_size)
  160. msg.annotations = {}
  161. i = 0
  162. while i < msg.annotations_size:
  163. anno, length = struct.unpack("!4sH", annotations_data[i:i + 6])
  164. if sys.version_info >= (3, 0):
  165. anno = anno.decode("ASCII")
  166. msg.annotations[anno] = annotations_data[i + 6:i + 6 + length]
  167. if sys.platform == "cli":
  168. msg.annotations[anno] = bytes(msg.annotations[anno])
  169. i += 6 + length
  170. # read data
  171. msg.data = connection.recv(msg.data_size)
  172. if "HMAC" in msg.annotations and hmac_key:
  173. if not secure_compare(msg.annotations["HMAC"], msg.hmac()):
  174. exc = errors.SecurityError("message hmac mismatch")
  175. exc.pyroMsg = msg
  176. raise exc
  177. elif ("HMAC" in msg.annotations) != bool(hmac_key):
  178. # Not allowed: message contains hmac but hmac_key is not set, or vice versa.
  179. err = "hmac key config not symmetric"
  180. log.warning(err)
  181. exc = errors.SecurityError(err)
  182. exc.pyroMsg = msg
  183. raise exc
  184. return msg
  185. def hmac(self):
  186. """returns the hmac of the data and the annotation chunk values (except HMAC chunk itself)"""
  187. mac = hmac.new(self.hmac_key, self.data, digestmod=hashlib.sha1)
  188. for k, v in sorted(self.annotations.items()): # note: sorted because we need fixed order to get the same hmac
  189. if k != "HMAC":
  190. mac.update(v)
  191. return mac.digest() if sys.platform != "cli" else bytes(mac.digest())
  192. @staticmethod
  193. def ping(pyroConnection, hmac_key=None):
  194. """Convenience method to send a 'ping' message and wait for the 'pong' response"""
  195. ping = Message(MSG_PING, b"ping", 42, 0, 0, hmac_key=hmac_key)
  196. pyroConnection.send(ping.to_bytes())
  197. Message.recv(pyroConnection, [MSG_PING])
  198. try:
  199. from hmac import compare_digest as secure_compare
  200. except ImportError:
  201. # Python version doesn't have it natively, use a python fallback implementation
  202. import operator
  203. try:
  204. reduce
  205. except NameError:
  206. from functools import reduce
  207. def secure_compare(a, b):
  208. if type(a) != type(b):
  209. raise TypeError("arguments must both be same type")
  210. if len(a) != len(b):
  211. return False
  212. return reduce(operator.and_, map(operator.eq, a, b), True)