certs.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. """0MQ authentication related functions and classes."""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import datetime
  5. import glob
  6. import io
  7. import os
  8. import zmq
  9. from zmq.utils.strtypes import bytes, unicode, b, u
  10. _cert_secret_banner = u("""# **** Generated on {0} by pyzmq ****
  11. # ZeroMQ CURVE **Secret** Certificate
  12. # DO NOT PROVIDE THIS FILE TO OTHER USERS nor change its permissions.
  13. """)
  14. _cert_public_banner = u("""# **** Generated on {0} by pyzmq ****
  15. # ZeroMQ CURVE Public Certificate
  16. # Exchange securely, or use a secure mechanism to verify the contents
  17. # of this file after exchange. Store public certificates in your home
  18. # directory, in the .curve subdirectory.
  19. """)
  20. def _write_key_file(key_filename, banner, public_key, secret_key=None, metadata=None, encoding='utf-8'):
  21. """Create a certificate file"""
  22. if isinstance(public_key, bytes):
  23. public_key = public_key.decode(encoding)
  24. if isinstance(secret_key, bytes):
  25. secret_key = secret_key.decode(encoding)
  26. with io.open(key_filename, 'w', encoding='utf8') as f:
  27. f.write(banner.format(datetime.datetime.now()))
  28. f.write(u('metadata\n'))
  29. if metadata:
  30. for k, v in metadata.items():
  31. if isinstance(k, bytes):
  32. k = k.decode(encoding)
  33. if isinstance(v, bytes):
  34. v = v.decode(encoding)
  35. f.write(u(" {0} = {1}\n").format(k, v))
  36. f.write(u('curve\n'))
  37. f.write(u(" public-key = \"{0}\"\n").format(public_key))
  38. if secret_key:
  39. f.write(u(" secret-key = \"{0}\"\n").format(secret_key))
  40. def create_certificates(key_dir, name, metadata=None):
  41. """Create zmq certificates.
  42. Returns the file paths to the public and secret certificate files.
  43. """
  44. public_key, secret_key = zmq.curve_keypair()
  45. base_filename = os.path.join(key_dir, name)
  46. secret_key_file = "{0}.key_secret".format(base_filename)
  47. public_key_file = "{0}.key".format(base_filename)
  48. now = datetime.datetime.now()
  49. _write_key_file(public_key_file,
  50. _cert_public_banner.format(now),
  51. public_key)
  52. _write_key_file(secret_key_file,
  53. _cert_secret_banner.format(now),
  54. public_key,
  55. secret_key=secret_key,
  56. metadata=metadata)
  57. return public_key_file, secret_key_file
  58. def load_certificate(filename):
  59. """Load public and secret key from a zmq certificate.
  60. Returns (public_key, secret_key)
  61. If the certificate file only contains the public key,
  62. secret_key will be None.
  63. If there is no public key found in the file, ValueError will be raised.
  64. """
  65. public_key = None
  66. secret_key = None
  67. if not os.path.exists(filename):
  68. raise IOError("Invalid certificate file: {0}".format(filename))
  69. with open(filename, 'rb') as f:
  70. for line in f:
  71. line = line.strip()
  72. if line.startswith(b'#'):
  73. continue
  74. if line.startswith(b'public-key'):
  75. public_key = line.split(b"=", 1)[1].strip(b' \t\'"')
  76. if line.startswith(b'secret-key'):
  77. secret_key = line.split(b"=", 1)[1].strip(b' \t\'"')
  78. if public_key and secret_key:
  79. break
  80. if public_key is None:
  81. raise ValueError("No public key found in %s" % filename)
  82. return public_key, secret_key
  83. def load_certificates(directory='.'):
  84. """Load public keys from all certificates in a directory"""
  85. certs = {}
  86. if not os.path.isdir(directory):
  87. raise IOError("Invalid certificate directory: {0}".format(directory))
  88. # Follow czmq pattern of public keys stored in *.key files.
  89. glob_string = os.path.join(directory, "*.key")
  90. cert_files = glob.glob(glob_string)
  91. for cert_file in cert_files:
  92. public_key, _ = load_certificate(cert_file)
  93. if public_key:
  94. certs[public_key] = True
  95. return certs
  96. __all__ = ['create_certificates', 'load_certificate', 'load_certificates']