ed25519key.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. # This file is part of paramiko.
  2. #
  3. # Paramiko is free software; you can redistribute it and/or modify it under the
  4. # terms of the GNU Lesser General Public License as published by the Free
  5. # Software Foundation; either version 2.1 of the License, or (at your option)
  6. # any later version.
  7. #
  8. # Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
  9. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  10. # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
  11. # details.
  12. #
  13. # You should have received a copy of the GNU Lesser General Public License
  14. # along with Paramiko; if not, write to the Free Software Foundation, Inc.,
  15. # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
  16. import bcrypt
  17. from cryptography.hazmat.backends import default_backend
  18. from cryptography.hazmat.primitives.ciphers import Cipher
  19. import nacl.signing
  20. import six
  21. from paramiko.message import Message
  22. from paramiko.pkey import PKey
  23. from paramiko.ssh_exception import SSHException, PasswordRequiredException
  24. OPENSSH_AUTH_MAGIC = b"openssh-key-v1\x00"
  25. def unpad(data):
  26. # At the moment, this is only used for unpadding private keys on disk. This
  27. # really ought to be made constant time (possibly by upstreaming this logic
  28. # into pyca/cryptography).
  29. padding_length = six.indexbytes(data, -1)
  30. if padding_length > 16:
  31. raise SSHException("Invalid key")
  32. for i in range(1, padding_length + 1):
  33. if six.indexbytes(data, -i) != (padding_length - i + 1):
  34. raise SSHException("Invalid key")
  35. return data[:-padding_length]
  36. class Ed25519Key(PKey):
  37. def __init__(self, msg=None, data=None, filename=None, password=None):
  38. verifying_key = signing_key = None
  39. if msg is None and data is not None:
  40. msg = Message(data)
  41. if msg is not None:
  42. if msg.get_text() != "ssh-ed25519":
  43. raise SSHException("Invalid key")
  44. verifying_key = nacl.signing.VerifyKey(msg.get_binary())
  45. elif filename is not None:
  46. with open(filename, "r") as f:
  47. data = self._read_private_key("OPENSSH", f)
  48. signing_key = self._parse_signing_key_data(data, password)
  49. if signing_key is None and verifying_key is None:
  50. raise ValueError("need a key")
  51. self._signing_key = signing_key
  52. self._verifying_key = verifying_key
  53. def _parse_signing_key_data(self, data, password):
  54. from paramiko.transport import Transport
  55. # We may eventually want this to be usable for other key types, as
  56. # OpenSSH moves to it, but for now this is just for Ed25519 keys.
  57. # This format is described here:
  58. # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key
  59. # The description isn't totally complete, and I had to refer to the
  60. # source for a full implementation.
  61. message = Message(data)
  62. if message.get_bytes(len(OPENSSH_AUTH_MAGIC)) != OPENSSH_AUTH_MAGIC:
  63. raise SSHException("Invalid key")
  64. ciphername = message.get_text()
  65. kdfname = message.get_text()
  66. kdfoptions = message.get_binary()
  67. num_keys = message.get_int()
  68. if kdfname == "none":
  69. # kdfname of "none" must have an empty kdfoptions, the ciphername
  70. # must be "none"
  71. if kdfoptions or ciphername != "none":
  72. raise SSHException("Invalid key")
  73. elif kdfname == "bcrypt":
  74. if not password:
  75. raise PasswordRequiredException(
  76. "Private key file is encrypted"
  77. )
  78. kdf = Message(kdfoptions)
  79. bcrypt_salt = kdf.get_binary()
  80. bcrypt_rounds = kdf.get_int()
  81. else:
  82. raise SSHException("Invalid key")
  83. if ciphername != "none" and ciphername not in Transport._cipher_info:
  84. raise SSHException("Invalid key")
  85. public_keys = []
  86. for _ in range(num_keys):
  87. pubkey = Message(message.get_binary())
  88. if pubkey.get_text() != "ssh-ed25519":
  89. raise SSHException("Invalid key")
  90. public_keys.append(pubkey.get_binary())
  91. private_ciphertext = message.get_binary()
  92. if ciphername == "none":
  93. private_data = private_ciphertext
  94. else:
  95. cipher = Transport._cipher_info[ciphername]
  96. key = bcrypt.kdf(
  97. password=password,
  98. salt=bcrypt_salt,
  99. desired_key_bytes=cipher["key-size"] + cipher["block-size"],
  100. rounds=bcrypt_rounds,
  101. # We can't control how many rounds are on disk, so no sense
  102. # warning about it.
  103. ignore_few_rounds=True,
  104. )
  105. decryptor = Cipher(
  106. cipher["class"](key[:cipher["key-size"]]),
  107. cipher["mode"](key[cipher["key-size"]:]),
  108. backend=default_backend()
  109. ).decryptor()
  110. private_data = (
  111. decryptor.update(private_ciphertext) + decryptor.finalize()
  112. )
  113. message = Message(unpad(private_data))
  114. if message.get_int() != message.get_int():
  115. raise SSHException("Invalid key")
  116. signing_keys = []
  117. for i in range(num_keys):
  118. if message.get_text() != "ssh-ed25519":
  119. raise SSHException("Invalid key")
  120. # A copy of the public key, again, ignore.
  121. public = message.get_binary()
  122. key_data = message.get_binary()
  123. # The second half of the key data is yet another copy of the public
  124. # key...
  125. signing_key = nacl.signing.SigningKey(key_data[:32])
  126. # Verify that all the public keys are the same...
  127. assert (
  128. signing_key.verify_key.encode() == public == public_keys[i] ==
  129. key_data[32:]
  130. )
  131. signing_keys.append(signing_key)
  132. # Comment, ignore.
  133. message.get_binary()
  134. if len(signing_keys) != 1:
  135. raise SSHException("Invalid key")
  136. return signing_keys[0]
  137. def asbytes(self):
  138. if self.can_sign():
  139. v = self._signing_key.verify_key
  140. else:
  141. v = self._verifying_key
  142. m = Message()
  143. m.add_string("ssh-ed25519")
  144. m.add_string(v.encode())
  145. return m.asbytes()
  146. def __hash__(self):
  147. if self.can_sign():
  148. v = self._signing_key.verify_key
  149. else:
  150. v = self._verifying_key
  151. return hash((self.get_name(), v))
  152. def get_name(self):
  153. return "ssh-ed25519"
  154. def get_bits(self):
  155. return 256
  156. def can_sign(self):
  157. return self._signing_key is not None
  158. def sign_ssh_data(self, data):
  159. m = Message()
  160. m.add_string("ssh-ed25519")
  161. m.add_string(self._signing_key.sign(data).signature)
  162. return m
  163. def verify_ssh_sig(self, data, msg):
  164. if msg.get_text() != "ssh-ed25519":
  165. return False
  166. try:
  167. self._verifying_key.verify(data, msg.get_binary())
  168. except nacl.exceptions.BadSignatureError:
  169. return False
  170. else:
  171. return True