test_unix.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for implementations of L{IReactorUNIX}.
  5. """
  6. from __future__ import division, absolute_import
  7. from stat import S_IMODE
  8. from os import stat, close, urandom, unlink, fstat
  9. from tempfile import mktemp, mkstemp
  10. from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, socket
  11. from pprint import pformat
  12. from hashlib import md5
  13. from struct import pack
  14. try:
  15. from socket import AF_UNIX
  16. except ImportError:
  17. AF_UNIX = None
  18. from zope.interface import implementer
  19. from twisted.internet import interfaces
  20. from twisted.internet.address import UNIXAddress
  21. from twisted.internet.defer import Deferred, fail
  22. from twisted.internet.endpoints import UNIXServerEndpoint, UNIXClientEndpoint
  23. from twisted.internet.error import ConnectionClosed, FileDescriptorOverrun
  24. from twisted.internet.interfaces import IFileDescriptorReceiver, IReactorUNIX
  25. from twisted.internet.protocol import DatagramProtocol
  26. from twisted.internet.protocol import ServerFactory, ClientFactory
  27. from twisted.internet.task import LoopingCall
  28. from twisted.internet.test.connectionmixins import EndpointCreator
  29. from twisted.internet.test.reactormixins import ReactorBuilder
  30. from twisted.internet.test.test_core import ObjectModelIntegrationMixin
  31. from twisted.internet.test.test_tcp import StreamTransportTestsMixin
  32. from twisted.internet.test.connectionmixins import ConnectableProtocol
  33. from twisted.internet.test.connectionmixins import ConnectionTestsMixin
  34. from twisted.internet.test.connectionmixins import StreamClientTestsMixin
  35. from twisted.internet.test.connectionmixins import runProtocolsWithReactor
  36. from twisted.python.compat import nativeString, _PY3, iteritems
  37. from twisted.python.failure import Failure
  38. from twisted.python.log import addObserver, removeObserver, err
  39. from twisted.python.runtime import platform
  40. from twisted.python.reflect import requireModule
  41. if requireModule("twisted.python.sendmsg") is not None:
  42. sendmsgSkip = None
  43. else:
  44. sendmsgSkip = (
  45. "sendmsg extension unavailable, extended UNIX features disabled")
  46. class UNIXFamilyMixin(object):
  47. """
  48. Test-helper defining mixin for things related to AF_UNIX sockets.
  49. """
  50. def _modeTest(self, methodName, path, factory):
  51. """
  52. Assert that the mode of the created unix socket is set to the mode
  53. specified to the reactor method.
  54. """
  55. mode = 0o600
  56. reactor = self.buildReactor()
  57. unixPort = getattr(reactor, methodName)(path, factory, mode=mode)
  58. unixPort.stopListening()
  59. self.assertEqual(S_IMODE(stat(path).st_mode), mode)
  60. def _abstractPath(case):
  61. """
  62. Return a new, unique abstract namespace path to be listened on.
  63. """
  64. return md5(urandom(100)).hexdigest()
  65. class UNIXCreator(EndpointCreator):
  66. """
  67. Create UNIX socket end points.
  68. """
  69. requiredInterfaces = (interfaces.IReactorUNIX,)
  70. def server(self, reactor):
  71. """
  72. Construct a UNIX server endpoint.
  73. """
  74. # self.mktemp() often returns a path which is too long to be used.
  75. path = mktemp(suffix='.sock', dir='.')
  76. return UNIXServerEndpoint(reactor, path)
  77. def client(self, reactor, serverAddress):
  78. """
  79. Construct a UNIX client endpoint.
  80. """
  81. return UNIXClientEndpoint(reactor, serverAddress.name)
  82. class SendFileDescriptor(ConnectableProtocol):
  83. """
  84. L{SendFileDescriptorAndBytes} sends a file descriptor and optionally some
  85. normal bytes and then closes its connection.
  86. @ivar reason: The reason the connection was lost, after C{connectionLost}
  87. is called.
  88. """
  89. reason = None
  90. def __init__(self, fd, data):
  91. """
  92. @param fd: A C{int} giving a file descriptor to send over the
  93. connection.
  94. @param data: A C{str} giving data to send over the connection, or
  95. L{None} if no data is to be sent.
  96. """
  97. self.fd = fd
  98. self.data = data
  99. def connectionMade(self):
  100. """
  101. Send C{self.fd} and, if it is not L{None}, C{self.data}. Then close the
  102. connection.
  103. """
  104. self.transport.sendFileDescriptor(self.fd)
  105. if self.data:
  106. self.transport.write(self.data)
  107. self.transport.loseConnection()
  108. def connectionLost(self, reason):
  109. ConnectableProtocol.connectionLost(self, reason)
  110. self.reason = reason
  111. @implementer(IFileDescriptorReceiver)
  112. class ReceiveFileDescriptor(ConnectableProtocol):
  113. """
  114. L{ReceiveFileDescriptor} provides an API for waiting for file descriptors to
  115. be received.
  116. @ivar reason: The reason the connection was lost, after C{connectionLost}
  117. is called.
  118. @ivar waiting: A L{Deferred} which fires with a file descriptor once one is
  119. received, or with a failure if the connection is lost with no descriptor
  120. arriving.
  121. """
  122. reason = None
  123. waiting = None
  124. def waitForDescriptor(self):
  125. """
  126. Return a L{Deferred} which will fire with the next file descriptor
  127. received, or with a failure if the connection is or has already been
  128. lost.
  129. """
  130. if self.reason is None:
  131. self.waiting = Deferred()
  132. return self.waiting
  133. else:
  134. return fail(self.reason)
  135. def fileDescriptorReceived(self, descriptor):
  136. """
  137. Fire the waiting Deferred, initialized by C{waitForDescriptor}, with the
  138. file descriptor just received.
  139. """
  140. self.waiting.callback(descriptor)
  141. self.waiting = None
  142. def dataReceived(self, data):
  143. """
  144. Fail the waiting Deferred, if it has not already been fired by
  145. C{fileDescriptorReceived}. The bytes sent along with a file descriptor
  146. are guaranteed to be delivered to the protocol's C{dataReceived} method
  147. only after the file descriptor has been delivered to the protocol's
  148. C{fileDescriptorReceived}.
  149. """
  150. if self.waiting is not None:
  151. self.waiting.errback(Failure(Exception(
  152. "Received bytes (%r) before descriptor." % (data,))))
  153. self.waiting = None
  154. def connectionLost(self, reason):
  155. """
  156. Fail the waiting Deferred, initialized by C{waitForDescriptor}, if there
  157. is one.
  158. """
  159. ConnectableProtocol.connectionLost(self, reason)
  160. if self.waiting is not None:
  161. self.waiting.errback(reason)
  162. self.waiting = None
  163. self.reason = reason
  164. class UNIXTestsBuilder(UNIXFamilyMixin, ReactorBuilder, ConnectionTestsMixin):
  165. """
  166. Builder defining tests relating to L{IReactorUNIX}.
  167. """
  168. requiredInterfaces = (IReactorUNIX,)
  169. endpoints = UNIXCreator()
  170. def test_mode(self):
  171. """
  172. The UNIX socket created by L{IReactorUNIX.listenUNIX} is created with
  173. the mode specified.
  174. """
  175. self._modeTest('listenUNIX', self.mktemp(), ServerFactory())
  176. def test_listenOnLinuxAbstractNamespace(self):
  177. """
  178. On Linux, a UNIX socket path may begin with C{'\0'} to indicate a socket
  179. in the abstract namespace. L{IReactorUNIX.listenUNIX} accepts such a
  180. path.
  181. """
  182. # Don't listen on a path longer than the maximum allowed.
  183. path = _abstractPath(self)
  184. reactor = self.buildReactor()
  185. port = reactor.listenUNIX('\0' + path, ServerFactory())
  186. self.assertEqual(port.getHost(), UNIXAddress('\0' + path))
  187. if not platform.isLinux():
  188. test_listenOnLinuxAbstractNamespace.skip = (
  189. 'Abstract namespace UNIX sockets only supported on Linux.')
  190. def test_connectToLinuxAbstractNamespace(self):
  191. """
  192. L{IReactorUNIX.connectUNIX} also accepts a Linux abstract namespace
  193. path.
  194. """
  195. path = _abstractPath(self)
  196. reactor = self.buildReactor()
  197. connector = reactor.connectUNIX('\0' + path, ClientFactory())
  198. self.assertEqual(connector.getDestination(), UNIXAddress('\0' + path))
  199. if not platform.isLinux():
  200. test_connectToLinuxAbstractNamespace.skip = (
  201. 'Abstract namespace UNIX sockets only supported on Linux.')
  202. def test_addresses(self):
  203. """
  204. A client's transport's C{getHost} and C{getPeer} return L{UNIXAddress}
  205. instances which have the filesystem path of the host and peer ends of
  206. the connection.
  207. """
  208. class SaveAddress(ConnectableProtocol):
  209. def makeConnection(self, transport):
  210. self.addresses = dict(
  211. host=transport.getHost(), peer=transport.getPeer())
  212. transport.loseConnection()
  213. server = SaveAddress()
  214. client = SaveAddress()
  215. runProtocolsWithReactor(self, server, client, self.endpoints)
  216. self.assertEqual(server.addresses['host'], client.addresses['peer'])
  217. self.assertEqual(server.addresses['peer'], client.addresses['host'])
  218. def test_sendFileDescriptor(self):
  219. """
  220. L{IUNIXTransport.sendFileDescriptor} accepts an integer file descriptor
  221. and sends a copy of it to the process reading from the connection.
  222. """
  223. from socket import fromfd
  224. s = socket()
  225. s.bind(('', 0))
  226. server = SendFileDescriptor(s.fileno(), b"junk")
  227. client = ReceiveFileDescriptor()
  228. d = client.waitForDescriptor()
  229. def checkDescriptor(descriptor):
  230. received = fromfd(descriptor, AF_INET, SOCK_STREAM)
  231. # Thanks for the free dup, fromfd()
  232. close(descriptor)
  233. # If the sockets have the same local address, they're probably the
  234. # same.
  235. self.assertEqual(s.getsockname(), received.getsockname())
  236. # But it would be cheating for them to be identified by the same
  237. # file descriptor. The point was to get a copy, as we might get if
  238. # there were two processes involved here.
  239. self.assertNotEqual(s.fileno(), received.fileno())
  240. d.addCallback(checkDescriptor)
  241. d.addErrback(err, "Sending file descriptor encountered a problem")
  242. d.addBoth(lambda ignored: server.transport.loseConnection())
  243. runProtocolsWithReactor(self, server, client, self.endpoints)
  244. if sendmsgSkip is not None:
  245. test_sendFileDescriptor.skip = sendmsgSkip
  246. def test_sendFileDescriptorTriggersPauseProducing(self):
  247. """
  248. If a L{IUNIXTransport.sendFileDescriptor} call fills up the send buffer,
  249. any registered producer is paused.
  250. """
  251. class DoesNotRead(ConnectableProtocol):
  252. def connectionMade(self):
  253. self.transport.pauseProducing()
  254. class SendsManyFileDescriptors(ConnectableProtocol):
  255. paused = False
  256. def connectionMade(self):
  257. self.socket = socket()
  258. self.transport.registerProducer(self, True)
  259. def sender():
  260. self.transport.sendFileDescriptor(self.socket.fileno())
  261. self.transport.write(b"x")
  262. self.task = LoopingCall(sender)
  263. self.task.clock = self.transport.reactor
  264. self.task.start(0).addErrback(err, "Send loop failure")
  265. def stopProducing(self):
  266. self._disconnect()
  267. def resumeProducing(self):
  268. self._disconnect()
  269. def pauseProducing(self):
  270. self.paused = True
  271. self.transport.unregisterProducer()
  272. self._disconnect()
  273. def _disconnect(self):
  274. self.task.stop()
  275. self.transport.abortConnection()
  276. self.other.transport.abortConnection()
  277. server = SendsManyFileDescriptors()
  278. client = DoesNotRead()
  279. server.other = client
  280. runProtocolsWithReactor(self, server, client, self.endpoints)
  281. self.assertTrue(
  282. server.paused, "sendFileDescriptor producer was not paused")
  283. if sendmsgSkip is not None:
  284. test_sendFileDescriptorTriggersPauseProducing.skip = sendmsgSkip
  285. def test_fileDescriptorOverrun(self):
  286. """
  287. If L{IUNIXTransport.sendFileDescriptor} is used to queue a greater
  288. number of file descriptors than the number of bytes sent using
  289. L{ITransport.write}, the connection is closed and the protocol connected
  290. to the transport has its C{connectionLost} method called with a failure
  291. wrapping L{FileDescriptorOverrun}.
  292. """
  293. cargo = socket()
  294. server = SendFileDescriptor(cargo.fileno(), None)
  295. client = ReceiveFileDescriptor()
  296. result = []
  297. d = client.waitForDescriptor()
  298. d.addBoth(result.append)
  299. d.addBoth(lambda ignored: server.transport.loseConnection())
  300. runProtocolsWithReactor(self, server, client, self.endpoints)
  301. self.assertIsInstance(result[0], Failure)
  302. result[0].trap(ConnectionClosed)
  303. self.assertIsInstance(server.reason.value, FileDescriptorOverrun)
  304. if sendmsgSkip is not None:
  305. test_fileDescriptorOverrun.skip = sendmsgSkip
  306. def _sendmsgMixinFileDescriptorReceivedDriver(self, ancillaryPacker):
  307. """
  308. Drive _SendmsgMixin via sendmsg socket calls to check that
  309. L{IFileDescriptorReceiver.fileDescriptorReceived} is called once
  310. for each file descriptor received in the ancillary messages.
  311. @param ancillaryPacker: A callable that will be given a list of
  312. two file descriptors and should return a two-tuple where:
  313. The first item is an iterable of zero or more (cmsg_level,
  314. cmsg_type, cmsg_data) tuples in the same order as the given
  315. list for actual sending via sendmsg; the second item is an
  316. integer indicating the expected number of FDs to be received.
  317. """
  318. # Strategy:
  319. # - Create a UNIX socketpair.
  320. # - Associate one end to a FakeReceiver and FakeProtocol.
  321. # - Call sendmsg on the other end to send FDs as ancillary data.
  322. # Ancillary data is obtained calling ancillaryPacker with
  323. # the two FDs associated to two temp files (using the socket
  324. # FDs for this fails the device/inode verification tests on
  325. # Mac OS X 10.10, so temp files are used instead).
  326. # - Call doRead in the FakeReceiver.
  327. # - Verify results on FakeProtocol.
  328. # Using known device/inodes to verify correct order.
  329. # TODO: replace FakeReceiver test approach with one based in
  330. # IReactorSocket.adoptStreamConnection once AF_UNIX support is
  331. # implemented; see https://twistedmatrix.com/trac/ticket/5573.
  332. from socket import socketpair
  333. from twisted.internet.unix import _SendmsgMixin
  334. from twisted.python.sendmsg import sendmsg
  335. def deviceInodeTuple(fd):
  336. fs = fstat(fd)
  337. return (fs.st_dev, fs.st_ino)
  338. @implementer(IFileDescriptorReceiver)
  339. class FakeProtocol(ConnectableProtocol):
  340. def __init__(self):
  341. self.fds = []
  342. self.deviceInodesReceived = []
  343. def fileDescriptorReceived(self, fd):
  344. self.fds.append(fd)
  345. self.deviceInodesReceived.append(deviceInodeTuple(fd))
  346. close(fd)
  347. class FakeReceiver(_SendmsgMixin):
  348. bufferSize = 1024
  349. def __init__(self, skt, proto):
  350. self.socket = skt
  351. self.protocol = proto
  352. def _dataReceived(self, data):
  353. pass
  354. def getHost(self):
  355. pass
  356. def getPeer(self):
  357. pass
  358. def _getLogPrefix(self, o):
  359. pass
  360. sendSocket, recvSocket = socketpair(AF_UNIX, SOCK_STREAM)
  361. self.addCleanup(sendSocket.close)
  362. self.addCleanup(recvSocket.close)
  363. proto = FakeProtocol()
  364. receiver = FakeReceiver(recvSocket, proto)
  365. # Temp files give us two FDs to send/receive/verify.
  366. fileOneFD, fileOneName = mkstemp()
  367. fileTwoFD, fileTwoName = mkstemp()
  368. self.addCleanup(unlink, fileOneName)
  369. self.addCleanup(unlink, fileTwoName)
  370. dataToSend = b'some data needs to be sent'
  371. fdsToSend = [fileOneFD, fileTwoFD]
  372. ancillary, expectedCount = ancillaryPacker(fdsToSend)
  373. sendmsg(sendSocket, dataToSend, ancillary)
  374. receiver.doRead()
  375. # Verify that fileDescriptorReceived was called twice.
  376. self.assertEqual(len(proto.fds), expectedCount)
  377. # Verify that received FDs are different from the sent ones.
  378. self.assertFalse(set(fdsToSend).intersection(set(proto.fds)))
  379. # Verify that FDs were received in the same order, if any.
  380. if proto.fds:
  381. deviceInodesSent = [deviceInodeTuple(fd) for fd in fdsToSend]
  382. self.assertEqual(deviceInodesSent, proto.deviceInodesReceived)
  383. def test_multiFileDescriptorReceivedPerRecvmsgOneCMSG(self):
  384. """
  385. _SendmsgMixin handles multiple file descriptors per recvmsg, calling
  386. L{IFileDescriptorReceiver.fileDescriptorReceived} once per received
  387. file descriptor. Scenario: single CMSG with two FDs.
  388. """
  389. from twisted.python.sendmsg import SCM_RIGHTS
  390. def ancillaryPacker(fdsToSend):
  391. ancillary = [(SOL_SOCKET, SCM_RIGHTS, pack('ii', *fdsToSend))]
  392. expectedCount = 2
  393. return ancillary, expectedCount
  394. self._sendmsgMixinFileDescriptorReceivedDriver(ancillaryPacker)
  395. if sendmsgSkip is not None:
  396. test_multiFileDescriptorReceivedPerRecvmsgOneCMSG.skip = sendmsgSkip
  397. def test_multiFileDescriptorReceivedPerRecvmsgTwoCMSGs(self):
  398. """
  399. _SendmsgMixin handles multiple file descriptors per recvmsg, calling
  400. L{IFileDescriptorReceiver.fileDescriptorReceived} once per received
  401. file descriptor. Scenario: two CMSGs with one FD each.
  402. """
  403. from twisted.python.sendmsg import SCM_RIGHTS
  404. def ancillaryPacker(fdsToSend):
  405. ancillary = [
  406. (SOL_SOCKET, SCM_RIGHTS, pack('i', fd))
  407. for fd in fdsToSend
  408. ]
  409. expectedCount = 2
  410. return ancillary, expectedCount
  411. self._sendmsgMixinFileDescriptorReceivedDriver(ancillaryPacker)
  412. if platform.isMacOSX():
  413. test_multiFileDescriptorReceivedPerRecvmsgTwoCMSGs.skip = (
  414. "Multi control message ancillary sendmsg not supported on Mac.")
  415. elif sendmsgSkip is not None:
  416. test_multiFileDescriptorReceivedPerRecvmsgTwoCMSGs.skip = sendmsgSkip
  417. def test_multiFileDescriptorReceivedPerRecvmsgBadCMSG(self):
  418. """
  419. _SendmsgMixin handles multiple file descriptors per recvmsg, calling
  420. L{IFileDescriptorReceiver.fileDescriptorReceived} once per received
  421. file descriptor. Scenario: unsupported CMSGs.
  422. """
  423. # Given that we can't just send random/invalid ancillary data via the
  424. # packer for it to be sent via sendmsg -- the kernel would not accept
  425. # it -- we'll temporarily replace recvmsg with a fake one that produces
  426. # a non-supported ancillary message level/type. This being said, from
  427. # the perspective of the ancillaryPacker, all that is required is to
  428. # let the test driver know that 0 file descriptors are expected.
  429. from twisted.python import sendmsg
  430. def ancillaryPacker(fdsToSend):
  431. ancillary = []
  432. expectedCount = 0
  433. return ancillary, expectedCount
  434. def fakeRecvmsgUnsupportedAncillary(skt, *args, **kwargs):
  435. data = b'some data'
  436. ancillary = [(None, None, b'')]
  437. flags = 0
  438. return sendmsg.RecievedMessage(data, ancillary, flags)
  439. events = []
  440. addObserver(events.append)
  441. self.addCleanup(removeObserver, events.append)
  442. self.patch(sendmsg, "recvmsg", fakeRecvmsgUnsupportedAncillary)
  443. self._sendmsgMixinFileDescriptorReceivedDriver(ancillaryPacker)
  444. # Verify the expected message was logged.
  445. expectedMessage = 'received unsupported ancillary data'
  446. found = any(expectedMessage in e['format'] for e in events)
  447. self.assertTrue(found, 'Expected message not found in logged events')
  448. if sendmsgSkip is not None:
  449. test_multiFileDescriptorReceivedPerRecvmsgBadCMSG.skip = sendmsgSkip
  450. def test_avoidLeakingFileDescriptors(self):
  451. """
  452. If associated with a protocol which does not provide
  453. L{IFileDescriptorReceiver}, file descriptors received by the
  454. L{IUNIXTransport} implementation are closed and a warning is emitted.
  455. """
  456. # To verify this, establish a connection. Send one end of the
  457. # connection over the IUNIXTransport implementation. After the copy
  458. # should no longer exist, close the original. If the opposite end of
  459. # the connection decides the connection is closed, the copy does not
  460. # exist.
  461. from socket import socketpair
  462. probeClient, probeServer = socketpair()
  463. events = []
  464. addObserver(events.append)
  465. self.addCleanup(removeObserver, events.append)
  466. class RecordEndpointAddresses(SendFileDescriptor):
  467. def connectionMade(self):
  468. self.hostAddress = self.transport.getHost()
  469. self.peerAddress = self.transport.getPeer()
  470. SendFileDescriptor.connectionMade(self)
  471. server = RecordEndpointAddresses(probeClient.fileno(), b"junk")
  472. client = ConnectableProtocol()
  473. runProtocolsWithReactor(self, server, client, self.endpoints)
  474. # Get rid of the original reference to the socket.
  475. probeClient.close()
  476. # A non-blocking recv will return "" if the connection is closed, as
  477. # desired. If the connection has not been closed, because the
  478. # duplicate file descriptor is still open, it will fail with EAGAIN
  479. # instead.
  480. probeServer.setblocking(False)
  481. self.assertEqual(b"", probeServer.recv(1024))
  482. # This is a surprising circumstance, so it should be logged.
  483. format = (
  484. "%(protocolName)s (on %(hostAddress)r) does not "
  485. "provide IFileDescriptorReceiver; closing file "
  486. "descriptor received (from %(peerAddress)r).")
  487. clsName = "ConnectableProtocol"
  488. # Reverse host and peer, since the log event is from the client
  489. # perspective.
  490. expectedEvent = dict(hostAddress=server.peerAddress,
  491. peerAddress=server.hostAddress,
  492. protocolName=clsName,
  493. format=format)
  494. for logEvent in events:
  495. for k, v in iteritems(expectedEvent):
  496. if v != logEvent.get(k):
  497. break
  498. else:
  499. # No mismatches were found, stop looking at events
  500. break
  501. else:
  502. # No fully matching events were found, fail the test.
  503. self.fail(
  504. "Expected event (%s) not found in logged events (%s)" % (
  505. expectedEvent, pformat(events,)))
  506. if sendmsgSkip is not None:
  507. test_avoidLeakingFileDescriptors.skip = sendmsgSkip
  508. def test_descriptorDeliveredBeforeBytes(self):
  509. """
  510. L{IUNIXTransport.sendFileDescriptor} sends file descriptors before
  511. L{ITransport.write} sends normal bytes.
  512. """
  513. @implementer(IFileDescriptorReceiver)
  514. class RecordEvents(ConnectableProtocol):
  515. def connectionMade(self):
  516. ConnectableProtocol.connectionMade(self)
  517. self.events = []
  518. def fileDescriptorReceived(innerSelf, descriptor):
  519. self.addCleanup(close, descriptor)
  520. innerSelf.events.append(type(descriptor))
  521. def dataReceived(self, data):
  522. self.events.extend(data)
  523. cargo = socket()
  524. server = SendFileDescriptor(cargo.fileno(), b"junk")
  525. client = RecordEvents()
  526. runProtocolsWithReactor(self, server, client, self.endpoints)
  527. self.assertEqual(int, client.events[0])
  528. if _PY3:
  529. self.assertEqual(b"junk", bytes(client.events[1:]))
  530. else:
  531. self.assertEqual(b"junk", b"".join(client.events[1:]))
  532. if sendmsgSkip is not None:
  533. test_descriptorDeliveredBeforeBytes.skip = sendmsgSkip
  534. class UNIXDatagramTestsBuilder(UNIXFamilyMixin, ReactorBuilder):
  535. """
  536. Builder defining tests relating to L{IReactorUNIXDatagram}.
  537. """
  538. requiredInterfaces = (interfaces.IReactorUNIXDatagram,)
  539. # There's no corresponding test_connectMode because the mode parameter to
  540. # connectUNIXDatagram has been completely ignored since that API was first
  541. # introduced.
  542. def test_listenMode(self):
  543. """
  544. The UNIX socket created by L{IReactorUNIXDatagram.listenUNIXDatagram}
  545. is created with the mode specified.
  546. """
  547. self._modeTest('listenUNIXDatagram', self.mktemp(), DatagramProtocol())
  548. def test_listenOnLinuxAbstractNamespace(self):
  549. """
  550. On Linux, a UNIX socket path may begin with C{'\0'} to indicate a socket
  551. in the abstract namespace. L{IReactorUNIX.listenUNIXDatagram} accepts
  552. such a path.
  553. """
  554. path = _abstractPath(self)
  555. reactor = self.buildReactor()
  556. port = reactor.listenUNIXDatagram('\0' + path, DatagramProtocol())
  557. self.assertEqual(port.getHost(), UNIXAddress('\0' + path))
  558. if not platform.isLinux():
  559. test_listenOnLinuxAbstractNamespace.skip = (
  560. 'Abstract namespace UNIX sockets only supported on Linux.')
  561. class UNIXPortTestsBuilder(ReactorBuilder, ObjectModelIntegrationMixin,
  562. StreamTransportTestsMixin):
  563. """
  564. Tests for L{IReactorUNIX.listenUnix}
  565. """
  566. requiredInterfaces = (interfaces.IReactorUNIX,)
  567. def getListeningPort(self, reactor, factory):
  568. """
  569. Get a UNIX port from a reactor
  570. """
  571. # self.mktemp() often returns a path which is too long to be used.
  572. path = mktemp(suffix='.sock', dir='.')
  573. return reactor.listenUNIX(path, factory)
  574. def getExpectedStartListeningLogMessage(self, port, factory):
  575. """
  576. Get the message expected to be logged when a UNIX port starts listening.
  577. """
  578. return "%s starting on %r" % (factory,
  579. nativeString(port.getHost().name))
  580. def getExpectedConnectionLostLogMsg(self, port):
  581. """
  582. Get the expected connection lost message for a UNIX port
  583. """
  584. return "(UNIX Port %s Closed)" % (nativeString(port.getHost().name),)
  585. globals().update(UNIXTestsBuilder.makeTestCaseClasses())
  586. globals().update(UNIXDatagramTestsBuilder.makeTestCaseClasses())
  587. globals().update(UNIXPortTestsBuilder.makeTestCaseClasses())
  588. class UnixClientTestsBuilder(ReactorBuilder, StreamClientTestsMixin):
  589. """
  590. Define tests for L{IReactorUNIX.connectUNIX}.
  591. """
  592. requiredInterfaces = (IReactorUNIX,)
  593. _path = None
  594. @property
  595. def path(self):
  596. """
  597. Return a path usable by C{connectUNIX} and C{listenUNIX}.
  598. @return: A path instance, built with C{_abstractPath}.
  599. """
  600. if self._path is None:
  601. self._path = _abstractPath(self)
  602. return self._path
  603. def listen(self, reactor, factory):
  604. """
  605. Start an UNIX server with the given C{factory}.
  606. @param reactor: The reactor to create the UNIX port in.
  607. @param factory: The server factory.
  608. @return: A UNIX port instance.
  609. """
  610. return reactor.listenUNIX(self.path, factory)
  611. def connect(self, reactor, factory):
  612. """
  613. Start an UNIX client with the given C{factory}.
  614. @param reactor: The reactor to create the connection in.
  615. @param factory: The client factory.
  616. @return: A UNIX connector instance.
  617. """
  618. return reactor.connectUNIX(self.path, factory)
  619. globals().update(UnixClientTestsBuilder.makeTestCaseClasses())