# -*- coding: utf-8 -*- import logging import os import time try: from urllib import urlencode from urlparse import urlparse except ImportError: from urllib.parse import urlencode, urlparse from Tea.vendored.requests import Request, Session, status_codes from Tea.converter import TeaConverter from Tea.exceptions import TeaException, RequiredArgumentException from Tea.model import TeaModel from Tea.response import TeaResponse DEFAULT_CONNECT_TIMEOUT = 5000 DEFAULT_READ_TIMEOUT = 10000 logger = logging.getLogger('alibabacloud-tea') logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() logger.addHandler(ch) class TeaCore(object): @staticmethod def _prepare_http_debug(request, symbol): base = '' for key, value in request.headers.items(): base += '\n%s %s : %s' % (symbol, key, value) return base @staticmethod def _do_http_debug(request, response): # logger the request url = urlparse(request.url) request_base = '\n> %s %s HTTP/1.1' % (request.method.upper(), url.path + url.query) logger.debug(request_base + TeaCore._prepare_http_debug(request, '>')) # logger the response response_base = '\n< HTTP/1.1 %s %s' % ( response.status_code, status_codes._codes.get(response.status_code)[0].upper() ) logger.debug(response_base + TeaCore._prepare_http_debug(response, '<')) @staticmethod def compose_url(request): host = TeaConverter.to_str(request.headers.get('host')) if not host: raise RequiredArgumentException('endpoint') else: host = host.rstrip('/') protocol = '%s://' % request.protocol.lower() pathname = request.pathname if host.startswith(('http://', 'https://')): protocol = '' if request.port == 80: port = '' else: port = ':%s' % request.port url = protocol + host + port + pathname if request.query: if "?" in url: if not url.endswith("&"): url += "&" else: url += "?" encode_query = {} for key in request.query: value = request.query[key] if value is not None: encode_query[key] = TeaConverter.to_str(value) url += urlencode(encode_query) return url.rstrip("?&") @staticmethod def do_action(request, runtime_option=None): url = TeaCore.compose_url(request) runtime_option = runtime_option or {} verify = not runtime_option.get('ignoreSSL', False) if verify: verify = runtime_option.get('ca', True) if runtime_option.get('ca', True) is not None else True cert = runtime_option.get('cert', None) connect_timeout = runtime_option.get('connectTimeout') connect_timeout = connect_timeout if connect_timeout else DEFAULT_CONNECT_TIMEOUT read_timeout = runtime_option.get('readTimeout') read_timeout = read_timeout if read_timeout else DEFAULT_READ_TIMEOUT timeout = (int(connect_timeout) / 1000, int(read_timeout) / 1000) with Session() as s: req = Request(method=request.method, url=url, data=request.body, headers=request.headers) prepped = s.prepare_request(req) proxies = {} http_proxy = runtime_option.get('httpProxy') https_proxy = runtime_option.get('httpsProxy') no_proxy = runtime_option.get('noProxy') if not http_proxy: http_proxy = os.environ.get('HTTP_PROXY') or os.environ.get('http_proxy') if not https_proxy: https_proxy = os.environ.get('HTTPS_PROXY') or os.environ.get('https_proxy') if http_proxy: proxies['http'] = http_proxy if https_proxy: proxies['https'] = https_proxy if no_proxy: proxies['no_proxy'] = no_proxy resp = s.send( prepped, proxies=proxies, timeout=timeout, verify=verify, cert=cert, ) debug = runtime_option.get('debug') or os.getenv('DEBUG') if debug and debug.lower() == 'sdk': TeaCore._do_http_debug(req, resp) response = TeaResponse() response.status_message = resp.reason response.status_code = resp.status_code response.headers = {k.lower(): v for k, v in resp.headers.items()} response.body = resp.content response.response = resp return response @staticmethod def get_response_body(resp): return resp.content.decode("utf-8") @staticmethod def allow_retry(dic, retry_times, now=None): if dic is None or not dic.__contains__("maxAttempts"): return False else: retry = 0 if dic.get("maxAttempts") is None else int( dic.get("maxAttempts")) return retry >= retry_times @staticmethod def get_backoff_time(dic, retry_times): default_back_off_time = 0 if dic is None or not dic.get("policy") or dic.get("policy") == "no": return default_back_off_time back_off_time = dic.get('period', default_back_off_time) if not isinstance(back_off_time, int) and \ not (isinstance(back_off_time, str) and back_off_time.isdigit()): return default_back_off_time back_off_time = int(back_off_time) if back_off_time < 0: return retry_times return back_off_time @staticmethod def sleep(t): time.sleep(t) @staticmethod def is_retryable(ex): return isinstance(ex, TeaException) @staticmethod def bytes_readable(body): return body @staticmethod def merge(*dic_list): dic_result = {} for item in dic_list: if isinstance(item, dict): dic_result.update(item) elif isinstance(item, TeaModel): dic_result.update(item.to_map()) return dic_result @staticmethod def to_map(model): if isinstance(model, TeaModel): return model.to_map() else: return dict() @staticmethod def from_map(model, dic): if isinstance(model, TeaModel): return model.from_map(dic) else: return model