# -*- coding: utf-8 -*- #!/usr/bin/env python """ A list of validators and schemas for data validation and transformation """ from voluptuous import * from voluptuous.schema_builder import message from voluptuous.validators import _url_validation from voluptuous.error import UrlInvalid from bson.objectid import ObjectId # validation errors class LengthInvalid(Invalid): pass class OneOfInvalid(Invalid): pass # validators def require(schema): return Required(schema=schema, msg=u'字段未传递') class Length(object): """The length of a value must be in a certain range.""" def __init__(self, min=None, max=None, msg=None): self.min = min self.max = max self.msg = msg def __call__(self, v): if self.min is not None and len(v) < self.min: raise LengthInvalid( self.msg or u'字段长度至少为 %s' % self.min) if self.max is not None and len(v) > self.max: raise LengthInvalid( self.msg or u'字段长度至多为 %s' % self.max) return v def __repr__(self): return 'Length(min=%s, max=%s)' % (self.min, self.max) class OneOf(object): def __init__(self, msg=None, *options): self.options = set(options) self.msg = msg def __call__(self, v): if v not in self.options: raise OneOfInvalid( self.msg or u'字段只能为%s中其一' ) return v @message(u'URL不合法'.encode('utf-8'), cls=UrlInvalid) def Url(v): """Verify that the value is a URL. """ try: _url_validation(v) return v except: raise ValueError def digit_length_limit(digit, min_len, max_len): try: return min_len <= len(str(digit)) <= max_len except ValueError: raise Invalid("invalid digit length") def valid_objectId(oid): if not ObjectId.is_valid(oid): raise Invalid("Expected %s to be an ObjectId" % oid) else: return True def is_valid_ipv4_address_with_port(ipaddress_with_port): try: ipaddress, port = ipaddress_with_port.split(':') except ValueError: raise Invalid('invalid ipaddress with port %s, requires a colon(:)' % ipaddress_with_port) if not ( 0 < int(port) < 65535 ): raise Invalid('invalid port number %s' % port) parts = ipaddress.split('.') if len(parts) == 4 and all(x.isdigit() for x in parts): numbers = list(int(x) for x in parts) if all(0 <= num < 256 for num in numbers): return ipaddress_with_port else: raise Invalid('invalid ip address %s' % ipaddress) raise Invalid('invalid ip address %s' % ipaddress) def numeric_str(string, mode=int): try: mode(string) except: raise Invalid('expected a %s' % mode) return string # entry function def validate_payload(payload, schema, translation): """ :param payload: 待验证的字段字典 :type payload: dict :param schema: 用于验证字段的模型 :type schema: Schema :param translation: 用于返回消息的翻译词典 :type translation: dict :return: """ try: schema(payload) except MultipleInvalid as e: return translation.get(e.path[0], u'输入有误请检查') wechatPayAppSchema = Schema( { 'appid': All(basestring, Length(min = 1)), 'mchid': All(basestring, Length(min = 1)), 'secret': All(basestring, Length(min = 1)), 'apikey': All(basestring, Length(min = 1)), 'sslcert_path': All(basestring, Length(min = 1)), 'sslkey_path': All(basestring, Length(min = 1)), 'manual_withdraw': bool, 'name': basestring, 'remarks': basestring }, extra = REMOVE_EXTRA) aliAppSchema = Schema( { 'appid': All(basestring, Length(min = 1)), 'public_key_path': All(basestring, Length(min = 1)), 'alipayPublicKeyContent': All(basestring, Length(min = 1)), 'appPublicKeyContent': All(basestring, Length(min = 1)), 'app_private_key_path': All(basestring, Length(min = 1)), 'shadow': bool, 'appName': basestring, 'companyName': basestring, 'remarks': basestring, 'aesEncryptKey': basestring }, extra = REMOVE_EXTRA) wechatManagerAppSchema = Schema( { Required('appid'): basestring, Required('secret'): basestring, Required('templateIdMap'): list, 'name': basestring, 'companyName': basestring, }, extra = REMOVE_EXTRA)