credentials.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. # -*- coding: utf-8 -*-
  2. import time
  3. from alibabacloud_credentials.utils import auth_constant as ac
  4. class _AutomaticallyRefreshCredentials(object):
  5. def __init__(self, expiration, provider):
  6. self.expiration = expiration
  7. self.provider = provider
  8. def _with_should_refresh(self):
  9. return int(time.mktime(time.localtime())) >= (self.expiration - 180)
  10. def _get_new_credential(self):
  11. return self.provider.get_credentials()
  12. def _refresh_credential(self):
  13. if self._with_should_refresh():
  14. return self._get_new_credential()
  15. class AccessKeyCredential(object):
  16. """AccessKeyCredential"""
  17. def __init__(self, access_key_id, access_key_secret):
  18. self.access_key_id = access_key_id
  19. self.access_key_secret = access_key_secret
  20. self.credential_type = ac.ACCESS_KEY
  21. def get_access_key_id(self):
  22. return self.access_key_id
  23. def get_access_key_secret(self):
  24. return self.access_key_secret
  25. class BearerTokenCredential(object):
  26. """BearerTokenCredential"""
  27. def __init__(self, bearer_token):
  28. self.bearer_token = bearer_token
  29. self.credential_type = ac.BEARER
  30. class EcsRamRoleCredential(_AutomaticallyRefreshCredentials):
  31. """EcsRamRoleCredential"""
  32. def __init__(self, access_key_id, access_key_secret, security_token, expiration, provider):
  33. super(EcsRamRoleCredential, self).__init__(expiration, provider)
  34. self.access_key_id = access_key_id
  35. self.access_key_secret = access_key_secret
  36. self.security_token = security_token
  37. self.credential_type = ac.ECS_RAM_ROLE
  38. def _refresh_credential(self):
  39. credential = super(EcsRamRoleCredential, self)._refresh_credential()
  40. if credential:
  41. self.access_key_id = credential.access_key_id
  42. self.access_key_secret = credential.access_key_secret
  43. self.expiration = credential.expiration
  44. self.security_token = credential.security_token
  45. def get_access_key_id(self):
  46. self._refresh_credential()
  47. return self.access_key_id
  48. def get_access_key_secret(self):
  49. self._refresh_credential()
  50. return self.access_key_secret
  51. def get_security_token(self):
  52. self._refresh_credential()
  53. return self.security_token
  54. class RamRoleArnCredential(_AutomaticallyRefreshCredentials):
  55. """RamRoleArnCredential"""
  56. def __init__(self, access_key_id, access_key_secret, security_token, expiration, provider):
  57. super(RamRoleArnCredential, self).__init__(expiration, provider)
  58. self.access_key_id = access_key_id
  59. self.access_key_secret = access_key_secret
  60. self.security_token = security_token
  61. self.credential_type = ac.RAM_ROLE_ARN
  62. def _refresh_credential(self):
  63. credential = super(RamRoleArnCredential, self)._refresh_credential()
  64. if credential:
  65. self.access_key_id = credential.access_key_id
  66. self.access_key_secret = credential.access_key_secret
  67. self.expiration = credential.expiration
  68. self.security_token = credential.security_token
  69. def get_access_key_id(self):
  70. self._refresh_credential()
  71. return self.access_key_id
  72. def get_access_key_secret(self):
  73. self._refresh_credential()
  74. return self.access_key_secret
  75. def get_security_token(self):
  76. self._refresh_credential()
  77. return self.security_token
  78. class RsaKeyPairCredential(_AutomaticallyRefreshCredentials):
  79. def __init__(self, access_key_id, access_key_secret, expiration, provider):
  80. super(RsaKeyPairCredential, self).__init__(expiration, provider)
  81. self.access_key_id = access_key_id
  82. self.access_key_secret = access_key_secret
  83. self.credential_type = ac.RSA_KEY_PAIR
  84. def _refresh_credential(self):
  85. credential = super(RsaKeyPairCredential, self)._refresh_credential()
  86. if credential:
  87. self.access_key_id = credential.access_key_id
  88. self.access_key_secret = credential.access_key_secret
  89. self.expiration = credential.expiration
  90. def get_access_key_id(self):
  91. self._refresh_credential()
  92. return self.access_key_id
  93. def get_access_key_secret(self):
  94. self._refresh_credential()
  95. return self.access_key_secret
  96. class StsCredential(object):
  97. def __init__(self, access_key_id, access_key_secret, security_token):
  98. self.access_key_id = access_key_id
  99. self.access_key_secret = access_key_secret
  100. self.security_token = security_token
  101. self.credential_type = ac.STS
  102. def get_access_key_id(self):
  103. return self.access_key_id
  104. def get_access_key_secret(self):
  105. return self.access_key_secret
  106. def get_security_token(self):
  107. return self.security_token