123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642 |
- # coding: utf-8
- """Tests for the notebook manager."""
- from __future__ import print_function
- import os
- import sys
- import time
- from contextlib import contextmanager
- from itertools import combinations
- from nose import SkipTest
- from tornado.web import HTTPError
- from unittest import TestCase
- from tempfile import NamedTemporaryFile
- from nbformat import v4 as nbformat
- from ipython_genutils.tempdir import TemporaryDirectory
- from traitlets import TraitError
- from ipython_genutils.testing import decorators as dec
- from ..filemanager import FileContentsManager
- def _make_dir(contents_manager, api_path):
- """
- Make a directory.
- """
- os_path = contents_manager._get_os_path(api_path)
- try:
- os.makedirs(os_path)
- except OSError:
- print("Directory already exists: %r" % os_path)
- class TestFileContentsManager(TestCase):
- @contextmanager
- def assertRaisesHTTPError(self, status, msg=None):
- msg = msg or "Should have raised HTTPError(%i)" % status
- try:
- yield
- except HTTPError as e:
- self.assertEqual(e.status_code, status)
- else:
- self.fail(msg)
- def symlink(self, contents_manager, src, dst):
- """Make a symlink to src from dst
- src and dst are api_paths
- """
- src_os_path = contents_manager._get_os_path(src)
- dst_os_path = contents_manager._get_os_path(dst)
- print(src_os_path, dst_os_path, os.path.isfile(src_os_path))
- os.symlink(src_os_path, dst_os_path)
- def test_root_dir(self):
- with TemporaryDirectory() as td:
- fm = FileContentsManager(root_dir=td)
- self.assertEqual(fm.root_dir, td)
- def test_missing_root_dir(self):
- with TemporaryDirectory() as td:
- root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
- self.assertRaises(TraitError, FileContentsManager, root_dir=root)
- def test_invalid_root_dir(self):
- with NamedTemporaryFile() as tf:
- self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
- def test_get_os_path(self):
- # full filesystem path should be returned with correct operating system
- # separators.
- with TemporaryDirectory() as td:
- root = td
- fm = FileContentsManager(root_dir=root)
- path = fm._get_os_path('/path/to/notebook/test.ipynb')
- rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
- fs_path = os.path.join(fm.root_dir, *rel_path_list)
- self.assertEqual(path, fs_path)
- fm = FileContentsManager(root_dir=root)
- path = fm._get_os_path('test.ipynb')
- fs_path = os.path.join(fm.root_dir, 'test.ipynb')
- self.assertEqual(path, fs_path)
- fm = FileContentsManager(root_dir=root)
- path = fm._get_os_path('////test.ipynb')
- fs_path = os.path.join(fm.root_dir, 'test.ipynb')
- self.assertEqual(path, fs_path)
- def test_checkpoint_subdir(self):
- subd = u'sub ∂ir'
- cp_name = 'test-cp.ipynb'
- with TemporaryDirectory() as td:
- root = td
- os.mkdir(os.path.join(td, subd))
- fm = FileContentsManager(root_dir=root)
- cpm = fm.checkpoints
- cp_dir = cpm.checkpoint_path(
- 'cp', 'test.ipynb'
- )
- cp_subdir = cpm.checkpoint_path(
- 'cp', '/%s/test.ipynb' % subd
- )
- self.assertNotEqual(cp_dir, cp_subdir)
- self.assertEqual(cp_dir, os.path.join(root, cpm.checkpoint_dir, cp_name))
- self.assertEqual(cp_subdir, os.path.join(root, subd, cpm.checkpoint_dir, cp_name))
- @dec.skipif(sys.platform == 'win32' and sys.version_info[0] < 3)
- def test_bad_symlink(self):
- with TemporaryDirectory() as td:
- cm = FileContentsManager(root_dir=td)
- path = 'test bad symlink'
- _make_dir(cm, path)
- file_model = cm.new_untitled(path=path, ext='.txt')
- # create a broken symlink
- self.symlink(cm, "target", '%s/%s' % (path, 'bad symlink'))
- model = cm.get(path)
- contents = {
- content['name']: content for content in model['content']
- }
- self.assertTrue('untitled.txt' in contents)
- self.assertEqual(contents['untitled.txt'], file_model)
- # broken symlinks should still be shown in the contents manager
- self.assertTrue('bad symlink' in contents)
- @dec.skipif(sys.platform == 'win32' and sys.version_info[0] < 3)
- def test_good_symlink(self):
- with TemporaryDirectory() as td:
- cm = FileContentsManager(root_dir=td)
- parent = 'test good symlink'
- name = 'good symlink'
- path = '{0}/{1}'.format(parent, name)
- _make_dir(cm, parent)
- file_model = cm.new(path=parent + '/zfoo.txt')
- # create a good symlink
- self.symlink(cm, file_model['path'], path)
- symlink_model = cm.get(path, content=False)
- dir_model = cm.get(parent)
- self.assertEqual(
- sorted(dir_model['content'], key=lambda x: x['name']),
- [symlink_model, file_model],
- )
- def test_403(self):
- if hasattr(os, 'getuid'):
- if os.getuid() == 0:
- raise SkipTest("Can't test permissions as root")
- if sys.platform.startswith('win'):
- raise SkipTest("Can't test permissions on Windows")
- with TemporaryDirectory() as td:
- cm = FileContentsManager(root_dir=td)
- model = cm.new_untitled(type='file')
- os_path = cm._get_os_path(model['path'])
- os.chmod(os_path, 0o400)
- try:
- with cm.open(os_path, 'w') as f:
- f.write(u"don't care")
- except HTTPError as e:
- self.assertEqual(e.status_code, 403)
- else:
- self.fail("Should have raised HTTPError(403)")
- def test_escape_root(self):
- with TemporaryDirectory() as td:
- cm = FileContentsManager(root_dir=td)
- # make foo, bar next to root
- with open(os.path.join(cm.root_dir, '..', 'foo'), 'w') as f:
- f.write('foo')
- with open(os.path.join(cm.root_dir, '..', 'bar'), 'w') as f:
- f.write('bar')
- with self.assertRaisesHTTPError(404):
- cm.get('..')
- with self.assertRaisesHTTPError(404):
- cm.get('foo/../../../bar')
- with self.assertRaisesHTTPError(404):
- cm.delete('../foo')
- with self.assertRaisesHTTPError(404):
- cm.rename('../foo', '../bar')
- with self.assertRaisesHTTPError(404):
- cm.save(model={
- 'type': 'file',
- 'content': u'',
- 'format': 'text',
- }, path='../foo')
- class TestContentsManager(TestCase):
- @contextmanager
- def assertRaisesHTTPError(self, status, msg=None):
- msg = msg or "Should have raised HTTPError(%i)" % status
- try:
- yield
- except HTTPError as e:
- self.assertEqual(e.status_code, status)
- else:
- self.fail(msg)
- def make_populated_dir(self, api_path):
- cm = self.contents_manager
- self.make_dir(api_path)
- cm.new(path="/".join([api_path, "nb.ipynb"]))
- cm.new(path="/".join([api_path, "file.txt"]))
- def check_populated_dir_files(self, api_path):
- dir_model = self.contents_manager.get(api_path)
- self.assertEqual(dir_model['path'], api_path)
- self.assertEqual(dir_model['type'], "directory")
- for entry in dir_model['content']:
- if entry['type'] == "directory":
- continue
- elif entry['type'] == "file":
- self.assertEqual(entry['name'], "file.txt")
- complete_path = "/".join([api_path, "file.txt"])
- self.assertEqual(entry["path"], complete_path)
- elif entry['type'] == "notebook":
- self.assertEqual(entry['name'], "nb.ipynb")
- complete_path = "/".join([api_path, "nb.ipynb"])
- self.assertEqual(entry["path"], complete_path)
- def setUp(self):
- self._temp_dir = TemporaryDirectory()
- self.td = self._temp_dir.name
- self.contents_manager = FileContentsManager(
- root_dir=self.td,
- )
- def tearDown(self):
- self._temp_dir.cleanup()
- def make_dir(self, api_path):
- """make a subdirectory at api_path
- override in subclasses if contents are not on the filesystem.
- """
- _make_dir(self.contents_manager, api_path)
- def add_code_cell(self, nb):
- output = nbformat.new_output("display_data", {'application/javascript': "alert('hi');"})
- cell = nbformat.new_code_cell("print('hi')", outputs=[output])
- nb.cells.append(cell)
- def new_notebook(self):
- cm = self.contents_manager
- model = cm.new_untitled(type='notebook')
- name = model['name']
- path = model['path']
- full_model = cm.get(path)
- nb = full_model['content']
- nb['metadata']['counter'] = int(1e6 * time.time())
- self.add_code_cell(nb)
- cm.save(full_model, path)
- return nb, name, path
- def test_new_untitled(self):
- cm = self.contents_manager
- # Test in root directory
- model = cm.new_untitled(type='notebook')
- assert isinstance(model, dict)
- self.assertIn('name', model)
- self.assertIn('path', model)
- self.assertIn('type', model)
- self.assertEqual(model['type'], 'notebook')
- self.assertEqual(model['name'], 'Untitled.ipynb')
- self.assertEqual(model['path'], 'Untitled.ipynb')
- # Test in sub-directory
- model = cm.new_untitled(type='directory')
- assert isinstance(model, dict)
- self.assertIn('name', model)
- self.assertIn('path', model)
- self.assertIn('type', model)
- self.assertEqual(model['type'], 'directory')
- self.assertEqual(model['name'], 'Untitled Folder')
- self.assertEqual(model['path'], 'Untitled Folder')
- sub_dir = model['path']
- model = cm.new_untitled(path=sub_dir)
- assert isinstance(model, dict)
- self.assertIn('name', model)
- self.assertIn('path', model)
- self.assertIn('type', model)
- self.assertEqual(model['type'], 'file')
- self.assertEqual(model['name'], 'untitled')
- self.assertEqual(model['path'], '%s/untitled' % sub_dir)
- # Test with a compound extension
- model = cm.new_untitled(path=sub_dir, ext='.foo.bar')
- self.assertEqual(model['name'], 'untitled.foo.bar')
- model = cm.new_untitled(path=sub_dir, ext='.foo.bar')
- self.assertEqual(model['name'], 'untitled1.foo.bar')
- def test_modified_date(self):
- cm = self.contents_manager
- # Create a new notebook.
- nb, name, path = self.new_notebook()
- model = cm.get(path)
- # Add a cell and save.
- self.add_code_cell(model['content'])
- cm.save(model, path)
- # Reload notebook and verify that last_modified incremented.
- saved = cm.get(path)
- self.assertGreaterEqual(saved['last_modified'], model['last_modified'])
- # Move the notebook and verify that last_modified stayed the same.
- # (The frontend fires a warning if last_modified increases on the
- # renamed file.)
- new_path = 'renamed.ipynb'
- cm.rename(path, new_path)
- renamed = cm.get(new_path)
- self.assertGreaterEqual(
- renamed['last_modified'],
- saved['last_modified'],
- )
- def test_get(self):
- cm = self.contents_manager
- # Create a notebook
- model = cm.new_untitled(type='notebook')
- name = model['name']
- path = model['path']
- # Check that we 'get' on the notebook we just created
- model2 = cm.get(path)
- assert isinstance(model2, dict)
- self.assertIn('name', model2)
- self.assertIn('path', model2)
- self.assertEqual(model['name'], name)
- self.assertEqual(model['path'], path)
- nb_as_file = cm.get(path, content=True, type='file')
- self.assertEqual(nb_as_file['path'], path)
- self.assertEqual(nb_as_file['type'], 'file')
- self.assertEqual(nb_as_file['format'], 'text')
- self.assertNotIsInstance(nb_as_file['content'], dict)
- nb_as_bin_file = cm.get(path, content=True, type='file', format='base64')
- self.assertEqual(nb_as_bin_file['format'], 'base64')
- # Test in sub-directory
- sub_dir = '/foo/'
- self.make_dir('foo')
- model = cm.new_untitled(path=sub_dir, ext='.ipynb')
- model2 = cm.get(sub_dir + name)
- assert isinstance(model2, dict)
- self.assertIn('name', model2)
- self.assertIn('path', model2)
- self.assertIn('content', model2)
- self.assertEqual(model2['name'], 'Untitled.ipynb')
- self.assertEqual(model2['path'], '{0}/{1}'.format(sub_dir.strip('/'), name))
- # Test with a regular file.
- file_model_path = cm.new_untitled(path=sub_dir, ext='.txt')['path']
- file_model = cm.get(file_model_path)
- self.assertDictContainsSubset(
- {
- 'content': u'',
- 'format': u'text',
- 'mimetype': u'text/plain',
- 'name': u'untitled.txt',
- 'path': u'foo/untitled.txt',
- 'type': u'file',
- 'writable': True,
- },
- file_model,
- )
- self.assertIn('created', file_model)
- self.assertIn('last_modified', file_model)
- # Test getting directory model
- # Create a sub-sub directory to test getting directory contents with a
- # subdir.
- self.make_dir('foo/bar')
- dirmodel = cm.get('foo')
- self.assertEqual(dirmodel['type'], 'directory')
- self.assertIsInstance(dirmodel['content'], list)
- self.assertEqual(len(dirmodel['content']), 3)
- self.assertEqual(dirmodel['path'], 'foo')
- self.assertEqual(dirmodel['name'], 'foo')
- # Directory contents should match the contents of each individual entry
- # when requested with content=False.
- model2_no_content = cm.get(sub_dir + name, content=False)
- file_model_no_content = cm.get(u'foo/untitled.txt', content=False)
- sub_sub_dir_no_content = cm.get('foo/bar', content=False)
- self.assertEqual(sub_sub_dir_no_content['path'], 'foo/bar')
- self.assertEqual(sub_sub_dir_no_content['name'], 'bar')
- for entry in dirmodel['content']:
- # Order isn't guaranteed by the spec, so this is a hacky way of
- # verifying that all entries are matched.
- if entry['path'] == sub_sub_dir_no_content['path']:
- self.assertEqual(entry, sub_sub_dir_no_content)
- elif entry['path'] == model2_no_content['path']:
- self.assertEqual(entry, model2_no_content)
- elif entry['path'] == file_model_no_content['path']:
- self.assertEqual(entry, file_model_no_content)
- else:
- self.fail("Unexpected directory entry: %s" % entry())
- with self.assertRaises(HTTPError):
- cm.get('foo', type='file')
- def test_update(self):
- cm = self.contents_manager
- # Create a notebook
- model = cm.new_untitled(type='notebook')
- name = model['name']
- path = model['path']
- # Change the name in the model for rename
- model['path'] = 'test.ipynb'
- model = cm.update(model, path)
- assert isinstance(model, dict)
- self.assertIn('name', model)
- self.assertIn('path', model)
- self.assertEqual(model['name'], 'test.ipynb')
- # Make sure the old name is gone
- self.assertRaises(HTTPError, cm.get, path)
- # Test in sub-directory
- # Create a directory and notebook in that directory
- sub_dir = '/foo/'
- self.make_dir('foo')
- model = cm.new_untitled(path=sub_dir, type='notebook')
- path = model['path']
- # Change the name in the model for rename
- d = path.rsplit('/', 1)[0]
- new_path = model['path'] = d + '/test_in_sub.ipynb'
- model = cm.update(model, path)
- assert isinstance(model, dict)
- self.assertIn('name', model)
- self.assertIn('path', model)
- self.assertEqual(model['name'], 'test_in_sub.ipynb')
- self.assertEqual(model['path'], new_path)
- # Make sure the old name is gone
- self.assertRaises(HTTPError, cm.get, path)
- def test_save(self):
- cm = self.contents_manager
- # Create a notebook
- model = cm.new_untitled(type='notebook')
- name = model['name']
- path = model['path']
- # Get the model with 'content'
- full_model = cm.get(path)
- # Save the notebook
- model = cm.save(full_model, path)
- assert isinstance(model, dict)
- self.assertIn('name', model)
- self.assertIn('path', model)
- self.assertEqual(model['name'], name)
- self.assertEqual(model['path'], path)
- # Test in sub-directory
- # Create a directory and notebook in that directory
- sub_dir = '/foo/'
- self.make_dir('foo')
- model = cm.new_untitled(path=sub_dir, type='notebook')
- name = model['name']
- path = model['path']
- model = cm.get(path)
- # Change the name in the model for rename
- model = cm.save(model, path)
- assert isinstance(model, dict)
- self.assertIn('name', model)
- self.assertIn('path', model)
- self.assertEqual(model['name'], 'Untitled.ipynb')
- self.assertEqual(model['path'], 'foo/Untitled.ipynb')
- def test_delete(self):
- cm = self.contents_manager
- # Create a notebook
- nb, name, path = self.new_notebook()
- # Delete the notebook
- cm.delete(path)
- # Check that deleting a non-existent path raises an error.
- self.assertRaises(HTTPError, cm.delete, path)
- # Check that a 'get' on the deleted notebook raises and error
- self.assertRaises(HTTPError, cm.get, path)
- def test_rename(self):
- cm = self.contents_manager
- # Create a new notebook
- nb, name, path = self.new_notebook()
- # Rename the notebook
- cm.rename(path, "changed_path")
- # Attempting to get the notebook under the old name raises an error
- self.assertRaises(HTTPError, cm.get, path)
- # Fetching the notebook under the new name is successful
- assert isinstance(cm.get("changed_path"), dict)
- # Ported tests on nested directory renaming from pgcontents
- all_dirs = ['foo', 'bar', 'foo/bar', 'foo/bar/foo', 'foo/bar/foo/bar']
- unchanged_dirs = all_dirs[:2]
- changed_dirs = all_dirs[2:]
- for _dir in all_dirs:
- self.make_populated_dir(_dir)
- self.check_populated_dir_files(_dir)
- # Renaming to an existing directory should fail
- for src, dest in combinations(all_dirs, 2):
- with self.assertRaisesHTTPError(409):
- cm.rename(src, dest)
- # Creating a notebook in a non_existant directory should fail
- with self.assertRaisesHTTPError(404):
- cm.new_untitled("foo/bar_diff", ext=".ipynb")
- cm.rename("foo/bar", "foo/bar_diff")
- # Assert that unchanged directories remain so
- for unchanged in unchanged_dirs:
- self.check_populated_dir_files(unchanged)
- # Assert changed directories can no longer be accessed under old names
- for changed_dirname in changed_dirs:
- with self.assertRaisesHTTPError(404):
- cm.get(changed_dirname)
- new_dirname = changed_dirname.replace("foo/bar", "foo/bar_diff", 1)
- self.check_populated_dir_files(new_dirname)
- # Created a notebook in the renamed directory should work
- cm.new_untitled("foo/bar_diff", ext=".ipynb")
- def test_delete_root(self):
- cm = self.contents_manager
- with self.assertRaises(HTTPError) as err:
- cm.delete('')
- self.assertEqual(err.exception.status_code, 400)
- def test_copy(self):
- cm = self.contents_manager
- parent = u'å b'
- name = u'nb √.ipynb'
- path = u'{0}/{1}'.format(parent, name)
- self.make_dir(parent)
- orig = cm.new(path=path)
- # copy with unspecified name
- copy = cm.copy(path)
- self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy1.ipynb'))
- # copy with specified name
- copy2 = cm.copy(path, u'å b/copy 2.ipynb')
- self.assertEqual(copy2['name'], u'copy 2.ipynb')
- self.assertEqual(copy2['path'], u'å b/copy 2.ipynb')
- # copy with specified path
- copy2 = cm.copy(path, u'/')
- self.assertEqual(copy2['name'], name)
- self.assertEqual(copy2['path'], name)
- def test_trust_notebook(self):
- cm = self.contents_manager
- nb, name, path = self.new_notebook()
- untrusted = cm.get(path)['content']
- assert not cm.notary.check_cells(untrusted)
- # print(untrusted)
- cm.trust_notebook(path)
- trusted = cm.get(path)['content']
- # print(trusted)
- assert cm.notary.check_cells(trusted)
- def test_mark_trusted_cells(self):
- cm = self.contents_manager
- nb, name, path = self.new_notebook()
- cm.mark_trusted_cells(nb, path)
- for cell in nb.cells:
- if cell.cell_type == 'code':
- assert not cell.metadata.trusted
- cm.trust_notebook(path)
- nb = cm.get(path)['content']
- for cell in nb.cells:
- if cell.cell_type == 'code':
- assert cell.metadata.trusted
- def test_check_and_sign(self):
- cm = self.contents_manager
- nb, name, path = self.new_notebook()
- cm.mark_trusted_cells(nb, path)
- cm.check_and_sign(nb, path)
- assert not cm.notary.check_signature(nb)
- cm.trust_notebook(path)
- nb = cm.get(path)['content']
- cm.mark_trusted_cells(nb, path)
- cm.check_and_sign(nb, path)
- assert cm.notary.check_signature(nb)
- class TestContentsManagerNoAtomic(TestContentsManager):
- """
- Make same test in no atomic case than in atomic case, using inheritance
- """
- def setUp(self):
- self._temp_dir = TemporaryDirectory()
- self.td = self._temp_dir.name
- self.contents_manager = FileContentsManager(
- root_dir = self.td,
- )
- self.contents_manager.use_atomic_writing = False
|