123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- # -*- coding: utf-8 -*-
- import binascii
- import datetime
- import hashlib
- import hmac
- import base64
- import copy
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives import hashes
- from cryptography.hazmat.primitives.asymmetric import padding
- from cryptography.hazmat.primitives.serialization import load_pem_private_key
- try:
- from urllib import quote_plus, quote
- except ImportError:
- from urllib.parse import quote_plus, quote
- from .sm3 import hash_sm3, Sm3
- from alibabacloud_tea_util.client import Client as Util
- from Tea.stream import STREAM_CLASS
- from Tea.model import TeaModel
- from Tea.converter import TeaConverter
- def prepare_headers(headers):
- canon_keys = []
- tmp_headers = {}
- for k, v in headers.items():
- if v is not None:
- if k.lower() not in canon_keys:
- canon_keys.append(k.lower())
- tmp_headers[k.lower()] = [TeaConverter.to_string(v).strip()]
- else:
- tmp_headers[k.lower()].append(TeaConverter.to_string(v).strip())
- canon_keys.sort()
- return {key: ','.join(sorted(tmp_headers[key])) for key in canon_keys}
- def rsa_sign(plaintext, secret):
- if not secret.startswith('-----BEGIN RSA PRIVATE KEY-----'):
- secret = '-----BEGIN RSA PRIVATE KEY-----\n%s' % secret
- if not secret.endswith('-----END RSA PRIVATE KEY-----'):
- secret = '%s\n-----END RSA PRIVATE KEY-----' % secret
- key = load_pem_private_key(secret, password=None, backend=default_backend())
- return key.sign(plaintext, padding.PKCS1v15(), hashes.SHA256())
- def signature_method(secret, source, sign_type):
- source = source.encode('utf-8')
- secret = secret.encode('utf-8')
- if sign_type == 'ACS3-HMAC-SHA256':
- return hmac.new(secret, source, hashlib.sha256).digest()
- elif sign_type == 'ACS3-HMAC-SM3':
- return hmac.new(secret, source, Sm3).digest()
- elif sign_type == 'ACS3-RSA-SHA256':
- return rsa_sign(source, secret)
- def get_canonical_query_string(query):
- if query is None or len(query) <= 0:
- return ''
- canon_keys = []
- for k, v in query.items():
- if v is not None:
- canon_keys.append(k)
- canon_keys.sort()
- query_string = ''
- for key in canon_keys:
- value = quote(TeaConverter.to_str(query[key]), safe='~')
- if value is None:
- s = '%s&' % key
- else:
- s = '%s=%s&' % (key, value)
- query_string += s
- return query_string[:-1]
- def get_canonicalized_headers(headers):
- canon_keys = []
- tmp_headers = {}
- for k, v in headers.items():
- if v is not None:
- if k.lower() not in canon_keys:
- canon_keys.append(k.lower())
- tmp_headers[k.lower()] = [TeaConverter.to_string(v).strip()]
- else:
- tmp_headers[k.lower()].append(TeaConverter.to_string(v).strip())
- canon_keys.sort()
- canonical_headers = ''
- for key in canon_keys:
- header_entry = ','.join(sorted(tmp_headers[key]))
- s = '%s:%s\n' % (key, header_entry)
- canonical_headers += s
- return canonical_headers, ';'.join(canon_keys)
- class Client(object):
- """
- This is for OpenApi Util
- """
- @staticmethod
- def convert(body, content):
- """
- Convert all params of body other than type of readable into content
- @param body: source Model
- @param content: target Model
- @return: void
- """
- pros = {}
- body_map = body.to_map()
- for k, v in body_map.items():
- if not isinstance(v, STREAM_CLASS):
- pros[k] = copy.deepcopy(v)
- content.from_map(pros)
- @staticmethod
- def _get_canonicalized_headers(headers):
- canon_keys = []
- for k in headers:
- if k.startswith('x-acs-'):
- canon_keys.append(k)
- canon_keys = sorted(canon_keys)
- canon_header = ''
- for k in canon_keys:
- canon_header += '%s:%s\n' % (k, headers[k])
- return canon_header
- @staticmethod
- def _get_canonicalized_resource(pathname, query):
- if len(query) <= 0:
- return pathname
- resource = '%s?' % pathname
- query_list = sorted(list(query))
- for key in query_list:
- if query[key] is not None:
- if query[key] == '':
- s = '%s&' % key
- else:
- s = '%s=%s&' % (key, TeaConverter.to_string(query[key]))
- resource += s
- return resource[:-1]
- @staticmethod
- def get_string_to_sign(request):
- """
- Get the string to be signed according to request
- @param request: which contains signed messages
- @return: the signed string
- """
- method, pathname, headers, query = request.method, request.pathname, request.headers, request.query
- accept = '' if headers.get('accept') is None else headers.get('accept')
- content_md5 = '' if headers.get('content-md5') is None else headers.get('content-md5')
- content_type = '' if headers.get('content-type') is None else headers.get('content-type')
- date = '' if headers.get('date') is None else headers.get('date')
- header = '%s\n%s\n%s\n%s\n%s\n' % (method, accept, content_md5, content_type, date)
- canon_headers = Client._get_canonicalized_headers(headers)
- canon_resource = Client._get_canonicalized_resource(pathname, query)
- sign_str = header + canon_headers + canon_resource
- return sign_str
- @staticmethod
- def get_roasignature(string_to_sign, secret):
- """
- Get signature according to stringToSign, secret
- @type string_to_sign: str
- @param string_to_sign: the signed string
- @type secret: str
- @param secret: accesskey secret
- @return: the signature
- """
- hash_val = hmac.new(secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha1).digest()
- signature = base64.b64encode(hash_val).decode('utf-8')
- return signature
- @staticmethod
- def _object_handler(key, value, out):
- if value is None:
- return
- if isinstance(value, dict):
- for k, v in value.items():
- Client._object_handler('%s.%s' % (key, k), v, out)
- elif isinstance(value, TeaModel):
- for k, v in value.to_map().items():
- Client._object_handler('%s.%s' % (key, k), v, out)
- elif isinstance(value, (list, tuple)):
- for index, val in enumerate(value):
- Client._object_handler('%s.%s' % (key, index + 1), val, out)
- else:
- if key.startswith('.'):
- key = key[1:]
- if not isinstance(value, STREAM_CLASS):
- out[key] = TeaConverter.to_string(value)
- @staticmethod
- def to_form(filter):
- """
- Parse filter into a form string
- @type filter: dict
- @param filter: object
- @return: the string
- """
- result = {}
- if filter:
- Client._object_handler('', filter, result)
- return Util.to_form_string(
- Util.anyify_map_value(result)
- )
- @staticmethod
- def get_timestamp():
- """
- Get timestamp
- @return: the timestamp string
- """
- return datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
- @staticmethod
- def query(filter):
- """
- Parse filter into a object which's type is map[string]string
- @type filter: dict
- @param filter: query param
- @return: the object
- """
- out_dict = {}
- if filter:
- Client._object_handler('', filter, out_dict)
- return out_dict
- @staticmethod
- def get_rpcsignature(signed_params, method, secret):
- """
- Get signature according to signedParams, method and secret
- @type signed_params: dict
- @param signed_params: params which need to be signed
- @type method: str
- @param method: http method e.g. GET
- @type secret: str
- @param secret: AccessKeySecret
- @return: the signature
- """
- queries = signed_params.copy()
- keys = list(queries.keys())
- keys.sort()
- canonicalized_query_string = ""
- for k in keys:
- if queries[k] is not None:
- canonicalized_query_string += "&"
- canonicalized_query_string += quote(TeaConverter.to_str(k), safe='')
- canonicalized_query_string += "="
- canonicalized_query_string += quote(TeaConverter.to_str(queries[k]), safe='')
- string_to_sign = ""
- string_to_sign += method
- string_to_sign += '&'
- string_to_sign += quote_plus("/")
- string_to_sign += '&'
- string_to_sign += quote_plus(
- canonicalized_query_string[1:])
- digest_maker = hmac.new(TeaConverter.to_bytes(secret + '&'),
- TeaConverter.to_bytes(string_to_sign),
- digestmod=hashlib.sha1)
- hash_bytes = digest_maker.digest()
- signed_str = TeaConverter.to_string(base64.b64encode(hash_bytes))
- return signed_str
- @staticmethod
- def array_to_string_with_specified_style(array, prefix, style):
- """
- Parse array into a string with specified style
- @type array: any
- @param array: the array
- @type prefix: str
- @param prefix: the prefix string
- @param style: specified style e.g. repeatList
- @return: the string
- """
- if array is None:
- return ''
- if style == 'repeatList':
- return Client._flat_repeat_list({prefix: array})
- elif style == 'simple':
- return ','.join(map(str, array))
- elif style == 'spaceDelimited':
- return ' '.join(map(str, array))
- elif style == 'pipeDelimited':
- return '|'.join(map(str, array))
- elif style == 'json':
- return Util.to_jsonstring(array)
- else:
- return ''
- @staticmethod
- def _flat_repeat_list(dic):
- query = {}
- if dic:
- Client._object_handler('', dic, query)
- l = []
- q = sorted(query)
- for i in q:
- k = quote_plus(TeaConverter.to_str(i))
- v = quote_plus(TeaConverter.to_str(query[i]))
- l.append(k + '=' + v)
- return '&&'.join(l)
- @staticmethod
- def parse_to_map(inp):
- """
- Transform input as map.
- """
- try:
- result = Client._parse_to_dict(inp)
- return copy.deepcopy(result)
- except TypeError:
- return
- @staticmethod
- def _parse_to_dict(val):
- if isinstance(val, dict):
- result = {}
- for k, v in val.items():
- if isinstance(v, (list, dict, TeaModel)):
- result[k] = Client._parse_to_dict(v)
- else:
- result[k] = v
- return result
- elif isinstance(val, list):
- result = []
- for i in val:
- if isinstance(i, (list, dict, TeaModel)):
- result.append(Client._parse_to_dict(i))
- else:
- result.append(i)
- return result
- elif isinstance(val, TeaModel):
- return val.to_map()
- @staticmethod
- def get_endpoint(endpoint, server_use, endpoint_type):
- """
- If endpointType is internal, use internal endpoint
- If serverUse is true and endpointType is accelerate, use accelerate endpoint
- Default return endpoint
- @param server_use whether use accelerate endpoint
- @param endpoint_type value must be internal or accelerate
- @return the final endpoint
- """
- if endpoint_type == "internal":
- str_split = endpoint.split('.')
- str_split[0] += "-internal"
- endpoint = ".".join(str_split)
- if server_use and endpoint_type == "accelerate":
- return "oss-accelerate.aliyuncs.com"
- return endpoint
- @staticmethod
- def hash(raw, sign_type):
- if sign_type == 'ACS3-HMAC-SHA256' or sign_type == 'ACS3-RSA-SHA256':
- return hashlib.sha256(raw).digest()
- elif sign_type == 'ACS3-HMAC-SM3':
- return hash_sm3(raw)
- @staticmethod
- def hex_encode(raw):
- if raw:
- return binascii.b2a_hex(raw).decode('utf-8')
- @staticmethod
- def get_authorization(request, sign_type, payload, ak, secret):
- canonical_uri = request.pathname if request.pathname else '/'
- canonicalized_query = get_canonical_query_string(request.query)
- canonicalized_headers, signed_headers = get_canonicalized_headers(request.headers)
- request.headers = prepare_headers(request.headers)
- canonical_request = '%s\n%s\n%s\n%s\n%s\n%s' % (
- request.method,
- canonical_uri,
- canonicalized_query,
- canonicalized_headers,
- signed_headers, payload
- )
- str_to_sign = '%s\n%s' % (
- sign_type,
- Client.hex_encode(Client.hash(canonical_request.encode("utf-8"), sign_type))
- )
- signature = Client.hex_encode(signature_method(secret, str_to_sign, sign_type))
- auth = '%s Credential=%s,SignedHeaders=%s,Signature=%s' % (
- sign_type, ak, signed_headers, signature
- )
- return auth
- @staticmethod
- def get_encode_path(path):
- return quote(TeaConverter.to_str(path), safe='/~')
- @staticmethod
- def get_encode_param(param):
- return quote(TeaConverter.to_str(param), safe='~')
|