utils.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import json
  4. import time
  5. import uuid
  6. from base64 import b64decode, b64encode
  7. from cryptography.exceptions import InvalidSignature, InvalidTag
  8. from cryptography.hazmat.backends import default_backend
  9. from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP, PKCS1v15
  10. from cryptography.hazmat.primitives.ciphers.aead import AESGCM
  11. from cryptography.hazmat.primitives.hashes import SHA1, SHA256, Hash
  12. from cryptography.hazmat.primitives.hmac import HMAC
  13. from cryptography.hazmat.primitives.serialization import load_pem_private_key
  14. from cryptography.x509 import load_pem_x509_certificate
  15. def build_authorization(path,
  16. method,
  17. mchid,
  18. serial_no,
  19. private_key,
  20. data = None,
  21. nonce_str = None):
  22. timeStamp = str(int(time.time()))
  23. nonce_str = nonce_str or ''.join(str(uuid.uuid4()).split('-')).upper()
  24. body = data if isinstance(data, str) else json.dumps(data) if data else ''
  25. sign_str = '%s\n%s\n%s\n%s\n%s\n' % (method, path, timeStamp, nonce_str, body)
  26. signature = rsa_sign(private_key = private_key, sign_str = sign_str)
  27. authorization = 'WECHATPAY2-SHA256-RSA2048 mchid="%s",nonce_str="%s",signature="%s",timestamp="%s",serial_no="%s"' % (
  28. mchid, nonce_str, signature, timeStamp, serial_no)
  29. return authorization
  30. def rsa_sign(private_key, sign_str):
  31. message = sign_str.encode('UTF-8')
  32. signature = private_key.sign(data = message, padding = PKCS1v15(), algorithm = SHA256())
  33. sign = b64encode(signature).decode('UTF-8').replace('\n', '')
  34. return sign
  35. def aes_decrypt(nonce, ciphertext, associated_data, apiv3_key):
  36. key_bytes = apiv3_key.encode('UTF-8')
  37. nonce_bytes = nonce.encode('UTF-8')
  38. associated_data_bytes = associated_data.encode('UTF-8')
  39. data = b64decode(ciphertext)
  40. aesgcm = AESGCM(key = key_bytes)
  41. try:
  42. result = aesgcm.decrypt(nonce = nonce_bytes, data = data, associated_data = associated_data_bytes).decode(
  43. 'UTF-8')
  44. except InvalidTag:
  45. result = None
  46. return result
  47. def format_private_key(private_key_str):
  48. pem_start = '-----BEGIN PRIVATE KEY-----\n'
  49. pem_end = '\n-----END PRIVATE KEY-----'
  50. if not private_key_str.startswith(pem_start):
  51. private_key_str = pem_start + private_key_str
  52. if not private_key_str.endswith(pem_end):
  53. private_key_str = private_key_str + pem_end
  54. return private_key_str
  55. def load_certificate(certificate_str):
  56. """加载证书字符串"""
  57. try:
  58. return load_pem_x509_certificate(data = certificate_str.encode('UTF-8'), backend = default_backend())
  59. except:
  60. return None
  61. def load_private_key(private_key_str):
  62. try:
  63. return load_pem_private_key(data = format_private_key(private_key_str).encode('UTF-8'), password = None,
  64. backend = default_backend())
  65. except:
  66. raise Exception('failed to load private key.')
  67. def rsa_verify(timestamp, nonce, body, signature, certificate):
  68. sign_str = '%s\n%s\n%s\n' % (timestamp, nonce, body)
  69. public_key = certificate.public_key()
  70. message = sign_str.encode('UTF-8')
  71. signature = b64decode(signature)
  72. try:
  73. public_key.verify(signature, message, PKCS1v15(), SHA256())
  74. except InvalidSignature:
  75. return False
  76. return True
  77. def rsa_encrypt(text, certificate):
  78. data = text.encode('UTF-8')
  79. public_key = certificate.public_key()
  80. cipherbyte = public_key.encrypt(
  81. plaintext = data,
  82. padding = OAEP(mgf = MGF1(algorithm = SHA1()), algorithm = SHA1(), label = None)
  83. )
  84. return b64encode(cipherbyte).decode('UTF-8')
  85. def rsa_decrypt(ciphertext, private_key):
  86. data = private_key.decrypt(
  87. ciphertext = b64decode(ciphertext),
  88. padding = OAEP(mgf = MGF1(algorithm = SHA1()), algorithm = SHA1(), label = None)
  89. )
  90. result = data.decode('UTF-8')
  91. return result
  92. def hmac_sign(key, sign_str):
  93. hmac = HMAC(key.encode('UTF-8'), SHA256())
  94. hmac.update(sign_str.encode('UTF-8'))
  95. sign = hmac.finalize().hex().upper()
  96. return sign
  97. def sha256(data):
  98. hash = Hash(SHA256())
  99. hash.update(data)
  100. return hash.finalize().hex()