utils.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. import base64
  4. import copy
  5. import hashlib
  6. import hmac
  7. import socket
  8. import logging
  9. import six
  10. from library import to_text, to_binary
  11. logger = logging.getLogger(__name__)
  12. def format_url(params, api_key=None):
  13. data = [to_binary('{0}={1}'.format(k, params[k])) for k in sorted(params) if params[k]]
  14. if api_key:
  15. data.append(to_binary('key={0}'.format(api_key)))
  16. return b"&".join(data)
  17. def calculate_signature(params, api_key):
  18. url = format_url(params, api_key)
  19. logger.debug("Calculate Signature URL: %s", url)
  20. return to_text(hashlib.md5(url).hexdigest().upper())
  21. def calculate_signature_hmac(params, api_key):
  22. url = format_url(params, api_key)
  23. sign = to_text(hmac.new(api_key.encode(), msg=url,
  24. digestmod=hashlib.sha256).hexdigest().upper())
  25. return sign
  26. def _check_signature(params, api_key):
  27. _params = copy.deepcopy(params)
  28. sign = _params.pop('sign', '')
  29. return sign == calculate_signature(_params, api_key)
  30. def dict_to_xml(d, sign):
  31. xml = ['<xml>\n']
  32. for k in sorted(d):
  33. # use sorted to avoid test error on Py3k
  34. v = d[k]
  35. if isinstance(v, six.integer_types) or (isinstance(v, six.string_types) and v.isdigit()):
  36. xml.append('<{0}>{1}</{0}>\n'.format(to_text(k), to_text(v)))
  37. else:
  38. xml.append(
  39. '<{0}><![CDATA[{1}]]></{0}>\n'.format(to_text(k), to_text(v))
  40. )
  41. xml.append('<sign><![CDATA[{0}]]></sign>\n</xml>'.format(to_text(sign)))
  42. return ''.join(xml)
  43. def get_external_ip():
  44. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  45. try:
  46. wechat_ip = socket.gethostbyname('api.mch.weixin.qq.com')
  47. sock.connect((wechat_ip, 80))
  48. addr, port = sock.getsockname()
  49. sock.close()
  50. return addr
  51. except socket.error:
  52. return '127.0.0.1'
  53. def rsa_encrypt(data, pem, b64_encode=True):
  54. """
  55. rsa 加密
  56. :param data: 待加密字符串/binary
  57. :param pem: RSA public key 内容/binary
  58. :param b64_encode: 是否对输出进行 base64 encode
  59. :return: 如果 b64_encode=True 的话,返回加密并 base64 处理后的 string;否则返回加密后的 binary
  60. """
  61. from cryptography.hazmat.backends import default_backend
  62. from cryptography.hazmat.primitives import serialization
  63. from cryptography.hazmat.primitives import hashes
  64. from cryptography.hazmat.primitives.asymmetric import padding
  65. encoded_data = to_binary(data)
  66. pem = to_binary(pem)
  67. public_key = serialization.load_pem_public_key(pem, backend=default_backend())
  68. encrypted_data = public_key.encrypt(
  69. encoded_data,
  70. padding=padding.OAEP(
  71. mgf=padding.MGF1(hashes.SHA1()),
  72. algorithm=hashes.SHA1(),
  73. label=None,
  74. )
  75. )
  76. if b64_encode:
  77. encrypted_data = base64.b64encode(encrypted_data).decode('utf-8')
  78. return encrypted_data
  79. def rsa_decrypt(encrypted_data, pem, password=None):
  80. """
  81. rsa 解密
  82. :param encrypted_data: 待解密 bytes
  83. :param pem: RSA private key 内容/binary
  84. :param password: RSA private key pass phrase
  85. :return: 解密后的 binary
  86. """
  87. from cryptography.hazmat.backends import default_backend
  88. from cryptography.hazmat.primitives import serialization
  89. from cryptography.hazmat.primitives import hashes
  90. from cryptography.hazmat.primitives.asymmetric import padding
  91. encrypted_data = to_binary(encrypted_data)
  92. pem = to_binary(pem)
  93. private_key = serialization.load_pem_private_key(pem, password, backend=default_backend())
  94. data = private_key.decrypt(
  95. encrypted_data,
  96. padding=padding.OAEP(
  97. mgf=padding.MGF1(hashes.SHA1()),
  98. algorithm=hashes.SHA1(),
  99. label=None,
  100. )
  101. )
  102. return data