test_telnet.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. # -*- test-case-name: twisted.conch.test.test_telnet -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Tests for L{twisted.conch.telnet}.
  6. """
  7. from __future__ import absolute_import, division
  8. from zope.interface import implementer
  9. from zope.interface.verify import verifyObject
  10. from twisted.internet import defer
  11. from twisted.conch import telnet
  12. from twisted.trial import unittest
  13. from twisted.test import proto_helpers
  14. from twisted.python.compat import iterbytes
  15. @implementer(telnet.ITelnetProtocol)
  16. class TestProtocol:
  17. localEnableable = ()
  18. remoteEnableable = ()
  19. def __init__(self):
  20. self.data = b''
  21. self.subcmd = []
  22. self.calls = []
  23. self.enabledLocal = []
  24. self.enabledRemote = []
  25. self.disabledLocal = []
  26. self.disabledRemote = []
  27. def makeConnection(self, transport):
  28. d = transport.negotiationMap = {}
  29. d[b'\x12'] = self.neg_TEST_COMMAND
  30. d = transport.commandMap = transport.commandMap.copy()
  31. for cmd in ('NOP', 'DM', 'BRK', 'IP', 'AO', 'AYT', 'EC', 'EL', 'GA'):
  32. d[getattr(telnet, cmd)] = lambda arg, cmd=cmd: self.calls.append(cmd)
  33. def dataReceived(self, data):
  34. self.data += data
  35. def connectionLost(self, reason):
  36. pass
  37. def neg_TEST_COMMAND(self, payload):
  38. self.subcmd = payload
  39. def enableLocal(self, option):
  40. if option in self.localEnableable:
  41. self.enabledLocal.append(option)
  42. return True
  43. return False
  44. def disableLocal(self, option):
  45. self.disabledLocal.append(option)
  46. def enableRemote(self, option):
  47. if option in self.remoteEnableable:
  48. self.enabledRemote.append(option)
  49. return True
  50. return False
  51. def disableRemote(self, option):
  52. self.disabledRemote.append(option)
  53. class InterfacesTests(unittest.TestCase):
  54. def test_interface(self):
  55. """
  56. L{telnet.TelnetProtocol} implements L{telnet.ITelnetProtocol}
  57. """
  58. p = telnet.TelnetProtocol()
  59. verifyObject(telnet.ITelnetProtocol, p)
  60. class TelnetTransportTests(unittest.TestCase):
  61. """
  62. Tests for L{telnet.TelnetTransport}.
  63. """
  64. def setUp(self):
  65. self.p = telnet.TelnetTransport(TestProtocol)
  66. self.t = proto_helpers.StringTransport()
  67. self.p.makeConnection(self.t)
  68. def testRegularBytes(self):
  69. # Just send a bunch of bytes. None of these do anything
  70. # with telnet. They should pass right through to the
  71. # application layer.
  72. h = self.p.protocol
  73. L = [b"here are some bytes la la la",
  74. b"some more arrive here",
  75. b"lots of bytes to play with",
  76. b"la la la",
  77. b"ta de da",
  78. b"dum"]
  79. for b in L:
  80. self.p.dataReceived(b)
  81. self.assertEqual(h.data, b''.join(L))
  82. def testNewlineHandling(self):
  83. # Send various kinds of newlines and make sure they get translated
  84. # into \n.
  85. h = self.p.protocol
  86. L = [b"here is the first line\r\n",
  87. b"here is the second line\r\0",
  88. b"here is the third line\r\n",
  89. b"here is the last line\r\0"]
  90. for b in L:
  91. self.p.dataReceived(b)
  92. self.assertEqual(h.data, L[0][:-2] + b'\n' +
  93. L[1][:-2] + b'\r' +
  94. L[2][:-2] + b'\n' +
  95. L[3][:-2] + b'\r')
  96. def testIACEscape(self):
  97. # Send a bunch of bytes and a couple quoted \xFFs. Unquoted,
  98. # \xFF is a telnet command. Quoted, one of them from each pair
  99. # should be passed through to the application layer.
  100. h = self.p.protocol
  101. L = [b"here are some bytes\xff\xff with an embedded IAC",
  102. b"and here is a test of a border escape\xff",
  103. b"\xff did you get that IAC?"]
  104. for b in L:
  105. self.p.dataReceived(b)
  106. self.assertEqual(h.data, b''.join(L).replace(b'\xff\xff', b'\xff'))
  107. def _simpleCommandTest(self, cmdName):
  108. # Send a single simple telnet command and make sure
  109. # it gets noticed and the appropriate method gets
  110. # called.
  111. h = self.p.protocol
  112. cmd = telnet.IAC + getattr(telnet, cmdName)
  113. L = [b"Here's some bytes, tra la la",
  114. b"But ono!" + cmd + b" an interrupt"]
  115. for b in L:
  116. self.p.dataReceived(b)
  117. self.assertEqual(h.calls, [cmdName])
  118. self.assertEqual(h.data, b''.join(L).replace(cmd, b''))
  119. def testInterrupt(self):
  120. self._simpleCommandTest("IP")
  121. def testNoOperation(self):
  122. self._simpleCommandTest("NOP")
  123. def testDataMark(self):
  124. self._simpleCommandTest("DM")
  125. def testBreak(self):
  126. self._simpleCommandTest("BRK")
  127. def testAbortOutput(self):
  128. self._simpleCommandTest("AO")
  129. def testAreYouThere(self):
  130. self._simpleCommandTest("AYT")
  131. def testEraseCharacter(self):
  132. self._simpleCommandTest("EC")
  133. def testEraseLine(self):
  134. self._simpleCommandTest("EL")
  135. def testGoAhead(self):
  136. self._simpleCommandTest("GA")
  137. def testSubnegotiation(self):
  138. # Send a subnegotiation command and make sure it gets
  139. # parsed and that the correct method is called.
  140. h = self.p.protocol
  141. cmd = telnet.IAC + telnet.SB + b'\x12hello world' + telnet.IAC + telnet.SE
  142. L = [b"These are some bytes but soon" + cmd,
  143. b"there will be some more"]
  144. for b in L:
  145. self.p.dataReceived(b)
  146. self.assertEqual(h.data, b''.join(L).replace(cmd, b''))
  147. self.assertEqual(h.subcmd, list(iterbytes(b"hello world")))
  148. def testSubnegotiationWithEmbeddedSE(self):
  149. # Send a subnegotiation command with an embedded SE. Make sure
  150. # that SE gets passed to the correct method.
  151. h = self.p.protocol
  152. cmd = (telnet.IAC + telnet.SB +
  153. b'\x12' + telnet.SE +
  154. telnet.IAC + telnet.SE)
  155. L = [b"Some bytes are here" + cmd + b"and here",
  156. b"and here"]
  157. for b in L:
  158. self.p.dataReceived(b)
  159. self.assertEqual(h.data, b''.join(L).replace(cmd, b''))
  160. self.assertEqual(h.subcmd, [telnet.SE])
  161. def testBoundarySubnegotiation(self):
  162. # Send a subnegotiation command. Split it at every possible byte boundary
  163. # and make sure it always gets parsed and that it is passed to the correct
  164. # method.
  165. cmd = (telnet.IAC + telnet.SB +
  166. b'\x12' + telnet.SE + b'hello' +
  167. telnet.IAC + telnet.SE)
  168. for i in range(len(cmd)):
  169. h = self.p.protocol = TestProtocol()
  170. h.makeConnection(self.p)
  171. a, b = cmd[:i], cmd[i:]
  172. L = [b"first part" + a,
  173. b + b"last part"]
  174. for data in L:
  175. self.p.dataReceived(data)
  176. self.assertEqual(h.data, b''.join(L).replace(cmd, b''))
  177. self.assertEqual(h.subcmd, [telnet.SE] + list(iterbytes(b'hello')))
  178. def _enabledHelper(self, o, eL=[], eR=[], dL=[], dR=[]):
  179. self.assertEqual(o.enabledLocal, eL)
  180. self.assertEqual(o.enabledRemote, eR)
  181. self.assertEqual(o.disabledLocal, dL)
  182. self.assertEqual(o.disabledRemote, dR)
  183. def testRefuseWill(self):
  184. # Try to enable an option. The server should refuse to enable it.
  185. cmd = telnet.IAC + telnet.WILL + b'\x12'
  186. data = b"surrounding bytes" + cmd + b"to spice things up"
  187. self.p.dataReceived(data)
  188. self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
  189. self.assertEqual(self.t.value(), telnet.IAC + telnet.DONT + b'\x12')
  190. self._enabledHelper(self.p.protocol)
  191. def testRefuseDo(self):
  192. # Try to enable an option. The server should refuse to enable it.
  193. cmd = telnet.IAC + telnet.DO + b'\x12'
  194. data = b"surrounding bytes" + cmd + b"to spice things up"
  195. self.p.dataReceived(data)
  196. self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
  197. self.assertEqual(self.t.value(), telnet.IAC + telnet.WONT + b'\x12')
  198. self._enabledHelper(self.p.protocol)
  199. def testAcceptDo(self):
  200. # Try to enable an option. The option is in our allowEnable
  201. # list, so we will allow it to be enabled.
  202. cmd = telnet.IAC + telnet.DO + b'\x19'
  203. data = b'padding' + cmd + b'trailer'
  204. h = self.p.protocol
  205. h.localEnableable = (b'\x19',)
  206. self.p.dataReceived(data)
  207. self.assertEqual(self.t.value(), telnet.IAC + telnet.WILL + b'\x19')
  208. self._enabledHelper(h, eL=[b'\x19'])
  209. def testAcceptWill(self):
  210. # Same as testAcceptDo, but reversed.
  211. cmd = telnet.IAC + telnet.WILL + b'\x91'
  212. data = b'header' + cmd + b'padding'
  213. h = self.p.protocol
  214. h.remoteEnableable = (b'\x91',)
  215. self.p.dataReceived(data)
  216. self.assertEqual(self.t.value(), telnet.IAC + telnet.DO + b'\x91')
  217. self._enabledHelper(h, eR=[b'\x91'])
  218. def testAcceptWont(self):
  219. # Try to disable an option. The server must allow any option to
  220. # be disabled at any time. Make sure it disables it and sends
  221. # back an acknowledgement of this.
  222. cmd = telnet.IAC + telnet.WONT + b'\x29'
  223. # Jimmy it - after these two lines, the server will be in a state
  224. # such that it believes the option to have been previously enabled
  225. # via normal negotiation.
  226. s = self.p.getOptionState(b'\x29')
  227. s.him.state = 'yes'
  228. data = b"fiddle dee" + cmd
  229. self.p.dataReceived(data)
  230. self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
  231. self.assertEqual(self.t.value(), telnet.IAC + telnet.DONT + b'\x29')
  232. self.assertEqual(s.him.state, 'no')
  233. self._enabledHelper(self.p.protocol, dR=[b'\x29'])
  234. def testAcceptDont(self):
  235. # Try to disable an option. The server must allow any option to
  236. # be disabled at any time. Make sure it disables it and sends
  237. # back an acknowledgement of this.
  238. cmd = telnet.IAC + telnet.DONT + b'\x29'
  239. # Jimmy it - after these two lines, the server will be in a state
  240. # such that it believes the option to have beenp previously enabled
  241. # via normal negotiation.
  242. s = self.p.getOptionState(b'\x29')
  243. s.us.state = 'yes'
  244. data = b"fiddle dum " + cmd
  245. self.p.dataReceived(data)
  246. self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
  247. self.assertEqual(self.t.value(), telnet.IAC + telnet.WONT + b'\x29')
  248. self.assertEqual(s.us.state, 'no')
  249. self._enabledHelper(self.p.protocol, dL=[b'\x29'])
  250. def testIgnoreWont(self):
  251. # Try to disable an option. The option is already disabled. The
  252. # server should send nothing in response to this.
  253. cmd = telnet.IAC + telnet.WONT + b'\x47'
  254. data = b"dum de dum" + cmd + b"tra la la"
  255. self.p.dataReceived(data)
  256. self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
  257. self.assertEqual(self.t.value(), b'')
  258. self._enabledHelper(self.p.protocol)
  259. def testIgnoreDont(self):
  260. # Try to disable an option. The option is already disabled. The
  261. # server should send nothing in response to this. Doing so could
  262. # lead to a negotiation loop.
  263. cmd = telnet.IAC + telnet.DONT + b'\x47'
  264. data = b"dum de dum" + cmd + b"tra la la"
  265. self.p.dataReceived(data)
  266. self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
  267. self.assertEqual(self.t.value(), b'')
  268. self._enabledHelper(self.p.protocol)
  269. def testIgnoreWill(self):
  270. # Try to enable an option. The option is already enabled. The
  271. # server should send nothing in response to this. Doing so could
  272. # lead to a negotiation loop.
  273. cmd = telnet.IAC + telnet.WILL + b'\x56'
  274. # Jimmy it - after these two lines, the server will be in a state
  275. # such that it believes the option to have been previously enabled
  276. # via normal negotiation.
  277. s = self.p.getOptionState(b'\x56')
  278. s.him.state = 'yes'
  279. data = b"tra la la" + cmd + b"dum de dum"
  280. self.p.dataReceived(data)
  281. self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
  282. self.assertEqual(self.t.value(), b'')
  283. self._enabledHelper(self.p.protocol)
  284. def testIgnoreDo(self):
  285. # Try to enable an option. The option is already enabled. The
  286. # server should send nothing in response to this. Doing so could
  287. # lead to a negotiation loop.
  288. cmd = telnet.IAC + telnet.DO + b'\x56'
  289. # Jimmy it - after these two lines, the server will be in a state
  290. # such that it believes the option to have been previously enabled
  291. # via normal negotiation.
  292. s = self.p.getOptionState(b'\x56')
  293. s.us.state = 'yes'
  294. data = b"tra la la" + cmd + b"dum de dum"
  295. self.p.dataReceived(data)
  296. self.assertEqual(self.p.protocol.data, data.replace(cmd, b''))
  297. self.assertEqual(self.t.value(), b'')
  298. self._enabledHelper(self.p.protocol)
  299. def testAcceptedEnableRequest(self):
  300. # Try to enable an option through the user-level API. This
  301. # returns a Deferred that fires when negotiation about the option
  302. # finishes. Make sure it fires, make sure state gets updated
  303. # properly, make sure the result indicates the option was enabled.
  304. d = self.p.do(b'\x42')
  305. h = self.p.protocol
  306. h.remoteEnableable = (b'\x42',)
  307. self.assertEqual(self.t.value(), telnet.IAC + telnet.DO + b'\x42')
  308. self.p.dataReceived(telnet.IAC + telnet.WILL + b'\x42')
  309. d.addCallback(self.assertEqual, True)
  310. d.addCallback(lambda _: self._enabledHelper(h, eR=[b'\x42']))
  311. return d
  312. def test_refusedEnableRequest(self):
  313. """
  314. If the peer refuses to enable an option we request it to enable, the
  315. L{Deferred} returned by L{TelnetProtocol.do} fires with an
  316. L{OptionRefused} L{Failure}.
  317. """
  318. # Try to enable an option through the user-level API. This returns a
  319. # Deferred that fires when negotiation about the option finishes. Make
  320. # sure it fires, make sure state gets updated properly, make sure the
  321. # result indicates the option was enabled.
  322. self.p.protocol.remoteEnableable = (b'\x42',)
  323. d = self.p.do(b'\x42')
  324. self.assertEqual(self.t.value(), telnet.IAC + telnet.DO + b'\x42')
  325. s = self.p.getOptionState(b'\x42')
  326. self.assertEqual(s.him.state, 'no')
  327. self.assertEqual(s.us.state, 'no')
  328. self.assertTrue(s.him.negotiating)
  329. self.assertFalse(s.us.negotiating)
  330. self.p.dataReceived(telnet.IAC + telnet.WONT + b'\x42')
  331. d = self.assertFailure(d, telnet.OptionRefused)
  332. d.addCallback(lambda ignored: self._enabledHelper(self.p.protocol))
  333. d.addCallback(
  334. lambda ignored: self.assertFalse(s.him.negotiating))
  335. return d
  336. def test_refusedEnableOffer(self):
  337. """
  338. If the peer refuses to allow us to enable an option, the L{Deferred}
  339. returned by L{TelnetProtocol.will} fires with an L{OptionRefused}
  340. L{Failure}.
  341. """
  342. # Try to offer an option through the user-level API. This returns a
  343. # Deferred that fires when negotiation about the option finishes. Make
  344. # sure it fires, make sure state gets updated properly, make sure the
  345. # result indicates the option was enabled.
  346. self.p.protocol.localEnableable = (b'\x42',)
  347. d = self.p.will(b'\x42')
  348. self.assertEqual(self.t.value(), telnet.IAC + telnet.WILL + b'\x42')
  349. s = self.p.getOptionState(b'\x42')
  350. self.assertEqual(s.him.state, 'no')
  351. self.assertEqual(s.us.state, 'no')
  352. self.assertFalse(s.him.negotiating)
  353. self.assertTrue(s.us.negotiating)
  354. self.p.dataReceived(telnet.IAC + telnet.DONT + b'\x42')
  355. d = self.assertFailure(d, telnet.OptionRefused)
  356. d.addCallback(lambda ignored: self._enabledHelper(self.p.protocol))
  357. d.addCallback(
  358. lambda ignored: self.assertFalse(s.us.negotiating))
  359. return d
  360. def testAcceptedDisableRequest(self):
  361. # Try to disable an option through the user-level API. This
  362. # returns a Deferred that fires when negotiation about the option
  363. # finishes. Make sure it fires, make sure state gets updated
  364. # properly, make sure the result indicates the option was enabled.
  365. s = self.p.getOptionState(b'\x42')
  366. s.him.state = 'yes'
  367. d = self.p.dont(b'\x42')
  368. self.assertEqual(self.t.value(), telnet.IAC + telnet.DONT + b'\x42')
  369. self.p.dataReceived(telnet.IAC + telnet.WONT + b'\x42')
  370. d.addCallback(self.assertEqual, True)
  371. d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
  372. dR=[b'\x42']))
  373. return d
  374. def testNegotiationBlocksFurtherNegotiation(self):
  375. # Try to disable an option, then immediately try to enable it, then
  376. # immediately try to disable it. Ensure that the 2nd and 3rd calls
  377. # fail quickly with the right exception.
  378. s = self.p.getOptionState(b'\x24')
  379. s.him.state = 'yes'
  380. self.p.dont(b'\x24') # fires after the first line of _final
  381. def _do(x):
  382. d = self.p.do(b'\x24')
  383. return self.assertFailure(d, telnet.AlreadyNegotiating)
  384. def _dont(x):
  385. d = self.p.dont(b'\x24')
  386. return self.assertFailure(d, telnet.AlreadyNegotiating)
  387. def _final(x):
  388. self.p.dataReceived(telnet.IAC + telnet.WONT + b'\x24')
  389. # an assertion that only passes if d2 has fired
  390. self._enabledHelper(self.p.protocol, dR=[b'\x24'])
  391. # Make sure we allow this
  392. self.p.protocol.remoteEnableable = (b'\x24',)
  393. d = self.p.do(b'\x24')
  394. self.p.dataReceived(telnet.IAC + telnet.WILL + b'\x24')
  395. d.addCallback(self.assertEqual, True)
  396. d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
  397. eR=[b'\x24'],
  398. dR=[b'\x24']))
  399. return d
  400. d = _do(None)
  401. d.addCallback(_dont)
  402. d.addCallback(_final)
  403. return d
  404. def testSuperfluousDisableRequestRaises(self):
  405. # Try to disable a disabled option. Make sure it fails properly.
  406. d = self.p.dont(b'\xab')
  407. return self.assertFailure(d, telnet.AlreadyDisabled)
  408. def testSuperfluousEnableRequestRaises(self):
  409. # Try to disable a disabled option. Make sure it fails properly.
  410. s = self.p.getOptionState(b'\xab')
  411. s.him.state = 'yes'
  412. d = self.p.do(b'\xab')
  413. return self.assertFailure(d, telnet.AlreadyEnabled)
  414. def testLostConnectionFailsDeferreds(self):
  415. d1 = self.p.do(b'\x12')
  416. d2 = self.p.do(b'\x23')
  417. d3 = self.p.do(b'\x34')
  418. class TestException(Exception):
  419. pass
  420. self.p.connectionLost(TestException("Total failure!"))
  421. d1 = self.assertFailure(d1, TestException)
  422. d2 = self.assertFailure(d2, TestException)
  423. d3 = self.assertFailure(d3, TestException)
  424. return defer.gatherResults([d1, d2, d3])
  425. class TestTelnet(telnet.Telnet):
  426. """
  427. A trivial extension of the telnet protocol class useful to unit tests.
  428. """
  429. def __init__(self):
  430. telnet.Telnet.__init__(self)
  431. self.events = []
  432. def applicationDataReceived(self, data):
  433. """
  434. Record the given data in C{self.events}.
  435. """
  436. self.events.append(('bytes', data))
  437. def unhandledCommand(self, command, data):
  438. """
  439. Record the given command in C{self.events}.
  440. """
  441. self.events.append(('command', command, data))
  442. def unhandledSubnegotiation(self, command, data):
  443. """
  444. Record the given subnegotiation command in C{self.events}.
  445. """
  446. self.events.append(('negotiate', command, data))
  447. class TelnetTests(unittest.TestCase):
  448. """
  449. Tests for L{telnet.Telnet}.
  450. L{telnet.Telnet} implements the TELNET protocol (RFC 854), including option
  451. and suboption negotiation, and option state tracking.
  452. """
  453. def setUp(self):
  454. """
  455. Create an unconnected L{telnet.Telnet} to be used by tests.
  456. """
  457. self.protocol = TestTelnet()
  458. def test_enableLocal(self):
  459. """
  460. L{telnet.Telnet.enableLocal} should reject all options, since
  461. L{telnet.Telnet} does not know how to implement any options.
  462. """
  463. self.assertFalse(self.protocol.enableLocal(b'\0'))
  464. def test_enableRemote(self):
  465. """
  466. L{telnet.Telnet.enableRemote} should reject all options, since
  467. L{telnet.Telnet} does not know how to implement any options.
  468. """
  469. self.assertFalse(self.protocol.enableRemote(b'\0'))
  470. def test_disableLocal(self):
  471. """
  472. It is an error for L{telnet.Telnet.disableLocal} to be called, since
  473. L{telnet.Telnet.enableLocal} will never allow any options to be enabled
  474. locally. If a subclass overrides enableLocal, it must also override
  475. disableLocal.
  476. """
  477. self.assertRaises(NotImplementedError, self.protocol.disableLocal, b'\0')
  478. def test_disableRemote(self):
  479. """
  480. It is an error for L{telnet.Telnet.disableRemote} to be called, since
  481. L{telnet.Telnet.enableRemote} will never allow any options to be
  482. enabled remotely. If a subclass overrides enableRemote, it must also
  483. override disableRemote.
  484. """
  485. self.assertRaises(NotImplementedError, self.protocol.disableRemote, b'\0')
  486. def test_requestNegotiation(self):
  487. """
  488. L{telnet.Telnet.requestNegotiation} formats the feature byte and the
  489. payload bytes into the subnegotiation format and sends them.
  490. See RFC 855.
  491. """
  492. transport = proto_helpers.StringTransport()
  493. self.protocol.makeConnection(transport)
  494. self.protocol.requestNegotiation(b'\x01', b'\x02\x03')
  495. self.assertEqual(
  496. transport.value(),
  497. # IAC SB feature bytes IAC SE
  498. b'\xff\xfa\x01\x02\x03\xff\xf0')
  499. def test_requestNegotiationEscapesIAC(self):
  500. """
  501. If the payload for a subnegotiation includes I{IAC}, it is escaped by
  502. L{telnet.Telnet.requestNegotiation} with another I{IAC}.
  503. See RFC 855.
  504. """
  505. transport = proto_helpers.StringTransport()
  506. self.protocol.makeConnection(transport)
  507. self.protocol.requestNegotiation(b'\x01', b'\xff')
  508. self.assertEqual(
  509. transport.value(),
  510. b'\xff\xfa\x01\xff\xff\xff\xf0')
  511. def _deliver(self, data, *expected):
  512. """
  513. Pass the given bytes to the protocol's C{dataReceived} method and
  514. assert that the given events occur.
  515. """
  516. received = self.protocol.events = []
  517. self.protocol.dataReceived(data)
  518. self.assertEqual(received, list(expected))
  519. def test_oneApplicationDataByte(self):
  520. """
  521. One application-data byte in the default state gets delivered right
  522. away.
  523. """
  524. self._deliver(b'a', ('bytes', b'a'))
  525. def test_twoApplicationDataBytes(self):
  526. """
  527. Two application-data bytes in the default state get delivered
  528. together.
  529. """
  530. self._deliver(b'bc', ('bytes', b'bc'))
  531. def test_threeApplicationDataBytes(self):
  532. """
  533. Three application-data bytes followed by a control byte get
  534. delivered, but the control byte doesn't.
  535. """
  536. self._deliver(b'def' + telnet.IAC, ('bytes', b'def'))
  537. def test_escapedControl(self):
  538. """
  539. IAC in the escaped state gets delivered and so does another
  540. application-data byte following it.
  541. """
  542. self._deliver(telnet.IAC)
  543. self._deliver(telnet.IAC + b'g', ('bytes', telnet.IAC + b'g'))
  544. def test_carriageReturn(self):
  545. """
  546. A carriage return only puts the protocol into the newline state. A
  547. linefeed in the newline state causes just the newline to be
  548. delivered. A nul in the newline state causes a carriage return to
  549. be delivered. An IAC in the newline state causes a carriage return
  550. to be delivered and puts the protocol into the escaped state.
  551. Anything else causes a carriage return and that thing to be
  552. delivered.
  553. """
  554. self._deliver(b'\r')
  555. self._deliver(b'\n', ('bytes', b'\n'))
  556. self._deliver(b'\r\n', ('bytes', b'\n'))
  557. self._deliver(b'\r')
  558. self._deliver(b'\0', ('bytes', b'\r'))
  559. self._deliver(b'\r\0', ('bytes', b'\r'))
  560. self._deliver(b'\r')
  561. self._deliver(b'a', ('bytes', b'\ra'))
  562. self._deliver(b'\ra', ('bytes', b'\ra'))
  563. self._deliver(b'\r')
  564. self._deliver(
  565. telnet.IAC + telnet.IAC + b'x', ('bytes', b'\r' + telnet.IAC + b'x'))
  566. def test_applicationDataBeforeSimpleCommand(self):
  567. """
  568. Application bytes received before a command are delivered before the
  569. command is processed.
  570. """
  571. self._deliver(
  572. b'x' + telnet.IAC + telnet.NOP,
  573. ('bytes', b'x'), ('command', telnet.NOP, None))
  574. def test_applicationDataBeforeCommand(self):
  575. """
  576. Application bytes received before a WILL/WONT/DO/DONT are delivered
  577. before the command is processed.
  578. """
  579. self.protocol.commandMap = {}
  580. self._deliver(
  581. b'y' + telnet.IAC + telnet.WILL + b'\x00',
  582. ('bytes', b'y'), ('command', telnet.WILL, b'\x00'))
  583. def test_applicationDataBeforeSubnegotiation(self):
  584. """
  585. Application bytes received before a subnegotiation command are
  586. delivered before the negotiation is processed.
  587. """
  588. self._deliver(
  589. b'z' + telnet.IAC + telnet.SB + b'Qx' + telnet.IAC + telnet.SE,
  590. ('bytes', b'z'), ('negotiate', b'Q', [b'x']))