123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373 |
- # -*- coding: utf-8 -*-
- """
- wechatpy.replies
- ~~~~~~~~~~~~~~~~~~
- This module defines all kinds of replies you can send to WeChat
- :copyright: (c) 2014 by messense.
- :license: MIT, see LICENSE for more details.
- """
- from __future__ import absolute_import, unicode_literals
- import time
- import six
- import xmltodict
- from library.wechatpy.fields import (
- StringField,
- IntegerField,
- ImageField,
- VoiceField,
- VideoField,
- MusicField,
- ArticlesField,
- Base64EncodeField,
- HardwareField,
- )
- from library.wechatpy.messages import BaseMessage, MessageMetaClass
- from library import to_text, to_binary
- REPLY_TYPES = {}
- def register_reply(reply_type):
- def register(cls):
- REPLY_TYPES[reply_type] = cls
- return cls
- return register
- class BaseReply(six.with_metaclass(MessageMetaClass)):
- """Base class for all replies"""
- source = StringField('FromUserName')
- target = StringField('ToUserName')
- time = IntegerField('CreateTime', time.time())
- type = 'unknown'
- def __init__(self, **kwargs):
- self._data = {}
- message = kwargs.pop('message', None)
- if message and isinstance(message, BaseMessage):
- if 'source' not in kwargs:
- kwargs['source'] = message.target
- if 'target' not in kwargs:
- kwargs['target'] = message.source
- if hasattr(message, 'agent') and 'agent' not in kwargs:
- kwargs['agent'] = message.agent
- if 'time' not in kwargs:
- kwargs['time'] = time.time()
- for name, value in kwargs.items():
- field = self._fields.get(name)
- if field:
- self._data[field.name] = value
- else:
- setattr(self, name, value)
- def render(self):
- """Render reply from Python object to XML string"""
- tpl = '<xml>\n{data}\n</xml>'
- nodes = []
- msg_type = '<MsgType><![CDATA[{msg_type}]]></MsgType>'.format(
- msg_type=self.type
- )
- nodes.append(msg_type)
- for name, field in self._fields.items():
- value = getattr(self, name, field.default)
- node_xml = field.to_xml(value)
- nodes.append(node_xml)
- data = '\n'.join(nodes)
- return tpl.format(data=data)
- def __str__(self):
- if six.PY2:
- return to_binary(self.render())
- else:
- return to_text(self.render())
- @register_reply('empty')
- class EmptyReply(BaseReply):
- """
- 回复空串
- 微信服务器不会对此作任何处理,并且不会发起重试
- """
- def __init__(self):
- pass
- def render(self):
- return ''
- @register_reply('text')
- class TextReply(BaseReply):
- """
- 文本回复
- 详情请参阅 http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
- """
- type = 'text'
- content = StringField('Content')
- @register_reply('image')
- class ImageReply(BaseReply):
- """
- 图片回复
- 详情请参阅
- http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
- """
- type = 'image'
- image = ImageField('Image')
- @property
- def media_id(self):
- return self.image
- @media_id.setter
- def media_id(self, value):
- self.image = value
- @register_reply('voice')
- class VoiceReply(BaseReply):
- """
- 语音回复
- 详情请参阅
- http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
- """
- type = 'voice'
- voice = VoiceField('Voice')
- @property
- def media_id(self):
- return self.voice
- @media_id.setter
- def media_id(self, value):
- self.voice = value
- @register_reply('video')
- class VideoReply(BaseReply):
- """
- 视频回复
- 详情请参阅
- http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
- """
- type = 'video'
- video = VideoField('Video', {})
- @property
- def media_id(self):
- return self.video.get('media_id')
- @media_id.setter
- def media_id(self, value):
- video = self.video
- video['media_id'] = value
- self.video = video
- @property
- def title(self):
- return self.video.get('title')
- @title.setter
- def title(self, value):
- video = self.video
- video['title'] = value
- self.video = video
- @property
- def description(self):
- return self.video.get('description')
- @description.setter
- def description(self, value):
- video = self.video
- video['description'] = value
- self.video = video
- @register_reply('music')
- class MusicReply(BaseReply):
- """
- 音乐回复
- 详情请参阅
- http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
- """
- type = 'music'
- music = MusicField('Music', {})
- @property
- def thumb_media_id(self):
- return self.music.get('thumb_media_id')
- @thumb_media_id.setter
- def thumb_media_id(self, value):
- music = self.music
- music['thumb_media_id'] = value
- self.music = music
- @property
- def title(self):
- return self.music.get('title')
- @title.setter
- def title(self, value):
- music = self.music
- music['title'] = value
- self.music = music
- @property
- def description(self):
- return self.music.get('description')
- @description.setter
- def description(self, value):
- music = self.music
- music['description'] = value
- self.music = music
- @property
- def music_url(self):
- return self.music.get('music_url')
- @music_url.setter
- def music_url(self, value):
- music = self.music
- music['music_url'] = value
- self.music = music
- @property
- def hq_music_url(self):
- return self.music.get('hq_music_url')
- @hq_music_url.setter
- def hq_music_url(self, value):
- music = self.music
- music['hq_music_url'] = value
- self.music = music
- @register_reply('news')
- class ArticlesReply(BaseReply):
- """
- 图文回复
- 详情请参阅
- http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
- """
- type = 'news'
- articles = ArticlesField('Articles', [])
- def add_article(self, article):
- if len(self.articles) == 10:
- raise AttributeError("Can't add more than 10 articles"
- " in an ArticlesReply")
- articles = self.articles
- articles.append(article)
- self.articles = articles
- @register_reply('transfer_customer_service')
- class TransferCustomerServiceReply(BaseReply):
- """
- 将消息转发到多客服
- 详情请参阅
- http://mp.weixin.qq.com/wiki/5/ae230189c9bd07a6b221f48619aeef35.html
- """
- type = 'transfer_customer_service'
- @register_reply('device_text')
- class DeviceTextReply(BaseReply):
- type = 'device_text'
- device_type = StringField('DeviceType')
- device_id = StringField('DeviceID')
- session_id = StringField('SessionID')
- content = Base64EncodeField('Content')
- @register_reply('device_event')
- class DeviceEventReply(BaseReply):
- type = 'device_event'
- event = StringField('Event')
- device_type = StringField('DeviceType')
- device_id = StringField('DeviceID')
- session_id = StringField('SessionID')
- content = Base64EncodeField('Content')
- @register_reply('device_status')
- class DeviceStatusReply(BaseReply):
- type = 'device_status'
- device_type = StringField('DeviceType')
- device_id = StringField('DeviceID')
- status = IntegerField('DeviceStatus')
- @register_reply('hardware')
- class HardwareReply(BaseReply):
- type = 'hardware'
- func_flag = IntegerField('FuncFlag', 0)
- hardware = HardwareField('HardWare')
- def create_reply(reply, message=None, render=False):
- """
- Create a reply quickly
- """
- r = None
- if not reply:
- r = EmptyReply()
- elif isinstance(reply, BaseReply):
- r = reply
- if message:
- r.source = message.target
- r.target = message.source
- elif isinstance(reply, six.string_types):
- r = TextReply(
- message=message,
- content=reply
- )
- elif isinstance(reply, (tuple, list)):
- if len(reply) > 10:
- raise AttributeError("Can't add more than 10 articles"
- " in an ArticlesReply")
- r = ArticlesReply(
- message=message,
- articles=reply
- )
- if r and render:
- return r.render()
- return r
- def deserialize_reply(xml, update_time=False):
- """
- 反序列化被动回复
- :param xml: 待反序列化的xml
- :param update_time: 是否用当前时间替换xml中的时间
- :raises ValueError: 不能辨识的reply xml
- :rtype: wechatpy.replies.BaseReply
- """
- if not xml:
- return EmptyReply()
- try:
- reply_dict = xmltodict.parse(xml)["xml"]
- msg_type = reply_dict["MsgType"]
- except (xmltodict.expat.ExpatError, KeyError):
- raise ValueError("bad reply xml")
- if msg_type not in REPLY_TYPES:
- raise ValueError("unknown reply type")
- cls = REPLY_TYPES[msg_type]
- kwargs = dict()
- for attr, field in cls._fields.items():
- if field.name in reply_dict:
- str_value = reply_dict[field.name]
- kwargs[attr] = field.from_xml(str_value)
- if update_time:
- kwargs["time"] = time.time()
- return cls(**kwargs)
|