test_tap.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.conch.tap}.
  5. """
  6. try:
  7. import cryptography
  8. except ImportError:
  9. cryptography = None
  10. try:
  11. import pyasn1
  12. except ImportError:
  13. pyasn1 = None
  14. try:
  15. from twisted.conch import unix
  16. except ImportError:
  17. unix = None
  18. if cryptography and pyasn1 and unix:
  19. from twisted.conch import tap
  20. from twisted.conch.openssh_compat.factory import OpenSSHFactory
  21. from twisted.application.internet import StreamServerEndpointService
  22. from twisted.cred import error
  23. from twisted.cred.credentials import ISSHPrivateKey
  24. from twisted.cred.credentials import IUsernamePassword, UsernamePassword
  25. from twisted.trial.unittest import TestCase
  26. class MakeServiceTests(TestCase):
  27. """
  28. Tests for L{tap.makeService}.
  29. """
  30. if not cryptography:
  31. skip = "can't run without cryptography"
  32. if not pyasn1:
  33. skip = "Cannot run without PyASN1"
  34. if not unix:
  35. skip = "can't run on non-posix computers"
  36. usernamePassword = (b'iamuser', b'thisispassword')
  37. def setUp(self):
  38. """
  39. Create a file with two users.
  40. """
  41. self.filename = self.mktemp()
  42. with open(self.filename, 'wb+') as f:
  43. f.write(b':'.join(self.usernamePassword))
  44. self.options = tap.Options()
  45. def test_basic(self):
  46. """
  47. L{tap.makeService} returns a L{StreamServerEndpointService} instance
  48. running on TCP port 22, and the linked protocol factory is an instance
  49. of L{OpenSSHFactory}.
  50. """
  51. config = tap.Options()
  52. service = tap.makeService(config)
  53. self.assertIsInstance(service, StreamServerEndpointService)
  54. self.assertEqual(service.endpoint._port, 22)
  55. self.assertIsInstance(service.factory, OpenSSHFactory)
  56. def test_defaultAuths(self):
  57. """
  58. Make sure that if the C{--auth} command-line option is not passed,
  59. the default checkers are (for backwards compatibility): SSH and UNIX
  60. """
  61. numCheckers = 2
  62. self.assertIn(ISSHPrivateKey, self.options['credInterfaces'],
  63. "SSH should be one of the default checkers")
  64. self.assertIn(IUsernamePassword, self.options['credInterfaces'],
  65. "UNIX should be one of the default checkers")
  66. self.assertEqual(numCheckers, len(self.options['credCheckers']),
  67. "There should be %d checkers by default" % (numCheckers,))
  68. def test_authAdded(self):
  69. """
  70. The C{--auth} command-line option will add a checker to the list of
  71. checkers, and it should be the only auth checker
  72. """
  73. self.options.parseOptions(['--auth', 'file:' + self.filename])
  74. self.assertEqual(len(self.options['credCheckers']), 1)
  75. def test_multipleAuthAdded(self):
  76. """
  77. Multiple C{--auth} command-line options will add all checkers specified
  78. to the list ofcheckers, and there should only be the specified auth
  79. checkers (no default checkers).
  80. """
  81. self.options.parseOptions(['--auth', 'file:' + self.filename,
  82. '--auth', 'memory:testuser:testpassword'])
  83. self.assertEqual(len(self.options['credCheckers']), 2)
  84. def test_authFailure(self):
  85. """
  86. The checker created by the C{--auth} command-line option returns a
  87. L{Deferred} that fails with L{UnauthorizedLogin} when
  88. presented with credentials that are unknown to that checker.
  89. """
  90. self.options.parseOptions(['--auth', 'file:' + self.filename])
  91. checker = self.options['credCheckers'][-1]
  92. invalid = UsernamePassword(self.usernamePassword[0], 'fake')
  93. # Wrong password should raise error
  94. return self.assertFailure(
  95. checker.requestAvatarId(invalid), error.UnauthorizedLogin)
  96. def test_authSuccess(self):
  97. """
  98. The checker created by the C{--auth} command-line option returns a
  99. L{Deferred} that returns the avatar id when presented with credentials
  100. that are known to that checker.
  101. """
  102. self.options.parseOptions(['--auth', 'file:' + self.filename])
  103. checker = self.options['credCheckers'][-1]
  104. correct = UsernamePassword(*self.usernamePassword)
  105. d = checker.requestAvatarId(correct)
  106. def checkSuccess(username):
  107. self.assertEqual(username, correct.username)
  108. return d.addCallback(checkSuccess)
  109. def test_checkers(self):
  110. """
  111. The L{OpenSSHFactory} built by L{tap.makeService} has a portal with
  112. L{ISSHPrivateKey} and L{IUsernamePassword} interfaces registered as
  113. checkers.
  114. """
  115. config = tap.Options()
  116. service = tap.makeService(config)
  117. portal = service.factory.portal
  118. self.assertEqual(
  119. set(portal.checkers.keys()),
  120. set([ISSHPrivateKey, IUsernamePassword]))