auth_test.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. # These tests do not currently do much to verify the correct implementation
  2. # of the openid/oauth protocols, they just exercise the major code paths
  3. # and ensure that it doesn't blow up (e.g. with unicode/bytes issues in
  4. # python 3)
  5. from __future__ import absolute_import, division, print_function
  6. import unittest
  7. import warnings
  8. from tornado.auth import (
  9. AuthError, OpenIdMixin, OAuthMixin, OAuth2Mixin,
  10. GoogleOAuth2Mixin, FacebookGraphMixin, TwitterMixin,
  11. )
  12. from tornado.concurrent import Future
  13. from tornado.escape import json_decode
  14. from tornado import gen
  15. from tornado.httputil import url_concat
  16. from tornado.log import gen_log, app_log
  17. from tornado.testing import AsyncHTTPTestCase, ExpectLog
  18. from tornado.test.util import ignore_deprecation
  19. from tornado.web import RequestHandler, Application, asynchronous, HTTPError
  20. try:
  21. from unittest import mock
  22. except ImportError:
  23. mock = None
  24. class OpenIdClientLoginHandlerLegacy(RequestHandler, OpenIdMixin):
  25. def initialize(self, test):
  26. self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
  27. with ignore_deprecation():
  28. @asynchronous
  29. def get(self):
  30. if self.get_argument('openid.mode', None):
  31. with warnings.catch_warnings():
  32. warnings.simplefilter('ignore', DeprecationWarning)
  33. self.get_authenticated_user(
  34. self.on_user, http_client=self.settings['http_client'])
  35. return
  36. res = self.authenticate_redirect()
  37. assert isinstance(res, Future)
  38. assert res.done()
  39. def on_user(self, user):
  40. if user is None:
  41. raise Exception("user is None")
  42. self.finish(user)
  43. class OpenIdClientLoginHandler(RequestHandler, OpenIdMixin):
  44. def initialize(self, test):
  45. self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
  46. @gen.coroutine
  47. def get(self):
  48. if self.get_argument('openid.mode', None):
  49. user = yield self.get_authenticated_user(http_client=self.settings['http_client'])
  50. if user is None:
  51. raise Exception("user is None")
  52. self.finish(user)
  53. return
  54. res = self.authenticate_redirect()
  55. assert isinstance(res, Future)
  56. assert res.done()
  57. class OpenIdServerAuthenticateHandler(RequestHandler):
  58. def post(self):
  59. if self.get_argument('openid.mode') != 'check_authentication':
  60. raise Exception("incorrect openid.mode %r")
  61. self.write('is_valid:true')
  62. class OAuth1ClientLoginHandlerLegacy(RequestHandler, OAuthMixin):
  63. def initialize(self, test, version):
  64. self._OAUTH_VERSION = version
  65. self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
  66. self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
  67. self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/oauth1/server/access_token')
  68. def _oauth_consumer_token(self):
  69. return dict(key='asdf', secret='qwer')
  70. with ignore_deprecation():
  71. @asynchronous
  72. def get(self):
  73. if self.get_argument('oauth_token', None):
  74. with warnings.catch_warnings():
  75. warnings.simplefilter('ignore', DeprecationWarning)
  76. self.get_authenticated_user(
  77. self.on_user, http_client=self.settings['http_client'])
  78. return
  79. res = self.authorize_redirect(http_client=self.settings['http_client'])
  80. assert isinstance(res, Future)
  81. def on_user(self, user):
  82. if user is None:
  83. raise Exception("user is None")
  84. self.finish(user)
  85. def _oauth_get_user(self, access_token, callback):
  86. if self.get_argument('fail_in_get_user', None):
  87. raise Exception("failing in get_user")
  88. if access_token != dict(key='uiop', secret='5678'):
  89. raise Exception("incorrect access token %r" % access_token)
  90. callback(dict(email='foo@example.com'))
  91. class OAuth1ClientLoginHandler(RequestHandler, OAuthMixin):
  92. def initialize(self, test, version):
  93. self._OAUTH_VERSION = version
  94. self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
  95. self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
  96. self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/oauth1/server/access_token')
  97. def _oauth_consumer_token(self):
  98. return dict(key='asdf', secret='qwer')
  99. @gen.coroutine
  100. def get(self):
  101. if self.get_argument('oauth_token', None):
  102. user = yield self.get_authenticated_user(http_client=self.settings['http_client'])
  103. if user is None:
  104. raise Exception("user is None")
  105. self.finish(user)
  106. return
  107. yield self.authorize_redirect(http_client=self.settings['http_client'])
  108. @gen.coroutine
  109. def _oauth_get_user_future(self, access_token):
  110. if self.get_argument('fail_in_get_user', None):
  111. raise Exception("failing in get_user")
  112. if access_token != dict(key='uiop', secret='5678'):
  113. raise Exception("incorrect access token %r" % access_token)
  114. return dict(email='foo@example.com')
  115. class OAuth1ClientLoginCoroutineHandler(OAuth1ClientLoginHandler):
  116. """Replaces OAuth1ClientLoginCoroutineHandler's get() with a coroutine."""
  117. @gen.coroutine
  118. def get(self):
  119. if self.get_argument('oauth_token', None):
  120. # Ensure that any exceptions are set on the returned Future,
  121. # not simply thrown into the surrounding StackContext.
  122. try:
  123. yield self.get_authenticated_user()
  124. except Exception as e:
  125. self.set_status(503)
  126. self.write("got exception: %s" % e)
  127. else:
  128. yield self.authorize_redirect()
  129. class OAuth1ClientRequestParametersHandler(RequestHandler, OAuthMixin):
  130. def initialize(self, version):
  131. self._OAUTH_VERSION = version
  132. def _oauth_consumer_token(self):
  133. return dict(key='asdf', secret='qwer')
  134. def get(self):
  135. params = self._oauth_request_parameters(
  136. 'http://www.example.com/api/asdf',
  137. dict(key='uiop', secret='5678'),
  138. parameters=dict(foo='bar'))
  139. self.write(params)
  140. class OAuth1ServerRequestTokenHandler(RequestHandler):
  141. def get(self):
  142. self.write('oauth_token=zxcv&oauth_token_secret=1234')
  143. class OAuth1ServerAccessTokenHandler(RequestHandler):
  144. def get(self):
  145. self.write('oauth_token=uiop&oauth_token_secret=5678')
  146. class OAuth2ClientLoginHandler(RequestHandler, OAuth2Mixin):
  147. def initialize(self, test):
  148. self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth2/server/authorize')
  149. def get(self):
  150. res = self.authorize_redirect()
  151. assert isinstance(res, Future)
  152. assert res.done()
  153. class FacebookClientLoginHandler(RequestHandler, FacebookGraphMixin):
  154. def initialize(self, test):
  155. self._OAUTH_AUTHORIZE_URL = test.get_url('/facebook/server/authorize')
  156. self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/facebook/server/access_token')
  157. self._FACEBOOK_BASE_URL = test.get_url('/facebook/server')
  158. @gen.coroutine
  159. def get(self):
  160. if self.get_argument("code", None):
  161. user = yield self.get_authenticated_user(
  162. redirect_uri=self.request.full_url(),
  163. client_id=self.settings["facebook_api_key"],
  164. client_secret=self.settings["facebook_secret"],
  165. code=self.get_argument("code"))
  166. self.write(user)
  167. else:
  168. yield self.authorize_redirect(
  169. redirect_uri=self.request.full_url(),
  170. client_id=self.settings["facebook_api_key"],
  171. extra_params={"scope": "read_stream,offline_access"})
  172. class FacebookServerAccessTokenHandler(RequestHandler):
  173. def get(self):
  174. self.write(dict(access_token="asdf", expires_in=3600))
  175. class FacebookServerMeHandler(RequestHandler):
  176. def get(self):
  177. self.write('{}')
  178. class TwitterClientHandler(RequestHandler, TwitterMixin):
  179. def initialize(self, test):
  180. self._OAUTH_REQUEST_TOKEN_URL = test.get_url('/oauth1/server/request_token')
  181. self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/twitter/server/access_token')
  182. self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth1/server/authorize')
  183. self._OAUTH_AUTHENTICATE_URL = test.get_url('/twitter/server/authenticate')
  184. self._TWITTER_BASE_URL = test.get_url('/twitter/api')
  185. def get_auth_http_client(self):
  186. return self.settings['http_client']
  187. class TwitterClientLoginHandlerLegacy(TwitterClientHandler):
  188. with ignore_deprecation():
  189. @asynchronous
  190. def get(self):
  191. if self.get_argument("oauth_token", None):
  192. self.get_authenticated_user(self.on_user)
  193. return
  194. self.authorize_redirect()
  195. def on_user(self, user):
  196. if user is None:
  197. raise Exception("user is None")
  198. self.finish(user)
  199. class TwitterClientLoginHandler(TwitterClientHandler):
  200. @gen.coroutine
  201. def get(self):
  202. if self.get_argument("oauth_token", None):
  203. user = yield self.get_authenticated_user()
  204. if user is None:
  205. raise Exception("user is None")
  206. self.finish(user)
  207. return
  208. yield self.authorize_redirect()
  209. class TwitterClientAuthenticateHandler(TwitterClientHandler):
  210. # Like TwitterClientLoginHandler, but uses authenticate_redirect
  211. # instead of authorize_redirect.
  212. @gen.coroutine
  213. def get(self):
  214. if self.get_argument("oauth_token", None):
  215. user = yield self.get_authenticated_user()
  216. if user is None:
  217. raise Exception("user is None")
  218. self.finish(user)
  219. return
  220. yield self.authenticate_redirect()
  221. class TwitterClientLoginGenEngineHandler(TwitterClientHandler):
  222. with ignore_deprecation():
  223. @asynchronous
  224. @gen.engine
  225. def get(self):
  226. if self.get_argument("oauth_token", None):
  227. user = yield self.get_authenticated_user()
  228. self.finish(user)
  229. else:
  230. # Old style: with @gen.engine we can ignore the Future from
  231. # authorize_redirect.
  232. self.authorize_redirect()
  233. class TwitterClientLoginGenCoroutineHandler(TwitterClientHandler):
  234. @gen.coroutine
  235. def get(self):
  236. if self.get_argument("oauth_token", None):
  237. user = yield self.get_authenticated_user()
  238. self.finish(user)
  239. else:
  240. # New style: with @gen.coroutine the result must be yielded
  241. # or else the request will be auto-finished too soon.
  242. yield self.authorize_redirect()
  243. class TwitterClientShowUserHandlerLegacy(TwitterClientHandler):
  244. with ignore_deprecation():
  245. @asynchronous
  246. @gen.engine
  247. def get(self):
  248. # TODO: would be nice to go through the login flow instead of
  249. # cheating with a hard-coded access token.
  250. with warnings.catch_warnings():
  251. warnings.simplefilter('ignore', DeprecationWarning)
  252. response = yield gen.Task(self.twitter_request,
  253. '/users/show/%s' % self.get_argument('name'),
  254. access_token=dict(key='hjkl', secret='vbnm'))
  255. if response is None:
  256. self.set_status(500)
  257. self.finish('error from twitter request')
  258. else:
  259. self.finish(response)
  260. class TwitterClientShowUserHandler(TwitterClientHandler):
  261. @gen.coroutine
  262. def get(self):
  263. # TODO: would be nice to go through the login flow instead of
  264. # cheating with a hard-coded access token.
  265. try:
  266. response = yield self.twitter_request(
  267. '/users/show/%s' % self.get_argument('name'),
  268. access_token=dict(key='hjkl', secret='vbnm'))
  269. except AuthError:
  270. self.set_status(500)
  271. self.finish('error from twitter request')
  272. else:
  273. self.finish(response)
  274. class TwitterServerAccessTokenHandler(RequestHandler):
  275. def get(self):
  276. self.write('oauth_token=hjkl&oauth_token_secret=vbnm&screen_name=foo')
  277. class TwitterServerShowUserHandler(RequestHandler):
  278. def get(self, screen_name):
  279. if screen_name == 'error':
  280. raise HTTPError(500)
  281. assert 'oauth_nonce' in self.request.arguments
  282. assert 'oauth_timestamp' in self.request.arguments
  283. assert 'oauth_signature' in self.request.arguments
  284. assert self.get_argument('oauth_consumer_key') == 'test_twitter_consumer_key'
  285. assert self.get_argument('oauth_signature_method') == 'HMAC-SHA1'
  286. assert self.get_argument('oauth_version') == '1.0'
  287. assert self.get_argument('oauth_token') == 'hjkl'
  288. self.write(dict(screen_name=screen_name, name=screen_name.capitalize()))
  289. class TwitterServerVerifyCredentialsHandler(RequestHandler):
  290. def get(self):
  291. assert 'oauth_nonce' in self.request.arguments
  292. assert 'oauth_timestamp' in self.request.arguments
  293. assert 'oauth_signature' in self.request.arguments
  294. assert self.get_argument('oauth_consumer_key') == 'test_twitter_consumer_key'
  295. assert self.get_argument('oauth_signature_method') == 'HMAC-SHA1'
  296. assert self.get_argument('oauth_version') == '1.0'
  297. assert self.get_argument('oauth_token') == 'hjkl'
  298. self.write(dict(screen_name='foo', name='Foo'))
  299. class AuthTest(AsyncHTTPTestCase):
  300. def get_app(self):
  301. return Application(
  302. [
  303. # test endpoints
  304. ('/legacy/openid/client/login', OpenIdClientLoginHandlerLegacy, dict(test=self)),
  305. ('/openid/client/login', OpenIdClientLoginHandler, dict(test=self)),
  306. ('/legacy/oauth10/client/login', OAuth1ClientLoginHandlerLegacy,
  307. dict(test=self, version='1.0')),
  308. ('/oauth10/client/login', OAuth1ClientLoginHandler,
  309. dict(test=self, version='1.0')),
  310. ('/oauth10/client/request_params',
  311. OAuth1ClientRequestParametersHandler,
  312. dict(version='1.0')),
  313. ('/legacy/oauth10a/client/login', OAuth1ClientLoginHandlerLegacy,
  314. dict(test=self, version='1.0a')),
  315. ('/oauth10a/client/login', OAuth1ClientLoginHandler,
  316. dict(test=self, version='1.0a')),
  317. ('/oauth10a/client/login_coroutine',
  318. OAuth1ClientLoginCoroutineHandler,
  319. dict(test=self, version='1.0a')),
  320. ('/oauth10a/client/request_params',
  321. OAuth1ClientRequestParametersHandler,
  322. dict(version='1.0a')),
  323. ('/oauth2/client/login', OAuth2ClientLoginHandler, dict(test=self)),
  324. ('/facebook/client/login', FacebookClientLoginHandler, dict(test=self)),
  325. ('/legacy/twitter/client/login', TwitterClientLoginHandlerLegacy, dict(test=self)),
  326. ('/twitter/client/login', TwitterClientLoginHandler, dict(test=self)),
  327. ('/twitter/client/authenticate', TwitterClientAuthenticateHandler, dict(test=self)),
  328. ('/twitter/client/login_gen_engine',
  329. TwitterClientLoginGenEngineHandler, dict(test=self)),
  330. ('/twitter/client/login_gen_coroutine',
  331. TwitterClientLoginGenCoroutineHandler, dict(test=self)),
  332. ('/legacy/twitter/client/show_user',
  333. TwitterClientShowUserHandlerLegacy, dict(test=self)),
  334. ('/twitter/client/show_user',
  335. TwitterClientShowUserHandler, dict(test=self)),
  336. # simulated servers
  337. ('/openid/server/authenticate', OpenIdServerAuthenticateHandler),
  338. ('/oauth1/server/request_token', OAuth1ServerRequestTokenHandler),
  339. ('/oauth1/server/access_token', OAuth1ServerAccessTokenHandler),
  340. ('/facebook/server/access_token', FacebookServerAccessTokenHandler),
  341. ('/facebook/server/me', FacebookServerMeHandler),
  342. ('/twitter/server/access_token', TwitterServerAccessTokenHandler),
  343. (r'/twitter/api/users/show/(.*)\.json', TwitterServerShowUserHandler),
  344. (r'/twitter/api/account/verify_credentials\.json',
  345. TwitterServerVerifyCredentialsHandler),
  346. ],
  347. http_client=self.http_client,
  348. twitter_consumer_key='test_twitter_consumer_key',
  349. twitter_consumer_secret='test_twitter_consumer_secret',
  350. facebook_api_key='test_facebook_api_key',
  351. facebook_secret='test_facebook_secret')
  352. def test_openid_redirect_legacy(self):
  353. response = self.fetch('/legacy/openid/client/login', follow_redirects=False)
  354. self.assertEqual(response.code, 302)
  355. self.assertTrue(
  356. '/openid/server/authenticate?' in response.headers['Location'])
  357. def test_openid_get_user_legacy(self):
  358. response = self.fetch('/legacy/openid/client/login?openid.mode=blah'
  359. '&openid.ns.ax=http://openid.net/srv/ax/1.0'
  360. '&openid.ax.type.email=http://axschema.org/contact/email'
  361. '&openid.ax.value.email=foo@example.com')
  362. response.rethrow()
  363. parsed = json_decode(response.body)
  364. self.assertEqual(parsed["email"], "foo@example.com")
  365. def test_openid_redirect(self):
  366. response = self.fetch('/openid/client/login', follow_redirects=False)
  367. self.assertEqual(response.code, 302)
  368. self.assertTrue(
  369. '/openid/server/authenticate?' in response.headers['Location'])
  370. def test_openid_get_user(self):
  371. response = self.fetch('/openid/client/login?openid.mode=blah'
  372. '&openid.ns.ax=http://openid.net/srv/ax/1.0'
  373. '&openid.ax.type.email=http://axschema.org/contact/email'
  374. '&openid.ax.value.email=foo@example.com')
  375. response.rethrow()
  376. parsed = json_decode(response.body)
  377. self.assertEqual(parsed["email"], "foo@example.com")
  378. def test_oauth10_redirect_legacy(self):
  379. response = self.fetch('/legacy/oauth10/client/login', follow_redirects=False)
  380. self.assertEqual(response.code, 302)
  381. self.assertTrue(response.headers['Location'].endswith(
  382. '/oauth1/server/authorize?oauth_token=zxcv'))
  383. # the cookie is base64('zxcv')|base64('1234')
  384. self.assertTrue(
  385. '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
  386. response.headers['Set-Cookie'])
  387. def test_oauth10_redirect(self):
  388. response = self.fetch('/oauth10/client/login', follow_redirects=False)
  389. self.assertEqual(response.code, 302)
  390. self.assertTrue(response.headers['Location'].endswith(
  391. '/oauth1/server/authorize?oauth_token=zxcv'))
  392. # the cookie is base64('zxcv')|base64('1234')
  393. self.assertTrue(
  394. '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
  395. response.headers['Set-Cookie'])
  396. def test_oauth10_get_user_legacy(self):
  397. with ignore_deprecation():
  398. response = self.fetch(
  399. '/legacy/oauth10/client/login?oauth_token=zxcv',
  400. headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
  401. response.rethrow()
  402. parsed = json_decode(response.body)
  403. self.assertEqual(parsed['email'], 'foo@example.com')
  404. self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
  405. def test_oauth10_get_user(self):
  406. response = self.fetch(
  407. '/oauth10/client/login?oauth_token=zxcv',
  408. headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
  409. response.rethrow()
  410. parsed = json_decode(response.body)
  411. self.assertEqual(parsed['email'], 'foo@example.com')
  412. self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
  413. def test_oauth10_request_parameters(self):
  414. response = self.fetch('/oauth10/client/request_params')
  415. response.rethrow()
  416. parsed = json_decode(response.body)
  417. self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
  418. self.assertEqual(parsed['oauth_token'], 'uiop')
  419. self.assertTrue('oauth_nonce' in parsed)
  420. self.assertTrue('oauth_signature' in parsed)
  421. def test_oauth10a_redirect_legacy(self):
  422. response = self.fetch('/legacy/oauth10a/client/login', follow_redirects=False)
  423. self.assertEqual(response.code, 302)
  424. self.assertTrue(response.headers['Location'].endswith(
  425. '/oauth1/server/authorize?oauth_token=zxcv'))
  426. # the cookie is base64('zxcv')|base64('1234')
  427. self.assertTrue(
  428. '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
  429. response.headers['Set-Cookie'])
  430. def test_oauth10a_get_user_legacy(self):
  431. with ignore_deprecation():
  432. response = self.fetch(
  433. '/legacy/oauth10a/client/login?oauth_token=zxcv',
  434. headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
  435. response.rethrow()
  436. parsed = json_decode(response.body)
  437. self.assertEqual(parsed['email'], 'foo@example.com')
  438. self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
  439. def test_oauth10a_redirect(self):
  440. response = self.fetch('/oauth10a/client/login', follow_redirects=False)
  441. self.assertEqual(response.code, 302)
  442. self.assertTrue(response.headers['Location'].endswith(
  443. '/oauth1/server/authorize?oauth_token=zxcv'))
  444. # the cookie is base64('zxcv')|base64('1234')
  445. self.assertTrue(
  446. '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
  447. response.headers['Set-Cookie'])
  448. @unittest.skipIf(mock is None, 'mock package not present')
  449. def test_oauth10a_redirect_error(self):
  450. with mock.patch.object(OAuth1ServerRequestTokenHandler, 'get') as get:
  451. get.side_effect = Exception("boom")
  452. with ExpectLog(app_log, "Uncaught exception"):
  453. response = self.fetch('/oauth10a/client/login', follow_redirects=False)
  454. self.assertEqual(response.code, 500)
  455. def test_oauth10a_get_user(self):
  456. response = self.fetch(
  457. '/oauth10a/client/login?oauth_token=zxcv',
  458. headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
  459. response.rethrow()
  460. parsed = json_decode(response.body)
  461. self.assertEqual(parsed['email'], 'foo@example.com')
  462. self.assertEqual(parsed['access_token'], dict(key='uiop', secret='5678'))
  463. def test_oauth10a_request_parameters(self):
  464. response = self.fetch('/oauth10a/client/request_params')
  465. response.rethrow()
  466. parsed = json_decode(response.body)
  467. self.assertEqual(parsed['oauth_consumer_key'], 'asdf')
  468. self.assertEqual(parsed['oauth_token'], 'uiop')
  469. self.assertTrue('oauth_nonce' in parsed)
  470. self.assertTrue('oauth_signature' in parsed)
  471. def test_oauth10a_get_user_coroutine_exception(self):
  472. response = self.fetch(
  473. '/oauth10a/client/login_coroutine?oauth_token=zxcv&fail_in_get_user=true',
  474. headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
  475. self.assertEqual(response.code, 503)
  476. def test_oauth2_redirect(self):
  477. response = self.fetch('/oauth2/client/login', follow_redirects=False)
  478. self.assertEqual(response.code, 302)
  479. self.assertTrue('/oauth2/server/authorize?' in response.headers['Location'])
  480. def test_facebook_login(self):
  481. response = self.fetch('/facebook/client/login', follow_redirects=False)
  482. self.assertEqual(response.code, 302)
  483. self.assertTrue('/facebook/server/authorize?' in response.headers['Location'])
  484. response = self.fetch('/facebook/client/login?code=1234', follow_redirects=False)
  485. self.assertEqual(response.code, 200)
  486. user = json_decode(response.body)
  487. self.assertEqual(user['access_token'], 'asdf')
  488. self.assertEqual(user['session_expires'], '3600')
  489. def base_twitter_redirect(self, url):
  490. # Same as test_oauth10a_redirect
  491. response = self.fetch(url, follow_redirects=False)
  492. self.assertEqual(response.code, 302)
  493. self.assertTrue(response.headers['Location'].endswith(
  494. '/oauth1/server/authorize?oauth_token=zxcv'))
  495. # the cookie is base64('zxcv')|base64('1234')
  496. self.assertTrue(
  497. '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
  498. response.headers['Set-Cookie'])
  499. def test_twitter_redirect_legacy(self):
  500. self.base_twitter_redirect('/legacy/twitter/client/login')
  501. def test_twitter_redirect(self):
  502. self.base_twitter_redirect('/twitter/client/login')
  503. def test_twitter_redirect_gen_engine(self):
  504. self.base_twitter_redirect('/twitter/client/login_gen_engine')
  505. def test_twitter_redirect_gen_coroutine(self):
  506. self.base_twitter_redirect('/twitter/client/login_gen_coroutine')
  507. def test_twitter_authenticate_redirect(self):
  508. response = self.fetch('/twitter/client/authenticate', follow_redirects=False)
  509. self.assertEqual(response.code, 302)
  510. self.assertTrue(response.headers['Location'].endswith(
  511. '/twitter/server/authenticate?oauth_token=zxcv'), response.headers['Location'])
  512. # the cookie is base64('zxcv')|base64('1234')
  513. self.assertTrue(
  514. '_oauth_request_token="enhjdg==|MTIzNA=="' in response.headers['Set-Cookie'],
  515. response.headers['Set-Cookie'])
  516. def test_twitter_get_user(self):
  517. response = self.fetch(
  518. '/twitter/client/login?oauth_token=zxcv',
  519. headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
  520. response.rethrow()
  521. parsed = json_decode(response.body)
  522. self.assertEqual(parsed,
  523. {u'access_token': {u'key': u'hjkl',
  524. u'screen_name': u'foo',
  525. u'secret': u'vbnm'},
  526. u'name': u'Foo',
  527. u'screen_name': u'foo',
  528. u'username': u'foo'})
  529. def test_twitter_show_user_legacy(self):
  530. response = self.fetch('/legacy/twitter/client/show_user?name=somebody')
  531. response.rethrow()
  532. self.assertEqual(json_decode(response.body),
  533. {'name': 'Somebody', 'screen_name': 'somebody'})
  534. def test_twitter_show_user_error_legacy(self):
  535. with ExpectLog(gen_log, 'Error response HTTP 500'):
  536. response = self.fetch('/legacy/twitter/client/show_user?name=error')
  537. self.assertEqual(response.code, 500)
  538. self.assertEqual(response.body, b'error from twitter request')
  539. def test_twitter_show_user(self):
  540. response = self.fetch('/twitter/client/show_user?name=somebody')
  541. response.rethrow()
  542. self.assertEqual(json_decode(response.body),
  543. {'name': 'Somebody', 'screen_name': 'somebody'})
  544. def test_twitter_show_user_error(self):
  545. response = self.fetch('/twitter/client/show_user?name=error')
  546. self.assertEqual(response.code, 500)
  547. self.assertEqual(response.body, b'error from twitter request')
  548. class GoogleLoginHandler(RequestHandler, GoogleOAuth2Mixin):
  549. def initialize(self, test):
  550. self.test = test
  551. self._OAUTH_REDIRECT_URI = test.get_url('/client/login')
  552. self._OAUTH_AUTHORIZE_URL = test.get_url('/google/oauth2/authorize')
  553. self._OAUTH_ACCESS_TOKEN_URL = test.get_url('/google/oauth2/token')
  554. @gen.coroutine
  555. def get(self):
  556. code = self.get_argument('code', None)
  557. if code is not None:
  558. # retrieve authenticate google user
  559. access = yield self.get_authenticated_user(self._OAUTH_REDIRECT_URI,
  560. code)
  561. user = yield self.oauth2_request(
  562. self.test.get_url("/google/oauth2/userinfo"),
  563. access_token=access["access_token"])
  564. # return the user and access token as json
  565. user["access_token"] = access["access_token"]
  566. self.write(user)
  567. else:
  568. yield self.authorize_redirect(
  569. redirect_uri=self._OAUTH_REDIRECT_URI,
  570. client_id=self.settings['google_oauth']['key'],
  571. client_secret=self.settings['google_oauth']['secret'],
  572. scope=['profile', 'email'],
  573. response_type='code',
  574. extra_params={'prompt': 'select_account'})
  575. class GoogleOAuth2AuthorizeHandler(RequestHandler):
  576. def get(self):
  577. # issue a fake auth code and redirect to redirect_uri
  578. code = 'fake-authorization-code'
  579. self.redirect(url_concat(self.get_argument('redirect_uri'),
  580. dict(code=code)))
  581. class GoogleOAuth2TokenHandler(RequestHandler):
  582. def post(self):
  583. assert self.get_argument('code') == 'fake-authorization-code'
  584. # issue a fake token
  585. self.finish({
  586. 'access_token': 'fake-access-token',
  587. 'expires_in': 'never-expires'
  588. })
  589. class GoogleOAuth2UserinfoHandler(RequestHandler):
  590. def get(self):
  591. assert self.get_argument('access_token') == 'fake-access-token'
  592. # return a fake user
  593. self.finish({
  594. 'name': 'Foo',
  595. 'email': 'foo@example.com'
  596. })
  597. class GoogleOAuth2Test(AsyncHTTPTestCase):
  598. def get_app(self):
  599. return Application(
  600. [
  601. # test endpoints
  602. ('/client/login', GoogleLoginHandler, dict(test=self)),
  603. # simulated google authorization server endpoints
  604. ('/google/oauth2/authorize', GoogleOAuth2AuthorizeHandler),
  605. ('/google/oauth2/token', GoogleOAuth2TokenHandler),
  606. ('/google/oauth2/userinfo', GoogleOAuth2UserinfoHandler),
  607. ],
  608. google_oauth={
  609. "key": 'fake_google_client_id',
  610. "secret": 'fake_google_client_secret'
  611. })
  612. def test_google_login(self):
  613. response = self.fetch('/client/login')
  614. self.assertDictEqual({
  615. u'name': u'Foo',
  616. u'email': u'foo@example.com',
  617. u'access_token': u'fake-access-token',
  618. }, json_decode(response.body))