auth.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. from __future__ import print_function
  2. import six
  3. import logging
  4. from tweepy.error import TweepError
  5. from tweepy.api import API
  6. import requests
  7. from requests_oauthlib import OAuth1Session, OAuth1
  8. from requests.auth import AuthBase
  9. from six.moves.urllib.parse import parse_qs
  10. WARNING_MESSAGE = """Warning! Due to a Twitter API bug, signin_with_twitter
  11. and access_type don't always play nice together. Details
  12. https://dev.twitter.com/discussions/21281"""
  13. class AuthHandler(object):
  14. def apply_auth(self, url, method, headers, parameters):
  15. """Apply authentication headers to request"""
  16. raise NotImplementedError
  17. def get_username(self):
  18. """Return the username of the authenticated user"""
  19. raise NotImplementedError
  20. class OAuthHandler(AuthHandler):
  21. """OAuth authentication handler"""
  22. OAUTH_HOST = 'api.twitter.com'
  23. OAUTH_ROOT = '/oauth/'
  24. def __init__(self, consumer_key, consumer_secret, callback=None):
  25. if type(consumer_key) == six.text_type:
  26. consumer_key = consumer_key.encode('ascii')
  27. if type(consumer_secret) == six.text_type:
  28. consumer_secret = consumer_secret.encode('ascii')
  29. self.consumer_key = consumer_key
  30. self.consumer_secret = consumer_secret
  31. self.access_token = None
  32. self.access_token_secret = None
  33. self.callback = callback
  34. self.username = None
  35. self.oauth = OAuth1Session(consumer_key,
  36. client_secret=consumer_secret,
  37. callback_uri=self.callback)
  38. def _get_oauth_url(self, endpoint):
  39. return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
  40. def apply_auth(self):
  41. return OAuth1(self.consumer_key,
  42. client_secret=self.consumer_secret,
  43. resource_owner_key=self.access_token,
  44. resource_owner_secret=self.access_token_secret,
  45. decoding=None)
  46. def _get_request_token(self, access_type=None):
  47. try:
  48. url = self._get_oauth_url('request_token')
  49. if access_type:
  50. url += '?x_auth_access_type=%s' % access_type
  51. return self.oauth.fetch_request_token(url)
  52. except Exception as e:
  53. raise TweepError(e)
  54. def set_access_token(self, key, secret):
  55. self.access_token = key
  56. self.access_token_secret = secret
  57. def get_authorization_url(self,
  58. signin_with_twitter=False,
  59. access_type=None):
  60. """Get the authorization URL to redirect the user"""
  61. try:
  62. if signin_with_twitter:
  63. url = self._get_oauth_url('authenticate')
  64. if access_type:
  65. logging.warning(WARNING_MESSAGE)
  66. else:
  67. url = self._get_oauth_url('authorize')
  68. self.request_token = self._get_request_token(access_type=access_type)
  69. return self.oauth.authorization_url(url)
  70. except Exception as e:
  71. raise TweepError(e)
  72. def get_access_token(self, verifier=None):
  73. """
  74. After user has authorized the request token, get access token
  75. with user supplied verifier.
  76. """
  77. try:
  78. url = self._get_oauth_url('access_token')
  79. self.oauth = OAuth1Session(self.consumer_key,
  80. client_secret=self.consumer_secret,
  81. resource_owner_key=self.request_token['oauth_token'],
  82. resource_owner_secret=self.request_token['oauth_token_secret'],
  83. verifier=verifier, callback_uri=self.callback)
  84. resp = self.oauth.fetch_access_token(url)
  85. self.access_token = resp['oauth_token']
  86. self.access_token_secret = resp['oauth_token_secret']
  87. return self.access_token, self.access_token_secret
  88. except Exception as e:
  89. raise TweepError(e)
  90. def get_xauth_access_token(self, username, password):
  91. """
  92. Get an access token from an username and password combination.
  93. In order to get this working you need to create an app at
  94. http://twitter.com/apps, after that send a mail to api@twitter.com
  95. and request activation of xAuth for it.
  96. """
  97. try:
  98. url = self._get_oauth_url('access_token')
  99. oauth = OAuth1(self.consumer_key,
  100. client_secret=self.consumer_secret)
  101. r = requests.post(url=url,
  102. auth=oauth,
  103. headers={'x_auth_mode': 'client_auth',
  104. 'x_auth_username': username,
  105. 'x_auth_password': password})
  106. credentials = parse_qs(r.content)
  107. return credentials.get('oauth_token')[0], credentials.get('oauth_token_secret')[0]
  108. except Exception as e:
  109. raise TweepError(e)
  110. def get_username(self):
  111. if self.username is None:
  112. api = API(self)
  113. user = api.verify_credentials()
  114. if user:
  115. self.username = user.screen_name
  116. else:
  117. raise TweepError('Unable to get username,'
  118. ' invalid oauth token!')
  119. return self.username
  120. class OAuth2Bearer(AuthBase):
  121. def __init__(self, bearer_token):
  122. self.bearer_token = bearer_token
  123. def __call__(self, request):
  124. request.headers['Authorization'] = 'Bearer ' + self.bearer_token
  125. return request
  126. class AppAuthHandler(AuthHandler):
  127. """Application-only authentication handler"""
  128. OAUTH_HOST = 'api.twitter.com'
  129. OAUTH_ROOT = '/oauth2/'
  130. def __init__(self, consumer_key, consumer_secret):
  131. self.consumer_key = consumer_key
  132. self.consumer_secret = consumer_secret
  133. self._bearer_token = ''
  134. resp = requests.post(self._get_oauth_url('token'),
  135. auth=(self.consumer_key,
  136. self.consumer_secret),
  137. data={'grant_type': 'client_credentials'})
  138. data = resp.json()
  139. if data.get('token_type') != 'bearer':
  140. raise TweepError('Expected token_type to equal "bearer", '
  141. 'but got %s instead' % data.get('token_type'))
  142. self._bearer_token = data['access_token']
  143. def _get_oauth_url(self, endpoint):
  144. return 'https://' + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
  145. def apply_auth(self):
  146. return OAuth2Bearer(self._bearer_token)