123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944 |
- """Base Tornado handlers for the notebook server."""
- # Copyright (c) Jupyter Development Team.
- # Distributed under the terms of the Modified BSD License.
- import datetime
- import functools
- import ipaddress
- import json
- import mimetypes
- import os
- import re
- import sys
- import traceback
- import types
- import warnings
- try:
- # py3
- from http.client import responses
- from http.cookies import Morsel
- except ImportError:
- from httplib import responses
- from Cookie import Morsel
- try:
- from urllib.parse import urlparse # Py 3
- except ImportError:
- from urlparse import urlparse # Py 2
- from jinja2 import TemplateNotFound
- from tornado import web, gen, escape, httputil
- from tornado.log import app_log
- import prometheus_client
- from notebook._sysinfo import get_sys_info
- from traitlets.config import Application
- from ipython_genutils.path import filefind
- from ipython_genutils.py3compat import string_types, PY3
- import notebook
- from notebook._tz import utcnow
- from notebook.i18n import combine_translations
- from notebook.utils import is_hidden, url_path_join, url_is_absolute, url_escape
- from notebook.services.security import csp_report_uri
- #-----------------------------------------------------------------------------
- # Top-level handlers
- #-----------------------------------------------------------------------------
- non_alphanum = re.compile(r'[^A-Za-z0-9]')
- _sys_info_cache = None
- def json_sys_info():
- global _sys_info_cache
- if _sys_info_cache is None:
- _sys_info_cache = json.dumps(get_sys_info())
- return _sys_info_cache
- def log():
- if Application.initialized():
- return Application.instance().log
- else:
- return app_log
- class AuthenticatedHandler(web.RequestHandler):
- """A RequestHandler with an authenticated user."""
- @property
- def content_security_policy(self):
- """The default Content-Security-Policy header
-
- Can be overridden by defining Content-Security-Policy in settings['headers']
- """
- if 'Content-Security-Policy' in self.settings.get('headers', {}):
- # user-specified, don't override
- return self.settings['headers']['Content-Security-Policy']
- return '; '.join([
- "frame-ancestors 'self'",
- # Make sure the report-uri is relative to the base_url
- "report-uri " + self.settings.get('csp_report_uri', url_path_join(self.base_url, csp_report_uri)),
- ])
- def set_default_headers(self):
- headers = {}
- headers["X-Content-Type-Options"] = "nosniff"
- headers.update(self.settings.get('headers', {}))
- headers["Content-Security-Policy"] = self.content_security_policy
- # Allow for overriding headers
- for header_name, value in headers.items():
- try:
- self.set_header(header_name, value)
- except Exception as e:
- # tornado raise Exception (not a subclass)
- # if method is unsupported (websocket and Access-Control-Allow-Origin
- # for example, so just ignore)
- self.log.debug(e)
- def force_clear_cookie(self, name, path="/", domain=None):
- """Deletes the cookie with the given name.
- Tornado's cookie handling currently (Jan 2018) stores cookies in a dict
- keyed by name, so it can only modify one cookie with a given name per
- response. The browser can store multiple cookies with the same name
- but different domains and/or paths. This method lets us clear multiple
- cookies with the same name.
- Due to limitations of the cookie protocol, you must pass the same
- path and domain to clear a cookie as were used when that cookie
- was set (but there is no way to find out on the server side
- which values were used for a given cookie).
- """
- name = escape.native_str(name)
- expires = datetime.datetime.utcnow() - datetime.timedelta(days=365)
- morsel = Morsel()
- morsel.set(name, '', '""')
- morsel['expires'] = httputil.format_timestamp(expires)
- morsel['path'] = path
- if domain:
- morsel['domain'] = domain
- self.add_header("Set-Cookie", morsel.OutputString())
- def clear_login_cookie(self):
- cookie_options = self.settings.get('cookie_options', {})
- path = cookie_options.setdefault('path', self.base_url)
- self.clear_cookie(self.cookie_name, path=path)
- if path and path != '/':
- # also clear cookie on / to ensure old cookies are cleared
- # after the change in path behavior (changed in notebook 5.2.2).
- # N.B. This bypasses the normal cookie handling, which can't update
- # two cookies with the same name. See the method above.
- self.force_clear_cookie(self.cookie_name)
- def get_current_user(self):
- if self.login_handler is None:
- return 'anonymous'
- return self.login_handler.get_user(self)
- def skip_check_origin(self):
- """Ask my login_handler if I should skip the origin_check
-
- For example: in the default LoginHandler, if a request is token-authenticated,
- origin checking should be skipped.
- """
- if self.request.method == 'OPTIONS':
- # no origin-check on options requests, which are used to check origins!
- return True
- if self.login_handler is None or not hasattr(self.login_handler, 'should_check_origin'):
- return False
- return not self.login_handler.should_check_origin(self)
- @property
- def token_authenticated(self):
- """Have I been authenticated with a token?"""
- if self.login_handler is None or not hasattr(self.login_handler, 'is_token_authenticated'):
- return False
- return self.login_handler.is_token_authenticated(self)
- @property
- def cookie_name(self):
- default_cookie_name = non_alphanum.sub('-', 'username-{}'.format(
- self.request.host
- ))
- return self.settings.get('cookie_name', default_cookie_name)
-
- @property
- def logged_in(self):
- """Is a user currently logged in?"""
- user = self.get_current_user()
- return (user and not user == 'anonymous')
- @property
- def login_handler(self):
- """Return the login handler for this application, if any."""
- return self.settings.get('login_handler_class', None)
- @property
- def token(self):
- """Return the login token for this application, if any."""
- return self.settings.get('token', None)
- @property
- def login_available(self):
- """May a user proceed to log in?
- This returns True if login capability is available, irrespective of
- whether the user is already logged in or not.
- """
- if self.login_handler is None:
- return False
- return bool(self.login_handler.get_login_available(self.settings))
- class IPythonHandler(AuthenticatedHandler):
- """IPython-specific extensions to authenticated handling
-
- Mostly property shortcuts to IPython-specific settings.
- """
- @property
- def ignore_minified_js(self):
- """Wether to user bundle in template. (*.min files)
-
- Mainly use for development and avoid file recompilation
- """
- return self.settings.get('ignore_minified_js', False)
- @property
- def config(self):
- return self.settings.get('config', None)
-
- @property
- def log(self):
- """use the IPython log by default, falling back on tornado's logger"""
- return log()
- @property
- def jinja_template_vars(self):
- """User-supplied values to supply to jinja templates."""
- return self.settings.get('jinja_template_vars', {})
-
- #---------------------------------------------------------------
- # URLs
- #---------------------------------------------------------------
-
- @property
- def version_hash(self):
- """The version hash to use for cache hints for static files"""
- return self.settings.get('version_hash', '')
-
- @property
- def mathjax_url(self):
- url = self.settings.get('mathjax_url', '')
- if not url or url_is_absolute(url):
- return url
- return url_path_join(self.base_url, url)
-
- @property
- def mathjax_config(self):
- return self.settings.get('mathjax_config', 'TeX-AMS-MML_HTMLorMML-full,Safe')
- @property
- def base_url(self):
- return self.settings.get('base_url', '/')
- @property
- def default_url(self):
- return self.settings.get('default_url', '')
- @property
- def ws_url(self):
- return self.settings.get('websocket_url', '')
- @property
- def contents_js_source(self):
- self.log.debug("Using contents: %s", self.settings.get('contents_js_source',
- 'services/contents'))
- return self.settings.get('contents_js_source', 'services/contents')
-
- #---------------------------------------------------------------
- # Manager objects
- #---------------------------------------------------------------
-
- @property
- def kernel_manager(self):
- return self.settings['kernel_manager']
- @property
- def contents_manager(self):
- return self.settings['contents_manager']
-
- @property
- def session_manager(self):
- return self.settings['session_manager']
-
- @property
- def terminal_manager(self):
- return self.settings['terminal_manager']
-
- @property
- def kernel_spec_manager(self):
- return self.settings['kernel_spec_manager']
- @property
- def config_manager(self):
- return self.settings['config_manager']
- #---------------------------------------------------------------
- # CORS
- #---------------------------------------------------------------
-
- @property
- def allow_origin(self):
- """Normal Access-Control-Allow-Origin"""
- return self.settings.get('allow_origin', '')
-
- @property
- def allow_origin_pat(self):
- """Regular expression version of allow_origin"""
- return self.settings.get('allow_origin_pat', None)
-
- @property
- def allow_credentials(self):
- """Whether to set Access-Control-Allow-Credentials"""
- return self.settings.get('allow_credentials', False)
-
- def set_default_headers(self):
- """Add CORS headers, if defined"""
- super(IPythonHandler, self).set_default_headers()
- if self.allow_origin:
- self.set_header("Access-Control-Allow-Origin", self.allow_origin)
- elif self.allow_origin_pat:
- origin = self.get_origin()
- if origin and self.allow_origin_pat.match(origin):
- self.set_header("Access-Control-Allow-Origin", origin)
- elif (
- self.token_authenticated
- and "Access-Control-Allow-Origin" not in
- self.settings.get('headers', {})
- ):
- # allow token-authenticated requests cross-origin by default.
- # only apply this exception if allow-origin has not been specified.
- self.set_header('Access-Control-Allow-Origin',
- self.request.headers.get('Origin', ''))
- if self.allow_credentials:
- self.set_header("Access-Control-Allow-Credentials", 'true')
-
- def set_attachment_header(self, filename):
- """Set Content-Disposition: attachment header
- As a method to ensure handling of filename encoding
- """
- escaped_filename = url_escape(filename)
- self.set_header('Content-Disposition',
- 'attachment;'
- " filename*=utf-8''{utf8}"
- .format(
- utf8=escaped_filename,
- )
- )
- def get_origin(self):
- # Handle WebSocket Origin naming convention differences
- # The difference between version 8 and 13 is that in 8 the
- # client sends a "Sec-Websocket-Origin" header and in 13 it's
- # simply "Origin".
- if "Origin" in self.request.headers:
- origin = self.request.headers.get("Origin")
- else:
- origin = self.request.headers.get("Sec-Websocket-Origin", None)
- return origin
- # origin_to_satisfy_tornado is present because tornado requires
- # check_origin to take an origin argument, but we don't use it
- def check_origin(self, origin_to_satisfy_tornado=""):
- """Check Origin for cross-site API requests, including websockets
- Copied from WebSocket with changes:
- - allow unspecified host/origin (e.g. scripts)
- - allow token-authenticated requests
- """
- if self.allow_origin == '*' or self.skip_check_origin():
- return True
- host = self.request.headers.get("Host")
- origin = self.request.headers.get("Origin")
- # If no header is provided, let the request through.
- # Origin can be None for:
- # - same-origin (IE, Firefox)
- # - Cross-site POST form (IE, Firefox)
- # - Scripts
- # The cross-site POST (XSRF) case is handled by tornado's xsrf_token
- if origin is None or host is None:
- return True
- origin = origin.lower()
- origin_host = urlparse(origin).netloc
- # OK if origin matches host
- if origin_host == host:
- return True
- # Check CORS headers
- if self.allow_origin:
- allow = self.allow_origin == origin
- elif self.allow_origin_pat:
- allow = bool(self.allow_origin_pat.match(origin))
- else:
- # No CORS headers deny the request
- allow = False
- if not allow:
- self.log.warning("Blocking Cross Origin API request for %s. Origin: %s, Host: %s",
- self.request.path, origin, host,
- )
- return allow
- def check_referer(self):
- """Check Referer for cross-site requests.
- Disables requests to certain endpoints with
- external or missing Referer.
- If set, allow_origin settings are applied to the Referer
- to whitelist specific cross-origin sites.
- Used on GET for api endpoints and /files/
- to block cross-site inclusion (XSSI).
- """
- host = self.request.headers.get("Host")
- referer = self.request.headers.get("Referer")
- if not host:
- self.log.warning("Blocking request with no host")
- return False
- if not referer:
- self.log.warning("Blocking request with no referer")
- return False
- referer_url = urlparse(referer)
- referer_host = referer_url.netloc
- if referer_host == host:
- return True
- # apply cross-origin checks to Referer:
- origin = "{}://{}".format(referer_url.scheme, referer_url.netloc)
- if self.allow_origin:
- allow = self.allow_origin == origin
- elif self.allow_origin_pat:
- allow = bool(self.allow_origin_pat.match(origin))
- else:
- # No CORS settings, deny the request
- allow = False
- if not allow:
- self.log.warning("Blocking Cross Origin request for %s. Referer: %s, Host: %s",
- self.request.path, origin, host,
- )
- return allow
- def check_xsrf_cookie(self):
- """Bypass xsrf cookie checks when token-authenticated"""
- if self.token_authenticated or self.settings.get('disable_check_xsrf', False):
- # Token-authenticated requests do not need additional XSRF-check
- # Servers without authentication are vulnerable to XSRF
- return
- try:
- return super(IPythonHandler, self).check_xsrf_cookie()
- except web.HTTPError as e:
- if self.request.method in {'GET', 'HEAD'}:
- # Consider Referer a sufficient cross-origin check for GET requests
- if not self.check_referer():
- referer = self.request.headers.get('Referer')
- if referer:
- msg = "Blocking Cross Origin request from {}.".format(referer)
- else:
- msg = "Blocking request from unknown origin"
- raise web.HTTPError(403, msg)
- else:
- raise
- def check_host(self):
- """Check the host header if remote access disallowed.
- Returns True if the request should continue, False otherwise.
- """
- if self.settings.get('allow_remote_access', False):
- return True
- # Remove port (e.g. ':8888') from host
- host = re.match(r'^(.*?)(:\d+)?$', self.request.host).group(1)
- # Browsers format IPv6 addresses like [::1]; we need to remove the []
- if host.startswith('[') and host.endswith(']'):
- host = host[1:-1]
- if not PY3:
- # ip_address only accepts unicode on Python 2
- host = host.decode('utf8', 'replace')
- try:
- addr = ipaddress.ip_address(host)
- except ValueError:
- # Not an IP address: check against hostnames
- allow = host in self.settings.get('local_hostnames', ['localhost'])
- else:
- allow = addr.is_loopback
- if not allow:
- self.log.warning(
- ("Blocking request with non-local 'Host' %s (%s). "
- "If the notebook should be accessible at that name, "
- "set NotebookApp.allow_remote_access to disable the check."),
- host, self.request.host
- )
- return allow
- def prepare(self):
- if not self.check_host():
- raise web.HTTPError(403)
- return super(IPythonHandler, self).prepare()
- #---------------------------------------------------------------
- # template rendering
- #---------------------------------------------------------------
-
- def get_template(self, name):
- """Return the jinja template object for a given name"""
- return self.settings['jinja2_env'].get_template(name)
-
- def render_template(self, name, **ns):
- ns.update(self.template_namespace)
- template = self.get_template(name)
- return template.render(**ns)
-
- @property
- def template_namespace(self):
- return dict(
- base_url=self.base_url,
- default_url=self.default_url,
- ws_url=self.ws_url,
- logged_in=self.logged_in,
- allow_password_change=self.settings.get('allow_password_change'),
- login_available=self.login_available,
- token_available=bool(self.token),
- static_url=self.static_url,
- sys_info=json_sys_info(),
- contents_js_source=self.contents_js_source,
- version_hash=self.version_hash,
- ignore_minified_js=self.ignore_minified_js,
- xsrf_form_html=self.xsrf_form_html,
- token=self.token,
- xsrf_token=self.xsrf_token.decode('utf8'),
- nbjs_translations=json.dumps(combine_translations(
- self.request.headers.get('Accept-Language', ''))),
- **self.jinja_template_vars
- )
-
- def get_json_body(self):
- """Return the body of the request as JSON data."""
- if not self.request.body:
- return None
- # Do we need to call body.decode('utf-8') here?
- body = self.request.body.strip().decode(u'utf-8')
- try:
- model = json.loads(body)
- except Exception:
- self.log.debug("Bad JSON: %r", body)
- self.log.error("Couldn't parse JSON", exc_info=True)
- raise web.HTTPError(400, u'Invalid JSON in body of request')
- return model
- def write_error(self, status_code, **kwargs):
- """render custom error pages"""
- exc_info = kwargs.get('exc_info')
- message = ''
- status_message = responses.get(status_code, 'Unknown HTTP Error')
- exception = '(unknown)'
- if exc_info:
- exception = exc_info[1]
- # get the custom message, if defined
- try:
- message = exception.log_message % exception.args
- except Exception:
- pass
-
- # construct the custom reason, if defined
- reason = getattr(exception, 'reason', '')
- if reason:
- status_message = reason
-
- # build template namespace
- ns = dict(
- status_code=status_code,
- status_message=status_message,
- message=message,
- exception=exception,
- )
- self.set_header('Content-Type', 'text/html')
- # render the template
- try:
- html = self.render_template('%s.html' % status_code, **ns)
- except TemplateNotFound:
- html = self.render_template('error.html', **ns)
- self.write(html)
- class APIHandler(IPythonHandler):
- """Base class for API handlers"""
- def prepare(self):
- if not self.check_origin():
- raise web.HTTPError(404)
- return super(APIHandler, self).prepare()
- def write_error(self, status_code, **kwargs):
- """APIHandler errors are JSON, not human pages"""
- self.set_header('Content-Type', 'application/json')
- message = responses.get(status_code, 'Unknown HTTP Error')
- reply = {
- 'message': message,
- }
- exc_info = kwargs.get('exc_info')
- if exc_info:
- e = exc_info[1]
- if isinstance(e, HTTPError):
- reply['message'] = e.log_message or message
- reply['reason'] = e.reason
- else:
- reply['message'] = 'Unhandled error'
- reply['reason'] = None
- reply['traceback'] = ''.join(traceback.format_exception(*exc_info))
- self.log.warning(reply['message'])
- self.finish(json.dumps(reply))
- def get_current_user(self):
- """Raise 403 on API handlers instead of redirecting to human login page"""
- # preserve _user_cache so we don't raise more than once
- if hasattr(self, '_user_cache'):
- return self._user_cache
- self._user_cache = user = super(APIHandler, self).get_current_user()
- return user
- def get_login_url(self):
- # if get_login_url is invoked in an API handler,
- # that means @web.authenticated is trying to trigger a redirect.
- # instead of redirecting, raise 403 instead.
- if not self.current_user:
- raise web.HTTPError(403)
- return super(APIHandler, self).get_login_url()
- @property
- def content_security_policy(self):
- csp = '; '.join([
- super(APIHandler, self).content_security_policy,
- "default-src 'none'",
- ])
- return csp
- # set _track_activity = False on API handlers that shouldn't track activity
- _track_activity = True
- def update_api_activity(self):
- """Update last_activity of API requests"""
- # record activity of authenticated requests
- if self._track_activity and getattr(self, '_user_cache', None):
- self.settings['api_last_activity'] = utcnow()
- def finish(self, *args, **kwargs):
- self.update_api_activity()
- self.set_header('Content-Type', 'application/json')
- return super(APIHandler, self).finish(*args, **kwargs)
- def options(self, *args, **kwargs):
- if 'Access-Control-Allow-Headers' in self.settings.get('headers', {}):
- self.set_header('Access-Control-Allow-Headers', self.settings['headers']['Access-Control-Allow-Headers'])
- else:
- self.set_header('Access-Control-Allow-Headers',
- 'accept, content-type, authorization, x-xsrftoken')
- self.set_header('Access-Control-Allow-Methods',
- 'GET, PUT, POST, PATCH, DELETE, OPTIONS')
- # if authorization header is requested,
- # that means the request is token-authenticated.
- # avoid browser-side rejection of the preflight request.
- # only allow this exception if allow_origin has not been specified
- # and notebook authentication is enabled.
- # If the token is not valid, the 'real' request will still be rejected.
- requested_headers = self.request.headers.get('Access-Control-Request-Headers', '').split(',')
- if requested_headers and any(
- h.strip().lower() == 'authorization'
- for h in requested_headers
- ) and (
- # FIXME: it would be even better to check specifically for token-auth,
- # but there is currently no API for this.
- self.login_available
- ) and (
- self.allow_origin
- or self.allow_origin_pat
- or 'Access-Control-Allow-Origin' in self.settings.get('headers', {})
- ):
- self.set_header('Access-Control-Allow-Origin',
- self.request.headers.get('Origin', ''))
- class Template404(IPythonHandler):
- """Render our 404 template"""
- def prepare(self):
- raise web.HTTPError(404)
- class AuthenticatedFileHandler(IPythonHandler, web.StaticFileHandler):
- """static files should only be accessible when logged in"""
- @property
- def content_security_policy(self):
- # In case we're serving HTML/SVG, confine any Javascript to a unique
- # origin so it can't interact with the notebook server.
- return super(AuthenticatedFileHandler, self).content_security_policy + \
- "; sandbox allow-scripts"
- @web.authenticated
- def head(self, path):
- self.check_xsrf_cookie()
- return super(AuthenticatedFileHandler, self).head(path)
- @web.authenticated
- def get(self, path):
- self.check_xsrf_cookie()
- if os.path.splitext(path)[1] == '.ipynb' or self.get_argument("download", False):
- name = path.rsplit('/', 1)[-1]
- self.set_attachment_header(name)
- return web.StaticFileHandler.get(self, path)
- def get_content_type(self):
- path = self.absolute_path.strip('/')
- if '/' in path:
- _, name = path.rsplit('/', 1)
- else:
- name = path
- if name.endswith('.ipynb'):
- return 'application/x-ipynb+json'
- else:
- cur_mime = mimetypes.guess_type(name)[0]
- if cur_mime == 'text/plain':
- return 'text/plain; charset=UTF-8'
- else:
- return super(AuthenticatedFileHandler, self).get_content_type()
- def set_headers(self):
- super(AuthenticatedFileHandler, self).set_headers()
- # disable browser caching, rely on 304 replies for savings
- if "v" not in self.request.arguments:
- self.add_header("Cache-Control", "no-cache")
-
- def compute_etag(self):
- return None
-
- def validate_absolute_path(self, root, absolute_path):
- """Validate and return the absolute path.
-
- Requires tornado 3.1
-
- Adding to tornado's own handling, forbids the serving of hidden files.
- """
- abs_path = super(AuthenticatedFileHandler, self).validate_absolute_path(root, absolute_path)
- abs_root = os.path.abspath(root)
- if is_hidden(abs_path, abs_root) and not self.contents_manager.allow_hidden:
- self.log.info("Refusing to serve hidden file, via 404 Error, use flag 'ContentsManager.allow_hidden' to enable")
- raise web.HTTPError(404)
- return abs_path
- def json_errors(method):
- """Decorate methods with this to return GitHub style JSON errors.
-
- This should be used on any JSON API on any handler method that can raise HTTPErrors.
-
- This will grab the latest HTTPError exception using sys.exc_info
- and then:
-
- 1. Set the HTTP status code based on the HTTPError
- 2. Create and return a JSON body with a message field describing
- the error in a human readable form.
- """
- warnings.warn('@json_errors is deprecated in notebook 5.2.0. Subclass APIHandler instead.',
- DeprecationWarning,
- stacklevel=2,
- )
- @functools.wraps(method)
- def wrapper(self, *args, **kwargs):
- self.write_error = types.MethodType(APIHandler.write_error, self)
- return method(self, *args, **kwargs)
- return wrapper
- #-----------------------------------------------------------------------------
- # File handler
- #-----------------------------------------------------------------------------
- # to minimize subclass changes:
- HTTPError = web.HTTPError
- class FileFindHandler(IPythonHandler, web.StaticFileHandler):
- """subclass of StaticFileHandler for serving files from a search path"""
-
- # cache search results, don't search for files more than once
- _static_paths = {}
-
- def set_headers(self):
- super(FileFindHandler, self).set_headers()
- # disable browser caching, rely on 304 replies for savings
- if "v" not in self.request.arguments or \
- any(self.request.path.startswith(path) for path in self.no_cache_paths):
- self.set_header("Cache-Control", "no-cache")
-
- def initialize(self, path, default_filename=None, no_cache_paths=None):
- self.no_cache_paths = no_cache_paths or []
-
- if isinstance(path, string_types):
- path = [path]
-
- self.root = tuple(
- os.path.abspath(os.path.expanduser(p)) + os.sep for p in path
- )
- self.default_filename = default_filename
-
- def compute_etag(self):
- return None
-
- @classmethod
- def get_absolute_path(cls, roots, path):
- """locate a file to serve on our static file search path"""
- with cls._lock:
- if path in cls._static_paths:
- return cls._static_paths[path]
- try:
- abspath = os.path.abspath(filefind(path, roots))
- except IOError:
- # IOError means not found
- return ''
-
- cls._static_paths[path] = abspath
-
- log().debug("Path %s served from %s"%(path, abspath))
- return abspath
-
- def validate_absolute_path(self, root, absolute_path):
- """check if the file should be served (raises 404, 403, etc.)"""
- if absolute_path == '':
- raise web.HTTPError(404)
-
- for root in self.root:
- if (absolute_path + os.sep).startswith(root):
- break
-
- return super(FileFindHandler, self).validate_absolute_path(root, absolute_path)
- class APIVersionHandler(APIHandler):
- def get(self):
- # not authenticated, so give as few info as possible
- self.finish(json.dumps({"version":notebook.__version__}))
- class TrailingSlashHandler(web.RequestHandler):
- """Simple redirect handler that strips trailing slashes
-
- This should be the first, highest priority handler.
- """
-
- def get(self):
- self.redirect(self.request.uri.rstrip('/'))
-
- post = put = get
- class FilesRedirectHandler(IPythonHandler):
- """Handler for redirecting relative URLs to the /files/ handler"""
-
- @staticmethod
- def redirect_to_files(self, path):
- """make redirect logic a reusable static method
-
- so it can be called from other handlers.
- """
- cm = self.contents_manager
- if cm.dir_exists(path):
- # it's a *directory*, redirect to /tree
- url = url_path_join(self.base_url, 'tree', url_escape(path))
- else:
- orig_path = path
- # otherwise, redirect to /files
- parts = path.split('/')
- if not cm.file_exists(path=path) and 'files' in parts:
- # redirect without files/ iff it would 404
- # this preserves pre-2.0-style 'files/' links
- self.log.warning("Deprecated files/ URL: %s", orig_path)
- parts.remove('files')
- path = '/'.join(parts)
- if not cm.file_exists(path=path):
- raise web.HTTPError(404)
- url = url_path_join(self.base_url, 'files', url_escape(path))
- self.log.debug("Redirecting %s to %s", self.request.path, url)
- self.redirect(url)
-
- def get(self, path=''):
- return self.redirect_to_files(self, path)
- class RedirectWithParams(web.RequestHandler):
- """Sam as web.RedirectHandler, but preserves URL parameters"""
- def initialize(self, url, permanent=True):
- self._url = url
- self._permanent = permanent
- def get(self):
- sep = '&' if '?' in self._url else '?'
- url = sep.join([self._url, self.request.query])
- self.redirect(url, permanent=self._permanent)
- class PrometheusMetricsHandler(IPythonHandler):
- """
- Return prometheus metrics for this notebook server
- """
- @web.authenticated
- def get(self):
- self.set_header('Content-Type', prometheus_client.CONTENT_TYPE_LATEST)
- self.write(prometheus_client.generate_latest(prometheus_client.REGISTRY))
- #-----------------------------------------------------------------------------
- # URL pattern fragments for re-use
- #-----------------------------------------------------------------------------
- # path matches any number of `/foo[/bar...]` or just `/` or ''
- path_regex = r"(?P<path>(?:(?:/[^/]+)+|/?))"
- #-----------------------------------------------------------------------------
- # URL to handler mappings
- #-----------------------------------------------------------------------------
- default_handlers = [
- (r".*/", TrailingSlashHandler),
- (r"api", APIVersionHandler),
- (r'/(robots\.txt|favicon\.ico)', web.StaticFileHandler),
- (r'/metrics', PrometheusMetricsHandler)
- ]
|