test_ethernet.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. from twisted.trial import unittest
  4. from twisted.python import components
  5. from twisted.pair import ethernet, raw
  6. from zope.interface import implementer
  7. @implementer(raw.IRawPacketProtocol)
  8. class MyProtocol:
  9. def __init__(self, expecting):
  10. self.expecting = list(expecting)
  11. def datagramReceived(self, data, **kw):
  12. assert self.expecting, 'Got a packet when not expecting anymore.'
  13. expect = self.expecting.pop(0)
  14. assert expect == (data, kw), \
  15. "Expected %r, got %r" % (
  16. expect, (data, kw),
  17. )
  18. class EthernetTests(unittest.TestCase):
  19. def testPacketParsing(self):
  20. proto = ethernet.EthernetProtocol()
  21. p1 = MyProtocol([
  22. (b'foobar', {
  23. 'partial': 0,
  24. 'dest': b"123456",
  25. 'source': b"987654",
  26. 'protocol': 0x0800,
  27. }),
  28. ])
  29. proto.addProto(0x0800, p1)
  30. proto.datagramReceived(b"123456987654\x08\x00foobar",
  31. partial=0)
  32. assert not p1.expecting, \
  33. 'Should not expect any more packets, but still want %r' % p1.expecting
  34. def testMultiplePackets(self):
  35. proto = ethernet.EthernetProtocol()
  36. p1 = MyProtocol([
  37. (b'foobar', {
  38. 'partial': 0,
  39. 'dest': b"123456",
  40. 'source': b"987654",
  41. 'protocol': 0x0800,
  42. }),
  43. (b'quux', {
  44. 'partial': 1,
  45. 'dest': b"012345",
  46. 'source': b"abcdef",
  47. 'protocol': 0x0800,
  48. }),
  49. ])
  50. proto.addProto(0x0800, p1)
  51. proto.datagramReceived(b"123456987654\x08\x00foobar",
  52. partial=0)
  53. proto.datagramReceived(b"012345abcdef\x08\x00quux",
  54. partial=1)
  55. assert not p1.expecting, \
  56. 'Should not expect any more packets, but still want %r' % p1.expecting
  57. def testMultipleSameProtos(self):
  58. proto = ethernet.EthernetProtocol()
  59. p1 = MyProtocol([
  60. (b'foobar', {
  61. 'partial': 0,
  62. 'dest': b"123456",
  63. 'source': b"987654",
  64. 'protocol': 0x0800,
  65. }),
  66. ])
  67. p2 = MyProtocol([
  68. (b'foobar', {
  69. 'partial': 0,
  70. 'dest': b"123456",
  71. 'source': b"987654",
  72. 'protocol': 0x0800,
  73. }),
  74. ])
  75. proto.addProto(0x0800, p1)
  76. proto.addProto(0x0800, p2)
  77. proto.datagramReceived(b"123456987654\x08\x00foobar",
  78. partial=0)
  79. assert not p1.expecting, \
  80. 'Should not expect any more packets, but still want %r' % p1.expecting
  81. assert not p2.expecting, \
  82. 'Should not expect any more packets, but still want %r' % p2.expecting
  83. def testWrongProtoNotSeen(self):
  84. proto = ethernet.EthernetProtocol()
  85. p1 = MyProtocol([])
  86. proto.addProto(0x0801, p1)
  87. proto.datagramReceived(b"123456987654\x08\x00foobar",
  88. partial=0)
  89. proto.datagramReceived(b"012345abcdef\x08\x00quux",
  90. partial=1)
  91. def testDemuxing(self):
  92. proto = ethernet.EthernetProtocol()
  93. p1 = MyProtocol([
  94. (b'foobar', {
  95. 'partial': 0,
  96. 'dest': b"123456",
  97. 'source': b"987654",
  98. 'protocol': 0x0800,
  99. }),
  100. (b'quux', {
  101. 'partial': 1,
  102. 'dest': b"012345",
  103. 'source': b"abcdef",
  104. 'protocol': 0x0800,
  105. }),
  106. ])
  107. proto.addProto(0x0800, p1)
  108. p2 = MyProtocol([
  109. (b'quux', {
  110. 'partial': 1,
  111. 'dest': b"012345",
  112. 'source': b"abcdef",
  113. 'protocol': 0x0806,
  114. }),
  115. (b'foobar', {
  116. 'partial': 0,
  117. 'dest': b"123456",
  118. 'source': b"987654",
  119. 'protocol': 0x0806,
  120. }),
  121. ])
  122. proto.addProto(0x0806, p2)
  123. proto.datagramReceived(b"123456987654\x08\x00foobar",
  124. partial=0)
  125. proto.datagramReceived(b"012345abcdef\x08\x06quux",
  126. partial=1)
  127. proto.datagramReceived(b"123456987654\x08\x06foobar",
  128. partial=0)
  129. proto.datagramReceived(b"012345abcdef\x08\x00quux",
  130. partial=1)
  131. assert not p1.expecting, \
  132. 'Should not expect any more packets, but still want %r' % p1.expecting
  133. assert not p2.expecting, \
  134. 'Should not expect any more packets, but still want %r' % p2.expecting
  135. def testAddingBadProtos_WrongLevel(self):
  136. """Adding a wrong level protocol raises an exception."""
  137. e = ethernet.EthernetProtocol()
  138. try:
  139. e.addProto(42, "silliness")
  140. except components.CannotAdapt:
  141. pass
  142. else:
  143. raise AssertionError('addProto must raise an exception for bad protocols')
  144. def testAddingBadProtos_TooSmall(self):
  145. """Adding a protocol with a negative number raises an exception."""
  146. e = ethernet.EthernetProtocol()
  147. try:
  148. e.addProto(-1, MyProtocol([]))
  149. except TypeError as e:
  150. if e.args == ('Added protocol must be positive or zero',):
  151. pass
  152. else:
  153. raise
  154. else:
  155. raise AssertionError('addProto must raise an exception for bad protocols')
  156. def testAddingBadProtos_TooBig(self):
  157. """Adding a protocol with a number >=2**16 raises an exception."""
  158. e = ethernet.EthernetProtocol()
  159. try:
  160. e.addProto(2**16, MyProtocol([]))
  161. except TypeError as e:
  162. if e.args == ('Added protocol must fit in 16 bits',):
  163. pass
  164. else:
  165. raise
  166. else:
  167. raise AssertionError('addProto must raise an exception for bad protocols')
  168. def testAddingBadProtos_TooBig2(self):
  169. """Adding a protocol with a number >=2**16 raises an exception."""
  170. e = ethernet.EthernetProtocol()
  171. try:
  172. e.addProto(2**16+1, MyProtocol([]))
  173. except TypeError as e:
  174. if e.args == ('Added protocol must fit in 16 bits',):
  175. pass
  176. else:
  177. raise
  178. else:
  179. raise AssertionError('addProto must raise an exception for bad protocols')