utils.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import base64
  4. import hashlib
  5. import hmac
  6. import simplejson as json
  7. import time
  8. import uuid
  9. from cryptography.exceptions import InvalidSignature, InvalidTag
  10. from cryptography.hazmat.backends import default_backend
  11. from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP, PKCS1v15
  12. from cryptography.hazmat.primitives.ciphers.aead import AESGCM
  13. from cryptography.hazmat.primitives.hashes import SHA1, SHA256
  14. from cryptography.hazmat.primitives.serialization import load_pem_private_key
  15. from cryptography.x509 import load_pem_x509_certificate
  16. def build_authorization(path, method, mchid, serial_no, private_key, data = None, nonce_str = None):
  17. timeStamp = str(int(time.time()))
  18. nonce_str = nonce_str or ''.join(str(uuid.uuid4()).split('-')).upper()
  19. data = data or data or ""
  20. body = json.dumps(data) if not isinstance(data, basestring) else data
  21. sign_str = '%s\n%s\n%s\n%s\n%s\n' % (method, path, timeStamp, nonce_str, body)
  22. signature = rsa_sign(private_key = private_key, sign_str = sign_str)
  23. authorization = 'WECHATPAY2-SHA256-RSA2048 mchid="%s",nonce_str="%s",signature="%s",timestamp="%s",serial_no="%s"' % (
  24. mchid, nonce_str, signature, timeStamp, serial_no)
  25. return authorization
  26. def rsa_sign(private_key, sign_str):
  27. message = sign_str.encode('utf-8')
  28. signature = private_key.sign(data = message, padding = PKCS1v15(), algorithm = SHA256())
  29. sign = base64.b64encode(signature).decode('utf8').replace('\n', '')
  30. return sign
  31. def aes_decrypt(nonce, ciphertext, associated_data, apiv3_key):
  32. key_bytes = apiv3_key.encode('UTF-8')
  33. nonce_bytes = nonce.encode('UTF-8')
  34. associated_data_bytes = associated_data.encode('UTF-8')
  35. data = base64.b64decode(ciphertext)
  36. aesgcm = AESGCM(key = key_bytes)
  37. try:
  38. result = aesgcm.decrypt(nonce = nonce_bytes, data = data, associated_data = associated_data_bytes).decode(
  39. 'UTF-8')
  40. except InvalidTag:
  41. result = None
  42. return result
  43. def format_private_key(private_key_str):
  44. pem_start = '-----BEGIN PRIVATE KEY-----\n'
  45. pem_end = '\n-----END PRIVATE KEY-----'
  46. if not private_key_str.startswith(pem_start):
  47. private_key_str = pem_start + private_key_str
  48. if not private_key_str.endswith(pem_end):
  49. private_key_str = private_key_str + pem_end
  50. return private_key_str
  51. def load_certificate(certificate_str):
  52. """加载证书字符串"""
  53. try:
  54. return load_pem_x509_certificate(data = certificate_str.encode('UTF-8'), backend = default_backend())
  55. except:
  56. return None
  57. def load_private_key(private_key_str):
  58. try:
  59. return load_pem_private_key(data = format_private_key(private_key_str).encode('UTF-8'), password = None,
  60. backend = default_backend())
  61. except:
  62. raise Exception('failed to load private key.')
  63. def rsa_verify(timestamp, nonce, body, signature, certificate):
  64. sign_str = '%s\n%s\n%s\n' % (timestamp, nonce, body)
  65. public_key = certificate.public_key()
  66. message = sign_str.encode('UTF-8')
  67. signature = base64.b64decode(signature)
  68. try:
  69. public_key.verify(signature, message, PKCS1v15(), SHA256())
  70. except InvalidSignature:
  71. return False
  72. return True
  73. def rsa_encrypt(text, certificate):
  74. data = text.encode('UTF-8')
  75. public_key = certificate.public_key()
  76. cipherbyte = public_key.encrypt(
  77. plaintext = data,
  78. padding = OAEP(mgf = MGF1(algorithm = SHA1()), algorithm = SHA1(), label = None)
  79. )
  80. return base64.b64encode(cipherbyte).decode('UTF-8')
  81. def rsa_decrypt(ciphertext, private_key):
  82. data = private_key.decrypt(
  83. ciphertext = base64.b64decode(ciphertext),
  84. padding = OAEP(mgf = MGF1(algorithm = SHA1()), algorithm = SHA1(), label = None)
  85. )
  86. result = data.decode('UTF-8')
  87. return result
  88. def hmac_sign(key, sign_str):
  89. return hmac.new(key.encode('UTF-8'), msg = sign_str, digestmod = hashlib.sha256).hexdigest().upper()
  90. def sha256(data):
  91. return hashlib.sha256(data).hexdigest()