__init__.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # -*- coding: utf-8 -*-
  2. """
  3. wechatpy.crypto
  4. ~~~~~~~~~~~~~~~~
  5. This module provides some crypto tools for WeChat and WeChat enterprise
  6. :copyright: (c) 2014 by messense.
  7. :license: MIT, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import, unicode_literals
  10. import json
  11. import time
  12. import base64
  13. from library.wechatpy.utils import WeChatSigner
  14. from library import to_text, to_binary
  15. from library.wechatbase.exceptions import (
  16. InvalidAppIdException,
  17. InvalidSignatureException
  18. )
  19. from library.wechatpy.crypto.base import BasePrpCrypto, WeChatCipher
  20. from library.wechatpy.crypto.pkcs7 import PKCS7Encoder
  21. def _get_signature(token, timestamp, nonce, encrypt):
  22. signer = WeChatSigner()
  23. signer.add_data(token, timestamp, nonce, encrypt)
  24. return signer.signature
  25. class PrpCrypto(BasePrpCrypto):
  26. def encrypt(self, text, app_id):
  27. return self._encrypt(text, app_id)
  28. def decrypt(self, text, app_id):
  29. return self._decrypt(text, app_id, InvalidAppIdException)
  30. class BaseWeChatCrypto(object):
  31. def __init__(self, token, encoding_aes_key, _id):
  32. encoding_aes_key = to_binary(encoding_aes_key + '=')
  33. self.key = base64.b64decode(encoding_aes_key)
  34. assert len(self.key) == 32
  35. self.token = token
  36. self._id = _id
  37. def _check_signature(self,
  38. signature,
  39. timestamp,
  40. nonce,
  41. echo_str,
  42. crypto_class=None):
  43. _signature = _get_signature(self.token, timestamp, nonce, echo_str)
  44. if _signature != signature:
  45. raise InvalidSignatureException()
  46. pc = crypto_class(self.key)
  47. return pc.decrypt(echo_str, self._id)
  48. def _encrypt_message(self,
  49. msg,
  50. nonce,
  51. timestamp=None,
  52. crypto_class=None):
  53. from library.wechatpy.replies import BaseReply
  54. xml = """<xml>
  55. <Encrypt><![CDATA[{encrypt}]]></Encrypt>
  56. <MsgSignature><![CDATA[{signature}]]></MsgSignature>
  57. <TimeStamp>{timestamp}</TimeStamp>
  58. <Nonce><![CDATA[{nonce}]]></Nonce>
  59. </xml>"""
  60. if isinstance(msg, BaseReply):
  61. msg = msg.render()
  62. timestamp = timestamp or to_text(int(time.time()))
  63. pc = crypto_class(self.key)
  64. encrypt = to_text(pc.encrypt(msg, self._id))
  65. signature = _get_signature(self.token, timestamp, nonce, encrypt)
  66. return to_text(xml.format(
  67. encrypt=encrypt,
  68. signature=signature,
  69. timestamp=timestamp,
  70. nonce=nonce
  71. ))
  72. def _decrypt_message(self,
  73. msg,
  74. signature,
  75. timestamp,
  76. nonce,
  77. crypto_class=None):
  78. if not isinstance(msg, dict):
  79. import xmltodict
  80. msg = xmltodict.parse(to_text(msg))['xml']
  81. encrypt = msg['Encrypt']
  82. _signature = _get_signature(self.token, timestamp, nonce, encrypt)
  83. if _signature != signature:
  84. raise InvalidSignatureException()
  85. pc = crypto_class(self.key)
  86. return pc.decrypt(encrypt, self._id)
  87. class WeChatCrypto(BaseWeChatCrypto):
  88. def __init__(self, token, encoding_aes_key, app_id):
  89. super(WeChatCrypto, self).__init__(token, encoding_aes_key, app_id)
  90. self.app_id = app_id
  91. def encrypt_message(self, msg, nonce, timestamp=None):
  92. return self._encrypt_message(msg, nonce, timestamp, PrpCrypto)
  93. def decrypt_message(self, msg, signature, timestamp, nonce):
  94. return self._decrypt_message(
  95. msg,
  96. signature,
  97. timestamp,
  98. nonce,
  99. PrpCrypto
  100. )
  101. class WeChatWxaCrypto(object):
  102. def __init__(self, key, iv, app_id):
  103. self.cipher = WeChatCipher(base64.b64decode(key), base64.b64decode(iv))
  104. self.app_id = app_id
  105. def decrypt_message(self, msg):
  106. raw_data = base64.b64decode(msg)
  107. decrypted = self.cipher.decrypt(raw_data)
  108. plaintext = PKCS7Encoder.decode(decrypted)
  109. decrypted_msg = json.loads(to_text(plaintext))
  110. if decrypted_msg['watermark']['appid'] != self.app_id:
  111. raise InvalidAppIdException()
  112. return decrypted_msg