client.py 19 KB


  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. #
  12. #
  13. # Unless required by applicable law or agreed to in writing,
  14. # software distributed under the License is distributed on an
  15. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16. # KIND, either express or implied. See the License for the
  17. # specific language governing permissions and limitations
  18. # under the License.
  19. # coding=utf-8
  20. import time
  21. import warnings
  22. import json
  23. import logging
  24. import jmespath
  25. import copy
  26. import platform
  27. import aliyunsdkcore
  28. from aliyunsdkcore.vendored.six.moves.urllib.parse import urlencode
  29. from aliyunsdkcore.vendored.requests import codes
  30. from aliyunsdkcore.acs_exception.exceptions import ClientException
  31. from aliyunsdkcore.acs_exception.exceptions import ServerException
  32. from aliyunsdkcore.acs_exception import error_code, error_msg
  33. from aliyunsdkcore.http.http_response import HttpResponse
  34. from aliyunsdkcore.request import AcsRequest
  35. from aliyunsdkcore.http import format_type
  36. from aliyunsdkcore.auth.signers.signer_factory import SignerFactory
  37. from aliyunsdkcore.request import CommonRequest
  38. from aliyunsdkcore.endpoint.resolver_endpoint_request import ResolveEndpointRequest
  39. from aliyunsdkcore.endpoint.default_endpoint_resolver import DefaultEndpointResolver
  40. import aliyunsdkcore.retry.retry_policy as retry_policy
  41. from aliyunsdkcore.retry.retry_condition import RetryCondition
  42. from aliyunsdkcore.retry.retry_policy_context import RetryPolicyContext
  43. import aliyunsdkcore.utils
  44. import aliyunsdkcore.utils.parameter_helper
  45. import aliyunsdkcore.utils.validation
  46. from aliyunsdkcore.vendored.requests.structures import CaseInsensitiveDict
  47. from aliyunsdkcore.vendored.requests.structures import OrderedDict
  48. """
  49. Acs default client module.
  50. """
  51. DEFAULT_READ_TIMEOUT = 10
  52. DEFAULT_CONNECTION_TIMEOUT = 5
  53. # TODO: replace it with TimeoutHandler
  54. _api_timeout_config_data = aliyunsdkcore.utils._load_json_from_data_dir("timeout_config.json")
  55. logger = logging.getLogger(__name__)
  56. class AcsClient:
  57. LOG_FORMAT = '%(thread)d %(asctime)s %(name)s %(levelname)s %(message)s'
  58. def __init__(
  59. self,
  60. ak=None,
  61. secret=None,
  62. region_id="cn-hangzhou",
  63. auto_retry=True,
  64. max_retry_time=None,
  65. user_agent=None,
  66. port=80,
  67. connect_timeout=None,
  68. timeout=None,
  69. public_key_id=None,
  70. private_key=None,
  71. session_period=3600,
  72. credential=None,
  73. debug=False,
  74. verify=None):
  75. """
  76. constructor for AcsClient
  77. :param ak: String, access key id
  78. :param secret: String, access key secret
  79. :param region_id: String, region id
  80. :param auto_retry: Boolean
  81. :param max_retry_time: Number
  82. :return:
  83. """
  84. self._max_retry_num = max_retry_time
  85. self._auto_retry = auto_retry
  86. self._ak = ak
  87. self._secret = secret
  88. self._region_id = region_id
  89. self._user_agent = user_agent
  90. self._port = port
  91. self._connect_timeout = connect_timeout
  92. self._read_timeout = timeout
  93. self._extra_user_agent = {}
  94. self._verify = verify
  95. credential = {
  96. 'ak': ak,
  97. 'secret': secret,
  98. 'public_key_id': public_key_id,
  99. 'private_key': private_key,
  100. 'session_period': session_period,
  101. 'credential': credential,
  102. }
  103. self._signer = SignerFactory.get_signer(
  104. credential, region_id, self._implementation_of_do_action, debug)
  105. self._endpoint_resolver = DefaultEndpointResolver(self)
  106. if self._auto_retry:
  107. self._retry_policy = retry_policy.get_default_retry_policy(
  108. max_retry_times=self._max_retry_num)
  109. else:
  110. self._retry_policy = retry_policy.NO_RETRY_POLICY
  111. def get_region_id(self):
  112. return self._region_id
  113. def get_access_key(self):
  114. return self._ak
  115. def get_access_secret(self):
  116. return self._secret
  117. def is_auto_retry(self):
  118. return self._auto_retry
  119. def get_max_retry_num(self):
  120. return self._max_retry_num
  121. def get_user_agent(self):
  122. return self._user_agent
  123. def get_verify(self):
  124. return self._verify
  125. def set_region_id(self, region):
  126. self._region_id = region
  127. def set_max_retry_num(self, num):
  128. self._max_retry_num = num
  129. def set_auto_retry(self, flag):
  130. """
  131. set whether or not the client perform auto-retry
  132. :param flag: Booleans
  133. :return: None
  134. """
  135. self._auto_retry = flag
  136. def set_user_agent(self, agent):
  137. """
  138. User agent set to client will overwrite the request setting.
  139. :param agent:
  140. :return:
  141. """
  142. self._user_agent = agent
  143. def set_verify(self, verify):
  144. self._verify = verify
  145. def append_user_agent(self, key, value):
  146. self._extra_user_agent.update({key: value})
  147. @staticmethod
  148. def user_agent_header():
  149. base = '%s (%s %s;%s)' \
  150. % ('AlibabaCloud',
  151. platform.system(),
  152. platform.release(),
  153. platform.machine()
  154. )
  155. return base
  156. @staticmethod
  157. def default_user_agent():
  158. default_agent = OrderedDict()
  159. default_agent['Python'] = platform.python_version()
  160. default_agent['Core'] = __import__('aliyunsdkcore').__version__
  161. default_agent['python-requests'] = __import__(
  162. 'aliyunsdkcore.vendored.requests.__version__', globals(), locals(),
  163. ['vendored', 'requests', '__version__'], 0).__version__
  164. return CaseInsensitiveDict(default_agent)
  165. def client_user_agent(self):
  166. client_user_agent = {}
  167. if self.get_user_agent() is not None:
  168. client_user_agent.update({'client': self.get_user_agent()})
  169. else:
  170. client_user_agent.update(self._extra_user_agent)
  171. return CaseInsensitiveDict(client_user_agent)
  172. def get_port(self):
  173. return self._port
  174. def get_location_service(self):
  175. return None
  176. @staticmethod
  177. def merge_user_agent(default_agent, extra_agent):
  178. if default_agent is None:
  179. return extra_agent
  180. if extra_agent is None:
  181. return default_agent
  182. user_agent = default_agent.copy()
  183. for key, value in extra_agent.items():
  184. if key not in default_agent:
  185. user_agent[key] = value
  186. return user_agent
  187. def handle_extra_agent(self, request):
  188. client_agent = self.client_user_agent()
  189. request_agent = request.request_user_agent()
  190. if client_agent is None:
  191. return request_agent
  192. if request_agent is None:
  193. return client_agent
  194. for key in request_agent:
  195. if key in client_agent:
  196. client_agent.pop(key)
  197. client_agent.update(request_agent)
  198. return client_agent
  199. def _make_http_response(self, endpoint, request, read_timeout, connect_timeout,
  200. specific_signer=None):
  201. body_params = request.get_body_params()
  202. if body_params:
  203. body = urlencode(body_params)
  204. request.set_content(body)
  205. request.set_content_type(format_type.APPLICATION_FORM)
  206. elif request.get_content() and "Content-Type" not in request.get_headers():
  207. request.set_content_type(format_type.APPLICATION_OCTET_STREAM)
  208. method = request.get_method()
  209. signer = self._signer if specific_signer is None else specific_signer
  210. header, url = signer.sign(self._region_id, request)
  211. base = self.user_agent_header()
  212. extra_agent = self.handle_extra_agent(request)
  213. default_agent = self.default_user_agent()
  214. user_agent = self.merge_user_agent(default_agent, extra_agent)
  215. for key, value in user_agent.items():
  216. base += ' %s/%s' % (key, value)
  217. header['User-Agent'] = base
  218. header['x-sdk-client'] = 'python/2.0.0'
  219. protocol = request.get_protocol_type()
  220. response = HttpResponse(
  221. endpoint,
  222. url,
  223. method,
  224. header,
  225. protocol,
  226. request.get_content(),
  227. self._port,
  228. read_timeout=read_timeout,
  229. connect_timeout=connect_timeout,
  230. verify=self.get_verify())
  231. if body_params:
  232. body = urlencode(request.get_body_params())
  233. response.set_content(body, "utf-8", format_type.APPLICATION_FORM)
  234. return response
  235. def _implementation_of_do_action(self, request, signer=None):
  236. if not isinstance(request, AcsRequest):
  237. raise ClientException(
  238. error_code.SDK_INVALID_REQUEST,
  239. error_msg.get_msg('SDK_INVALID_REQUEST'))
  240. # modify Accept-Encoding
  241. request.add_header('Accept-Encoding', 'identity')
  242. if isinstance(request, CommonRequest):
  243. request.trans_to_acs_request()
  244. if request.endpoint:
  245. endpoint = request.endpoint
  246. else:
  247. endpoint = self._resolve_endpoint(request)
  248. return self._handle_retry_and_timeout(endpoint, request, signer)
  249. def implementation_of_do_action(self, request, signer=None):
  250. # keep compatible
  251. warnings.warn(
  252. "implementation_of_do_action() method is deprecated",
  253. DeprecationWarning)
  254. status, headers, body, exception = self._implementation_of_do_action(request, signer)
  255. return status, headers, body
  256. def _add_request_client_token(self, request):
  257. if hasattr(request, "set_ClientToken") and hasattr(request, "get_ClientToken"):
  258. client_token = request.get_ClientToken()
  259. if not client_token:
  260. # ClientToken has not been set
  261. client_token = aliyunsdkcore.utils.parameter_helper.get_uuid() # up to 60 chars
  262. request.set_ClientToken(client_token)
  263. def _get_request_read_timeout(self, request):
  264. # TODO: replace it with a timeout_handler
  265. if request._request_read_timeout:
  266. return request._request_read_timeout
  267. # if self._timeout:
  268. # return self._timeout
  269. if self._read_timeout:
  270. return self._read_timeout
  271. if request.get_product() is None:
  272. return DEFAULT_READ_TIMEOUT
  273. path = '"{0}"."{1}"."{2}"'.format(request.get_product().lower(), request.get_version(),
  274. request.get_action_name())
  275. timeout = jmespath.search(path, _api_timeout_config_data)
  276. if timeout is None:
  277. return DEFAULT_READ_TIMEOUT
  278. else:
  279. aliyunsdkcore.utils.validation.assert_integer_positive(timeout, "timeout")
  280. return max(timeout, DEFAULT_READ_TIMEOUT)
  281. def _get_request_connect_timeout(self, request):
  282. if request._request_connect_timeout:
  283. return request._request_connect_timeout
  284. if self._connect_timeout:
  285. return self._connect_timeout
  286. return DEFAULT_CONNECTION_TIMEOUT
  287. def _handle_retry_and_timeout(self, endpoint, request, signer):
  288. # TODO: replace it with a retry_handler
  289. # it's a temporary implementation. the long-term plan will be a group a normalized handlers
  290. # which contains retry_handler and timeout_handler
  291. # decide whether we should initialize a ClientToken for the request
  292. retry_policy_context = RetryPolicyContext(request, None, 0, None)
  293. if self._retry_policy.should_retry(retry_policy_context) & \
  294. RetryCondition.SHOULD_RETRY_WITH_CLIENT_TOKEN:
  295. self._add_request_client_token(request)
  296. request_read_timeout = self._get_request_read_timeout(request)
  297. request_connect_timeout = self._get_request_connect_timeout(request)
  298. retries = 0
  299. while True:
  300. status, headers, body, exception = self._handle_single_request(endpoint,
  301. request,
  302. request_read_timeout,
  303. request_connect_timeout,
  304. signer)
  305. retry_policy_context = RetryPolicyContext(request, exception, retries, status)
  306. retryable = self._retry_policy.should_retry(retry_policy_context)
  307. if retryable & RetryCondition.NO_RETRY:
  308. break
  309. logger.debug("Retry needed. Request:%s Retries :%d",
  310. request.get_action_name(), retries)
  311. retry_policy_context.retryable = retryable
  312. time_to_sleep = self._retry_policy.compute_delay_before_next_retry(retry_policy_context)
  313. time.sleep(time_to_sleep / 1000.0)
  314. retries += 1
  315. if isinstance(exception, ClientException):
  316. raise exception
  317. return status, headers, body, exception
  318. def _handle_single_request(self, endpoint, request, read_timeout, connect_timeout, signer):
  319. http_response = self._make_http_response(endpoint, request, read_timeout, connect_timeout,
  320. signer)
  321. params = copy.deepcopy(request.get_query_params())
  322. params.pop('AccessKeyId', None)
  323. logger.debug('Request received. Product:%s Endpoint:%s Params: %s',
  324. request.get_product(), endpoint, str(params))
  325. # Do the actual network thing
  326. try:
  327. status, headers, body = http_response.get_response_object()
  328. except IOError as e:
  329. exception = ClientException(error_code.SDK_HTTP_ERROR, str(e))
  330. logger.error("HttpError occurred. Host:%s SDK-Version:%s ClientException:%s",
  331. endpoint, aliyunsdkcore.__version__, str(exception))
  332. return None, None, None, exception
  333. exception = self._get_server_exception(status, body, endpoint, request.string_to_sign)
  334. return status, headers, body, exception
  335. @staticmethod
  336. def _parse_error_info_from_response_body(response_body):
  337. error_code_to_return = error_code.SDK_UNKNOWN_SERVER_ERROR
  338. # TODO handle if response_body is too big
  339. error_message_to_return = "ServerResponseBody: " + str(response_body)
  340. try:
  341. body_obj = json.loads(response_body)
  342. if 'Code' in body_obj:
  343. error_code_to_return = body_obj['Code']
  344. if 'Message' in body_obj:
  345. error_message_to_return = body_obj['Message']
  346. except ValueError:
  347. # failed to parse body as json format
  348. logger.warning('Failed to parse response as json format. Response:%s', response_body)
  349. return error_code_to_return, error_message_to_return
  350. def _get_server_exception(self, http_status, response_body, endpoint, string_to_sign):
  351. request_id = None
  352. try:
  353. body_obj = json.loads(response_body.decode('utf-8'))
  354. request_id = body_obj.get('RequestId')
  355. except (ValueError, TypeError, AttributeError):
  356. # in case the response body is not a json string, return the raw
  357. # data instead
  358. logger.warning('Failed to parse response as json format. Response:%s', response_body)
  359. if http_status < codes.OK or http_status >= codes.MULTIPLE_CHOICES:
  360. server_error_code, server_error_message = self._parse_error_info_from_response_body(
  361. response_body.decode('utf-8'))
  362. if http_status == codes.BAD_REQUEST and server_error_code == 'SignatureDoesNotMatch':
  363. if string_to_sign == server_error_message.split(':')[1]:
  364. server_error_code = 'InvalidAccessKeySecret'
  365. server_error_message = 'The AccessKeySecret is incorrect. ' \
  366. 'Please check your AccessKeyId and AccessKeySecret.'
  367. exception = ServerException(
  368. server_error_code,
  369. server_error_message,
  370. http_status=http_status,
  371. request_id=request_id)
  372. logger.error("ServerException occurred. Host:%s SDK-Version:%s ServerException:%s",
  373. endpoint, aliyunsdkcore.__version__, str(exception))
  374. return exception
  375. def do_action_with_exception(self, acs_request):
  376. # set server response format as json, because this function will
  377. # parse the response so which format doesn't matter
  378. acs_request.set_accept_format('JSON')
  379. status, headers, body, exception = self._implementation_of_do_action(acs_request)
  380. if exception:
  381. raise exception
  382. logger.debug('Response received. Product:%s Response-body: %s',
  383. acs_request.get_product(), body)
  384. return body
  385. def _resolve_endpoint(self, request):
  386. resolve_request = ResolveEndpointRequest(
  387. self._region_id,
  388. request.get_product(),
  389. request.get_location_service_code(),
  390. request.get_location_endpoint_type(),
  391. )
  392. resolve_request.request_network = request.request_network
  393. resolve_request.product_suffix = request.product_suffix
  394. resolve_request.endpoint_map = request.endpoint_map
  395. resolve_request.endpoint_regional = request.endpoint_regional
  396. return self._endpoint_resolver.resolve(resolve_request)
  397. def do_action(self, acs_request):
  398. warnings.warn(
  399. "do_action() method is deprecated, please use do_action_with_exception() instead.",
  400. DeprecationWarning)
  401. status, headers, body, exception = self._implementation_of_do_action(acs_request)
  402. return body
  403. def get_response(self, acs_request):
  404. return self.implementation_of_do_action(acs_request)
  405. def add_endpoint(self, region_id, product_code, endpoint):
  406. self._endpoint_resolver.put_endpoint_entry(
  407. region_id, product_code, endpoint)
  408. def set_stream_logger(self, log_level=logging.DEBUG, logger_name='aliyunsdkcore', stream=None,
  409. format_string=None):
  410. log = logging.getLogger(logger_name)
  411. log.setLevel(log_level)
  412. ch = logging.StreamHandler(stream)
  413. ch.setLevel(log_level)
  414. if format_string is None:
  415. format_string = self.LOG_FORMAT
  416. formatter = logging.Formatter(format_string)
  417. ch.setFormatter(formatter)
  418. log.addHandler(ch)
  419. def set_file_logger(self, path, log_level=logging.DEBUG, logger_name='aliyunsdkcore'):
  420. log = logging.getLogger(logger_name)
  421. log.setLevel(log_level)
  422. fh = logging.FileHandler(path)
  423. fh.setLevel(log_level)
  424. formatter = logging.Formatter(self.LOG_FORMAT)
  425. fh.setFormatter(formatter)
  426. log.addHandler(fh)