test_recvline.py 25 KB


  1. # -*- test-case-name: twisted.conch.test.test_recvline -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Tests for L{twisted.conch.recvline} and fixtures for testing related
  6. functionality.
  7. """
  8. import os
  9. import sys
  10. from twisted.conch.insults import insults
  11. from twisted.conch import recvline
  12. from twisted.python import reflect, components, filepath
  13. from twisted.python.compat import iterbytes, bytesEnviron
  14. from twisted.python.runtime import platform
  15. from twisted.internet import defer, error
  16. from twisted.trial import unittest
  17. from twisted.cred import portal
  18. from twisted.test.proto_helpers import StringTransport
  19. if platform.isWindows():
  20. properEnv = dict(os.environ)
  21. properEnv["PYTHONPATH"] = os.pathsep.join(sys.path)
  22. else:
  23. properEnv = bytesEnviron()
  24. properEnv[b"PYTHONPATH"] = os.pathsep.join(sys.path).encode(
  25. sys.getfilesystemencoding())
  26. class ArrowsTests(unittest.TestCase):
  27. def setUp(self):
  28. self.underlyingTransport = StringTransport()
  29. self.pt = insults.ServerProtocol()
  30. self.p = recvline.HistoricRecvLine()
  31. self.pt.protocolFactory = lambda: self.p
  32. self.pt.factory = self
  33. self.pt.makeConnection(self.underlyingTransport)
  34. def test_printableCharacters(self):
  35. """
  36. When L{HistoricRecvLine} receives a printable character,
  37. it adds it to the current line buffer.
  38. """
  39. self.p.keystrokeReceived(b'x', None)
  40. self.p.keystrokeReceived(b'y', None)
  41. self.p.keystrokeReceived(b'z', None)
  42. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  43. def test_horizontalArrows(self):
  44. """
  45. When L{HistoricRecvLine} receives a LEFT_ARROW or
  46. RIGHT_ARROW keystroke it moves the cursor left or right
  47. in the current line buffer, respectively.
  48. """
  49. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  50. for ch in iterbytes(b'xyz'):
  51. kR(ch)
  52. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  53. kR(self.pt.RIGHT_ARROW)
  54. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  55. kR(self.pt.LEFT_ARROW)
  56. self.assertEqual(self.p.currentLineBuffer(), (b'xy', b'z'))
  57. kR(self.pt.LEFT_ARROW)
  58. self.assertEqual(self.p.currentLineBuffer(), (b'x', b'yz'))
  59. kR(self.pt.LEFT_ARROW)
  60. self.assertEqual(self.p.currentLineBuffer(), (b'', b'xyz'))
  61. kR(self.pt.LEFT_ARROW)
  62. self.assertEqual(self.p.currentLineBuffer(), (b'', b'xyz'))
  63. kR(self.pt.RIGHT_ARROW)
  64. self.assertEqual(self.p.currentLineBuffer(), (b'x', b'yz'))
  65. kR(self.pt.RIGHT_ARROW)
  66. self.assertEqual(self.p.currentLineBuffer(), (b'xy', b'z'))
  67. kR(self.pt.RIGHT_ARROW)
  68. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  69. kR(self.pt.RIGHT_ARROW)
  70. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  71. def test_newline(self):
  72. """
  73. When {HistoricRecvLine} receives a newline, it adds the current
  74. line buffer to the end of its history buffer.
  75. """
  76. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  77. for ch in iterbytes(b'xyz\nabc\n123\n'):
  78. kR(ch)
  79. self.assertEqual(self.p.currentHistoryBuffer(),
  80. ((b'xyz', b'abc', b'123'), ()))
  81. kR(b'c')
  82. kR(b'b')
  83. kR(b'a')
  84. self.assertEqual(self.p.currentHistoryBuffer(),
  85. ((b'xyz', b'abc', b'123'), ()))
  86. kR(b'\n')
  87. self.assertEqual(self.p.currentHistoryBuffer(),
  88. ((b'xyz', b'abc', b'123', b'cba'), ()))
  89. def test_verticalArrows(self):
  90. """
  91. When L{HistoricRecvLine} receives UP_ARROW or DOWN_ARROW
  92. keystrokes it move the current index in the current history
  93. buffer up or down, and resets the current line buffer to the
  94. previous or next line in history, respectively for each.
  95. """
  96. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  97. for ch in iterbytes(b'xyz\nabc\n123\n'):
  98. kR(ch)
  99. self.assertEqual(self.p.currentHistoryBuffer(),
  100. ((b'xyz', b'abc', b'123'), ()))
  101. self.assertEqual(self.p.currentLineBuffer(), (b'', b''))
  102. kR(self.pt.UP_ARROW)
  103. self.assertEqual(self.p.currentHistoryBuffer(),
  104. ((b'xyz', b'abc'), (b'123',)))
  105. self.assertEqual(self.p.currentLineBuffer(), (b'123', b''))
  106. kR(self.pt.UP_ARROW)
  107. self.assertEqual(self.p.currentHistoryBuffer(),
  108. ((b'xyz',), (b'abc', b'123')))
  109. self.assertEqual(self.p.currentLineBuffer(), (b'abc', b''))
  110. kR(self.pt.UP_ARROW)
  111. self.assertEqual(self.p.currentHistoryBuffer(),
  112. ((), (b'xyz', b'abc', b'123')))
  113. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  114. kR(self.pt.UP_ARROW)
  115. self.assertEqual(self.p.currentHistoryBuffer(),
  116. ((), (b'xyz', b'abc', b'123')))
  117. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  118. for i in range(4):
  119. kR(self.pt.DOWN_ARROW)
  120. self.assertEqual(self.p.currentHistoryBuffer(),
  121. ((b'xyz', b'abc', b'123'), ()))
  122. def test_home(self):
  123. """
  124. When L{HistoricRecvLine} receives a HOME keystroke it moves the
  125. cursor to the beginning of the current line buffer.
  126. """
  127. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  128. for ch in iterbytes(b'hello, world'):
  129. kR(ch)
  130. self.assertEqual(self.p.currentLineBuffer(), (b'hello, world', b''))
  131. kR(self.pt.HOME)
  132. self.assertEqual(self.p.currentLineBuffer(), (b'', b'hello, world'))
  133. def test_end(self):
  134. """
  135. When L{HistoricRecvLine} receives an END keystroke it moves the cursor
  136. to the end of the current line buffer.
  137. """
  138. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  139. for ch in iterbytes(b'hello, world'):
  140. kR(ch)
  141. self.assertEqual(self.p.currentLineBuffer(), (b'hello, world', b''))
  142. kR(self.pt.HOME)
  143. kR(self.pt.END)
  144. self.assertEqual(self.p.currentLineBuffer(), (b'hello, world', b''))
  145. def test_backspace(self):
  146. """
  147. When L{HistoricRecvLine} receives a BACKSPACE keystroke it deletes
  148. the character immediately before the cursor.
  149. """
  150. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  151. for ch in iterbytes(b'xyz'):
  152. kR(ch)
  153. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  154. kR(self.pt.BACKSPACE)
  155. self.assertEqual(self.p.currentLineBuffer(), (b'xy', b''))
  156. kR(self.pt.LEFT_ARROW)
  157. kR(self.pt.BACKSPACE)
  158. self.assertEqual(self.p.currentLineBuffer(), (b'', b'y'))
  159. kR(self.pt.BACKSPACE)
  160. self.assertEqual(self.p.currentLineBuffer(), (b'', b'y'))
  161. def test_delete(self):
  162. """
  163. When L{HistoricRecvLine} receives a DELETE keystroke, it
  164. delets the character immediately after the cursor.
  165. """
  166. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  167. for ch in iterbytes(b'xyz'):
  168. kR(ch)
  169. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  170. kR(self.pt.DELETE)
  171. self.assertEqual(self.p.currentLineBuffer(), (b'xyz', b''))
  172. kR(self.pt.LEFT_ARROW)
  173. kR(self.pt.DELETE)
  174. self.assertEqual(self.p.currentLineBuffer(), (b'xy', b''))
  175. kR(self.pt.LEFT_ARROW)
  176. kR(self.pt.DELETE)
  177. self.assertEqual(self.p.currentLineBuffer(), (b'x', b''))
  178. kR(self.pt.LEFT_ARROW)
  179. kR(self.pt.DELETE)
  180. self.assertEqual(self.p.currentLineBuffer(), (b'', b''))
  181. kR(self.pt.DELETE)
  182. self.assertEqual(self.p.currentLineBuffer(), (b'', b''))
  183. def test_insert(self):
  184. """
  185. When not in INSERT mode, L{HistoricRecvLine} inserts the typed
  186. character at the cursor before the next character.
  187. """
  188. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  189. for ch in iterbytes(b'xyz'):
  190. kR(ch)
  191. kR(self.pt.LEFT_ARROW)
  192. kR(b'A')
  193. self.assertEqual(self.p.currentLineBuffer(), (b'xyA', b'z'))
  194. kR(self.pt.LEFT_ARROW)
  195. kR(b'B')
  196. self.assertEqual(self.p.currentLineBuffer(), (b'xyB', b'Az'))
  197. def test_typeover(self):
  198. """
  199. When in INSERT mode and upon receiving a keystroke with a printable
  200. character, L{HistoricRecvLine} replaces the character at
  201. the cursor with the typed character rather than inserting before.
  202. Ah, the ironies of INSERT mode.
  203. """
  204. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  205. for ch in iterbytes(b'xyz'):
  206. kR(ch)
  207. kR(self.pt.INSERT)
  208. kR(self.pt.LEFT_ARROW)
  209. kR(b'A')
  210. self.assertEqual(self.p.currentLineBuffer(), (b'xyA', b''))
  211. kR(self.pt.LEFT_ARROW)
  212. kR(b'B')
  213. self.assertEqual(self.p.currentLineBuffer(), (b'xyB', b''))
  214. def test_unprintableCharacters(self):
  215. """
  216. When L{HistoricRecvLine} receives a keystroke for an unprintable
  217. function key with no assigned behavior, the line buffer is unmodified.
  218. """
  219. kR = lambda ch: self.p.keystrokeReceived(ch, None)
  220. pt = self.pt
  221. for ch in (pt.F1, pt.F2, pt.F3, pt.F4, pt.F5, pt.F6, pt.F7, pt.F8,
  222. pt.F9, pt.F10, pt.F11, pt.F12, pt.PGUP, pt.PGDN):
  223. kR(ch)
  224. self.assertEqual(self.p.currentLineBuffer(), (b'', b''))
  225. from twisted.conch import telnet
  226. from twisted.conch.insults import helper
  227. from twisted.conch.test.loopback import LoopbackRelay
  228. class EchoServer(recvline.HistoricRecvLine):
  229. def lineReceived(self, line):
  230. self.terminal.write(line + b'\n' + self.ps[self.pn])
  231. # An insults API for this would be nice.
  232. left = b"\x1b[D"
  233. right = b"\x1b[C"
  234. up = b"\x1b[A"
  235. down = b"\x1b[B"
  236. insert = b"\x1b[2~"
  237. home = b"\x1b[1~"
  238. delete = b"\x1b[3~"
  239. end = b"\x1b[4~"
  240. backspace = b"\x7f"
  241. from twisted.cred import checkers
  242. try:
  243. from twisted.conch.ssh import (userauth, transport, channel, connection,
  244. session, keys)
  245. from twisted.conch.manhole_ssh import TerminalUser, TerminalSession, TerminalRealm, TerminalSessionTransport, ConchFactory
  246. except ImportError:
  247. ssh = False
  248. else:
  249. ssh = True
  250. class SessionChannel(channel.SSHChannel):
  251. name = b'session'
  252. def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, width, height, *a, **kw):
  253. channel.SSHChannel.__init__(self, *a, **kw)
  254. self.protocolFactory = protocolFactory
  255. self.protocolArgs = protocolArgs
  256. self.protocolKwArgs = protocolKwArgs
  257. self.width = width
  258. self.height = height
  259. def channelOpen(self, data):
  260. term = session.packRequest_pty_req(b"vt102", (self.height, self.width, 0, 0), b'')
  261. self.conn.sendRequest(self, b'pty-req', term)
  262. self.conn.sendRequest(self, b'shell', b'')
  263. self._protocolInstance = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs)
  264. self._protocolInstance.factory = self
  265. self._protocolInstance.makeConnection(self)
  266. def closed(self):
  267. self._protocolInstance.connectionLost(error.ConnectionDone())
  268. def dataReceived(self, data):
  269. self._protocolInstance.dataReceived(data)
  270. class TestConnection(connection.SSHConnection):
  271. def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, width, height, *a, **kw):
  272. connection.SSHConnection.__init__(self, *a, **kw)
  273. self.protocolFactory = protocolFactory
  274. self.protocolArgs = protocolArgs
  275. self.protocolKwArgs = protocolKwArgs
  276. self.width = width
  277. self.height = height
  278. def serviceStarted(self):
  279. self.__channel = SessionChannel(self.protocolFactory, self.protocolArgs, self.protocolKwArgs, self.width, self.height)
  280. self.openChannel(self.__channel)
  281. def write(self, data):
  282. return self.__channel.write(data)
  283. class TestAuth(userauth.SSHUserAuthClient):
  284. def __init__(self, username, password, *a, **kw):
  285. userauth.SSHUserAuthClient.__init__(self, username, *a, **kw)
  286. self.password = password
  287. def getPassword(self):
  288. return defer.succeed(self.password)
  289. class TestTransport(transport.SSHClientTransport):
  290. def __init__(self, protocolFactory, protocolArgs, protocolKwArgs, username, password, width, height, *a, **kw):
  291. self.protocolFactory = protocolFactory
  292. self.protocolArgs = protocolArgs
  293. self.protocolKwArgs = protocolKwArgs
  294. self.username = username
  295. self.password = password
  296. self.width = width
  297. self.height = height
  298. def verifyHostKey(self, hostKey, fingerprint):
  299. return defer.succeed(True)
  300. def connectionSecure(self):
  301. self.__connection = TestConnection(self.protocolFactory, self.protocolArgs, self.protocolKwArgs, self.width, self.height)
  302. self.requestService(
  303. TestAuth(self.username, self.password, self.__connection))
  304. def write(self, data):
  305. return self.__connection.write(data)
  306. class TestSessionTransport(TerminalSessionTransport):
  307. def protocolFactory(self):
  308. return self.avatar.conn.transport.factory.serverProtocol()
  309. class TestSession(TerminalSession):
  310. transportFactory = TestSessionTransport
  311. class TestUser(TerminalUser):
  312. pass
  313. components.registerAdapter(TestSession, TestUser, session.ISession)
  314. class NotifyingExpectableBuffer(helper.ExpectableBuffer):
  315. def __init__(self):
  316. self.onConnection = defer.Deferred()
  317. self.onDisconnection = defer.Deferred()
  318. def connectionMade(self):
  319. helper.ExpectableBuffer.connectionMade(self)
  320. self.onConnection.callback(self)
  321. def connectionLost(self, reason):
  322. self.onDisconnection.errback(reason)
  323. class _BaseMixin:
  324. WIDTH = 80
  325. HEIGHT = 24
  326. def _assertBuffer(self, lines):
  327. receivedLines = self.recvlineClient.__bytes__().splitlines()
  328. expectedLines = lines + ([b''] * (self.HEIGHT - len(lines) - 1))
  329. self.assertEqual(len(receivedLines), len(expectedLines))
  330. for i in range(len(receivedLines)):
  331. self.assertEqual(
  332. receivedLines[i], expectedLines[i],
  333. b"".join(receivedLines[max(0, i-1):i+1]) +
  334. b" != " +
  335. b"".join(expectedLines[max(0, i-1):i+1]))
  336. def _trivialTest(self, inputLine, output):
  337. done = self.recvlineClient.expect(b"done")
  338. self._testwrite(inputLine)
  339. def finished(ign):
  340. self._assertBuffer(output)
  341. return done.addCallback(finished)
  342. class _SSHMixin(_BaseMixin):
  343. def setUp(self):
  344. if not ssh:
  345. raise unittest.SkipTest(
  346. "cryptography requirements missing, can't run historic "
  347. "recvline tests over ssh")
  348. u, p = b'testuser', b'testpass'
  349. rlm = TerminalRealm()
  350. rlm.userFactory = TestUser
  351. rlm.chainedProtocolFactory = lambda: insultsServer
  352. checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
  353. checker.addUser(u, p)
  354. ptl = portal.Portal(rlm)
  355. ptl.registerChecker(checker)
  356. sshFactory = ConchFactory(ptl)
  357. sshKey = keys._getPersistentRSAKey(filepath.FilePath(self.mktemp()),
  358. keySize=512)
  359. sshFactory.publicKeys[b"ssh-rsa"] = sshKey
  360. sshFactory.privateKeys[b"ssh-rsa"] = sshKey
  361. sshFactory.serverProtocol = self.serverProtocol
  362. sshFactory.startFactory()
  363. recvlineServer = self.serverProtocol()
  364. insultsServer = insults.ServerProtocol(lambda: recvlineServer)
  365. sshServer = sshFactory.buildProtocol(None)
  366. clientTransport = LoopbackRelay(sshServer)
  367. recvlineClient = NotifyingExpectableBuffer()
  368. insultsClient = insults.ClientProtocol(lambda: recvlineClient)
  369. sshClient = TestTransport(lambda: insultsClient, (), {}, u, p, self.WIDTH, self.HEIGHT)
  370. serverTransport = LoopbackRelay(sshClient)
  371. sshClient.makeConnection(clientTransport)
  372. sshServer.makeConnection(serverTransport)
  373. self.recvlineClient = recvlineClient
  374. self.sshClient = sshClient
  375. self.sshServer = sshServer
  376. self.clientTransport = clientTransport
  377. self.serverTransport = serverTransport
  378. return recvlineClient.onConnection
  379. def _testwrite(self, data):
  380. self.sshClient.write(data)
  381. from twisted.conch.test import test_telnet
  382. class TestInsultsClientProtocol(insults.ClientProtocol,
  383. test_telnet.TestProtocol):
  384. pass
  385. class TestInsultsServerProtocol(insults.ServerProtocol,
  386. test_telnet.TestProtocol):
  387. pass
  388. class _TelnetMixin(_BaseMixin):
  389. def setUp(self):
  390. recvlineServer = self.serverProtocol()
  391. insultsServer = TestInsultsServerProtocol(lambda: recvlineServer)
  392. telnetServer = telnet.TelnetTransport(lambda: insultsServer)
  393. clientTransport = LoopbackRelay(telnetServer)
  394. recvlineClient = NotifyingExpectableBuffer()
  395. insultsClient = TestInsultsClientProtocol(lambda: recvlineClient)
  396. telnetClient = telnet.TelnetTransport(lambda: insultsClient)
  397. serverTransport = LoopbackRelay(telnetClient)
  398. telnetClient.makeConnection(clientTransport)
  399. telnetServer.makeConnection(serverTransport)
  400. serverTransport.clearBuffer()
  401. clientTransport.clearBuffer()
  402. self.recvlineClient = recvlineClient
  403. self.telnetClient = telnetClient
  404. self.clientTransport = clientTransport
  405. self.serverTransport = serverTransport
  406. return recvlineClient.onConnection
  407. def _testwrite(self, data):
  408. self.telnetClient.write(data)
  409. try:
  410. from twisted.conch import stdio
  411. except ImportError:
  412. stdio = None
  413. class _StdioMixin(_BaseMixin):
  414. def setUp(self):
  415. # A memory-only terminal emulator, into which the server will
  416. # write things and make other state changes. What ends up
  417. # here is basically what a user would have seen on their
  418. # screen.
  419. testTerminal = NotifyingExpectableBuffer()
  420. # An insults client protocol which will translate bytes
  421. # received from the child process into keystroke commands for
  422. # an ITerminalProtocol.
  423. insultsClient = insults.ClientProtocol(lambda: testTerminal)
  424. # A process protocol which will translate stdout and stderr
  425. # received from the child process to dataReceived calls and
  426. # error reporting on an insults client protocol.
  427. processClient = stdio.TerminalProcessProtocol(insultsClient)
  428. # Run twisted/conch/stdio.py with the name of a class
  429. # implementing ITerminalProtocol. This class will be used to
  430. # handle bytes we send to the child process.
  431. exe = sys.executable
  432. module = stdio.__file__
  433. if module.endswith('.pyc') or module.endswith('.pyo'):
  434. module = module[:-1]
  435. args = [exe, module, reflect.qual(self.serverProtocol)]
  436. if not platform.isWindows():
  437. args = [arg.encode(sys.getfilesystemencoding()) for arg in args]
  438. from twisted.internet import reactor
  439. clientTransport = reactor.spawnProcess(processClient, exe, args,
  440. env=properEnv, usePTY=True)
  441. self.recvlineClient = self.testTerminal = testTerminal
  442. self.processClient = processClient
  443. self.clientTransport = clientTransport
  444. # Wait for the process protocol and test terminal to become
  445. # connected before proceeding. The former should always
  446. # happen first, but it doesn't hurt to be safe.
  447. return defer.gatherResults(filter(None, [
  448. processClient.onConnection,
  449. testTerminal.expect(b">>> ")]))
  450. def tearDown(self):
  451. # Kill the child process. We're done with it.
  452. try:
  453. self.clientTransport.signalProcess("KILL")
  454. except (error.ProcessExitedAlready, OSError):
  455. pass
  456. def trap(failure):
  457. failure.trap(error.ProcessTerminated)
  458. self.assertIsNone(failure.value.exitCode)
  459. self.assertEqual(failure.value.status, 9)
  460. return self.testTerminal.onDisconnection.addErrback(trap)
  461. def _testwrite(self, data):
  462. self.clientTransport.write(data)
  463. class RecvlineLoopbackMixin:
  464. serverProtocol = EchoServer
  465. def testSimple(self):
  466. return self._trivialTest(
  467. b"first line\ndone",
  468. [b">>> first line",
  469. b"first line",
  470. b">>> done"])
  471. def testLeftArrow(self):
  472. return self._trivialTest(
  473. insert + b'first line' + left * 4 + b"xxxx\ndone",
  474. [b">>> first xxxx",
  475. b"first xxxx",
  476. b">>> done"])
  477. def testRightArrow(self):
  478. return self._trivialTest(
  479. insert + b'right line' + left * 4 + right * 2 + b"xx\ndone",
  480. [b">>> right lixx",
  481. b"right lixx",
  482. b">>> done"])
  483. def testBackspace(self):
  484. return self._trivialTest(
  485. b"second line" + backspace * 4 + b"xxxx\ndone",
  486. [b">>> second xxxx",
  487. b"second xxxx",
  488. b">>> done"])
  489. def testDelete(self):
  490. return self._trivialTest(
  491. b"delete xxxx" + left * 4 + delete * 4 + b"line\ndone",
  492. [b">>> delete line",
  493. b"delete line",
  494. b">>> done"])
  495. def testInsert(self):
  496. return self._trivialTest(
  497. b"third ine" + left * 3 + b"l\ndone",
  498. [b">>> third line",
  499. b"third line",
  500. b">>> done"])
  501. def testTypeover(self):
  502. return self._trivialTest(
  503. b"fourth xine" + left * 4 + insert + b"l\ndone",
  504. [b">>> fourth line",
  505. b"fourth line",
  506. b">>> done"])
  507. def testHome(self):
  508. return self._trivialTest(
  509. insert + b"blah line" + home + b"home\ndone",
  510. [b">>> home line",
  511. b"home line",
  512. b">>> done"])
  513. def testEnd(self):
  514. return self._trivialTest(
  515. b"end " + left * 4 + end + b"line\ndone",
  516. [b">>> end line",
  517. b"end line",
  518. b">>> done"])
  519. class RecvlineLoopbackTelnetTests(_TelnetMixin, unittest.TestCase, RecvlineLoopbackMixin):
  520. pass
  521. class RecvlineLoopbackSSHTests(_SSHMixin, unittest.TestCase, RecvlineLoopbackMixin):
  522. pass
  523. class RecvlineLoopbackStdioTests(_StdioMixin, unittest.TestCase, RecvlineLoopbackMixin):
  524. if stdio is None:
  525. skip = "Terminal requirements missing, can't run recvline tests over stdio"
  526. class HistoricRecvlineLoopbackMixin:
  527. serverProtocol = EchoServer
  528. def testUpArrow(self):
  529. return self._trivialTest(
  530. b"first line\n" + up + b"\ndone",
  531. [b">>> first line",
  532. b"first line",
  533. b">>> first line",
  534. b"first line",
  535. b">>> done"])
  536. def test_DownArrowToPartialLineInHistory(self):
  537. """
  538. Pressing down arrow to visit an entry that was added to the
  539. history by pressing the up arrow instead of return does not
  540. raise a L{TypeError}.
  541. @see: U{http://twistedmatrix.com/trac/ticket/9031}
  542. @return: A L{defer.Deferred} that fires when C{b"done"} is
  543. echoed back.
  544. """
  545. return self._trivialTest(
  546. b"first line\n" + b"partial line" + up + down + b"\ndone",
  547. [b">>> first line",
  548. b"first line",
  549. b">>> partial line",
  550. b"partial line",
  551. b">>> done"])
  552. def testDownArrow(self):
  553. return self._trivialTest(
  554. b"first line\nsecond line\n" + up * 2 + down + b"\ndone",
  555. [b">>> first line",
  556. b"first line",
  557. b">>> second line",
  558. b"second line",
  559. b">>> second line",
  560. b"second line",
  561. b">>> done"])
  562. class HistoricRecvlineLoopbackTelnetTests(_TelnetMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin):
  563. pass
  564. class HistoricRecvlineLoopbackSSHTests(_SSHMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin):
  565. pass
  566. class HistoricRecvlineLoopbackStdioTests(_StdioMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin):
  567. if stdio is None:
  568. skip = "Terminal requirements missing, can't run historic recvline tests over stdio"
  569. class TransportSequenceTests(unittest.TestCase):
  570. """
  571. L{twisted.conch.recvline.TransportSequence}
  572. """
  573. def test_invalidSequence(self):
  574. """
  575. Initializing a L{recvline.TransportSequence} with no args
  576. raises an assertion.
  577. """
  578. self.assertRaises(AssertionError, recvline.TransportSequence)