__init__.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # -*- coding: utf-8 -*-
  2. """
  3. wechatpy.crypto
  4. ~~~~~~~~~~~~~~~~
  5. This module provides some crypto tools for WeChat and WeChat enterprise
  6. :copyright: (c) 2014 by messense.
  7. :license: MIT, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import, unicode_literals
  10. import json
  11. import time
  12. import base64
  13. from wechatpy.utils import to_text, to_binary, WeChatSigner
  14. from wechatpy.exceptions import (
  15. InvalidAppIdException,
  16. InvalidSignatureException
  17. )
  18. from wechatpy.crypto.base import BasePrpCrypto, WeChatCipher
  19. from wechatpy.crypto.pkcs7 import PKCS7Encoder
  20. def _get_signature(token, timestamp, nonce, encrypt):
  21. signer = WeChatSigner()
  22. signer.add_data(token, timestamp, nonce, encrypt)
  23. return signer.signature
  24. class PrpCrypto(BasePrpCrypto):
  25. def encrypt(self, text, app_id):
  26. return self._encrypt(text, app_id)
  27. def decrypt(self, text, app_id):
  28. return self._decrypt(text, app_id, InvalidAppIdException)
  29. class BaseWeChatCrypto(object):
  30. def __init__(self, token, encoding_aes_key, _id):
  31. encoding_aes_key = to_binary(encoding_aes_key + '=')
  32. self.key = base64.b64decode(encoding_aes_key)
  33. assert len(self.key) == 32
  34. self.token = token
  35. self._id = _id
  36. def _check_signature(self,
  37. signature,
  38. timestamp,
  39. nonce,
  40. echo_str,
  41. crypto_class=None):
  42. _signature = _get_signature(self.token, timestamp, nonce, echo_str)
  43. if _signature != signature:
  44. raise InvalidSignatureException()
  45. pc = crypto_class(self.key)
  46. return pc.decrypt(echo_str, self._id)
  47. def _encrypt_message(self,
  48. msg,
  49. nonce,
  50. timestamp=None,
  51. crypto_class=None):
  52. from wechatpy.replies import BaseReply
  53. xml = """<xml>
  54. <Encrypt><![CDATA[{encrypt}]]></Encrypt>
  55. <MsgSignature><![CDATA[{signature}]]></MsgSignature>
  56. <TimeStamp>{timestamp}</TimeStamp>
  57. <Nonce><![CDATA[{nonce}]]></Nonce>
  58. </xml>"""
  59. if isinstance(msg, BaseReply):
  60. msg = msg.render()
  61. timestamp = timestamp or to_text(int(time.time()))
  62. pc = crypto_class(self.key)
  63. encrypt = to_text(pc.encrypt(msg, self._id))
  64. signature = _get_signature(self.token, timestamp, nonce, encrypt)
  65. return to_text(xml.format(
  66. encrypt=encrypt,
  67. signature=signature,
  68. timestamp=timestamp,
  69. nonce=nonce
  70. ))
  71. def _decrypt_message(self,
  72. msg,
  73. signature,
  74. timestamp,
  75. nonce,
  76. crypto_class=None):
  77. if not isinstance(msg, dict):
  78. import xmltodict
  79. msg = xmltodict.parse(to_text(msg))['xml']
  80. encrypt = msg['Encrypt']
  81. _signature = _get_signature(self.token, timestamp, nonce, encrypt)
  82. if _signature != signature:
  83. raise InvalidSignatureException()
  84. pc = crypto_class(self.key)
  85. return pc.decrypt(encrypt, self._id)
  86. class WeChatCrypto(BaseWeChatCrypto):
  87. def __init__(self, token, encoding_aes_key, app_id):
  88. super(WeChatCrypto, self).__init__(token, encoding_aes_key, app_id)
  89. self.app_id = app_id
  90. def encrypt_message(self, msg, nonce, timestamp=None):
  91. return self._encrypt_message(msg, nonce, timestamp, PrpCrypto)
  92. def decrypt_message(self, msg, signature, timestamp, nonce):
  93. return self._decrypt_message(
  94. msg,
  95. signature,
  96. timestamp,
  97. nonce,
  98. PrpCrypto
  99. )
  100. class WeChatWxaCrypto(object):
  101. def __init__(self, key, iv, app_id):
  102. self.cipher = WeChatCipher(base64.b64decode(key), base64.b64decode(iv))
  103. self.app_id = app_id
  104. def decrypt_message(self, msg):
  105. raw_data = base64.b64decode(msg)
  106. decrypted = self.cipher.decrypt(raw_data)
  107. plaintext = PKCS7Encoder.decode(decrypted)
  108. decrypted_msg = json.loads(to_text(plaintext))
  109. if decrypted_msg['watermark']['appid'] != self.app_id:
  110. raise InvalidAppIdException()
  111. return decrypted_msg