# -*- coding: utf-8 -*- """ wechatpy.utils ~~~~~~~~~~~~~~~ This module provides some useful utilities. :copyright: (c) 2014 by messense. :license: MIT, see LICENSE for more details. """ from __future__ import absolute_import, unicode_literals import base64 import hashlib from base64 import b64decode from Crypto.Cipher import AES from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.x509 import load_pem_x509_certificate from library import to_binary try: '''Use simplejson if we can, fallback to json otherwise.''' import simplejson as json except ImportError: import json # NOQA class WeChatSigner(object): """WeChat data signer""" def __init__(self, delimiter=b''): self._data = [] self._delimiter = to_binary(delimiter) def add_data(self, *args): """Add data to signer""" for data in args: self._data.append(to_binary(data)) @property def signature(self): """Get data signature""" self._data.sort() str_to_sign = self._delimiter.join(self._data) return hashlib.sha1(str_to_sign).hexdigest() def check_signature(token, signature, timestamp, nonce): """Check WeChat callback signature, raises InvalidSignatureException if check failed. :param token: WeChat callback token :param signature: WeChat callback signature sent by WeChat server :param timestamp: WeChat callback timestamp sent by WeChat server :param nonce: WeChat callback nonce sent by WeChat sever """ signer = WeChatSigner() signer.add_data(token, timestamp, nonce) if signer.signature != signature: from library.wechatbase.exceptions import InvalidSignatureException raise InvalidSignatureException() def check_wxa_signature(session_key, raw_data, client_signature): """校验前端传来的rawData签名正确 详情请参考 https://developers.weixin.qq.com/miniprogram/dev/framework/open-ability/signature.html # noqa :param session_key: code换取的session_key :param raw_data: 前端拿到的rawData :param client_signature: 前端拿到的signature :raises: InvalidSignatureException :return: 返回数据dict """ str2sign = (raw_data + session_key).encode("utf-8") signature = hashlib.sha1(str2sign).hexdigest() if signature != client_signature: from library.wechatbase.exceptions import InvalidSignatureException raise InvalidSignatureException() class WXBizDataCrypt: def __init__(self, appid, session_key): self.appid = appid self.session_key = session_key def decrypt(self, encrypted_data, iv): ''' aes decode 将加密后的信息解密 @param encrypted_data: 包括敏感数据在内的完整用户信息的加密数据 @param iv: 加密算法的初始向量 @return: 解密后数据 ''' session_key = base64.b64decode(self.session_key) encrypted_data = base64.b64decode(encrypted_data) iv = base64.b64decode(iv) cipher = AES.new(session_key, AES.MODE_CBC, iv) decrypted = json.loads(self._unpad(cipher.decrypt(encrypted_data))) if decrypted['watermark']['appid'] != self.appid: raise Exception('Invalid Buffer') return decrypted def _unpad(self, s): return s[:-ord(s[len(s) - 1:])] def aes_decrypt(nonce, ciphertext, associated_data, apiv3_key): """使用api v3 key 解密数据""" key_bytes = apiv3_key.encode('UTF-8') nonce_bytes = nonce.encode('UTF-8') associated_data_bytes = associated_data.encode('UTF-8') data = b64decode(ciphertext) return AESGCM(key=key_bytes).decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode("UTF-8") # result = aesgcm.decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode('UTF-8') def load_certificate(certificate_str): """加载证书字符串""" return load_pem_x509_certificate(data=certificate_str.encode('UTF-8'), backend=default_backend()) def load_private_key(private_key_str): """加载私钥字符串""" return load_pem_private_key(private_key_str.encode('UTF-8'), password=None, backend=default_backend())