123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- import logging
- import os
- import sys
- import json
- from django.conf import settings
- from oss2.headers import OSS_OBJECT_TAGGING
- from typing import List, Any, Callable, Union, TYPE_CHECKING
- from threading import Thread, Timer
- from apilib.utils_json import JsonResponse
- from apilib.utils_sys import MyStringIO
- from apps.thirdparties import AliOSS
- from apps.web.core.exceptions import EmptyInput
- if TYPE_CHECKING:
- from collections import OrderedDict
- # sys
- PY3 = sys.version_info[0] == 3
- if PY3:
- unicode_type = str
- basestring_type = (str, bytes)
- else:
- unicode_type = unicode
- basestring_type = basestring
- logger = logging.getLogger(__name__)
- def encode(s):
- return s.encode('utf-8') if isinstance(s, unicode_type) else s
- def decode(s):
- return s.decode('utf-8') if isinstance(s, bytes) else s
- class _Missing(object):
- def __repr__(self):
- return '_Missing'
- _missing = _Missing()
- MaybePayload = Union[dict, _Missing]
- def responder(result, description='', payload=_missing):
- # type:(int, unicode, MaybePayload)->JsonResponse
- payload = payload if payload is not _missing else {}
- return JsonResponse({'result': result, 'description': description, 'payload': payload})
- def JsonOkResponse(description='', payload=_missing):
- # type:(unicode, MaybePayload)->JsonResponse
- return responder(result=1, description=description, payload=payload)
- def JsonErrorResponse(description='', payload=_missing):
- # type:(unicode, MaybePayload)->JsonResponse
- return responder(result=0, description=description, payload=payload)
- def NoCommissionErrorResponse(description='', payload=_missing): # type:(unicode, MaybePayload)->JsonResponse
- """
- result == 2的返回 前端的fail不接管
- """
- return responder(result=2, description=description, payload=payload)
- def DefaultJsonErrorResponse(): return JsonResponse({'result': 0, 'description': u'系统错误', 'payload': {}})
- def validate_class_type_arguments(operator):
- """
- borrowed from maya
- Decorator to validate all the arguments to function
- are of the type of calling class for passed operator
- """
- def inner(function):
- def wrapper(self, *args, **kwargs):
- for arg in args + tuple(kwargs.values()):
- if not isinstance(arg, self.__class__):
- raise TypeError(
- 'unorderable types: {}() {} {}()'.format(
- type(self).__name__, operator, type(arg).__name__
- )
- )
- return function(self, *args, **kwargs)
- return wrapper
- return inner
- def validate_arguments_type_of_function(param_type=None):
- """
- borrowed from maya
- Decorator to validate the <type> of arguments in
- the calling function are of the `param_type` class.
- if `param_type` is None, uses `param_type` as the class where it is used.
- Note: Use this decorator on the functions of the class.
- """
- def inner(function):
- def wrapper(self, *args, **kwargs):
- type_ = param_type or type(self)
- for arg in args + tuple(kwargs.values()):
- if not isinstance(arg, type_):
- raise TypeError(
- (
- 'Invalid Type: {}.{}() accepts only the '
- 'arguments of type "<{}>"'
- ).format(
- type(self).__name__,
- function.__name__,
- type_.__name__,
- )
- )
- return function(self, *args, **kwargs)
- return wrapper
- return inner
- def generate_excel_report(filePath, records, localSave = False):
- # type: (str, List[OrderedDict], bool)->None
- if (not localSave) and settings.UPLOAD_REPORT_TO_OSS:
- with MyStringIO() as fp:
- import pandas as pd
- df = pd.DataFrame(records)
- df.to_excel(fp, sheet_name = 'sheet1', index = False)
- fp.seek(0)
- AliOSS().put_attachment(key = filePath, data = fp, headers = {
- 'Content-Disposition': 'attachment',
- OSS_OBJECT_TAGGING: 'lifecycle=7'
- })
- else:
- dest_dir = os.path.join(os.path.dirname(settings.MEDIA_ROOT), filePath)
- if not os.path.exists(os.path.dirname(dest_dir)):
- os.makedirs(dest_dir)
- import pandas as pd
- df = pd.DataFrame(records)
- df.to_excel(dest_dir, sheet_name = 'sheet1', index = False)
- def gernerate_excel_report_for_sheet(filePath, sheets, localSave = False):
- if (not localSave) and settings.UPLOAD_REPORT_TO_OSS:
- with MyStringIO() as fp:
- import pandas as pd
- with pd.ExcelWriter(fp) as writer:
- for _sheet in sheets:
- pd.DataFrame(_sheet.data).to_excel(writer, sheet_name = _sheet.name, index = False)
- writer.save()
- fp.seek(0)
- AliOSS().put_attachment(key = filePath, data = fp.read(), headers = {
- 'Content-Disposition': 'attachment',
- OSS_OBJECT_TAGGING: 'lifecycle=7'
- })
- else:
- dest_dir = os.path.join(os.path.dirname(settings.MEDIA_ROOT), filePath)
- if not os.path.exists(os.path.dirname(dest_dir)):
- os.makedirs(dest_dir)
- import pandas as pd
- with pd.ExcelWriter(dest_dir) as writer:
- for _sheet in sheets:
- pd.DataFrame(_sheet.data).to_excel(writer, sheet_name = _sheet.name, index = False)
- def async_operation(function, *args, **kwargs):
- # type:(Callable, *Any, **Any)->None
- """
- 目前是使用线程,后期考虑引入消息队列或流计算
- :param function:
- :param args:
- :return:
- """
- logger.debug('to start async thread worker. module = {}, funciton = {}, args = {}, kwargs = {}'.format(
- getattr(function, '__module__', None), getattr(function, '__name__', None), str(args), str(kwargs)))
- try:
- t = Thread(target = function, args = args, kwargs = kwargs)
- t.setName('AsyncOperation')
- t.setDaemon(False)
- t.start()
- except Exception as e:
- logger.exception(e)
- def async_operation_no_catch(function, *args, **kwargs):
- # type:(Callable, *Any, **Any)->None
- """
- 目前是使用线程,后期考虑引入消息队列或流计算
- :param function:
- :param args:
- :return:
- """
- logger.debug('to start async thread worker. module = {}, function = {}, args = {}, kwargs = {}'.format(
- getattr(function, '__module__', None), getattr(function, '__name__', None), str(args), str(kwargs)))
- t = Thread(target = function, args = args, kwargs = kwargs)
- t.setName('AsyncOperationNoCatch')
- t.setDaemon(False)
- t.start()
- def delay_async_operation(func, delay, *args, **kwargs):
- """
- 异步延时
- :param func:
- :param delay:
- :param args:
- :param kwargs:
- :return:
- """
- logger.debug('to start async thread worker. module = {}, func = {}, args = {}, kwargs = {}'.format(
- getattr(func, '__module__', None), getattr(func, '__name__', None), str(args), str(kwargs)))
- try:
- t = Timer(interval=delay, function=func, args=args, kwargs=kwargs)
- t.setName('DelayAsyncOperation')
- t.setDaemon(False)
- t.start()
- except Exception as e:
- logger.exception(e)
- def parse_json_payload(string, allow_empty=True):
- # type:(str, bool)->dict
- if not string and allow_empty:
- return {}
- elif not string and not allow_empty:
- raise EmptyInput('empty input is not allowed')
- else:
- return json.loads(string)
|