# -*- coding: utf-8 -*- # !/usr/bin/env python import itertools import math import re import time import codecs import pickle from functools import partial import addressparser import chardet import platform import collections from collections import OrderedDict from itertools import islice, takewhile from typing import NamedTuple, Tuple, Optional from Crypto.PublicKey import RSA is_windows = any(platform.win32_ver()) def xor_encrypt_buffer(buffer, offset, key): import struct lkey = len(key) step = 0 num = offset for _ in buffer[offset:]: struct.pack_into('None with open(name, 'w') as f: f.write(content) KeyPair = NamedTuple('KeyPair', [('public', str), ('private', str)]) def generate_RSA_key_pairs(bits = 2048): # type: (int)->KeyPair """ :param bits: :return: """ key = RSA.generate(bits) return KeyPair(public = key.publickey().exportKey('PEM').decode('ascii'), private = key.exportKey('PEM').decode('ascii')) def write_RSA_key_pairs(path, pair = None): # type: (str, bool, KeyPair)->Tuple[str, str] if pair is None: pair = generate_RSA_key_pairs() # type: KeyPair def key_name(name, kind): return '{name}_{kind}.pem'.format(name = name, kind = kind) public_key_file_name = key_name(path, 'public') private_key_file_name = key_name(path, 'private') write_file(public_key_file_name, pair.public) write_file(private_key_file_name, pair.private) return public_key_file_name, private_key_file_name def head(iterable, default = None): return next(iter(iterable), default) def first_true(iterable, pred = None, default = None): """Returns the first true value in the iterable. If no true value is found, returns *default* If *pred* is not None, returns the first item for which pred(item) is true.""" # first_true([a,b,c], default=x) --> a or b or c or x # first_true([a,b], fn, x) --> a if fn(a) else b if fn(b) else x return next(filter(pred, iterable), default) def nth(iterable, n, default = None): """Returns the nth item of iterable, or a default value""" return next(islice(iterable, n, None), default) def upto(iterable, max_val): """From a monotonically increasing iterable, generate all the values <= max_val.""" # Why <= max_val rather than < max_val? In part because that's how Ruby's upto does it. return takewhile(lambda x: x <= max_val, iterable) def ilen(iterable): """Length of any iterable (consumes generators).""" return sum(1 for _ in iterable) count_iterable = ilen def recursive_repr(fill_value = '...'): """ back-ported from Python3 Decorator to make a repr function return fill_value for a recursive call""" from thread import get_ident def decorating_function(user_function): repr_running = set() def wrapper(self): key = id(self), get_ident() if key in repr_running: return fill_value repr_running.add(key) try: result = user_function(self) finally: repr_running.discard(key) return result # Can't use functools.wraps() here because of bootstrap issues wrapper.__module__ = getattr(user_function, '__module__') wrapper.__doc__ = getattr(user_function, '__doc__') wrapper.__name__ = getattr(user_function, '__name__') wrapper.__annotations__ = getattr(user_function, '__annotations__', {}) return wrapper return decorating_function class ChainMap(collections.MutableMapping): """ back-ported from Python3 A ChainMap groups multiple dicts (or other mappings) together to create a single, updateable view. The underlying mappings are stored in a list. That list is public and can be accessed or updated using the *maps* attribute. There is no other state. Lookups search the underlying mappings successively until a key is found. In contrast, writes, updates, and deletions only operate on the first mapping. """ def __init__(self, *maps): """Initialize a ChainMap by setting *maps* to the given mappings. If no mappings are provided, a single empty dictionary is used. """ self.maps = list(maps) or [{}] # always at least one map def __missing__(self, key): raise KeyError(key) def __getitem__(self, key): for mapping in self.maps: try: return mapping[key] # can't use 'key in mapping' with defaultdict except KeyError: pass return self.__missing__(key) # support subclasses that define __missing__ def get(self, key, default = None): return self[key] if key in self else default def __len__(self): return len(set().union(*self.maps)) # reuses stored hash values if possible def __iter__(self): d = {} for mapping in reversed(self.maps): d.update(mapping) # reuses stored hash values if possible return iter(d) def __contains__(self, key): return any(key in m for m in self.maps) def __bool__(self): return any(self.maps) @recursive_repr() def __repr__(self): return '{name}({maps})'.format(name = self.__class__.__name__, maps = ", ".join(map(repr, self.maps))) @classmethod def fromkeys(cls, iterable, *args): """Create a ChainMap with a single dict created from the iterable.""" return cls(dict.fromkeys(iterable, *args)) def copy(self): """New ChainMap or subclass with a new copy of maps[0] and refs to maps[1:]""" return self.__class__(self.maps[0].copy(), *self.maps[1:]) __copy__ = copy def new_child(self, m = None): # like Django's Context.push() """New ChainMap with a new map followed by all previous maps. If no map is provided, an empty dict is used. """ if m is None: m = {} return self.__class__(m, *self.maps) @property def parents(self): # like Django's Context.pop() """New ChainMap from maps[1:].""" return self.__class__(*self.maps[1:]) def __setitem__(self, key, value): self.maps[0][key] = value def __delitem__(self, key): try: del self.maps[0][key] except KeyError: raise KeyError('Key not found in the first mapping: {!r}'.format(key)) def popitem(self): """Remove and return an item pair from maps[0]. Raise KeyError is maps[0] is empty.""" try: return self.maps[0].popitem() except KeyError: raise KeyError('No keys found in the first mapping.') def pop(self, key, *args): """Remove *key* from maps[0] and return its value. Raise KeyError if *key* not in maps[0].""" try: return self.maps[0].pop(key, *args) except KeyError: raise KeyError('Key not found in the first mapping: {!r}'.format(key)) def clear(self): """Clear maps[0], leaving maps[1:] intact.""" self.maps[0].clear() class Immutable(object): """ Immutable object which adheres to the Mapping and the Sequence protocols. * Attributes are kept in `self._ordered_dict`. NEVER MUTATE THIS! * Instantiate with kwargs or args. Instantiating with args preserves order. * Mapping methods get(), items(), keys(), and values() are also included. * Sequence methods index() and count() are also included. """ class ImmutableError(Exception): pass def __init__(self, *args, **kwargs): """ Instantiate an Immutable instance. >>> # tuple instantiation --> Note that this method preserves order! >>> obj = Immutable((('val_0', 0), ('val_1', 1))) >>> # key=value pairs >>> obj1 = Immutable(val_0=0, val_1=1) >>> # same as above, but by upacking a dict >>> attribute_dict = {'val_0': 0, 'val_1': 1} >>> obj2 = Immutable(**attribute_dict) >>> # access via '.' or '[]' >>> obj.val_0 >>> obj['val_0'] :param args: (, ,) pairs to add *in order* :param kwargs: Allows you to unpack a dict to create this. """ reserved_keys = ('get', 'keys', 'values', 'items', 'count', 'index', '_ordered_dict', '_tuple') ordered_dict = OrderedDict() for key, val in args: if key in kwargs: raise self.ImmutableError('Key in args duplicated in kwargs.') ordered_dict[key] = val ordered_dict.update(kwargs) for key, val in ordered_dict.items(): try: hash(key) hash(val) except TypeError: raise self.ImmutableError('Keys and vals must be hashable.') if isinstance(key, int): raise self.ImmutableError('Keys cannot be integers.') if key in reserved_keys: raise self.ImmutableError('Keys cannot be any of these: {}.' .format(reserved_keys)) self.__dict__['_ordered_dict'] = ordered_dict self.__dict__['_tuple'] = tuple(ordered_dict.values()) def __contains__(self, item): raise self.ImmutableError('Containment not implemented. Try with ' 'keys(), values(), or items().') def __reversed__(self): raise self.ImmutableError('Reversal not implemented. Try with ' 'keys(), values(), or items().') def __getitem__(self, key): if isinstance(key, int): return self.__dict__['_tuple'][key] else: return self.__dict__['_ordered_dict'][key] def __setitem__(self, key, value): raise self.ImmutableError('Cannot set items on Immutable.') def __getattr__(self, key): if key == 'items': return self.__dict__['_ordered_dict'].items elif key == 'keys': return self.__dict__['_ordered_dict'].keys elif key == 'values': return self.__dict__['_ordered_dict'].values elif key == 'index': return self.__dict__['_tuple'].index elif key == 'count': return self.__dict__['_tuple'].count else: return self.__getitem__(key) def __setattr__(self, key, value): raise self.ImmutableError('Cannot set attributes on Immutable.') def __cmp__(self, other): raise self.ImmutableError('Only equality comparisons implemented.') def __eq__(self, other): if not isinstance(other, Immutable): return False return hash(self) == hash(other) def __ne__(self, other): if not isinstance(other, Immutable): return True return hash(self) != hash(other) def __len__(self): return len(self.__dict__['_tuple']) def __iter__(self): raise self.ImmutableError('Iteration not implemented. Try with ' 'keys(), values(), or items().') def __hash__(self): return hash(tuple(self._ordered_dict.items())) def __str__(self): return bytes('{}'.format(self.__repr__())) def __unicode__(self): return '{}'.format(self.__repr__()) def __repr__(self): keys_repr = ', '.join('{}={}'.format(key, repr(val)) for key, val in self.items()) return 'Immutable({})'.format(keys_repr) def __dir__(self): return list(self.keys()) def guess_encoding(file_path): # type: (str, int)->str """Predict a file's encoding using chardet""" # Open the file as binary data with open(file_path, 'rb') as f: # Join binary lines for specified number of lines raw_data = b''.join(f.readlines()) return chardet.detect(raw_data)['encoding'] def convert_encoding(source_file, target_file = None, source_encoding = None, target_encoding = "utf-8"): # type: (str, Optional[str], Optional[str], Optional[str])->str """ :param source_file: :param target_file: :param source_encoding: :param target_encoding: :return: """ source_encoding = source_encoding if source_encoding is not None else guess_encoding(source_file) if source_encoding in ('gb2312', 'GB2312'): #: gb18030 is a superset of gb2312, it can cover more corner cases source_encoding = 'gb18030' if not target_file: filename, suffix = source_file.rsplit('.') target_file = 'converted-{filename}-{source_encoding}-{target_encoding}.{suffix}' \ .format(filename = filename, source_encoding = source_encoding, target_encoding = target_encoding, suffix = suffix) BLOCK_SIZE = 1048576 with codecs.open(source_file, "r", source_encoding) as source: with codecs.open(target_file, "w", target_encoding) as target: while True: contents = source.read(BLOCK_SIZE) if not contents: break target.write(contents) return target_file def rec_update_dict(d, update_dict, firstLevelOverwrite = False): """ 递归地更新字典 :param firstLevelOverwrite: 特指是否覆盖第一层dict :param d: :param update_dict: :return: """ for k, v in update_dict.iteritems(): if firstLevelOverwrite: d[k] = v else: if isinstance(v, collections.Mapping): d[k] = rec_update_dict(d.get(k, {}), v) else: d[k] = v return d flatten = itertools.chain.from_iterable def convert(text): return int(text) if text.isdigit() else text def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', key)] def natural_sort(array, key, reverse): return sorted(array, key = lambda d: alphanum_key(d[key]), reverse = reverse) def paginated(dataList, pageIndex, pageSize): return dataList[(pageIndex - 1) * pageSize:pageIndex * pageSize] def is_number(s): try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError, ImportError): pass return False def ceil_floor(x): return math.ceil(x) if x < 0 else math.floor(x) def round_n_digits(x, n): return ceil_floor(x * math.pow(10, n)) / math.pow(10, n) round_2_digits = partial(round_n_digits, n = 2) def with_metaclass(meta, *bases): """ Function from jinja2/_compat.py. License: BSD. Use it like this:: class BaseForm(object): pass class FormType(type): pass class Form(with_metaclass(FormType, BaseForm)): pass This requires a bit of explanation: the basic idea is to make a dummy metaclass for one level of class instantiation that replaces itself with the actual metaclass. Because of internal type checks we also need to make sure that we downgrade the custom metaclass for one level to something closer to type (that's why __call__ and __init__ comes back from type etc.). This has the advantage over six.with_metaclass of not introducing dummy classes into the final MRO. """ class Metaclass(meta): __call__ = type.__call__ __init__ = type.__init__ def __new__(cls, name, this_bases, d): if this_bases is None: return type.__new__(cls, name, (), d) return meta(name, bases, d) return Metaclass('temporary_class', None, {}) def load_pickle(filepath): with open(filepath) as f: return pickle.load(f) def dump_pickle(obj, filepath): with open(filepath, 'w') as f: pickle.dump(obj, f) def split_addr(addr): # type:(unicode) -> tuple addr = [addr] df = addressparser.transform(addr) df.fillna("", inplace=True) return df.values.tolist()[0] def fix_dict(mydict, check_empty_string = []): # type: (dict, list)->dict result = {} if not mydict: return result for key, value in mydict.iteritems(): if key in check_empty_string: if value is None or value == '': continue else: result[key] = value else: if value is not None: result[key] = value return result