123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- #
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- # coding=utf-8
- import time
- import warnings
- import json
- import logging
- import jmespath
- import copy
- import platform
- import aliyunsdkcore
- from aliyunsdkcore.vendored.six.moves.urllib.parse import urlencode
- from aliyunsdkcore.vendored.requests import codes
- from aliyunsdkcore.acs_exception.exceptions import ClientException
- from aliyunsdkcore.acs_exception.exceptions import ServerException
- from aliyunsdkcore.acs_exception import error_code, error_msg
- from aliyunsdkcore.http.http_response import HttpResponse
- from aliyunsdkcore.request import AcsRequest
- from aliyunsdkcore.http import format_type
- from aliyunsdkcore.auth.signers.signer_factory import SignerFactory
- from aliyunsdkcore.request import CommonRequest
- from aliyunsdkcore.endpoint.resolver_endpoint_request import ResolveEndpointRequest
- from aliyunsdkcore.endpoint.default_endpoint_resolver import DefaultEndpointResolver
- import aliyunsdkcore.retry.retry_policy as retry_policy
- from aliyunsdkcore.retry.retry_condition import RetryCondition
- from aliyunsdkcore.retry.retry_policy_context import RetryPolicyContext
- import aliyunsdkcore.utils
- import aliyunsdkcore.utils.parameter_helper
- import aliyunsdkcore.utils.validation
- from aliyunsdkcore.vendored.requests.structures import CaseInsensitiveDict
- from aliyunsdkcore.vendored.requests.structures import OrderedDict
- """
- Acs default client module.
- """
- DEFAULT_READ_TIMEOUT = 10
- DEFAULT_CONNECTION_TIMEOUT = 5
- # TODO: replace it with TimeoutHandler
- _api_timeout_config_data = aliyunsdkcore.utils._load_json_from_data_dir("timeout_config.json")
- logger = logging.getLogger(__name__)
- class AcsClient:
- LOG_FORMAT = '%(thread)d %(asctime)s %(name)s %(levelname)s %(message)s'
- def __init__(
- self,
- ak=None,
- secret=None,
- region_id="cn-hangzhou",
- auto_retry=True,
- max_retry_time=None,
- user_agent=None,
- port=80,
- connect_timeout=None,
- timeout=None,
- public_key_id=None,
- private_key=None,
- session_period=3600,
- credential=None,
- debug=False,
- verify=None):
- """
- constructor for AcsClient
- :param ak: String, access key id
- :param secret: String, access key secret
- :param region_id: String, region id
- :param auto_retry: Boolean
- :param max_retry_time: Number
- :return:
- """
- self._max_retry_num = max_retry_time
- self._auto_retry = auto_retry
- self._ak = ak
- self._secret = secret
- self._region_id = region_id
- self._user_agent = user_agent
- self._port = port
- self._connect_timeout = connect_timeout
- self._read_timeout = timeout
- self._extra_user_agent = {}
- self._verify = verify
- credential = {
- 'ak': ak,
- 'secret': secret,
- 'public_key_id': public_key_id,
- 'private_key': private_key,
- 'session_period': session_period,
- 'credential': credential,
- }
- self._signer = SignerFactory.get_signer(
- credential, region_id, self._implementation_of_do_action, debug)
- self._endpoint_resolver = DefaultEndpointResolver(self)
- if self._auto_retry:
- self._retry_policy = retry_policy.get_default_retry_policy(
- max_retry_times=self._max_retry_num)
- else:
- self._retry_policy = retry_policy.NO_RETRY_POLICY
- def get_region_id(self):
- return self._region_id
- def get_access_key(self):
- return self._ak
- def get_access_secret(self):
- return self._secret
- def is_auto_retry(self):
- return self._auto_retry
- def get_max_retry_num(self):
- return self._max_retry_num
- def get_user_agent(self):
- return self._user_agent
- def get_verify(self):
- return self._verify
- def set_region_id(self, region):
- self._region_id = region
- def set_max_retry_num(self, num):
- self._max_retry_num = num
- def set_auto_retry(self, flag):
- """
- set whether or not the client perform auto-retry
- :param flag: Booleans
- :return: None
- """
- self._auto_retry = flag
- def set_user_agent(self, agent):
- """
- User agent set to client will overwrite the request setting.
- :param agent:
- :return:
- """
- self._user_agent = agent
- def set_verify(self, verify):
- self._verify = verify
- def append_user_agent(self, key, value):
- self._extra_user_agent.update({key: value})
- @staticmethod
- def user_agent_header():
- base = '%s (%s %s;%s)' \
- % ('AlibabaCloud',
- platform.system(),
- platform.release(),
- platform.machine()
- )
- return base
- @staticmethod
- def default_user_agent():
- default_agent = OrderedDict()
- default_agent['Python'] = platform.python_version()
- default_agent['Core'] = __import__('aliyunsdkcore').__version__
- default_agent['python-requests'] = __import__(
- 'aliyunsdkcore.vendored.requests.__version__', globals(), locals(),
- ['vendored', 'requests', '__version__'], 0).__version__
- return CaseInsensitiveDict(default_agent)
- def client_user_agent(self):
- client_user_agent = {}
- if self.get_user_agent() is not None:
- client_user_agent.update({'client': self.get_user_agent()})
- else:
- client_user_agent.update(self._extra_user_agent)
- return CaseInsensitiveDict(client_user_agent)
- def get_port(self):
- return self._port
- def get_location_service(self):
- return None
- @staticmethod
- def merge_user_agent(default_agent, extra_agent):
- if default_agent is None:
- return extra_agent
- if extra_agent is None:
- return default_agent
- user_agent = default_agent.copy()
- for key, value in extra_agent.items():
- if key not in default_agent:
- user_agent[key] = value
- return user_agent
- def handle_extra_agent(self, request):
- client_agent = self.client_user_agent()
- request_agent = request.request_user_agent()
- if client_agent is None:
- return request_agent
- if request_agent is None:
- return client_agent
- for key in request_agent:
- if key in client_agent:
- client_agent.pop(key)
- client_agent.update(request_agent)
- return client_agent
- def _make_http_response(self, endpoint, request, read_timeout, connect_timeout,
- specific_signer=None):
- body_params = request.get_body_params()
- if body_params:
- body = urlencode(body_params)
- request.set_content(body)
- request.set_content_type(format_type.APPLICATION_FORM)
- elif request.get_content() and "Content-Type" not in request.get_headers():
- request.set_content_type(format_type.APPLICATION_OCTET_STREAM)
- method = request.get_method()
- signer = self._signer if specific_signer is None else specific_signer
- header, url = signer.sign(self._region_id, request)
- base = self.user_agent_header()
- extra_agent = self.handle_extra_agent(request)
- default_agent = self.default_user_agent()
- user_agent = self.merge_user_agent(default_agent, extra_agent)
- for key, value in user_agent.items():
- base += ' %s/%s' % (key, value)
- header['User-Agent'] = base
- header['x-sdk-client'] = 'python/2.0.0'
- protocol = request.get_protocol_type()
- response = HttpResponse(
- endpoint,
- url,
- method,
- header,
- protocol,
- request.get_content(),
- self._port,
- read_timeout=read_timeout,
- connect_timeout=connect_timeout,
- verify=self.get_verify())
- if body_params:
- body = urlencode(request.get_body_params())
- response.set_content(body, "utf-8", format_type.APPLICATION_FORM)
- return response
- def _implementation_of_do_action(self, request, signer=None):
- if not isinstance(request, AcsRequest):
- raise ClientException(
- error_code.SDK_INVALID_REQUEST,
- error_msg.get_msg('SDK_INVALID_REQUEST'))
- # modify Accept-Encoding
- request.add_header('Accept-Encoding', 'identity')
- if isinstance(request, CommonRequest):
- request.trans_to_acs_request()
- if request.endpoint:
- endpoint = request.endpoint
- else:
- endpoint = self._resolve_endpoint(request)
- return self._handle_retry_and_timeout(endpoint, request, signer)
- def implementation_of_do_action(self, request, signer=None):
- # keep compatible
- warnings.warn(
- "implementation_of_do_action() method is deprecated",
- DeprecationWarning)
- status, headers, body, exception = self._implementation_of_do_action(request, signer)
- return status, headers, body
- def _add_request_client_token(self, request):
- if hasattr(request, "set_ClientToken") and hasattr(request, "get_ClientToken"):
- client_token = request.get_ClientToken()
- if not client_token:
- # ClientToken has not been set
- client_token = aliyunsdkcore.utils.parameter_helper.get_uuid() # up to 60 chars
- request.set_ClientToken(client_token)
- def _get_request_read_timeout(self, request):
- # TODO: replace it with a timeout_handler
- if request._request_read_timeout:
- return request._request_read_timeout
- # if self._timeout:
- # return self._timeout
- if self._read_timeout:
- return self._read_timeout
- if request.get_product() is None:
- return DEFAULT_READ_TIMEOUT
- path = '"{0}"."{1}"."{2}"'.format(request.get_product().lower(), request.get_version(),
- request.get_action_name())
- timeout = jmespath.search(path, _api_timeout_config_data)
- if timeout is None:
- return DEFAULT_READ_TIMEOUT
- else:
- aliyunsdkcore.utils.validation.assert_integer_positive(timeout, "timeout")
- return max(timeout, DEFAULT_READ_TIMEOUT)
- def _get_request_connect_timeout(self, request):
- if request._request_connect_timeout:
- return request._request_connect_timeout
- if self._connect_timeout:
- return self._connect_timeout
- return DEFAULT_CONNECTION_TIMEOUT
- def _handle_retry_and_timeout(self, endpoint, request, signer):
- # TODO: replace it with a retry_handler
- # it's a temporary implementation. the long-term plan will be a group a normalized handlers
- # which contains retry_handler and timeout_handler
- # decide whether we should initialize a ClientToken for the request
- retry_policy_context = RetryPolicyContext(request, None, 0, None)
- if self._retry_policy.should_retry(retry_policy_context) & \
- RetryCondition.SHOULD_RETRY_WITH_CLIENT_TOKEN:
- self._add_request_client_token(request)
- request_read_timeout = self._get_request_read_timeout(request)
- request_connect_timeout = self._get_request_connect_timeout(request)
- retries = 0
- while True:
- status, headers, body, exception = self._handle_single_request(endpoint,
- request,
- request_read_timeout,
- request_connect_timeout,
- signer)
- retry_policy_context = RetryPolicyContext(request, exception, retries, status)
- retryable = self._retry_policy.should_retry(retry_policy_context)
- if retryable & RetryCondition.NO_RETRY:
- break
- logger.debug("Retry needed. Request:%s Retries :%d",
- request.get_action_name(), retries)
- retry_policy_context.retryable = retryable
- time_to_sleep = self._retry_policy.compute_delay_before_next_retry(retry_policy_context)
- time.sleep(time_to_sleep / 1000.0)
- retries += 1
- if isinstance(exception, ClientException):
- raise exception
- return status, headers, body, exception
- def _handle_single_request(self, endpoint, request, read_timeout, connect_timeout, signer):
- http_response = self._make_http_response(endpoint, request, read_timeout, connect_timeout,
- signer)
- params = copy.deepcopy(request.get_query_params())
- params.pop('AccessKeyId', None)
- logger.debug('Request received. Product:%s Endpoint:%s Params: %s',
- request.get_product(), endpoint, str(params))
- # Do the actual network thing
- try:
- status, headers, body = http_response.get_response_object()
- except IOError as e:
- exception = ClientException(error_code.SDK_HTTP_ERROR, str(e))
- logger.error("HttpError occurred. Host:%s SDK-Version:%s ClientException:%s",
- endpoint, aliyunsdkcore.__version__, str(exception))
- return None, None, None, exception
- exception = self._get_server_exception(status, body, endpoint, request.string_to_sign)
- return status, headers, body, exception
- @staticmethod
- def _parse_error_info_from_response_body(response_body):
- error_code_to_return = error_code.SDK_UNKNOWN_SERVER_ERROR
- # TODO handle if response_body is too big
- error_message_to_return = "ServerResponseBody: " + str(response_body)
- try:
- body_obj = json.loads(response_body)
- if 'Code' in body_obj:
- error_code_to_return = body_obj['Code']
- if 'Message' in body_obj:
- error_message_to_return = body_obj['Message']
- except ValueError:
- # failed to parse body as json format
- logger.warning('Failed to parse response as json format. Response:%s', response_body)
- return error_code_to_return, error_message_to_return
- def _get_server_exception(self, http_status, response_body, endpoint, string_to_sign):
- request_id = None
- try:
- body_obj = json.loads(response_body.decode('utf-8'))
- request_id = body_obj.get('RequestId')
- except (ValueError, TypeError, AttributeError):
- # in case the response body is not a json string, return the raw
- # data instead
- logger.warning('Failed to parse response as json format. Response:%s', response_body)
- if http_status < codes.OK or http_status >= codes.MULTIPLE_CHOICES:
- server_error_code, server_error_message = self._parse_error_info_from_response_body(
- response_body.decode('utf-8'))
- if http_status == codes.BAD_REQUEST and server_error_code == 'SignatureDoesNotMatch':
- if string_to_sign == server_error_message.split(':')[1]:
- server_error_code = 'InvalidAccessKeySecret'
- server_error_message = 'The AccessKeySecret is incorrect. ' \
- 'Please check your AccessKeyId and AccessKeySecret.'
- exception = ServerException(
- server_error_code,
- server_error_message,
- http_status=http_status,
- request_id=request_id)
- logger.error("ServerException occurred. Host:%s SDK-Version:%s ServerException:%s",
- endpoint, aliyunsdkcore.__version__, str(exception))
- return exception
- def do_action_with_exception(self, acs_request):
- # set server response format as json, because this function will
- # parse the response so which format doesn't matter
- acs_request.set_accept_format('JSON')
- status, headers, body, exception = self._implementation_of_do_action(acs_request)
- if exception:
- raise exception
- logger.debug('Response received. Product:%s Response-body: %s',
- acs_request.get_product(), body)
- return body
- def _resolve_endpoint(self, request):
- resolve_request = ResolveEndpointRequest(
- self._region_id,
- request.get_product(),
- request.get_location_service_code(),
- request.get_location_endpoint_type(),
- )
- resolve_request.request_network = request.request_network
- resolve_request.product_suffix = request.product_suffix
- resolve_request.endpoint_map = request.endpoint_map
- resolve_request.endpoint_regional = request.endpoint_regional
- return self._endpoint_resolver.resolve(resolve_request)
- def do_action(self, acs_request):
- warnings.warn(
- "do_action() method is deprecated, please use do_action_with_exception() instead.",
- DeprecationWarning)
- status, headers, body, exception = self._implementation_of_do_action(acs_request)
- return body
- def get_response(self, acs_request):
- return self.implementation_of_do_action(acs_request)
- def add_endpoint(self, region_id, product_code, endpoint):
- self._endpoint_resolver.put_endpoint_entry(
- region_id, product_code, endpoint)
- def set_stream_logger(self, log_level=logging.DEBUG, logger_name='aliyunsdkcore', stream=None,
- format_string=None):
- log = logging.getLogger(logger_name)
- log.setLevel(log_level)
- ch = logging.StreamHandler(stream)
- ch.setLevel(log_level)
- if format_string is None:
- format_string = self.LOG_FORMAT
- formatter = logging.Formatter(format_string)
- ch.setFormatter(formatter)
- log.addHandler(ch)
- def set_file_logger(self, path, log_level=logging.DEBUG, logger_name='aliyunsdkcore'):
- log = logging.getLogger(logger_name)
- log.setLevel(log_level)
- fh = logging.FileHandler(path)
- fh.setLevel(log_level)
- formatter = logging.Formatter(self.LOG_FORMAT)
- fh.setFormatter(formatter)
- log.addHandler(fh)
|