protocol_spy.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. #! python
  2. #
  3. # This module implements a special URL handler that wraps an other port,
  4. # print the traffic for debugging purposes. With this, it is possible
  5. # to debug the serial port traffic on every application that uses
  6. # serial_for_url.
  7. #
  8. # This file is part of pySerial. https://github.com/pyserial/pyserial
  9. # (C) 2015 Chris Liechti <cliechti@gmx.net>
  10. #
  11. # SPDX-License-Identifier: BSD-3-Clause
  12. #
  13. # URL format: spy://port[?option[=value][&option[=value]]]
  14. # options:
  15. # - dev=X a file or device to write to
  16. # - color use escape code to colorize output
  17. # - raw forward raw bytes instead of hexdump
  18. #
  19. # example:
  20. # redirect output to an other terminal window on Posix (Linux):
  21. # python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color
  22. import sys
  23. import time
  24. import serial
  25. try:
  26. import urlparse
  27. except ImportError:
  28. import urllib.parse as urlparse
  29. def sixteen(data):
  30. """\
  31. yield tuples of hex and ASCII display in multiples of 16. Includes a
  32. space after 8 bytes and (None, None) after 16 bytes and at the end.
  33. """
  34. n = 0
  35. for b in serial.iterbytes(data):
  36. yield ('{:02X} '.format(ord(b)), b.decode('ascii') if b' ' <= b < b'\x7f' else '.')
  37. n += 1
  38. if n == 8:
  39. yield (' ', '')
  40. elif n >= 16:
  41. yield (None, None)
  42. n = 0
  43. if n > 0:
  44. while n < 16:
  45. n += 1
  46. if n == 8:
  47. yield (' ', '')
  48. yield (' ', ' ')
  49. yield (None, None)
  50. def hexdump(data):
  51. """yield lines with hexdump of data"""
  52. values = []
  53. ascii = []
  54. offset = 0
  55. for h, a in sixteen(data):
  56. if h is None:
  57. yield (offset, ' '.join([''.join(values), ''.join(ascii)]))
  58. del values[:]
  59. del ascii[:]
  60. offset += 0x10
  61. else:
  62. values.append(h)
  63. ascii.append(a)
  64. class FormatRaw(object):
  65. """Forward only RX and TX data to output."""
  66. def __init__(self, output, color):
  67. self.output = output
  68. self.color = color
  69. self.rx_color = '\x1b[32m'
  70. self.tx_color = '\x1b[31m'
  71. def rx(self, data):
  72. """show received data"""
  73. if self.color:
  74. self.output.write(self.rx_color)
  75. self.output.write(data)
  76. self.output.flush()
  77. def tx(self, data):
  78. """show transmitted data"""
  79. if self.color:
  80. self.output.write(self.tx_color)
  81. self.output.write(data)
  82. self.output.flush()
  83. def control(self, name, value):
  84. """(do not) show control calls"""
  85. pass
  86. class FormatHexdump(object):
  87. """\
  88. Create a hex dump of RX ad TX data, show when control lines are read or
  89. written.
  90. output example::
  91. 000000.000 Q-RX flushInput
  92. 000002.469 RTS inactive
  93. 000002.773 RTS active
  94. 000003.001 TX 48 45 4C 4C 4F HELLO
  95. 000003.102 RX 48 45 4C 4C 4F HELLO
  96. """
  97. def __init__(self, output, color):
  98. self.start_time = time.time()
  99. self.output = output
  100. self.color = color
  101. self.rx_color = '\x1b[32m'
  102. self.tx_color = '\x1b[31m'
  103. self.control_color = '\x1b[37m'
  104. def write_line(self, timestamp, label, value, value2=''):
  105. self.output.write('{:010.3f} {:4} {}{}\n'.format(timestamp, label, value, value2))
  106. self.output.flush()
  107. def rx(self, data):
  108. """show received data as hex dump"""
  109. if self.color:
  110. self.output.write(self.rx_color)
  111. if data:
  112. for offset, row in hexdump(data):
  113. self.write_line(time.time() - self.start_time, 'RX', '{:04X} '.format(offset), row)
  114. else:
  115. self.write_line(time.time() - self.start_time, 'RX', '<empty>')
  116. def tx(self, data):
  117. """show transmitted data as hex dump"""
  118. if self.color:
  119. self.output.write(self.tx_color)
  120. for offset, row in hexdump(data):
  121. self.write_line(time.time() - self.start_time, 'TX', '{:04X} '.format(offset), row)
  122. def control(self, name, value):
  123. """show control calls"""
  124. if self.color:
  125. self.output.write(self.control_color)
  126. self.write_line(time.time() - self.start_time, name, value)
  127. class Serial(serial.Serial):
  128. """\
  129. Inherit the native Serial port implementation and wrap all the methods and
  130. attributes.
  131. """
  132. # pylint: disable=no-member
  133. def __init__(self, *args, **kwargs):
  134. super(Serial, self).__init__(*args, **kwargs)
  135. self.formatter = None
  136. self.show_all = False
  137. @serial.Serial.port.setter
  138. def port(self, value):
  139. if value is not None:
  140. serial.Serial.port.__set__(self, self.from_url(value))
  141. def from_url(self, url):
  142. """extract host and port from an URL string"""
  143. parts = urlparse.urlsplit(url)
  144. if parts.scheme != 'spy':
  145. raise serial.SerialException(
  146. 'expected a string in the form '
  147. '"spy://port[?option[=value][&option[=value]]]": '
  148. 'not starting with spy:// ({!r})'.format(parts.scheme))
  149. # process options now, directly altering self
  150. formatter = FormatHexdump
  151. color = False
  152. output = sys.stderr
  153. try:
  154. for option, values in urlparse.parse_qs(parts.query, True).items():
  155. if option == 'file':
  156. output = open(values[0], 'w')
  157. elif option == 'color':
  158. color = True
  159. elif option == 'raw':
  160. formatter = FormatRaw
  161. elif option == 'all':
  162. self.show_all = True
  163. else:
  164. raise ValueError('unknown option: {!r}'.format(option))
  165. except ValueError as e:
  166. raise serial.SerialException(
  167. 'expected a string in the form '
  168. '"spy://port[?option[=value][&option[=value]]]": {}'.format(e))
  169. self.formatter = formatter(output, color)
  170. return ''.join([parts.netloc, parts.path])
  171. def write(self, tx):
  172. self.formatter.tx(tx)
  173. return super(Serial, self).write(tx)
  174. def read(self, size=1):
  175. rx = super(Serial, self).read(size)
  176. if rx or self.show_all:
  177. self.formatter.rx(rx)
  178. return rx
  179. if hasattr(serial.Serial, 'cancel_read'):
  180. def cancel_read(self):
  181. self.formatter.control('Q-RX', 'cancel_read')
  182. super(Serial, self).cancel_read()
  183. if hasattr(serial.Serial, 'cancel_write'):
  184. def cancel_write(self):
  185. self.formatter.control('Q-TX', 'cancel_write')
  186. super(Serial, self).cancel_write()
  187. @property
  188. def in_waiting(self):
  189. n = super(Serial, self).in_waiting
  190. if self.show_all:
  191. self.formatter.control('Q-RX', 'in_waiting -> {}'.format(n))
  192. return n
  193. def flush(self):
  194. self.formatter.control('Q-TX', 'flush')
  195. super(Serial, self).flush()
  196. def reset_input_buffer(self):
  197. self.formatter.control('Q-RX', 'reset_input_buffer')
  198. super(Serial, self).reset_input_buffer()
  199. def reset_output_buffer(self):
  200. self.formatter.control('Q-TX', 'reset_output_buffer')
  201. super(Serial, self).reset_output_buffer()
  202. def send_break(self, duration=0.25):
  203. self.formatter.control('BRK', 'send_break {}s'.format(duration))
  204. super(Serial, self).send_break(duration)
  205. @serial.Serial.break_condition.setter
  206. def break_condition(self, level):
  207. self.formatter.control('BRK', 'active' if level else 'inactive')
  208. serial.Serial.break_condition.__set__(self, level)
  209. @serial.Serial.rts.setter
  210. def rts(self, level):
  211. self.formatter.control('RTS', 'active' if level else 'inactive')
  212. serial.Serial.rts.__set__(self, level)
  213. @serial.Serial.dtr.setter
  214. def dtr(self, level):
  215. self.formatter.control('DTR', 'active' if level else 'inactive')
  216. serial.Serial.dtr.__set__(self, level)
  217. @serial.Serial.cts.getter
  218. def cts(self):
  219. level = super(Serial, self).cts
  220. self.formatter.control('CTS', 'active' if level else 'inactive')
  221. return level
  222. @serial.Serial.dsr.getter
  223. def dsr(self):
  224. level = super(Serial, self).dsr
  225. self.formatter.control('DSR', 'active' if level else 'inactive')
  226. return level
  227. @serial.Serial.ri.getter
  228. def ri(self):
  229. level = super(Serial, self).ri
  230. self.formatter.control('RI', 'active' if level else 'inactive')
  231. return level
  232. @serial.Serial.cd.getter
  233. def cd(self):
  234. level = super(Serial, self).cd
  235. self.formatter.control('CD', 'active' if level else 'inactive')
  236. return level
  237. # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  238. if __name__ == '__main__':
  239. ser = Serial(None)
  240. ser.port = 'spy:///dev/ttyS0'
  241. print(ser)