123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- # -*- coding: utf-8 -*-
- """
- wechatpy.utils
- ~~~~~~~~~~~~~~~
- This module provides some useful utilities.
- :copyright: (c) 2014 by messense.
- :license: MIT, see LICENSE for more details.
- """
- from __future__ import absolute_import, unicode_literals
- import base64
- import hashlib
- from base64 import b64decode
- from Crypto.Cipher import AES
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.ciphers.aead import AESGCM
- from cryptography.hazmat.primitives.serialization import load_pem_private_key
- from cryptography.x509 import load_pem_x509_certificate
- from library import to_binary
- try:
- '''Use simplejson if we can, fallback to json otherwise.'''
- import simplejson as json
- except ImportError:
- import json # NOQA
- class WeChatSigner(object):
- """WeChat data signer"""
- def __init__(self, delimiter=b''):
- self._data = []
- self._delimiter = to_binary(delimiter)
- def add_data(self, *args):
- """Add data to signer"""
- for data in args:
- self._data.append(to_binary(data))
- @property
- def signature(self):
- """Get data signature"""
- self._data.sort()
- str_to_sign = self._delimiter.join(self._data)
- return hashlib.sha1(str_to_sign).hexdigest()
- def check_signature(token, signature, timestamp, nonce):
- """Check WeChat callback signature, raises InvalidSignatureException
- if check failed.
- :param token: WeChat callback token
- :param signature: WeChat callback signature sent by WeChat server
- :param timestamp: WeChat callback timestamp sent by WeChat server
- :param nonce: WeChat callback nonce sent by WeChat sever
- """
- signer = WeChatSigner()
- signer.add_data(token, timestamp, nonce)
- if signer.signature != signature:
- from library.wechatbase.exceptions import InvalidSignatureException
- raise InvalidSignatureException()
- def check_wxa_signature(session_key, raw_data, client_signature):
- """校验前端传来的rawData签名正确
- 详情请参考
- https://developers.weixin.qq.com/miniprogram/dev/framework/open-ability/signature.html # noqa
- :param session_key: code换取的session_key
- :param raw_data: 前端拿到的rawData
- :param client_signature: 前端拿到的signature
- :raises: InvalidSignatureException
- :return: 返回数据dict
- """
- str2sign = (raw_data + session_key).encode("utf-8")
- signature = hashlib.sha1(str2sign).hexdigest()
- if signature != client_signature:
- from library.wechatbase.exceptions import InvalidSignatureException
- raise InvalidSignatureException()
- class WXBizDataCrypt:
- def __init__(self, appid, session_key):
- self.appid = appid
- self.session_key = session_key
- def decrypt(self, encrypted_data, iv):
- '''
- aes decode
- 将加密后的信息解密
- @param encrypted_data: 包括敏感数据在内的完整用户信息的加密数据
- @param iv: 加密算法的初始向量
- @return: 解密后数据
- '''
- session_key = base64.b64decode(self.session_key)
- encrypted_data = base64.b64decode(encrypted_data)
- iv = base64.b64decode(iv)
- cipher = AES.new(session_key, AES.MODE_CBC, iv)
- decrypted = json.loads(self._unpad(cipher.decrypt(encrypted_data)))
- if decrypted['watermark']['appid'] != self.appid:
- raise Exception('Invalid Buffer')
- return decrypted
- def _unpad(self, s):
- return s[:-ord(s[len(s) - 1:])]
- def aes_decrypt(nonce, ciphertext, associated_data, apiv3_key):
- """使用api v3 key 解密数据"""
- key_bytes = apiv3_key.encode('UTF-8')
- nonce_bytes = nonce.encode('UTF-8')
- associated_data_bytes = associated_data.encode('UTF-8')
- data = b64decode(ciphertext)
- return AESGCM(key=key_bytes).decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode("UTF-8")
- # result = aesgcm.decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode('UTF-8')
- def load_certificate(certificate_str):
- """加载证书字符串"""
- return load_pem_x509_certificate(data=certificate_str.encode('UTF-8'), backend=default_backend())
- def load_private_key(private_key_str):
- """加载私钥字符串"""
- return load_pem_private_key(private_key_str.encode('UTF-8'), password=None, backend=default_backend())
|