utils.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # -*- coding: utf-8 -*-
  2. """
  3. wechatpy.utils
  4. ~~~~~~~~~~~~~~~
  5. This module provides some useful utilities.
  6. :copyright: (c) 2014 by messense.
  7. :license: MIT, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import, unicode_literals
  10. import base64
  11. import hashlib
  12. from base64 import b64decode
  13. from Crypto.Cipher import AES
  14. from cryptography.hazmat.backends import default_backend
  15. from cryptography.hazmat.primitives.ciphers.aead import AESGCM
  16. from cryptography.hazmat.primitives.serialization import load_pem_private_key
  17. from cryptography.x509 import load_pem_x509_certificate
  18. from library import to_binary
  19. try:
  20. '''Use simplejson if we can, fallback to json otherwise.'''
  21. import simplejson as json
  22. except ImportError:
  23. import json # NOQA
  24. class WeChatSigner(object):
  25. """WeChat data signer"""
  26. def __init__(self, delimiter=b''):
  27. self._data = []
  28. self._delimiter = to_binary(delimiter)
  29. def add_data(self, *args):
  30. """Add data to signer"""
  31. for data in args:
  32. self._data.append(to_binary(data))
  33. @property
  34. def signature(self):
  35. """Get data signature"""
  36. self._data.sort()
  37. str_to_sign = self._delimiter.join(self._data)
  38. return hashlib.sha1(str_to_sign).hexdigest()
  39. def check_signature(token, signature, timestamp, nonce):
  40. """Check WeChat callback signature, raises InvalidSignatureException
  41. if check failed.
  42. :param token: WeChat callback token
  43. :param signature: WeChat callback signature sent by WeChat server
  44. :param timestamp: WeChat callback timestamp sent by WeChat server
  45. :param nonce: WeChat callback nonce sent by WeChat sever
  46. """
  47. signer = WeChatSigner()
  48. signer.add_data(token, timestamp, nonce)
  49. if signer.signature != signature:
  50. from library.wechatbase.exceptions import InvalidSignatureException
  51. raise InvalidSignatureException()
  52. def check_wxa_signature(session_key, raw_data, client_signature):
  53. """校验前端传来的rawData签名正确
  54. 详情请参考
  55. https://developers.weixin.qq.com/miniprogram/dev/framework/open-ability/signature.html # noqa
  56. :param session_key: code换取的session_key
  57. :param raw_data: 前端拿到的rawData
  58. :param client_signature: 前端拿到的signature
  59. :raises: InvalidSignatureException
  60. :return: 返回数据dict
  61. """
  62. str2sign = (raw_data + session_key).encode("utf-8")
  63. signature = hashlib.sha1(str2sign).hexdigest()
  64. if signature != client_signature:
  65. from library.wechatbase.exceptions import InvalidSignatureException
  66. raise InvalidSignatureException()
  67. class WXBizDataCrypt:
  68. def __init__(self, appid, session_key):
  69. self.appid = appid
  70. self.session_key = session_key
  71. def decrypt(self, encrypted_data, iv):
  72. '''
  73. aes decode
  74. 将加密后的信息解密
  75. @param encrypted_data: 包括敏感数据在内的完整用户信息的加密数据
  76. @param iv: 加密算法的初始向量
  77. @return: 解密后数据
  78. '''
  79. session_key = base64.b64decode(self.session_key)
  80. encrypted_data = base64.b64decode(encrypted_data)
  81. iv = base64.b64decode(iv)
  82. cipher = AES.new(session_key, AES.MODE_CBC, iv)
  83. decrypted = json.loads(self._unpad(cipher.decrypt(encrypted_data)))
  84. if decrypted['watermark']['appid'] != self.appid:
  85. raise Exception('Invalid Buffer')
  86. return decrypted
  87. def _unpad(self, s):
  88. return s[:-ord(s[len(s) - 1:])]
  89. def aes_decrypt(nonce, ciphertext, associated_data, apiv3_key):
  90. """使用api v3 key 解密数据"""
  91. key_bytes = apiv3_key.encode('UTF-8')
  92. nonce_bytes = nonce.encode('UTF-8')
  93. associated_data_bytes = associated_data.encode('UTF-8')
  94. data = b64decode(ciphertext)
  95. return AESGCM(key=key_bytes).decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode("UTF-8")
  96. # result = aesgcm.decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode('UTF-8')
  97. def load_certificate(certificate_str):
  98. """加载证书字符串"""
  99. return load_pem_x509_certificate(data=certificate_str.encode('UTF-8'), backend=default_backend())
  100. def load_private_key(private_key_str):
  101. """加载私钥字符串"""
  102. return load_pem_private_key(private_key_str.encode('UTF-8'), password=None, backend=default_backend())