123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- """0MQ authentication related functions and classes."""
- # Copyright (C) PyZMQ Developers
- # Distributed under the terms of the Modified BSD License.
- import datetime
- import glob
- import io
- import os
- import zmq
- from zmq.utils.strtypes import bytes, unicode, b, u
- _cert_secret_banner = u("""# **** Generated on {0} by pyzmq ****
- # ZeroMQ CURVE **Secret** Certificate
- # DO NOT PROVIDE THIS FILE TO OTHER USERS nor change its permissions.
- """)
- _cert_public_banner = u("""# **** Generated on {0} by pyzmq ****
- # ZeroMQ CURVE Public Certificate
- # Exchange securely, or use a secure mechanism to verify the contents
- # of this file after exchange. Store public certificates in your home
- # directory, in the .curve subdirectory.
- """)
- def _write_key_file(key_filename, banner, public_key, secret_key=None, metadata=None, encoding='utf-8'):
- """Create a certificate file"""
- if isinstance(public_key, bytes):
- public_key = public_key.decode(encoding)
- if isinstance(secret_key, bytes):
- secret_key = secret_key.decode(encoding)
- with io.open(key_filename, 'w', encoding='utf8') as f:
- f.write(banner.format(datetime.datetime.now()))
- f.write(u('metadata\n'))
- if metadata:
- for k, v in metadata.items():
- if isinstance(k, bytes):
- k = k.decode(encoding)
- if isinstance(v, bytes):
- v = v.decode(encoding)
- f.write(u(" {0} = {1}\n").format(k, v))
- f.write(u('curve\n'))
- f.write(u(" public-key = \"{0}\"\n").format(public_key))
- if secret_key:
- f.write(u(" secret-key = \"{0}\"\n").format(secret_key))
- def create_certificates(key_dir, name, metadata=None):
- """Create zmq certificates.
-
- Returns the file paths to the public and secret certificate files.
- """
- public_key, secret_key = zmq.curve_keypair()
- base_filename = os.path.join(key_dir, name)
- secret_key_file = "{0}.key_secret".format(base_filename)
- public_key_file = "{0}.key".format(base_filename)
- now = datetime.datetime.now()
- _write_key_file(public_key_file,
- _cert_public_banner.format(now),
- public_key)
- _write_key_file(secret_key_file,
- _cert_secret_banner.format(now),
- public_key,
- secret_key=secret_key,
- metadata=metadata)
- return public_key_file, secret_key_file
- def load_certificate(filename):
- """Load public and secret key from a zmq certificate.
-
- Returns (public_key, secret_key)
-
- If the certificate file only contains the public key,
- secret_key will be None.
-
- If there is no public key found in the file, ValueError will be raised.
- """
- public_key = None
- secret_key = None
- if not os.path.exists(filename):
- raise IOError("Invalid certificate file: {0}".format(filename))
- with open(filename, 'rb') as f:
- for line in f:
- line = line.strip()
- if line.startswith(b'#'):
- continue
- if line.startswith(b'public-key'):
- public_key = line.split(b"=", 1)[1].strip(b' \t\'"')
- if line.startswith(b'secret-key'):
- secret_key = line.split(b"=", 1)[1].strip(b' \t\'"')
- if public_key and secret_key:
- break
-
- if public_key is None:
- raise ValueError("No public key found in %s" % filename)
-
- return public_key, secret_key
- def load_certificates(directory='.'):
- """Load public keys from all certificates in a directory"""
- certs = {}
- if not os.path.isdir(directory):
- raise IOError("Invalid certificate directory: {0}".format(directory))
- # Follow czmq pattern of public keys stored in *.key files.
- glob_string = os.path.join(directory, "*.key")
-
- cert_files = glob.glob(glob_string)
- for cert_file in cert_files:
- public_key, _ = load_certificate(cert_file)
- if public_key:
- certs[public_key] = True
- return certs
- __all__ = ['create_certificates', 'load_certificate', 'load_certificates']
|