123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.protocol.socks}, an implementation of the SOCKSv4 and
- SOCKSv4a protocols.
- """
- import struct, socket
- from twisted.internet import defer, address
- from twisted.internet.error import DNSLookupError
- from twisted.python.compat import iterbytes
- from twisted.protocols import socks
- from twisted.test import proto_helpers
- from twisted.trial import unittest
- class StringTCPTransport(proto_helpers.StringTransport):
- stringTCPTransport_closing = False
- peer = None
- def getPeer(self):
- return self.peer
- def getHost(self):
- return address.IPv4Address('TCP', '2.3.4.5', 42)
- def loseConnection(self):
- self.stringTCPTransport_closing = True
- class FakeResolverReactor:
- """
- Bare-bones reactor with deterministic behavior for the resolve method.
- """
- def __init__(self, names):
- """
- @type names: L{dict} containing L{str} keys and L{str} values.
- @param names: A hostname to IP address mapping. The IP addresses are
- stringified dotted quads.
- """
- self.names = names
- def resolve(self, hostname):
- """
- Resolve a hostname by looking it up in the C{names} dictionary.
- """
- try:
- return defer.succeed(self.names[hostname])
- except KeyError:
- return defer.fail(
- DNSLookupError("FakeResolverReactor couldn't find " +
- hostname.decode("utf-8")))
- class SOCKSv4Driver(socks.SOCKSv4):
- # last SOCKSv4Outgoing instantiated
- driver_outgoing = None
- # last SOCKSv4IncomingFactory instantiated
- driver_listen = None
- def connectClass(self, host, port, klass, *args):
- # fake it
- proto = klass(*args)
- proto.transport = StringTCPTransport()
- proto.transport.peer = address.IPv4Address('TCP', host, port)
- proto.connectionMade()
- self.driver_outgoing = proto
- return defer.succeed(proto)
- def listenClass(self, port, klass, *args):
- # fake it
- factory = klass(*args)
- self.driver_listen = factory
- if port == 0:
- port = 1234
- return defer.succeed(('6.7.8.9', port))
- class ConnectTests(unittest.TestCase):
- """
- Tests for SOCKS and SOCKSv4a connect requests using the L{SOCKSv4} protocol.
- """
- def setUp(self):
- self.sock = SOCKSv4Driver()
- self.sock.transport = StringTCPTransport()
- self.sock.connectionMade()
- self.sock.reactor = FakeResolverReactor({b"localhost":"127.0.0.1"})
- def tearDown(self):
- outgoing = self.sock.driver_outgoing
- if outgoing is not None:
- self.assertTrue(outgoing.transport.stringTCPTransport_closing,
- "Outgoing SOCKS connections need to be closed.")
- def test_simple(self):
- self.sock.dataReceived(
- struct.pack('!BBH', 4, 1, 34)
- + socket.inet_aton('1.2.3.4')
- + b'fooBAR'
- + b'\0')
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- self.assertEqual(sent,
- struct.pack('!BBH', 0, 90, 34)
- + socket.inet_aton('1.2.3.4'))
- self.assertFalse(self.sock.transport.stringTCPTransport_closing)
- self.assertIsNotNone(self.sock.driver_outgoing)
- # pass some data through
- self.sock.dataReceived(b'hello, world')
- self.assertEqual(self.sock.driver_outgoing.transport.value(),
- b'hello, world')
- # the other way around
- self.sock.driver_outgoing.dataReceived(b'hi there')
- self.assertEqual(self.sock.transport.value(), b'hi there')
- self.sock.connectionLost('fake reason')
- def test_socks4aSuccessfulResolution(self):
- """
- If the destination IP address has zeros for the first three octets and
- non-zero for the fourth octet, the client is attempting a v4a
- connection. A hostname is specified after the user ID string and the
- server connects to the address that hostname resolves to.
- @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
- """
- # send the domain name "localhost" to be resolved
- clientRequest = (
- struct.pack('!BBH', 4, 1, 34)
- + socket.inet_aton('0.0.0.1')
- + b'fooBAZ\0'
- + b'localhost\0')
- # Deliver the bytes one by one to exercise the protocol's buffering
- # logic. FakeResolverReactor's resolve method is invoked to "resolve"
- # the hostname.
- for byte in iterbytes(clientRequest):
- self.sock.dataReceived(byte)
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- # Verify that the server responded with the address which will be
- # connected to.
- self.assertEqual(
- sent,
- struct.pack('!BBH', 0, 90, 34) + socket.inet_aton('127.0.0.1'))
- self.assertFalse(self.sock.transport.stringTCPTransport_closing)
- self.assertIsNotNone(self.sock.driver_outgoing)
- # Pass some data through and verify it is forwarded to the outgoing
- # connection.
- self.sock.dataReceived(b'hello, world')
- self.assertEqual(
- self.sock.driver_outgoing.transport.value(), b'hello, world')
- # Deliver some data from the output connection and verify it is
- # passed along to the incoming side.
- self.sock.driver_outgoing.dataReceived(b'hi there')
- self.assertEqual(self.sock.transport.value(), b'hi there')
- self.sock.connectionLost('fake reason')
- def test_socks4aFailedResolution(self):
- """
- Failed hostname resolution on a SOCKSv4a packet results in a 91 error
- response and the connection getting closed.
- """
- # send the domain name "failinghost" to be resolved
- clientRequest = (
- struct.pack('!BBH', 4, 1, 34)
- + socket.inet_aton('0.0.0.1')
- + b'fooBAZ\0'
- + b'failinghost\0')
- # Deliver the bytes one by one to exercise the protocol's buffering
- # logic. FakeResolverReactor's resolve method is invoked to "resolve"
- # the hostname.
- for byte in iterbytes(clientRequest):
- self.sock.dataReceived(byte)
- # Verify that the server responds with a 91 error.
- sent = self.sock.transport.value()
- self.assertEqual(
- sent,
- struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
- # A failed resolution causes the transport to drop the connection.
- self.assertTrue(self.sock.transport.stringTCPTransport_closing)
- self.assertIsNone(self.sock.driver_outgoing)
- def test_accessDenied(self):
- self.sock.authorize = lambda code, server, port, user: 0
- self.sock.dataReceived(
- struct.pack('!BBH', 4, 1, 4242)
- + socket.inet_aton('10.2.3.4')
- + b'fooBAR'
- + b'\0')
- self.assertEqual(self.sock.transport.value(),
- struct.pack('!BBH', 0, 91, 0)
- + socket.inet_aton('0.0.0.0'))
- self.assertTrue(self.sock.transport.stringTCPTransport_closing)
- self.assertIsNone(self.sock.driver_outgoing)
- def test_eofRemote(self):
- self.sock.dataReceived(
- struct.pack('!BBH', 4, 1, 34)
- + socket.inet_aton('1.2.3.4')
- + b'fooBAR'
- + b'\0')
- self.sock.transport.clear()
- # pass some data through
- self.sock.dataReceived(b'hello, world')
- self.assertEqual(self.sock.driver_outgoing.transport.value(),
- b'hello, world')
- # now close it from the server side
- self.sock.driver_outgoing.transport.loseConnection()
- self.sock.driver_outgoing.connectionLost('fake reason')
- def test_eofLocal(self):
- self.sock.dataReceived(
- struct.pack('!BBH', 4, 1, 34)
- + socket.inet_aton('1.2.3.4')
- + b'fooBAR'
- + b'\0')
- self.sock.transport.clear()
- # pass some data through
- self.sock.dataReceived(b'hello, world')
- self.assertEqual(self.sock.driver_outgoing.transport.value(),
- b'hello, world')
- # now close it from the client side
- self.sock.connectionLost('fake reason')
- class BindTests(unittest.TestCase):
- """
- Tests for SOCKS and SOCKSv4a bind requests using the L{SOCKSv4} protocol.
- """
- def setUp(self):
- self.sock = SOCKSv4Driver()
- self.sock.transport = StringTCPTransport()
- self.sock.connectionMade()
- self.sock.reactor = FakeResolverReactor({b"localhost":"127.0.0.1"})
- ## def tearDown(self):
- ## # TODO ensure the listen port is closed
- ## listen = self.sock.driver_listen
- ## if listen is not None:
- ## self.assert_(incoming.transport.stringTCPTransport_closing,
- ## "Incoming SOCKS connections need to be closed.")
- def test_simple(self):
- self.sock.dataReceived(
- struct.pack('!BBH', 4, 2, 34)
- + socket.inet_aton('1.2.3.4')
- + b'fooBAR'
- + b'\0')
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- self.assertEqual(sent,
- struct.pack('!BBH', 0, 90, 1234)
- + socket.inet_aton('6.7.8.9'))
- self.assertFalse(self.sock.transport.stringTCPTransport_closing)
- self.assertIsNotNone(self.sock.driver_listen)
- # connect
- incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
- self.assertIsNotNone(incoming)
- incoming.transport = StringTCPTransport()
- incoming.connectionMade()
- # now we should have the second reply packet
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- self.assertEqual(sent,
- struct.pack('!BBH', 0, 90, 0)
- + socket.inet_aton('0.0.0.0'))
- self.assertFalse(self.sock.transport.stringTCPTransport_closing)
- # pass some data through
- self.sock.dataReceived(b'hello, world')
- self.assertEqual(incoming.transport.value(),
- b'hello, world')
- # the other way around
- incoming.dataReceived(b'hi there')
- self.assertEqual(self.sock.transport.value(), b'hi there')
- self.sock.connectionLost('fake reason')
- def test_socks4a(self):
- """
- If the destination IP address has zeros for the first three octets and
- non-zero for the fourth octet, the client is attempting a v4a
- connection. A hostname is specified after the user ID string and the
- server connects to the address that hostname resolves to.
- @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
- """
- # send the domain name "localhost" to be resolved
- clientRequest = (
- struct.pack('!BBH', 4, 2, 34)
- + socket.inet_aton('0.0.0.1')
- + b'fooBAZ\0'
- + b'localhost\0')
- # Deliver the bytes one by one to exercise the protocol's buffering
- # logic. FakeResolverReactor's resolve method is invoked to "resolve"
- # the hostname.
- for byte in iterbytes(clientRequest):
- self.sock.dataReceived(byte)
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- # Verify that the server responded with the address which will be
- # connected to.
- self.assertEqual(
- sent,
- struct.pack('!BBH', 0, 90, 1234) + socket.inet_aton('6.7.8.9'))
- self.assertFalse(self.sock.transport.stringTCPTransport_closing)
- self.assertIsNotNone(self.sock.driver_listen)
- # connect
- incoming = self.sock.driver_listen.buildProtocol(('127.0.0.1', 5345))
- self.assertIsNotNone(incoming)
- incoming.transport = StringTCPTransport()
- incoming.connectionMade()
- # now we should have the second reply packet
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- self.assertEqual(sent,
- struct.pack('!BBH', 0, 90, 0)
- + socket.inet_aton('0.0.0.0'))
- self.assertIsNot(
- self.sock.transport.stringTCPTransport_closing, None)
- # Deliver some data from the output connection and verify it is
- # passed along to the incoming side.
- self.sock.dataReceived(b'hi there')
- self.assertEqual(incoming.transport.value(), b'hi there')
- # the other way around
- incoming.dataReceived(b'hi there')
- self.assertEqual(self.sock.transport.value(), b'hi there')
- self.sock.connectionLost('fake reason')
- def test_socks4aFailedResolution(self):
- """
- Failed hostname resolution on a SOCKSv4a packet results in a 91 error
- response and the connection getting closed.
- """
- # send the domain name "failinghost" to be resolved
- clientRequest = (
- struct.pack('!BBH', 4, 2, 34)
- + socket.inet_aton('0.0.0.1')
- + b'fooBAZ\0'
- + b'failinghost\0')
- # Deliver the bytes one by one to exercise the protocol's buffering
- # logic. FakeResolverReactor's resolve method is invoked to "resolve"
- # the hostname.
- for byte in iterbytes(clientRequest):
- self.sock.dataReceived(byte)
- # Verify that the server responds with a 91 error.
- sent = self.sock.transport.value()
- self.assertEqual(
- sent,
- struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
- # A failed resolution causes the transport to drop the connection.
- self.assertTrue(self.sock.transport.stringTCPTransport_closing)
- self.assertIsNone(self.sock.driver_outgoing)
- def test_accessDenied(self):
- self.sock.authorize = lambda code, server, port, user: 0
- self.sock.dataReceived(
- struct.pack('!BBH', 4, 2, 4242)
- + socket.inet_aton('10.2.3.4')
- + b'fooBAR'
- + b'\0')
- self.assertEqual(self.sock.transport.value(),
- struct.pack('!BBH', 0, 91, 0)
- + socket.inet_aton('0.0.0.0'))
- self.assertTrue(self.sock.transport.stringTCPTransport_closing)
- self.assertIsNone(self.sock.driver_listen)
- def test_eofRemote(self):
- self.sock.dataReceived(
- struct.pack('!BBH', 4, 2, 34)
- + socket.inet_aton('1.2.3.4')
- + b'fooBAR'
- + b'\0')
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- # connect
- incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
- self.assertIsNotNone(incoming)
- incoming.transport = StringTCPTransport()
- incoming.connectionMade()
- # now we should have the second reply packet
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- self.assertEqual(sent,
- struct.pack('!BBH', 0, 90, 0)
- + socket.inet_aton('0.0.0.0'))
- self.assertFalse(self.sock.transport.stringTCPTransport_closing)
- # pass some data through
- self.sock.dataReceived(b'hello, world')
- self.assertEqual(incoming.transport.value(),
- b'hello, world')
- # now close it from the server side
- incoming.transport.loseConnection()
- incoming.connectionLost('fake reason')
- def test_eofLocal(self):
- self.sock.dataReceived(
- struct.pack('!BBH', 4, 2, 34)
- + socket.inet_aton('1.2.3.4')
- + b'fooBAR'
- + b'\0')
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- # connect
- incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
- self.assertIsNotNone(incoming)
- incoming.transport = StringTCPTransport()
- incoming.connectionMade()
- # now we should have the second reply packet
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- self.assertEqual(sent,
- struct.pack('!BBH', 0, 90, 0)
- + socket.inet_aton('0.0.0.0'))
- self.assertFalse(self.sock.transport.stringTCPTransport_closing)
- # pass some data through
- self.sock.dataReceived(b'hello, world')
- self.assertEqual(incoming.transport.value(),
- b'hello, world')
- # now close it from the client side
- self.sock.connectionLost('fake reason')
- def test_badSource(self):
- self.sock.dataReceived(
- struct.pack('!BBH', 4, 2, 34)
- + socket.inet_aton('1.2.3.4')
- + b'fooBAR'
- + b'\0')
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- # connect from WRONG address
- incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666))
- self.assertIsNone(incoming)
- # Now we should have the second reply packet and it should
- # be a failure. The connection should be closing.
- sent = self.sock.transport.value()
- self.sock.transport.clear()
- self.assertEqual(sent,
- struct.pack('!BBH', 0, 91, 0)
- + socket.inet_aton('0.0.0.0'))
- self.assertTrue(self.sock.transport.stringTCPTransport_closing)
|