# -*- coding: utf-8 -*- """ wechatpy.replies ~~~~~~~~~~~~~~~~~~ This module defines all kinds of replies you can send to WeChat :copyright: (c) 2014 by messense. :license: MIT, see LICENSE for more details. """ from __future__ import absolute_import, unicode_literals import time import six import xmltodict from library.wechatpy.fields import ( StringField, IntegerField, ImageField, VoiceField, VideoField, MusicField, ArticlesField, Base64EncodeField, HardwareField, ) from library.wechatpy.messages import BaseMessage, MessageMetaClass from library import to_text, to_binary REPLY_TYPES = {} def register_reply(reply_type): def register(cls): REPLY_TYPES[reply_type] = cls return cls return register class BaseReply(six.with_metaclass(MessageMetaClass)): """Base class for all replies""" source = StringField('FromUserName') target = StringField('ToUserName') time = IntegerField('CreateTime', time.time()) type = 'unknown' def __init__(self, **kwargs): self._data = {} message = kwargs.pop('message', None) if message and isinstance(message, BaseMessage): if 'source' not in kwargs: kwargs['source'] = message.target if 'target' not in kwargs: kwargs['target'] = message.source if hasattr(message, 'agent') and 'agent' not in kwargs: kwargs['agent'] = message.agent if 'time' not in kwargs: kwargs['time'] = time.time() for name, value in kwargs.items(): field = self._fields.get(name) if field: self._data[field.name] = value else: setattr(self, name, value) def render(self): """Render reply from Python object to XML string""" tpl = '\n{data}\n' nodes = [] msg_type = ''.format( msg_type=self.type ) nodes.append(msg_type) for name, field in self._fields.items(): value = getattr(self, name, field.default) node_xml = field.to_xml(value) nodes.append(node_xml) data = '\n'.join(nodes) return tpl.format(data=data) def __str__(self): if six.PY2: return to_binary(self.render()) else: return to_text(self.render()) @register_reply('empty') class EmptyReply(BaseReply): """ 回复空串 微信服务器不会对此作任何处理,并且不会发起重试 """ def __init__(self): pass def render(self): return '' @register_reply('text') class TextReply(BaseReply): """ 文本回复 详情请参阅 http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html """ type = 'text' content = StringField('Content') @register_reply('image') class ImageReply(BaseReply): """ 图片回复 详情请参阅 http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html """ type = 'image' image = ImageField('Image') @property def media_id(self): return self.image @media_id.setter def media_id(self, value): self.image = value @register_reply('voice') class VoiceReply(BaseReply): """ 语音回复 详情请参阅 http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html """ type = 'voice' voice = VoiceField('Voice') @property def media_id(self): return self.voice @media_id.setter def media_id(self, value): self.voice = value @register_reply('video') class VideoReply(BaseReply): """ 视频回复 详情请参阅 http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html """ type = 'video' video = VideoField('Video', {}) @property def media_id(self): return self.video.get('media_id') @media_id.setter def media_id(self, value): video = self.video video['media_id'] = value self.video = video @property def title(self): return self.video.get('title') @title.setter def title(self, value): video = self.video video['title'] = value self.video = video @property def description(self): return self.video.get('description') @description.setter def description(self, value): video = self.video video['description'] = value self.video = video @register_reply('music') class MusicReply(BaseReply): """ 音乐回复 详情请参阅 http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html """ type = 'music' music = MusicField('Music', {}) @property def thumb_media_id(self): return self.music.get('thumb_media_id') @thumb_media_id.setter def thumb_media_id(self, value): music = self.music music['thumb_media_id'] = value self.music = music @property def title(self): return self.music.get('title') @title.setter def title(self, value): music = self.music music['title'] = value self.music = music @property def description(self): return self.music.get('description') @description.setter def description(self, value): music = self.music music['description'] = value self.music = music @property def music_url(self): return self.music.get('music_url') @music_url.setter def music_url(self, value): music = self.music music['music_url'] = value self.music = music @property def hq_music_url(self): return self.music.get('hq_music_url') @hq_music_url.setter def hq_music_url(self, value): music = self.music music['hq_music_url'] = value self.music = music @register_reply('news') class ArticlesReply(BaseReply): """ 图文回复 详情请参阅 http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html """ type = 'news' articles = ArticlesField('Articles', []) def add_article(self, article): if len(self.articles) == 10: raise AttributeError("Can't add more than 10 articles" " in an ArticlesReply") articles = self.articles articles.append(article) self.articles = articles @register_reply('transfer_customer_service') class TransferCustomerServiceReply(BaseReply): """ 将消息转发到多客服 详情请参阅 http://mp.weixin.qq.com/wiki/5/ae230189c9bd07a6b221f48619aeef35.html """ type = 'transfer_customer_service' @register_reply('device_text') class DeviceTextReply(BaseReply): type = 'device_text' device_type = StringField('DeviceType') device_id = StringField('DeviceID') session_id = StringField('SessionID') content = Base64EncodeField('Content') @register_reply('device_event') class DeviceEventReply(BaseReply): type = 'device_event' event = StringField('Event') device_type = StringField('DeviceType') device_id = StringField('DeviceID') session_id = StringField('SessionID') content = Base64EncodeField('Content') @register_reply('device_status') class DeviceStatusReply(BaseReply): type = 'device_status' device_type = StringField('DeviceType') device_id = StringField('DeviceID') status = IntegerField('DeviceStatus') @register_reply('hardware') class HardwareReply(BaseReply): type = 'hardware' func_flag = IntegerField('FuncFlag', 0) hardware = HardwareField('HardWare') def create_reply(reply, message=None, render=False): """ Create a reply quickly """ r = None if not reply: r = EmptyReply() elif isinstance(reply, BaseReply): r = reply if message: r.source = message.target r.target = message.source elif isinstance(reply, six.string_types): r = TextReply( message=message, content=reply ) elif isinstance(reply, (tuple, list)): if len(reply) > 10: raise AttributeError("Can't add more than 10 articles" " in an ArticlesReply") r = ArticlesReply( message=message, articles=reply ) if r and render: return r.render() return r def deserialize_reply(xml, update_time=False): """ 反序列化被动回复 :param xml: 待反序列化的xml :param update_time: 是否用当前时间替换xml中的时间 :raises ValueError: 不能辨识的reply xml :rtype: wechatpy.replies.BaseReply """ if not xml: return EmptyReply() try: reply_dict = xmltodict.parse(xml)["xml"] msg_type = reply_dict["MsgType"] except (xmltodict.expat.ExpatError, KeyError): raise ValueError("bad reply xml") if msg_type not in REPLY_TYPES: raise ValueError("unknown reply type") cls = REPLY_TYPES[msg_type] kwargs = dict() for attr, field in cls._fields.items(): if field.name in reply_dict: str_value = reply_dict[field.name] kwargs[attr] = field.from_xml(str_value) if update_time: kwargs["time"] = time.time() return cls(**kwargs)