test_v2parser.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test cases for L{twisted.protocols.haproxy.V2Parser}.
  5. """
  6. from twisted.trial import unittest
  7. from twisted.internet import address
  8. from .._exceptions import InvalidProxyHeader
  9. from .. import _v2parser
  10. V2_SIGNATURE = b'\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A'
  11. def _makeHeaderIPv6(sig=V2_SIGNATURE, verCom=b'\x21', famProto=b'\x21',
  12. addrLength=b'\x00\x24',
  13. addrs=((b'\x00' * 15) + b'\x01') * 2,
  14. ports=b'\x1F\x90\x22\xB8'):
  15. """
  16. Construct a version 2 IPv6 header with custom bytes.
  17. @param sig: The protocol signature; defaults to valid L{V2_SIGNATURE}.
  18. @type sig: L{bytes}
  19. @param verCom: Protocol version and command. Defaults to V2 PROXY.
  20. @type verCom: L{bytes}
  21. @param famProto: Address family and protocol. Defaults to AF_INET6/STREAM.
  22. @type famProto: L{bytes}
  23. @param addrLength: Network-endian byte length of payload. Defaults to
  24. description of default addrs/ports.
  25. @type addrLength: L{bytes}
  26. @param addrs: Address payload. Defaults to C{::1} for source and
  27. destination.
  28. @type addrs: L{bytes}
  29. @param ports: Source and destination ports. Defaults to 8080 for source
  30. 8888 for destination.
  31. @type ports: L{bytes}
  32. @return: A packet with header, addresses, and ports.
  33. @rtype: L{bytes}
  34. """
  35. return sig + verCom + famProto + addrLength + addrs + ports
  36. def _makeHeaderIPv4(sig=V2_SIGNATURE, verCom=b'\x21', famProto=b'\x11',
  37. addrLength=b'\x00\x0C',
  38. addrs=b'\x7F\x00\x00\x01\x7F\x00\x00\x01',
  39. ports=b'\x1F\x90\x22\xB8'):
  40. """
  41. Construct a version 2 IPv4 header with custom bytes.
  42. @param sig: The protocol signature; defaults to valid L{V2_SIGNATURE}.
  43. @type sig: L{bytes}
  44. @param verCom: Protocol version and command. Defaults to V2 PROXY.
  45. @type verCom: L{bytes}
  46. @param famProto: Address family and protocol. Defaults to AF_INET/STREAM.
  47. @type famProto: L{bytes}
  48. @param addrLength: Network-endian byte length of payload. Defaults to
  49. description of default addrs/ports.
  50. @type addrLength: L{bytes}
  51. @param addrs: Address payload. Defaults to 127.0.0.1 for source and
  52. destination.
  53. @type addrs: L{bytes}
  54. @param ports: Source and destination ports. Defaults to 8080 for source
  55. 8888 for destination.
  56. @type ports: L{bytes}
  57. @return: A packet with header, addresses, and ports.
  58. @rtype: L{bytes}
  59. """
  60. return sig + verCom + famProto + addrLength + addrs + ports
  61. def _makeHeaderUnix(sig=V2_SIGNATURE, verCom=b'\x21', famProto=b'\x31',
  62. addrLength=b'\x00\xD8',
  63. addrs=(b'\x2F\x68\x6F\x6D\x65\x2F\x74\x65\x73\x74\x73\x2F'
  64. b'\x6D\x79\x73\x6F\x63\x6B\x65\x74\x73\x2F\x73\x6F'
  65. b'\x63\x6B' + (b'\x00' * 82)) * 2):
  66. """
  67. Construct a version 2 IPv4 header with custom bytes.
  68. @param sig: The protocol signature; defaults to valid L{V2_SIGNATURE}.
  69. @type sig: L{bytes}
  70. @param verCom: Protocol version and command. Defaults to V2 PROXY.
  71. @type verCom: L{bytes}
  72. @param famProto: Address family and protocol. Defaults to AF_UNIX/STREAM.
  73. @type famProto: L{bytes}
  74. @param addrLength: Network-endian byte length of payload. Defaults to 108
  75. bytes for 2 null terminated paths.
  76. @type addrLength: L{bytes}
  77. @param addrs: Address payload. Defaults to C{/home/tests/mysockets/sock}
  78. for source and destination paths.
  79. @type addrs: L{bytes}
  80. @return: A packet with header, addresses, and8 ports.
  81. @rtype: L{bytes}
  82. """
  83. return sig + verCom + famProto + addrLength + addrs
  84. class V2ParserTests(unittest.TestCase):
  85. """
  86. Test L{twisted.protocols.haproxy.V2Parser} behaviour.
  87. """
  88. def test_happyPathIPv4(self):
  89. """
  90. Test if a well formed IPv4 header is parsed without error.
  91. """
  92. header = _makeHeaderIPv4()
  93. self.assertTrue(_v2parser.V2Parser.parse(header))
  94. def test_happyPathIPv6(self):
  95. """
  96. Test if a well formed IPv6 header is parsed without error.
  97. """
  98. header = _makeHeaderIPv6()
  99. self.assertTrue(_v2parser.V2Parser.parse(header))
  100. def test_happyPathUnix(self):
  101. """
  102. Test if a well formed UNIX header is parsed without error.
  103. """
  104. header = _makeHeaderUnix()
  105. self.assertTrue(_v2parser.V2Parser.parse(header))
  106. def test_invalidSignature(self):
  107. """
  108. Test if an invalid signature block raises InvalidProxyError.
  109. """
  110. header = _makeHeaderIPv4(sig=b'\x00'*12)
  111. self.assertRaises(
  112. InvalidProxyHeader,
  113. _v2parser.V2Parser.parse,
  114. header,
  115. )
  116. def test_invalidVersion(self):
  117. """
  118. Test if an invalid version raises InvalidProxyError.
  119. """
  120. header = _makeHeaderIPv4(verCom=b'\x11')
  121. self.assertRaises(
  122. InvalidProxyHeader,
  123. _v2parser.V2Parser.parse,
  124. header,
  125. )
  126. def test_invalidCommand(self):
  127. """
  128. Test if an invalid command raises InvalidProxyError.
  129. """
  130. header = _makeHeaderIPv4(verCom=b'\x23')
  131. self.assertRaises(
  132. InvalidProxyHeader,
  133. _v2parser.V2Parser.parse,
  134. header,
  135. )
  136. def test_invalidFamily(self):
  137. """
  138. Test if an invalid family raises InvalidProxyError.
  139. """
  140. header = _makeHeaderIPv4(famProto=b'\x40')
  141. self.assertRaises(
  142. InvalidProxyHeader,
  143. _v2parser.V2Parser.parse,
  144. header,
  145. )
  146. def test_invalidProto(self):
  147. """
  148. Test if an invalid protocol raises InvalidProxyError.
  149. """
  150. header = _makeHeaderIPv4(famProto=b'\x24')
  151. self.assertRaises(
  152. InvalidProxyHeader,
  153. _v2parser.V2Parser.parse,
  154. header,
  155. )
  156. def test_localCommandIpv4(self):
  157. """
  158. Test that local does not return endpoint data for IPv4 connections.
  159. """
  160. header = _makeHeaderIPv4(verCom=b'\x20')
  161. info = _v2parser.V2Parser.parse(header)
  162. self.assertFalse(info.source)
  163. self.assertFalse(info.destination)
  164. def test_localCommandIpv6(self):
  165. """
  166. Test that local does not return endpoint data for IPv6 connections.
  167. """
  168. header = _makeHeaderIPv6(verCom=b'\x20')
  169. info = _v2parser.V2Parser.parse(header)
  170. self.assertFalse(info.source)
  171. self.assertFalse(info.destination)
  172. def test_localCommandUnix(self):
  173. """
  174. Test that local does not return endpoint data for UNIX connections.
  175. """
  176. header = _makeHeaderUnix(verCom=b'\x20')
  177. info = _v2parser.V2Parser.parse(header)
  178. self.assertFalse(info.source)
  179. self.assertFalse(info.destination)
  180. def test_proxyCommandIpv4(self):
  181. """
  182. Test that proxy returns endpoint data for IPv4 connections.
  183. """
  184. header = _makeHeaderIPv4(verCom=b'\x21')
  185. info = _v2parser.V2Parser.parse(header)
  186. self.assertTrue(info.source)
  187. self.assertIsInstance(info.source, address.IPv4Address)
  188. self.assertTrue(info.destination)
  189. self.assertIsInstance(info.destination, address.IPv4Address)
  190. def test_proxyCommandIpv6(self):
  191. """
  192. Test that proxy returns endpoint data for IPv6 connections.
  193. """
  194. header = _makeHeaderIPv6(verCom=b'\x21')
  195. info = _v2parser.V2Parser.parse(header)
  196. self.assertTrue(info.source)
  197. self.assertIsInstance(info.source, address.IPv6Address)
  198. self.assertTrue(info.destination)
  199. self.assertIsInstance(info.destination, address.IPv6Address)
  200. def test_proxyCommandUnix(self):
  201. """
  202. Test that proxy returns endpoint data for UNIX connections.
  203. """
  204. header = _makeHeaderUnix(verCom=b'\x21')
  205. info = _v2parser.V2Parser.parse(header)
  206. self.assertTrue(info.source)
  207. self.assertIsInstance(info.source, address.UNIXAddress)
  208. self.assertTrue(info.destination)
  209. self.assertIsInstance(info.destination, address.UNIXAddress)
  210. def test_unspecFamilyIpv4(self):
  211. """
  212. Test that UNSPEC does not return endpoint data for IPv4 connections.
  213. """
  214. header = _makeHeaderIPv4(famProto=b'\x01')
  215. info = _v2parser.V2Parser.parse(header)
  216. self.assertFalse(info.source)
  217. self.assertFalse(info.destination)
  218. def test_unspecFamilyIpv6(self):
  219. """
  220. Test that UNSPEC does not return endpoint data for IPv6 connections.
  221. """
  222. header = _makeHeaderIPv6(famProto=b'\x01')
  223. info = _v2parser.V2Parser.parse(header)
  224. self.assertFalse(info.source)
  225. self.assertFalse(info.destination)
  226. def test_unspecFamilyUnix(self):
  227. """
  228. Test that UNSPEC does not return endpoint data for UNIX connections.
  229. """
  230. header = _makeHeaderUnix(famProto=b'\x01')
  231. info = _v2parser.V2Parser.parse(header)
  232. self.assertFalse(info.source)
  233. self.assertFalse(info.destination)
  234. def test_unspecProtoIpv4(self):
  235. """
  236. Test that UNSPEC does not return endpoint data for IPv4 connections.
  237. """
  238. header = _makeHeaderIPv4(famProto=b'\x10')
  239. info = _v2parser.V2Parser.parse(header)
  240. self.assertFalse(info.source)
  241. self.assertFalse(info.destination)
  242. def test_unspecProtoIpv6(self):
  243. """
  244. Test that UNSPEC does not return endpoint data for IPv6 connections.
  245. """
  246. header = _makeHeaderIPv6(famProto=b'\x20')
  247. info = _v2parser.V2Parser.parse(header)
  248. self.assertFalse(info.source)
  249. self.assertFalse(info.destination)
  250. def test_unspecProtoUnix(self):
  251. """
  252. Test that UNSPEC does not return endpoint data for UNIX connections.
  253. """
  254. header = _makeHeaderUnix(famProto=b'\x30')
  255. info = _v2parser.V2Parser.parse(header)
  256. self.assertFalse(info.source)
  257. self.assertFalse(info.destination)
  258. def test_overflowIpv4(self):
  259. """
  260. Test that overflow bits are preserved during feed parsing for IPv4.
  261. """
  262. testValue = b'TEST DATA\r\n\r\nTEST DATA'
  263. header = _makeHeaderIPv4() + testValue
  264. parser = _v2parser.V2Parser()
  265. info, overflow = parser.feed(header)
  266. self.assertTrue(info)
  267. self.assertEqual(overflow, testValue)
  268. def test_overflowIpv6(self):
  269. """
  270. Test that overflow bits are preserved during feed parsing for IPv6.
  271. """
  272. testValue = b'TEST DATA\r\n\r\nTEST DATA'
  273. header = _makeHeaderIPv6() + testValue
  274. parser = _v2parser.V2Parser()
  275. info, overflow = parser.feed(header)
  276. self.assertTrue(info)
  277. self.assertEqual(overflow, testValue)
  278. def test_overflowUnix(self):
  279. """
  280. Test that overflow bits are preserved during feed parsing for Unix.
  281. """
  282. testValue = b'TEST DATA\r\n\r\nTEST DATA'
  283. header = _makeHeaderUnix() + testValue
  284. parser = _v2parser.V2Parser()
  285. info, overflow = parser.feed(header)
  286. self.assertTrue(info)
  287. self.assertEqual(overflow, testValue)
  288. def test_segmentTooSmall(self):
  289. """
  290. Test that an initial payload of less than 16 bytes fails.
  291. """
  292. testValue = b'NEEDMOREDATA'
  293. parser = _v2parser.V2Parser()
  294. self.assertRaises(
  295. InvalidProxyHeader,
  296. parser.feed,
  297. testValue,
  298. )