123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Test cases for L{twisted.protocols.haproxy.V2Parser}.
- """
- from twisted.trial import unittest
- from twisted.internet import address
- from .._exceptions import InvalidProxyHeader
- from .. import _v2parser
- V2_SIGNATURE = b'\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A'
- def _makeHeaderIPv6(sig=V2_SIGNATURE, verCom=b'\x21', famProto=b'\x21',
- addrLength=b'\x00\x24',
- addrs=((b'\x00' * 15) + b'\x01') * 2,
- ports=b'\x1F\x90\x22\xB8'):
- """
- Construct a version 2 IPv6 header with custom bytes.
- @param sig: The protocol signature; defaults to valid L{V2_SIGNATURE}.
- @type sig: L{bytes}
- @param verCom: Protocol version and command. Defaults to V2 PROXY.
- @type verCom: L{bytes}
- @param famProto: Address family and protocol. Defaults to AF_INET6/STREAM.
- @type famProto: L{bytes}
- @param addrLength: Network-endian byte length of payload. Defaults to
- description of default addrs/ports.
- @type addrLength: L{bytes}
- @param addrs: Address payload. Defaults to C{::1} for source and
- destination.
- @type addrs: L{bytes}
- @param ports: Source and destination ports. Defaults to 8080 for source
- 8888 for destination.
- @type ports: L{bytes}
- @return: A packet with header, addresses, and ports.
- @rtype: L{bytes}
- """
- return sig + verCom + famProto + addrLength + addrs + ports
- def _makeHeaderIPv4(sig=V2_SIGNATURE, verCom=b'\x21', famProto=b'\x11',
- addrLength=b'\x00\x0C',
- addrs=b'\x7F\x00\x00\x01\x7F\x00\x00\x01',
- ports=b'\x1F\x90\x22\xB8'):
- """
- Construct a version 2 IPv4 header with custom bytes.
- @param sig: The protocol signature; defaults to valid L{V2_SIGNATURE}.
- @type sig: L{bytes}
- @param verCom: Protocol version and command. Defaults to V2 PROXY.
- @type verCom: L{bytes}
- @param famProto: Address family and protocol. Defaults to AF_INET/STREAM.
- @type famProto: L{bytes}
- @param addrLength: Network-endian byte length of payload. Defaults to
- description of default addrs/ports.
- @type addrLength: L{bytes}
- @param addrs: Address payload. Defaults to 127.0.0.1 for source and
- destination.
- @type addrs: L{bytes}
- @param ports: Source and destination ports. Defaults to 8080 for source
- 8888 for destination.
- @type ports: L{bytes}
- @return: A packet with header, addresses, and ports.
- @rtype: L{bytes}
- """
- return sig + verCom + famProto + addrLength + addrs + ports
- def _makeHeaderUnix(sig=V2_SIGNATURE, verCom=b'\x21', famProto=b'\x31',
- addrLength=b'\x00\xD8',
- addrs=(b'\x2F\x68\x6F\x6D\x65\x2F\x74\x65\x73\x74\x73\x2F'
- b'\x6D\x79\x73\x6F\x63\x6B\x65\x74\x73\x2F\x73\x6F'
- b'\x63\x6B' + (b'\x00' * 82)) * 2):
- """
- Construct a version 2 IPv4 header with custom bytes.
- @param sig: The protocol signature; defaults to valid L{V2_SIGNATURE}.
- @type sig: L{bytes}
- @param verCom: Protocol version and command. Defaults to V2 PROXY.
- @type verCom: L{bytes}
- @param famProto: Address family and protocol. Defaults to AF_UNIX/STREAM.
- @type famProto: L{bytes}
- @param addrLength: Network-endian byte length of payload. Defaults to 108
- bytes for 2 null terminated paths.
- @type addrLength: L{bytes}
- @param addrs: Address payload. Defaults to C{/home/tests/mysockets/sock}
- for source and destination paths.
- @type addrs: L{bytes}
- @return: A packet with header, addresses, and8 ports.
- @rtype: L{bytes}
- """
- return sig + verCom + famProto + addrLength + addrs
- class V2ParserTests(unittest.TestCase):
- """
- Test L{twisted.protocols.haproxy.V2Parser} behaviour.
- """
- def test_happyPathIPv4(self):
- """
- Test if a well formed IPv4 header is parsed without error.
- """
- header = _makeHeaderIPv4()
- self.assertTrue(_v2parser.V2Parser.parse(header))
- def test_happyPathIPv6(self):
- """
- Test if a well formed IPv6 header is parsed without error.
- """
- header = _makeHeaderIPv6()
- self.assertTrue(_v2parser.V2Parser.parse(header))
- def test_happyPathUnix(self):
- """
- Test if a well formed UNIX header is parsed without error.
- """
- header = _makeHeaderUnix()
- self.assertTrue(_v2parser.V2Parser.parse(header))
- def test_invalidSignature(self):
- """
- Test if an invalid signature block raises InvalidProxyError.
- """
- header = _makeHeaderIPv4(sig=b'\x00'*12)
- self.assertRaises(
- InvalidProxyHeader,
- _v2parser.V2Parser.parse,
- header,
- )
- def test_invalidVersion(self):
- """
- Test if an invalid version raises InvalidProxyError.
- """
- header = _makeHeaderIPv4(verCom=b'\x11')
- self.assertRaises(
- InvalidProxyHeader,
- _v2parser.V2Parser.parse,
- header,
- )
- def test_invalidCommand(self):
- """
- Test if an invalid command raises InvalidProxyError.
- """
- header = _makeHeaderIPv4(verCom=b'\x23')
- self.assertRaises(
- InvalidProxyHeader,
- _v2parser.V2Parser.parse,
- header,
- )
- def test_invalidFamily(self):
- """
- Test if an invalid family raises InvalidProxyError.
- """
- header = _makeHeaderIPv4(famProto=b'\x40')
- self.assertRaises(
- InvalidProxyHeader,
- _v2parser.V2Parser.parse,
- header,
- )
- def test_invalidProto(self):
- """
- Test if an invalid protocol raises InvalidProxyError.
- """
- header = _makeHeaderIPv4(famProto=b'\x24')
- self.assertRaises(
- InvalidProxyHeader,
- _v2parser.V2Parser.parse,
- header,
- )
- def test_localCommandIpv4(self):
- """
- Test that local does not return endpoint data for IPv4 connections.
- """
- header = _makeHeaderIPv4(verCom=b'\x20')
- info = _v2parser.V2Parser.parse(header)
- self.assertFalse(info.source)
- self.assertFalse(info.destination)
- def test_localCommandIpv6(self):
- """
- Test that local does not return endpoint data for IPv6 connections.
- """
- header = _makeHeaderIPv6(verCom=b'\x20')
- info = _v2parser.V2Parser.parse(header)
- self.assertFalse(info.source)
- self.assertFalse(info.destination)
- def test_localCommandUnix(self):
- """
- Test that local does not return endpoint data for UNIX connections.
- """
- header = _makeHeaderUnix(verCom=b'\x20')
- info = _v2parser.V2Parser.parse(header)
- self.assertFalse(info.source)
- self.assertFalse(info.destination)
- def test_proxyCommandIpv4(self):
- """
- Test that proxy returns endpoint data for IPv4 connections.
- """
- header = _makeHeaderIPv4(verCom=b'\x21')
- info = _v2parser.V2Parser.parse(header)
- self.assertTrue(info.source)
- self.assertIsInstance(info.source, address.IPv4Address)
- self.assertTrue(info.destination)
- self.assertIsInstance(info.destination, address.IPv4Address)
- def test_proxyCommandIpv6(self):
- """
- Test that proxy returns endpoint data for IPv6 connections.
- """
- header = _makeHeaderIPv6(verCom=b'\x21')
- info = _v2parser.V2Parser.parse(header)
- self.assertTrue(info.source)
- self.assertIsInstance(info.source, address.IPv6Address)
- self.assertTrue(info.destination)
- self.assertIsInstance(info.destination, address.IPv6Address)
- def test_proxyCommandUnix(self):
- """
- Test that proxy returns endpoint data for UNIX connections.
- """
- header = _makeHeaderUnix(verCom=b'\x21')
- info = _v2parser.V2Parser.parse(header)
- self.assertTrue(info.source)
- self.assertIsInstance(info.source, address.UNIXAddress)
- self.assertTrue(info.destination)
- self.assertIsInstance(info.destination, address.UNIXAddress)
- def test_unspecFamilyIpv4(self):
- """
- Test that UNSPEC does not return endpoint data for IPv4 connections.
- """
- header = _makeHeaderIPv4(famProto=b'\x01')
- info = _v2parser.V2Parser.parse(header)
- self.assertFalse(info.source)
- self.assertFalse(info.destination)
- def test_unspecFamilyIpv6(self):
- """
- Test that UNSPEC does not return endpoint data for IPv6 connections.
- """
- header = _makeHeaderIPv6(famProto=b'\x01')
- info = _v2parser.V2Parser.parse(header)
- self.assertFalse(info.source)
- self.assertFalse(info.destination)
- def test_unspecFamilyUnix(self):
- """
- Test that UNSPEC does not return endpoint data for UNIX connections.
- """
- header = _makeHeaderUnix(famProto=b'\x01')
- info = _v2parser.V2Parser.parse(header)
- self.assertFalse(info.source)
- self.assertFalse(info.destination)
- def test_unspecProtoIpv4(self):
- """
- Test that UNSPEC does not return endpoint data for IPv4 connections.
- """
- header = _makeHeaderIPv4(famProto=b'\x10')
- info = _v2parser.V2Parser.parse(header)
- self.assertFalse(info.source)
- self.assertFalse(info.destination)
- def test_unspecProtoIpv6(self):
- """
- Test that UNSPEC does not return endpoint data for IPv6 connections.
- """
- header = _makeHeaderIPv6(famProto=b'\x20')
- info = _v2parser.V2Parser.parse(header)
- self.assertFalse(info.source)
- self.assertFalse(info.destination)
- def test_unspecProtoUnix(self):
- """
- Test that UNSPEC does not return endpoint data for UNIX connections.
- """
- header = _makeHeaderUnix(famProto=b'\x30')
- info = _v2parser.V2Parser.parse(header)
- self.assertFalse(info.source)
- self.assertFalse(info.destination)
- def test_overflowIpv4(self):
- """
- Test that overflow bits are preserved during feed parsing for IPv4.
- """
- testValue = b'TEST DATA\r\n\r\nTEST DATA'
- header = _makeHeaderIPv4() + testValue
- parser = _v2parser.V2Parser()
- info, overflow = parser.feed(header)
- self.assertTrue(info)
- self.assertEqual(overflow, testValue)
- def test_overflowIpv6(self):
- """
- Test that overflow bits are preserved during feed parsing for IPv6.
- """
- testValue = b'TEST DATA\r\n\r\nTEST DATA'
- header = _makeHeaderIPv6() + testValue
- parser = _v2parser.V2Parser()
- info, overflow = parser.feed(header)
- self.assertTrue(info)
- self.assertEqual(overflow, testValue)
- def test_overflowUnix(self):
- """
- Test that overflow bits are preserved during feed parsing for Unix.
- """
- testValue = b'TEST DATA\r\n\r\nTEST DATA'
- header = _makeHeaderUnix() + testValue
- parser = _v2parser.V2Parser()
- info, overflow = parser.feed(header)
- self.assertTrue(info)
- self.assertEqual(overflow, testValue)
- def test_segmentTooSmall(self):
- """
- Test that an initial payload of less than 16 bytes fails.
- """
- testValue = b'NEEDMOREDATA'
- parser = _v2parser.V2Parser()
- self.assertRaises(
- InvalidProxyHeader,
- parser.feed,
- testValue,
- )
|