test_checkers.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.conch.checkers}.
  5. """
  6. from __future__ import absolute_import, division
  7. try:
  8. import crypt
  9. except ImportError:
  10. cryptSkip = 'cannot run without crypt module'
  11. else:
  12. cryptSkip = None
  13. import os
  14. from collections import namedtuple
  15. from io import BytesIO
  16. from zope.interface.verify import verifyObject
  17. from twisted.python import util
  18. from twisted.python.compat import _b64encodebytes
  19. from twisted.python.failure import Failure
  20. from twisted.python.reflect import requireModule
  21. from twisted.trial.unittest import TestCase
  22. from twisted.python.filepath import FilePath
  23. from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
  24. from twisted.cred.credentials import UsernamePassword, IUsernamePassword, \
  25. SSHPrivateKey, ISSHPrivateKey
  26. from twisted.cred.error import UnhandledCredentials, UnauthorizedLogin
  27. from twisted.python.fakepwd import UserDatabase, ShadowDatabase
  28. from twisted.test.test_process import MockOS
  29. if requireModule('cryptography') and requireModule('pyasn1'):
  30. dependencySkip = None
  31. from twisted.conch.ssh import keys
  32. from twisted.conch import checkers
  33. from twisted.conch.error import NotEnoughAuthentication, ValidPublicKey
  34. from twisted.conch.test import keydata
  35. else:
  36. dependencySkip = "can't run without cryptography and PyASN1"
  37. if getattr(os, 'geteuid', None) is None:
  38. euidSkip = "Cannot run without effective UIDs (questionable)"
  39. else:
  40. euidSkip = None
  41. class HelperTests(TestCase):
  42. """
  43. Tests for helper functions L{verifyCryptedPassword}, L{_pwdGetByName} and
  44. L{_shadowGetByName}.
  45. """
  46. skip = cryptSkip or dependencySkip
  47. def setUp(self):
  48. self.mockos = MockOS()
  49. def test_verifyCryptedPassword(self):
  50. """
  51. L{verifyCryptedPassword} returns C{True} if the plaintext password
  52. passed to it matches the encrypted password passed to it.
  53. """
  54. password = 'secret string'
  55. salt = 'salty'
  56. crypted = crypt.crypt(password, salt)
  57. self.assertTrue(
  58. checkers.verifyCryptedPassword(crypted, password),
  59. '%r supposed to be valid encrypted password for %r' % (
  60. crypted, password))
  61. def test_verifyCryptedPasswordMD5(self):
  62. """
  63. L{verifyCryptedPassword} returns True if the provided cleartext password
  64. matches the provided MD5 password hash.
  65. """
  66. password = 'password'
  67. salt = '$1$salt'
  68. crypted = crypt.crypt(password, salt)
  69. self.assertTrue(
  70. checkers.verifyCryptedPassword(crypted, password),
  71. '%r supposed to be valid encrypted password for %s' % (
  72. crypted, password))
  73. def test_refuteCryptedPassword(self):
  74. """
  75. L{verifyCryptedPassword} returns C{False} if the plaintext password
  76. passed to it does not match the encrypted password passed to it.
  77. """
  78. password = 'string secret'
  79. wrong = 'secret string'
  80. crypted = crypt.crypt(password, password)
  81. self.assertFalse(
  82. checkers.verifyCryptedPassword(crypted, wrong),
  83. '%r not supposed to be valid encrypted password for %s' % (
  84. crypted, wrong))
  85. def test_pwdGetByName(self):
  86. """
  87. L{_pwdGetByName} returns a tuple of items from the UNIX /etc/passwd
  88. database if the L{pwd} module is present.
  89. """
  90. userdb = UserDatabase()
  91. userdb.addUser(
  92. 'alice', 'secrit', 1, 2, 'first last', '/foo', '/bin/sh')
  93. self.patch(checkers, 'pwd', userdb)
  94. self.assertEqual(
  95. checkers._pwdGetByName('alice'), userdb.getpwnam('alice'))
  96. def test_pwdGetByNameWithoutPwd(self):
  97. """
  98. If the C{pwd} module isn't present, L{_pwdGetByName} returns L{None}.
  99. """
  100. self.patch(checkers, 'pwd', None)
  101. self.assertIsNone(checkers._pwdGetByName('alice'))
  102. def test_shadowGetByName(self):
  103. """
  104. L{_shadowGetByName} returns a tuple of items from the UNIX /etc/shadow
  105. database if the L{spwd} is present.
  106. """
  107. userdb = ShadowDatabase()
  108. userdb.addUser('bob', 'passphrase', 1, 2, 3, 4, 5, 6, 7)
  109. self.patch(checkers, 'spwd', userdb)
  110. self.mockos.euid = 2345
  111. self.mockos.egid = 1234
  112. self.patch(util, 'os', self.mockos)
  113. self.assertEqual(
  114. checkers._shadowGetByName('bob'), userdb.getspnam('bob'))
  115. self.assertEqual(self.mockos.seteuidCalls, [0, 2345])
  116. self.assertEqual(self.mockos.setegidCalls, [0, 1234])
  117. def test_shadowGetByNameWithoutSpwd(self):
  118. """
  119. L{_shadowGetByName} returns L{None} if C{spwd} is not present.
  120. """
  121. self.patch(checkers, 'spwd', None)
  122. self.assertIsNone(checkers._shadowGetByName('bob'))
  123. self.assertEqual(self.mockos.seteuidCalls, [])
  124. self.assertEqual(self.mockos.setegidCalls, [])
  125. class SSHPublicKeyDatabaseTests(TestCase):
  126. """
  127. Tests for L{SSHPublicKeyDatabase}.
  128. """
  129. skip = euidSkip or dependencySkip
  130. def setUp(self):
  131. self.checker = checkers.SSHPublicKeyDatabase()
  132. self.key1 = _b64encodebytes(b"foobar")
  133. self.key2 = _b64encodebytes(b"eggspam")
  134. self.content = (b"t1 " + self.key1 + b" foo\nt2 " + self.key2 +
  135. b" egg\n")
  136. self.mockos = MockOS()
  137. self.mockos.path = FilePath(self.mktemp())
  138. self.mockos.path.makedirs()
  139. self.patch(util, 'os', self.mockos)
  140. self.sshDir = self.mockos.path.child('.ssh')
  141. self.sshDir.makedirs()
  142. userdb = UserDatabase()
  143. userdb.addUser(
  144. b'user', b'password', 1, 2, b'first last',
  145. self.mockos.path.path, b'/bin/shell')
  146. self.checker._userdb = userdb
  147. def test_deprecated(self):
  148. """
  149. L{SSHPublicKeyDatabase} is deprecated as of version 15.0
  150. """
  151. warningsShown = self.flushWarnings(
  152. offendingFunctions=[self.setUp])
  153. self.assertEqual(warningsShown[0]['category'], DeprecationWarning)
  154. self.assertEqual(
  155. warningsShown[0]['message'],
  156. "twisted.conch.checkers.SSHPublicKeyDatabase "
  157. "was deprecated in Twisted 15.0.0: Please use "
  158. "twisted.conch.checkers.SSHPublicKeyChecker, "
  159. "initialized with an instance of "
  160. "twisted.conch.checkers.UNIXAuthorizedKeysFiles instead.")
  161. self.assertEqual(len(warningsShown), 1)
  162. def _testCheckKey(self, filename):
  163. self.sshDir.child(filename).setContent(self.content)
  164. user = UsernamePassword(b"user", b"password")
  165. user.blob = b"foobar"
  166. self.assertTrue(self.checker.checkKey(user))
  167. user.blob = b"eggspam"
  168. self.assertTrue(self.checker.checkKey(user))
  169. user.blob = b"notallowed"
  170. self.assertFalse(self.checker.checkKey(user))
  171. def test_checkKey(self):
  172. """
  173. L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the
  174. authorized_keys file and check the keys against that file.
  175. """
  176. self._testCheckKey("authorized_keys")
  177. self.assertEqual(self.mockos.seteuidCalls, [])
  178. self.assertEqual(self.mockos.setegidCalls, [])
  179. def test_checkKey2(self):
  180. """
  181. L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the
  182. authorized_keys2 file and check the keys against that file.
  183. """
  184. self._testCheckKey("authorized_keys2")
  185. self.assertEqual(self.mockos.seteuidCalls, [])
  186. self.assertEqual(self.mockos.setegidCalls, [])
  187. def test_checkKeyAsRoot(self):
  188. """
  189. If the key file is readable, L{SSHPublicKeyDatabase.checkKey} should
  190. switch its uid/gid to the ones of the authenticated user.
  191. """
  192. keyFile = self.sshDir.child("authorized_keys")
  193. keyFile.setContent(self.content)
  194. # Fake permission error by changing the mode
  195. keyFile.chmod(0o000)
  196. self.addCleanup(keyFile.chmod, 0o777)
  197. # And restore the right mode when seteuid is called
  198. savedSeteuid = self.mockos.seteuid
  199. def seteuid(euid):
  200. keyFile.chmod(0o777)
  201. return savedSeteuid(euid)
  202. self.mockos.euid = 2345
  203. self.mockos.egid = 1234
  204. self.patch(self.mockos, "seteuid", seteuid)
  205. self.patch(util, 'os', self.mockos)
  206. user = UsernamePassword(b"user", b"password")
  207. user.blob = b"foobar"
  208. self.assertTrue(self.checker.checkKey(user))
  209. self.assertEqual(self.mockos.seteuidCalls, [0, 1, 0, 2345])
  210. self.assertEqual(self.mockos.setegidCalls, [2, 1234])
  211. def test_requestAvatarId(self):
  212. """
  213. L{SSHPublicKeyDatabase.requestAvatarId} should return the avatar id
  214. passed in if its C{_checkKey} method returns True.
  215. """
  216. def _checkKey(ignored):
  217. return True
  218. self.patch(self.checker, 'checkKey', _checkKey)
  219. credentials = SSHPrivateKey(
  220. b'test', b'ssh-rsa', keydata.publicRSA_openssh, b'foo',
  221. keys.Key.fromString(keydata.privateRSA_openssh).sign(b'foo'))
  222. d = self.checker.requestAvatarId(credentials)
  223. def _verify(avatarId):
  224. self.assertEqual(avatarId, b'test')
  225. return d.addCallback(_verify)
  226. def test_requestAvatarIdWithoutSignature(self):
  227. """
  228. L{SSHPublicKeyDatabase.requestAvatarId} should raise L{ValidPublicKey}
  229. if the credentials represent a valid key without a signature. This
  230. tells the user that the key is valid for login, but does not actually
  231. allow that user to do so without a signature.
  232. """
  233. def _checkKey(ignored):
  234. return True
  235. self.patch(self.checker, 'checkKey', _checkKey)
  236. credentials = SSHPrivateKey(
  237. b'test', b'ssh-rsa', keydata.publicRSA_openssh, None, None)
  238. d = self.checker.requestAvatarId(credentials)
  239. return self.assertFailure(d, ValidPublicKey)
  240. def test_requestAvatarIdInvalidKey(self):
  241. """
  242. If L{SSHPublicKeyDatabase.checkKey} returns False,
  243. C{_cbRequestAvatarId} should raise L{UnauthorizedLogin}.
  244. """
  245. def _checkKey(ignored):
  246. return False
  247. self.patch(self.checker, 'checkKey', _checkKey)
  248. d = self.checker.requestAvatarId(None);
  249. return self.assertFailure(d, UnauthorizedLogin)
  250. def test_requestAvatarIdInvalidSignature(self):
  251. """
  252. Valid keys with invalid signatures should cause
  253. L{SSHPublicKeyDatabase.requestAvatarId} to return a {UnauthorizedLogin}
  254. failure
  255. """
  256. def _checkKey(ignored):
  257. return True
  258. self.patch(self.checker, 'checkKey', _checkKey)
  259. credentials = SSHPrivateKey(
  260. b'test', b'ssh-rsa', keydata.publicRSA_openssh, b'foo',
  261. keys.Key.fromString(keydata.privateDSA_openssh).sign(b'foo'))
  262. d = self.checker.requestAvatarId(credentials)
  263. return self.assertFailure(d, UnauthorizedLogin)
  264. def test_requestAvatarIdNormalizeException(self):
  265. """
  266. Exceptions raised while verifying the key should be normalized into an
  267. C{UnauthorizedLogin} failure.
  268. """
  269. def _checkKey(ignored):
  270. return True
  271. self.patch(self.checker, 'checkKey', _checkKey)
  272. credentials = SSHPrivateKey(b'test', None, b'blob', b'sigData', b'sig')
  273. d = self.checker.requestAvatarId(credentials)
  274. def _verifyLoggedException(failure):
  275. errors = self.flushLoggedErrors(keys.BadKeyError)
  276. self.assertEqual(len(errors), 1)
  277. return failure
  278. d.addErrback(_verifyLoggedException)
  279. return self.assertFailure(d, UnauthorizedLogin)
  280. class SSHProtocolCheckerTests(TestCase):
  281. """
  282. Tests for L{SSHProtocolChecker}.
  283. """
  284. skip = dependencySkip
  285. def test_registerChecker(self):
  286. """
  287. L{SSHProcotolChecker.registerChecker} should add the given checker to
  288. the list of registered checkers.
  289. """
  290. checker = checkers.SSHProtocolChecker()
  291. self.assertEqual(checker.credentialInterfaces, [])
  292. checker.registerChecker(checkers.SSHPublicKeyDatabase(), )
  293. self.assertEqual(checker.credentialInterfaces, [ISSHPrivateKey])
  294. self.assertIsInstance(checker.checkers[ISSHPrivateKey],
  295. checkers.SSHPublicKeyDatabase)
  296. def test_registerCheckerWithInterface(self):
  297. """
  298. If a specific interface is passed into
  299. L{SSHProtocolChecker.registerChecker}, that interface should be
  300. registered instead of what the checker specifies in
  301. credentialIntefaces.
  302. """
  303. checker = checkers.SSHProtocolChecker()
  304. self.assertEqual(checker.credentialInterfaces, [])
  305. checker.registerChecker(checkers.SSHPublicKeyDatabase(),
  306. IUsernamePassword)
  307. self.assertEqual(checker.credentialInterfaces, [IUsernamePassword])
  308. self.assertIsInstance(checker.checkers[IUsernamePassword],
  309. checkers.SSHPublicKeyDatabase)
  310. def test_requestAvatarId(self):
  311. """
  312. L{SSHProtocolChecker.requestAvatarId} should defer to one if its
  313. registered checkers to authenticate a user.
  314. """
  315. checker = checkers.SSHProtocolChecker()
  316. passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse()
  317. passwordDatabase.addUser(b'test', b'test')
  318. checker.registerChecker(passwordDatabase)
  319. d = checker.requestAvatarId(UsernamePassword(b'test', b'test'))
  320. def _callback(avatarId):
  321. self.assertEqual(avatarId, b'test')
  322. return d.addCallback(_callback)
  323. def test_requestAvatarIdWithNotEnoughAuthentication(self):
  324. """
  325. If the client indicates that it is never satisfied, by always returning
  326. False from _areDone, then L{SSHProtocolChecker} should raise
  327. L{NotEnoughAuthentication}.
  328. """
  329. checker = checkers.SSHProtocolChecker()
  330. def _areDone(avatarId):
  331. return False
  332. self.patch(checker, 'areDone', _areDone)
  333. passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse()
  334. passwordDatabase.addUser(b'test', b'test')
  335. checker.registerChecker(passwordDatabase)
  336. d = checker.requestAvatarId(UsernamePassword(b'test', b'test'))
  337. return self.assertFailure(d, NotEnoughAuthentication)
  338. def test_requestAvatarIdInvalidCredential(self):
  339. """
  340. If the passed credentials aren't handled by any registered checker,
  341. L{SSHProtocolChecker} should raise L{UnhandledCredentials}.
  342. """
  343. checker = checkers.SSHProtocolChecker()
  344. d = checker.requestAvatarId(UsernamePassword(b'test', b'test'))
  345. return self.assertFailure(d, UnhandledCredentials)
  346. def test_areDone(self):
  347. """
  348. The default L{SSHProcotolChecker.areDone} should simply return True.
  349. """
  350. self.assertTrue(checkers.SSHProtocolChecker().areDone(None))
  351. class UNIXPasswordDatabaseTests(TestCase):
  352. """
  353. Tests for L{UNIXPasswordDatabase}.
  354. """
  355. skip = cryptSkip or dependencySkip
  356. def assertLoggedIn(self, d, username):
  357. """
  358. Assert that the L{Deferred} passed in is called back with the value
  359. 'username'. This represents a valid login for this TestCase.
  360. NOTE: To work, this method's return value must be returned from the
  361. test method, or otherwise hooked up to the test machinery.
  362. @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method.
  363. @type d: L{Deferred}
  364. @rtype: L{Deferred}
  365. """
  366. result = []
  367. d.addBoth(result.append)
  368. self.assertEqual(len(result), 1, "login incomplete")
  369. if isinstance(result[0], Failure):
  370. result[0].raiseException()
  371. self.assertEqual(result[0], username)
  372. def test_defaultCheckers(self):
  373. """
  374. L{UNIXPasswordDatabase} with no arguments has checks the C{pwd} database
  375. and then the C{spwd} database.
  376. """
  377. checker = checkers.UNIXPasswordDatabase()
  378. def crypted(username, password):
  379. salt = crypt.crypt(password, username)
  380. crypted = crypt.crypt(password, '$1$' + salt)
  381. return crypted
  382. pwd = UserDatabase()
  383. pwd.addUser('alice', crypted('alice', 'password'),
  384. 1, 2, 'foo', '/foo', '/bin/sh')
  385. # x and * are convention for "look elsewhere for the password"
  386. pwd.addUser('bob', 'x', 1, 2, 'bar', '/bar', '/bin/sh')
  387. spwd = ShadowDatabase()
  388. spwd.addUser('alice', 'wrong', 1, 2, 3, 4, 5, 6, 7)
  389. spwd.addUser('bob', crypted('bob', 'password'),
  390. 8, 9, 10, 11, 12, 13, 14)
  391. self.patch(checkers, 'pwd', pwd)
  392. self.patch(checkers, 'spwd', spwd)
  393. mockos = MockOS()
  394. self.patch(util, 'os', mockos)
  395. mockos.euid = 2345
  396. mockos.egid = 1234
  397. cred = UsernamePassword(b"alice", b"password")
  398. self.assertLoggedIn(checker.requestAvatarId(cred), b'alice')
  399. self.assertEqual(mockos.seteuidCalls, [])
  400. self.assertEqual(mockos.setegidCalls, [])
  401. cred.username = b"bob"
  402. self.assertLoggedIn(checker.requestAvatarId(cred), b'bob')
  403. self.assertEqual(mockos.seteuidCalls, [0, 2345])
  404. self.assertEqual(mockos.setegidCalls, [0, 1234])
  405. def assertUnauthorizedLogin(self, d):
  406. """
  407. Asserts that the L{Deferred} passed in is erred back with an
  408. L{UnauthorizedLogin} L{Failure}. This reprsents an invalid login for
  409. this TestCase.
  410. NOTE: To work, this method's return value must be returned from the
  411. test method, or otherwise hooked up to the test machinery.
  412. @param d: a L{Deferred} from an L{IChecker.requestAvatarId} method.
  413. @type d: L{Deferred}
  414. @rtype: L{None}
  415. """
  416. self.assertRaises(
  417. checkers.UnauthorizedLogin, self.assertLoggedIn, d, 'bogus value')
  418. def test_passInCheckers(self):
  419. """
  420. L{UNIXPasswordDatabase} takes a list of functions to check for UNIX
  421. user information.
  422. """
  423. password = crypt.crypt('secret', 'secret')
  424. userdb = UserDatabase()
  425. userdb.addUser('anybody', password, 1, 2, 'foo', '/bar', '/bin/sh')
  426. checker = checkers.UNIXPasswordDatabase([userdb.getpwnam])
  427. self.assertLoggedIn(
  428. checker.requestAvatarId(UsernamePassword(b'anybody', b'secret')),
  429. b'anybody')
  430. def test_verifyPassword(self):
  431. """
  432. If the encrypted password provided by the getpwnam function is valid
  433. (verified by the L{verifyCryptedPassword} function), we callback the
  434. C{requestAvatarId} L{Deferred} with the username.
  435. """
  436. def verifyCryptedPassword(crypted, pw):
  437. return crypted == pw
  438. def getpwnam(username):
  439. return [username, username]
  440. self.patch(checkers, 'verifyCryptedPassword', verifyCryptedPassword)
  441. checker = checkers.UNIXPasswordDatabase([getpwnam])
  442. credential = UsernamePassword(b'username', b'username')
  443. self.assertLoggedIn(checker.requestAvatarId(credential), b'username')
  444. def test_failOnKeyError(self):
  445. """
  446. If the getpwnam function raises a KeyError, the login fails with an
  447. L{UnauthorizedLogin} exception.
  448. """
  449. def getpwnam(username):
  450. raise KeyError(username)
  451. checker = checkers.UNIXPasswordDatabase([getpwnam])
  452. credential = UsernamePassword(b'username', b'username')
  453. self.assertUnauthorizedLogin(checker.requestAvatarId(credential))
  454. def test_failOnBadPassword(self):
  455. """
  456. If the verifyCryptedPassword function doesn't verify the password, the
  457. login fails with an L{UnauthorizedLogin} exception.
  458. """
  459. def verifyCryptedPassword(crypted, pw):
  460. return False
  461. def getpwnam(username):
  462. return [username, username]
  463. self.patch(checkers, 'verifyCryptedPassword', verifyCryptedPassword)
  464. checker = checkers.UNIXPasswordDatabase([getpwnam])
  465. credential = UsernamePassword(b'username', b'username')
  466. self.assertUnauthorizedLogin(checker.requestAvatarId(credential))
  467. def test_loopThroughFunctions(self):
  468. """
  469. UNIXPasswordDatabase.requestAvatarId loops through each getpwnam
  470. function associated with it and returns a L{Deferred} which fires with
  471. the result of the first one which returns a value other than None.
  472. ones do not verify the password.
  473. """
  474. def verifyCryptedPassword(crypted, pw):
  475. return crypted == pw
  476. def getpwnam1(username):
  477. return [username, 'not the password']
  478. def getpwnam2(username):
  479. return [username, username]
  480. self.patch(checkers, 'verifyCryptedPassword', verifyCryptedPassword)
  481. checker = checkers.UNIXPasswordDatabase([getpwnam1, getpwnam2])
  482. credential = UsernamePassword(b'username', b'username')
  483. self.assertLoggedIn(checker.requestAvatarId(credential), b'username')
  484. def test_failOnSpecial(self):
  485. """
  486. If the password returned by any function is C{""}, C{"x"}, or C{"*"} it
  487. is not compared against the supplied password. Instead it is skipped.
  488. """
  489. pwd = UserDatabase()
  490. pwd.addUser('alice', '', 1, 2, '', 'foo', 'bar')
  491. pwd.addUser('bob', 'x', 1, 2, '', 'foo', 'bar')
  492. pwd.addUser('carol', '*', 1, 2, '', 'foo', 'bar')
  493. self.patch(checkers, 'pwd', pwd)
  494. checker = checkers.UNIXPasswordDatabase([checkers._pwdGetByName])
  495. cred = UsernamePassword(b'alice', b'')
  496. self.assertUnauthorizedLogin(checker.requestAvatarId(cred))
  497. cred = UsernamePassword(b'bob', b'x')
  498. self.assertUnauthorizedLogin(checker.requestAvatarId(cred))
  499. cred = UsernamePassword(b'carol', b'*')
  500. self.assertUnauthorizedLogin(checker.requestAvatarId(cred))
  501. class AuthorizedKeyFileReaderTests(TestCase):
  502. """
  503. Tests for L{checkers.readAuthorizedKeyFile}
  504. """
  505. skip = dependencySkip
  506. def test_ignoresComments(self):
  507. """
  508. L{checkers.readAuthorizedKeyFile} does not attempt to turn comments
  509. into keys
  510. """
  511. fileobj = BytesIO(b'# this comment is ignored\n'
  512. b'this is not\n'
  513. b'# this is again\n'
  514. b'and this is not')
  515. result = checkers.readAuthorizedKeyFile(fileobj, lambda x: x)
  516. self.assertEqual([b'this is not', b'and this is not'], list(result))
  517. def test_ignoresLeadingWhitespaceAndEmptyLines(self):
  518. """
  519. L{checkers.readAuthorizedKeyFile} ignores leading whitespace in
  520. lines, as well as empty lines
  521. """
  522. fileobj = BytesIO(b"""
  523. # ignore
  524. not ignored
  525. """)
  526. result = checkers.readAuthorizedKeyFile(fileobj, parseKey=lambda x: x)
  527. self.assertEqual([b'not ignored'], list(result))
  528. def test_ignoresUnparsableKeys(self):
  529. """
  530. L{checkers.readAuthorizedKeyFile} does not raise an exception
  531. when a key fails to parse (raises a
  532. L{twisted.conch.ssh.keys.BadKeyError}), but rather just keeps going
  533. """
  534. def failOnSome(line):
  535. if line.startswith(b'f'):
  536. raise keys.BadKeyError('failed to parse')
  537. return line
  538. fileobj = BytesIO(b'failed key\ngood key')
  539. result = checkers.readAuthorizedKeyFile(fileobj,
  540. parseKey=failOnSome)
  541. self.assertEqual([b'good key'], list(result))
  542. class InMemorySSHKeyDBTests(TestCase):
  543. """
  544. Tests for L{checkers.InMemorySSHKeyDB}
  545. """
  546. skip = dependencySkip
  547. def test_implementsInterface(self):
  548. """
  549. L{checkers.InMemorySSHKeyDB} implements
  550. L{checkers.IAuthorizedKeysDB}
  551. """
  552. keydb = checkers.InMemorySSHKeyDB({b'alice': [b'key']})
  553. verifyObject(checkers.IAuthorizedKeysDB, keydb)
  554. def test_noKeysForUnauthorizedUser(self):
  555. """
  556. If the user is not in the mapping provided to
  557. L{checkers.InMemorySSHKeyDB}, an empty iterator is returned
  558. by L{checkers.InMemorySSHKeyDB.getAuthorizedKeys}
  559. """
  560. keydb = checkers.InMemorySSHKeyDB({b'alice': [b'keys']})
  561. self.assertEqual([], list(keydb.getAuthorizedKeys(b'bob')))
  562. def test_allKeysForAuthorizedUser(self):
  563. """
  564. If the user is in the mapping provided to
  565. L{checkers.InMemorySSHKeyDB}, an iterator with all the keys
  566. is returned by L{checkers.InMemorySSHKeyDB.getAuthorizedKeys}
  567. """
  568. keydb = checkers.InMemorySSHKeyDB({b'alice': [b'a', b'b']})
  569. self.assertEqual([b'a', b'b'], list(keydb.getAuthorizedKeys(b'alice')))
  570. class UNIXAuthorizedKeysFilesTests(TestCase):
  571. """
  572. Tests for L{checkers.UNIXAuthorizedKeysFiles}.
  573. """
  574. skip = dependencySkip
  575. def setUp(self):
  576. mockos = MockOS()
  577. mockos.path = FilePath(self.mktemp())
  578. mockos.path.makedirs()
  579. self.userdb = UserDatabase()
  580. self.userdb.addUser(b'alice', b'password', 1, 2, b'alice lastname',
  581. mockos.path.path, b'/bin/shell')
  582. self.sshDir = mockos.path.child('.ssh')
  583. self.sshDir.makedirs()
  584. authorizedKeys = self.sshDir.child('authorized_keys')
  585. authorizedKeys.setContent(b'key 1\nkey 2')
  586. self.expectedKeys = [b'key 1', b'key 2']
  587. def test_implementsInterface(self):
  588. """
  589. L{checkers.UNIXAuthorizedKeysFiles} implements
  590. L{checkers.IAuthorizedKeysDB}.
  591. """
  592. keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb)
  593. verifyObject(checkers.IAuthorizedKeysDB, keydb)
  594. def test_noKeysForUnauthorizedUser(self):
  595. """
  596. If the user is not in the user database provided to
  597. L{checkers.UNIXAuthorizedKeysFiles}, an empty iterator is returned
  598. by L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys}.
  599. """
  600. keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb,
  601. parseKey=lambda x: x)
  602. self.assertEqual([], list(keydb.getAuthorizedKeys('bob')))
  603. def test_allKeysInAllAuthorizedFilesForAuthorizedUser(self):
  604. """
  605. If the user is in the user database provided to
  606. L{checkers.UNIXAuthorizedKeysFiles}, an iterator with all the keys in
  607. C{~/.ssh/authorized_keys} and C{~/.ssh/authorized_keys2} is returned
  608. by L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys}.
  609. """
  610. self.sshDir.child('authorized_keys2').setContent(b'key 3')
  611. keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb,
  612. parseKey=lambda x: x)
  613. self.assertEqual(self.expectedKeys + [b'key 3'],
  614. list(keydb.getAuthorizedKeys(b'alice')))
  615. def test_ignoresNonexistantFile(self):
  616. """
  617. L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys} returns only
  618. the keys in C{~/.ssh/authorized_keys} and C{~/.ssh/authorized_keys2}
  619. if they exist.
  620. """
  621. keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb,
  622. parseKey=lambda x: x)
  623. self.assertEqual(self.expectedKeys,
  624. list(keydb.getAuthorizedKeys(b'alice')))
  625. def test_ignoresUnreadableFile(self):
  626. """
  627. L{checkers.UNIXAuthorizedKeysFiles.getAuthorizedKeys} returns only
  628. the keys in C{~/.ssh/authorized_keys} and C{~/.ssh/authorized_keys2}
  629. if they are readable.
  630. """
  631. self.sshDir.child('authorized_keys2').makedirs()
  632. keydb = checkers.UNIXAuthorizedKeysFiles(self.userdb,
  633. parseKey=lambda x: x)
  634. self.assertEqual(self.expectedKeys,
  635. list(keydb.getAuthorizedKeys(b'alice')))
  636. _KeyDB = namedtuple('KeyDB', ['getAuthorizedKeys'])
  637. class _DummyException(Exception):
  638. """
  639. Fake exception to be used for testing.
  640. """
  641. pass
  642. class SSHPublicKeyCheckerTests(TestCase):
  643. """
  644. Tests for L{checkers.SSHPublicKeyChecker}.
  645. """
  646. skip = dependencySkip
  647. def setUp(self):
  648. self.credentials = SSHPrivateKey(
  649. b'alice', b'ssh-rsa', keydata.publicRSA_openssh, b'foo',
  650. keys.Key.fromString(keydata.privateRSA_openssh).sign(b'foo'))
  651. self.keydb = _KeyDB(lambda _: [
  652. keys.Key.fromString(keydata.publicRSA_openssh)])
  653. self.checker = checkers.SSHPublicKeyChecker(self.keydb)
  654. def test_credentialsWithoutSignature(self):
  655. """
  656. Calling L{checkers.SSHPublicKeyChecker.requestAvatarId} with
  657. credentials that do not have a signature fails with L{ValidPublicKey}.
  658. """
  659. self.credentials.signature = None
  660. self.failureResultOf(self.checker.requestAvatarId(self.credentials),
  661. ValidPublicKey)
  662. def test_credentialsWithBadKey(self):
  663. """
  664. Calling L{checkers.SSHPublicKeyChecker.requestAvatarId} with
  665. credentials that have a bad key fails with L{keys.BadKeyError}.
  666. """
  667. self.credentials.blob = b''
  668. self.failureResultOf(self.checker.requestAvatarId(self.credentials),
  669. keys.BadKeyError)
  670. def test_credentialsNoMatchingKey(self):
  671. """
  672. If L{checkers.IAuthorizedKeysDB.getAuthorizedKeys} returns no keys
  673. that match the credentials,
  674. L{checkers.SSHPublicKeyChecker.requestAvatarId} fails with
  675. L{UnauthorizedLogin}.
  676. """
  677. self.credentials.blob = keydata.publicDSA_openssh
  678. self.failureResultOf(self.checker.requestAvatarId(self.credentials),
  679. UnauthorizedLogin)
  680. def test_credentialsInvalidSignature(self):
  681. """
  682. Calling L{checkers.SSHPublicKeyChecker.requestAvatarId} with
  683. credentials that are incorrectly signed fails with
  684. L{UnauthorizedLogin}.
  685. """
  686. self.credentials.signature = (
  687. keys.Key.fromString(keydata.privateDSA_openssh).sign(b'foo'))
  688. self.failureResultOf(self.checker.requestAvatarId(self.credentials),
  689. UnauthorizedLogin)
  690. def test_failureVerifyingKey(self):
  691. """
  692. If L{keys.Key.verify} raises an exception,
  693. L{checkers.SSHPublicKeyChecker.requestAvatarId} fails with
  694. L{UnauthorizedLogin}.
  695. """
  696. def fail(*args, **kwargs):
  697. raise _DummyException()
  698. self.patch(keys.Key, 'verify', fail)
  699. self.failureResultOf(self.checker.requestAvatarId(self.credentials),
  700. UnauthorizedLogin)
  701. self.flushLoggedErrors(_DummyException)
  702. def test_usernameReturnedOnSuccess(self):
  703. """
  704. L{checker.SSHPublicKeyChecker.requestAvatarId}, if successful,
  705. callbacks with the username.
  706. """
  707. d = self.checker.requestAvatarId(self.credentials)
  708. self.assertEqual(b'alice', self.successResultOf(d))