test_iocp.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.internet.iocpreactor}.
  5. """
  6. import errno
  7. from array import array
  8. from struct import pack
  9. from socket import AF_INET6, AF_INET, SOCK_STREAM, SOL_SOCKET, error, socket
  10. from zope.interface.verify import verifyClass
  11. from twisted.trial import unittest
  12. from twisted.python.log import msg
  13. from twisted.internet.interfaces import IPushProducer
  14. try:
  15. from twisted.internet.iocpreactor import iocpsupport as _iocp, tcp, udp
  16. from twisted.internet.iocpreactor.reactor import IOCPReactor, EVENTS_PER_LOOP, KEY_NORMAL
  17. from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
  18. from twisted.internet.iocpreactor.const import SO_UPDATE_ACCEPT_CONTEXT
  19. from twisted.internet.iocpreactor.abstract import FileHandle
  20. except ImportError:
  21. skip = 'This test only applies to IOCPReactor'
  22. try:
  23. socket(AF_INET6, SOCK_STREAM).close()
  24. except error as e:
  25. ipv6Skip = str(e)
  26. else:
  27. ipv6Skip = None
  28. class SupportTests(unittest.TestCase):
  29. """
  30. Tests for L{twisted.internet.iocpreactor.iocpsupport}, low-level reactor
  31. implementation helpers.
  32. """
  33. def _acceptAddressTest(self, family, localhost):
  34. """
  35. Create a C{SOCK_STREAM} connection to localhost using a socket with an
  36. address family of C{family} and assert that the result of
  37. L{iocpsupport.get_accept_addrs} is consistent with the result of
  38. C{socket.getsockname} and C{socket.getpeername}.
  39. """
  40. msg("family = %r" % (family,))
  41. port = socket(family, SOCK_STREAM)
  42. self.addCleanup(port.close)
  43. port.bind(('', 0))
  44. port.listen(1)
  45. client = socket(family, SOCK_STREAM)
  46. self.addCleanup(client.close)
  47. client.setblocking(False)
  48. try:
  49. client.connect((localhost, port.getsockname()[1]))
  50. except error as e:
  51. self.assertIn(e.errno, (errno.EINPROGRESS, errno.EWOULDBLOCK))
  52. server = socket(family, SOCK_STREAM)
  53. self.addCleanup(server.close)
  54. buff = array('B', b'\0' * 256)
  55. self.assertEqual(
  56. 0, _iocp.accept(port.fileno(), server.fileno(), buff, None))
  57. server.setsockopt(
  58. SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, pack('P', port.fileno()))
  59. self.assertEqual(
  60. (family, client.getpeername()[:2], client.getsockname()[:2]),
  61. _iocp.get_accept_addrs(server.fileno(), buff))
  62. def test_ipv4AcceptAddress(self):
  63. """
  64. L{iocpsupport.get_accept_addrs} returns a three-tuple of address
  65. information about the socket associated with the file descriptor passed
  66. to it. For a connection using IPv4:
  67. - the first element is C{AF_INET}
  68. - the second element is a two-tuple of a dotted decimal notation IPv4
  69. address and a port number giving the peer address of the connection
  70. - the third element is the same type giving the host address of the
  71. connection
  72. """
  73. self._acceptAddressTest(AF_INET, '127.0.0.1')
  74. def test_ipv6AcceptAddress(self):
  75. """
  76. Like L{test_ipv4AcceptAddress}, but for IPv6 connections. In this case:
  77. - the first element is C{AF_INET6}
  78. - the second element is a two-tuple of a hexadecimal IPv6 address
  79. literal and a port number giving the peer address of the connection
  80. - the third element is the same type giving the host address of the
  81. connection
  82. """
  83. self._acceptAddressTest(AF_INET6, '::1')
  84. if ipv6Skip is not None:
  85. test_ipv6AcceptAddress.skip = ipv6Skip
  86. class IOCPReactorTests(unittest.TestCase):
  87. def test_noPendingTimerEvents(self):
  88. """
  89. Test reactor behavior (doIteration) when there are no pending time
  90. events.
  91. """
  92. ir = IOCPReactor()
  93. ir.wakeUp()
  94. self.assertFalse(ir.doIteration(None))
  95. def test_reactorInterfaces(self):
  96. """
  97. Verify that IOCP socket-representing classes implement IReadWriteHandle
  98. """
  99. self.assertTrue(verifyClass(IReadWriteHandle, tcp.Connection))
  100. self.assertTrue(verifyClass(IReadWriteHandle, udp.Port))
  101. def test_fileHandleInterfaces(self):
  102. """
  103. Verify that L{Filehandle} implements L{IPushProducer}.
  104. """
  105. self.assertTrue(verifyClass(IPushProducer, FileHandle))
  106. def test_maxEventsPerIteration(self):
  107. """
  108. Verify that we don't lose an event when more than EVENTS_PER_LOOP
  109. events occur in the same reactor iteration
  110. """
  111. class FakeFD:
  112. counter = 0
  113. def logPrefix(self):
  114. return 'FakeFD'
  115. def cb(self, rc, bytes, evt):
  116. self.counter += 1
  117. ir = IOCPReactor()
  118. fd = FakeFD()
  119. event = _iocp.Event(fd.cb, fd)
  120. for _ in range(EVENTS_PER_LOOP + 1):
  121. ir.port.postEvent(0, KEY_NORMAL, event)
  122. ir.doIteration(None)
  123. self.assertEqual(fd.counter, EVENTS_PER_LOOP)
  124. ir.doIteration(0)
  125. self.assertEqual(fd.counter, EVENTS_PER_LOOP + 1)