123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- # -*- coding: utf-8 -*-
- from __future__ import unicode_literals
- import os
- import time
- import json
- import hashlib
- import string
- import random
- import requests
- from .base import Map, WeixinError
- __all__ = ("WeixinMPError", "WeixinMP")
- DEFAULT_DIR = os.getenv("HOME", os.getcwd())
- class WeixinMPError(WeixinError):
- def __init__(self, msg):
- super(WeixinMPError, self).__init__(msg)
- class WeixinMP(object):
- api_uri = "https://api.weixin.qq.com/cgi-bin"
- def __init__(self, app_id, app_secret, ac_path=None, jt_path=None):
- self.app_id = app_id
- self.app_secret = app_secret
- self.session = requests.Session()
- if ac_path is None:
- ac_path = os.path.join(DEFAULT_DIR, ".access_token")
- if jt_path is None:
- jt_path = os.path.join(DEFAULT_DIR, ".jsapi_ticket")
- self.ac_path = ac_path
- self.jt_path = jt_path
- def fetch(self, method, url, params=None, data=None, headers=None):
- req = requests.Request(method, url, params=params,
- data=data, headers=headers)
- prepped = req.prepare()
- resp = self.session.send(prepped, timeout=20)
- data = Map(resp.json())
- if data.errcode:
- msg = "%(errcode)d %(errmsg)s" % data
- raise WeixinMPError(msg)
- return data
- def get(self, path, params=None, token=True):
- url = "{0}{1}".format(self.api_uri, path)
- params = {} if not params else params
- token and params.setdefault("access_token", self.access_token)
- return self.fetch("GET", url, params)
- def post(self, path, data, json_encode=True, token=True):
- url = "{0}{1}".format(self.api_uri, path)
- params = {}
- token and params.setdefault("access_token", self.access_token)
- headers = {}
- if json_encode:
- data = json.dumps(data, ensure_ascii=False)
- headers["Content-Type"] = "application/json"
- return self.fetch("POST", url, params=params, data=data, headers=headers)
- @property
- def access_token(self):
- """
- 获取服务端凭证
- """
- timestamp = time.time()
- if not os.path.exists(self.ac_path) or \
- int(os.path.getmtime(self.ac_path)) < timestamp:
- params = dict()
- params.setdefault("grant_type", "client_credential")
- params.setdefault("appid", self.app_id)
- params.setdefault("secret", self.app_secret)
- data = self.get("/token", params, False)
- with open(self.ac_path, 'wb') as fp:
- fp.write(data.access_token.encode("utf-8"))
- os.utime(self.ac_path, (timestamp, timestamp + data.expires_in - 600))
- return open(self.ac_path).read()
- @property
- def jsapi_ticket(self):
- """
- 获取jsapi ticket
- """
- timestamp = time.time()
- if not os.path.exists(self.jt_path) or \
- int(os.path.getmtime(self.jt_path)) < timestamp:
- params = dict()
- params.setdefault("type", "jsapi")
- data = self.get("/ticket/getticket", params, True)
- with open(self.jt_path, 'wb') as fp:
- fp.write(data.ticket.encode("utf-8"))
- os.utime(self.jt_path, (timestamp, timestamp + data.expires_in - 600))
- return open(self.jt_path).read()
- @property
- def nonce_str(self):
- char = string.ascii_letters + string.digits
- return "".join(random.choice(char) for _ in range(32))
- def jsapi_sign(self, **kwargs):
- """
- 生成签名给js使用
- """
- timestamp = str(int(time.time()))
- nonce_str = self.nonce_str
- kwargs.setdefault("jsapi_ticket", self.jsapi_ticket)
- kwargs.setdefault("timestamp", timestamp)
- kwargs.setdefault("noncestr", nonce_str)
- raw = [(k, kwargs[k]) for k in sorted(kwargs.keys())]
- s = "&".join("=".join(kv) for kv in raw if kv[1])
- sign = hashlib.sha1(s.encode("utf-8")).hexdigest().lower()
- return Map(sign=sign, timestamp=timestamp, noncestr=nonce_str)
- def groups_create(self, name):
- """
- 创建分组
- :param name: 分组名
- """
- data = dict(group=dict(name=name))
- return self.post("/groups/create", data)
- def groups_get(self):
- """
- 获取所有分组
- """
- return self.get("/groups/get")
- def groups_getid(self, openid):
- """
- 查询用户所在分组
- :param openid: 用户id
- """
- data = dict(openid=openid)
- return self.post("/groups/getid", data)
- def groups_update(self, id, name):
- """
- 修改分组名
- :param id: 分组id
- :param name: 分组名
- """
- data = dict(group=dict(id=id, name=name))
- return self.post("/groups/update", data)
- def groups_members_update(self, to_groupid, openid):
- """
- 移动用户分组
- :param to_groupid: 分组id
- :param openid: 用户唯一标识符
- """
- data = dict(openid=openid, to_groupid=to_groupid)
- return self.post("/groups/members/update", data)
- def groups_members_batchupdate(self, to_groupid, *openid):
- """
- 批量移动用户分组
- :param to_groupid: 分组id
- :param openid: 用户唯一标示列表
- """
- data = dict(openid_list=openid, to_groupid=to_groupid)
- return self.post("/groups/members/batchupdate", data)
- def groups_delete(self, id):
- """
- 删除组
- :param id: 分组的id
- """
- data = dict(group=dict(id=id))
- return self.post("/groups/delete", data)
- def user_info_updateremark(self, openid, remark):
- """
- 设置备注名
- :param openid: 用户唯一标识符
- :param remark: 备注
- """
- data = dict(openid=openid, remark=remark)
- return self.post("/user/info/updateremark", data)
- def user_info(self, openid):
- """
- 获取用户信息
- 包含subscribe字段,可以用来判断用户是否关注公众号
- :param openid: 用户id
- """
- args = dict(openid=openid, lang="zh_CN")
- return self.get("/user/info", args)
- def user_info_batchget(self, *openid):
- """
- 批量获取用户信息
- """
- user_list = []
- for id in openid:
- user_list.append(dict(openid=openid, lang="zh_CN"))
- data = dict(user_list=user_list)
- return self.post("/user/info/batchget", data)
- def user_get(self, next_openid=None):
- """
- 获取公众号关注列表
- 一次最多返回1000个
- :param next_openid: 第一个拉取的openid,不填默认从头开始
- """
- args = dict()
- if next_openid:
- args.setdefault("next_openid", next_openid)
- return self.get("/user/get", args)
- def menu_create(self, data):
- data = dict(button=data)
- return self.post("/menu/create", data)
- def menu_get(self):
- return self.get("/menu/get")
- def menu_delete(self):
- return self.get("/menu/delete")
- def get_current_selfmenu_info(self):
- return self.get("/get_current_selfmenu_info")
- def shorturl(self, long_url):
- """
- 长链接转为短链接
- :param long_url: 长链接
- """
- data = dict(action="long2short", long_url=long_url)
- return self.post("/shorturl", data)
- def qrcode_create(self, scene_id, expires=30):
- """
- 创建qrcode
- """
- data = dict(
- action_name="QR_SCENE", expire_seconds=expires,
- action_info=dict(scene=dict(scene_id=scene_id)),
- )
- return self.post("/qrcode/create", data)
- def qrcode_create_limit(self, input):
- """
- 创建qrcode限制方式
- """
- data = dict()
- if isinstance(input, int):
- data["action_name"] = "QR_LIMIT_SCENE"
- data["action_info"] = dict(scene=dict(
- scene_id=input,
- ))
- elif isinstance(input, str):
- data["action_name"] = "QR_LIMIT_STR_SCENE"
- data["action_info"] = dict(scene=dict(
- scene_str=input,
- ))
- else:
- raise ValueError("invalid type")
- return self.post("/qrcode/create", data)
- def qrcode_show(self, ticket):
- """
- 显示qrcode
- """
- url = "https://mp.weixin.qq.com/cgi-bin/showqrcode"
- return self.add_query(url, dict(ticket=ticket))
|