mp.py 8.5 KB


  1. # -*- coding: utf-8 -*-
  2. from __future__ import unicode_literals
  3. import os
  4. import time
  5. import json
  6. import hashlib
  7. import string
  8. import random
  9. import requests
  10. from .base import Map, WeixinError
  11. __all__ = ("WeixinMPError", "WeixinMP")
  12. DEFAULT_DIR = os.getenv("HOME", os.getcwd())
  13. class WeixinMPError(WeixinError):
  14. def __init__(self, msg):
  15. super(WeixinMPError, self).__init__(msg)
  16. class WeixinMP(object):
  17. api_uri = "https://api.weixin.qq.com/cgi-bin"
  18. def __init__(self, app_id, app_secret, ac_path=None, jt_path=None):
  19. self.app_id = app_id
  20. self.app_secret = app_secret
  21. self.session = requests.Session()
  22. if ac_path is None:
  23. ac_path = os.path.join(DEFAULT_DIR, ".access_token")
  24. if jt_path is None:
  25. jt_path = os.path.join(DEFAULT_DIR, ".jsapi_ticket")
  26. self.ac_path = ac_path
  27. self.jt_path = jt_path
  28. def fetch(self, method, url, params=None, data=None, headers=None):
  29. req = requests.Request(method, url, params=params,
  30. data=data, headers=headers)
  31. prepped = req.prepare()
  32. resp = self.session.send(prepped, timeout=20)
  33. data = Map(resp.json())
  34. if data.errcode:
  35. msg = "%(errcode)d %(errmsg)s" % data
  36. raise WeixinMPError(msg)
  37. return data
  38. def get(self, path, params=None, token=True):
  39. url = "{0}{1}".format(self.api_uri, path)
  40. params = {} if not params else params
  41. token and params.setdefault("access_token", self.access_token)
  42. return self.fetch("GET", url, params)
  43. def post(self, path, data, json_encode=True, token=True):
  44. url = "{0}{1}".format(self.api_uri, path)
  45. params = {}
  46. token and params.setdefault("access_token", self.access_token)
  47. headers = {}
  48. if json_encode:
  49. data = json.dumps(data, ensure_ascii=False)
  50. headers["Content-Type"] = "application/json"
  51. return self.fetch("POST", url, params=params, data=data, headers=headers)
  52. @property
  53. def access_token(self):
  54. """
  55. 获取服务端凭证
  56. """
  57. timestamp = time.time()
  58. if not os.path.exists(self.ac_path) or \
  59. int(os.path.getmtime(self.ac_path)) < timestamp:
  60. params = dict()
  61. params.setdefault("grant_type", "client_credential")
  62. params.setdefault("appid", self.app_id)
  63. params.setdefault("secret", self.app_secret)
  64. data = self.get("/token", params, False)
  65. with open(self.ac_path, 'wb') as fp:
  66. fp.write(data.access_token.encode("utf-8"))
  67. os.utime(self.ac_path, (timestamp, timestamp + data.expires_in - 600))
  68. return open(self.ac_path).read()
  69. @property
  70. def jsapi_ticket(self):
  71. """
  72. 获取jsapi ticket
  73. """
  74. timestamp = time.time()
  75. if not os.path.exists(self.jt_path) or \
  76. int(os.path.getmtime(self.jt_path)) < timestamp:
  77. params = dict()
  78. params.setdefault("type", "jsapi")
  79. data = self.get("/ticket/getticket", params, True)
  80. with open(self.jt_path, 'wb') as fp:
  81. fp.write(data.ticket.encode("utf-8"))
  82. os.utime(self.jt_path, (timestamp, timestamp + data.expires_in - 600))
  83. return open(self.jt_path).read()
  84. @property
  85. def nonce_str(self):
  86. char = string.ascii_letters + string.digits
  87. return "".join(random.choice(char) for _ in range(32))
  88. def jsapi_sign(self, **kwargs):
  89. """
  90. 生成签名给js使用
  91. """
  92. timestamp = str(int(time.time()))
  93. nonce_str = self.nonce_str
  94. kwargs.setdefault("jsapi_ticket", self.jsapi_ticket)
  95. kwargs.setdefault("timestamp", timestamp)
  96. kwargs.setdefault("noncestr", nonce_str)
  97. raw = [(k, kwargs[k]) for k in sorted(kwargs.keys())]
  98. s = "&".join("=".join(kv) for kv in raw if kv[1])
  99. sign = hashlib.sha1(s.encode("utf-8")).hexdigest().lower()
  100. return Map(sign=sign, timestamp=timestamp, noncestr=nonce_str)
  101. def groups_create(self, name):
  102. """
  103. 创建分组
  104. :param name: 分组名
  105. """
  106. data = dict(group=dict(name=name))
  107. return self.post("/groups/create", data)
  108. def groups_get(self):
  109. """
  110. 获取所有分组
  111. """
  112. return self.get("/groups/get")
  113. def groups_getid(self, openid):
  114. """
  115. 查询用户所在分组
  116. :param openid: 用户id
  117. """
  118. data = dict(openid=openid)
  119. return self.post("/groups/getid", data)
  120. def groups_update(self, id, name):
  121. """
  122. 修改分组名
  123. :param id: 分组id
  124. :param name: 分组名
  125. """
  126. data = dict(group=dict(id=id, name=name))
  127. return self.post("/groups/update", data)
  128. def groups_members_update(self, to_groupid, openid):
  129. """
  130. 移动用户分组
  131. :param to_groupid: 分组id
  132. :param openid: 用户唯一标识符
  133. """
  134. data = dict(openid=openid, to_groupid=to_groupid)
  135. return self.post("/groups/members/update", data)
  136. def groups_members_batchupdate(self, to_groupid, *openid):
  137. """
  138. 批量移动用户分组
  139. :param to_groupid: 分组id
  140. :param openid: 用户唯一标示列表
  141. """
  142. data = dict(openid_list=openid, to_groupid=to_groupid)
  143. return self.post("/groups/members/batchupdate", data)
  144. def groups_delete(self, id):
  145. """
  146. 删除组
  147. :param id: 分组的id
  148. """
  149. data = dict(group=dict(id=id))
  150. return self.post("/groups/delete", data)
  151. def user_info_updateremark(self, openid, remark):
  152. """
  153. 设置备注名
  154. :param openid: 用户唯一标识符
  155. :param remark: 备注
  156. """
  157. data = dict(openid=openid, remark=remark)
  158. return self.post("/user/info/updateremark", data)
  159. def user_info(self, openid):
  160. """
  161. 获取用户信息
  162. 包含subscribe字段,可以用来判断用户是否关注公众号
  163. :param openid: 用户id
  164. """
  165. args = dict(openid=openid, lang="zh_CN")
  166. return self.get("/user/info", args)
  167. def user_info_batchget(self, *openid):
  168. """
  169. 批量获取用户信息
  170. """
  171. user_list = []
  172. for id in openid:
  173. user_list.append(dict(openid=openid, lang="zh_CN"))
  174. data = dict(user_list=user_list)
  175. return self.post("/user/info/batchget", data)
  176. def user_get(self, next_openid=None):
  177. """
  178. 获取公众号关注列表
  179. 一次最多返回1000个
  180. :param next_openid: 第一个拉取的openid,不填默认从头开始
  181. """
  182. args = dict()
  183. if next_openid:
  184. args.setdefault("next_openid", next_openid)
  185. return self.get("/user/get", args)
  186. def menu_create(self, data):
  187. data = dict(button=data)
  188. return self.post("/menu/create", data)
  189. def menu_get(self):
  190. return self.get("/menu/get")
  191. def menu_delete(self):
  192. return self.get("/menu/delete")
  193. def get_current_selfmenu_info(self):
  194. return self.get("/get_current_selfmenu_info")
  195. def shorturl(self, long_url):
  196. """
  197. 长链接转为短链接
  198. :param long_url: 长链接
  199. """
  200. data = dict(action="long2short", long_url=long_url)
  201. return self.post("/shorturl", data)
  202. def qrcode_create(self, scene_id, expires=30):
  203. """
  204. 创建qrcode
  205. """
  206. data = dict(
  207. action_name="QR_SCENE", expire_seconds=expires,
  208. action_info=dict(scene=dict(scene_id=scene_id)),
  209. )
  210. return self.post("/qrcode/create", data)
  211. def qrcode_create_limit(self, input):
  212. """
  213. 创建qrcode限制方式
  214. """
  215. data = dict()
  216. if isinstance(input, int):
  217. data["action_name"] = "QR_LIMIT_SCENE"
  218. data["action_info"] = dict(scene=dict(
  219. scene_id=input,
  220. ))
  221. elif isinstance(input, str):
  222. data["action_name"] = "QR_LIMIT_STR_SCENE"
  223. data["action_info"] = dict(scene=dict(
  224. scene_str=input,
  225. ))
  226. else:
  227. raise ValueError("invalid type")
  228. return self.post("/qrcode/create", data)
  229. def qrcode_show(self, ticket):
  230. """
  231. 显示qrcode
  232. """
  233. url = "https://mp.weixin.qq.com/cgi-bin/showqrcode"
  234. return self.add_query(url, dict(ticket=ticket))