filecheckpoints.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. """
  2. File-based Checkpoints implementations.
  3. """
  4. import os
  5. import shutil
  6. from tornado.web import HTTPError
  7. from .checkpoints import (
  8. Checkpoints,
  9. GenericCheckpointsMixin,
  10. )
  11. from .fileio import FileManagerMixin
  12. from jupyter_core.utils import ensure_dir_exists
  13. from ipython_genutils.py3compat import getcwd
  14. from traitlets import Unicode
  15. from notebook import _tz as tz
  16. class FileCheckpoints(FileManagerMixin, Checkpoints):
  17. """
  18. A Checkpoints that caches checkpoints for files in adjacent
  19. directories.
  20. Only works with FileContentsManager. Use GenericFileCheckpoints if
  21. you want file-based checkpoints with another ContentsManager.
  22. """
  23. checkpoint_dir = Unicode(
  24. '.ipynb_checkpoints',
  25. config=True,
  26. help="""The directory name in which to keep file checkpoints
  27. This is a path relative to the file's own directory.
  28. By default, it is .ipynb_checkpoints
  29. """,
  30. )
  31. root_dir = Unicode(config=True)
  32. def _root_dir_default(self):
  33. try:
  34. return self.parent.root_dir
  35. except AttributeError:
  36. return getcwd()
  37. # ContentsManager-dependent checkpoint API
  38. def create_checkpoint(self, contents_mgr, path):
  39. """Create a checkpoint."""
  40. checkpoint_id = u'checkpoint'
  41. src_path = contents_mgr._get_os_path(path)
  42. dest_path = self.checkpoint_path(checkpoint_id, path)
  43. self._copy(src_path, dest_path)
  44. return self.checkpoint_model(checkpoint_id, dest_path)
  45. def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
  46. """Restore a checkpoint."""
  47. src_path = self.checkpoint_path(checkpoint_id, path)
  48. dest_path = contents_mgr._get_os_path(path)
  49. self._copy(src_path, dest_path)
  50. # ContentsManager-independent checkpoint API
  51. def rename_checkpoint(self, checkpoint_id, old_path, new_path):
  52. """Rename a checkpoint from old_path to new_path."""
  53. old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
  54. new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
  55. if os.path.isfile(old_cp_path):
  56. self.log.debug(
  57. "Renaming checkpoint %s -> %s",
  58. old_cp_path,
  59. new_cp_path,
  60. )
  61. with self.perm_to_403():
  62. shutil.move(old_cp_path, new_cp_path)
  63. def delete_checkpoint(self, checkpoint_id, path):
  64. """delete a file's checkpoint"""
  65. path = path.strip('/')
  66. cp_path = self.checkpoint_path(checkpoint_id, path)
  67. if not os.path.isfile(cp_path):
  68. self.no_such_checkpoint(path, checkpoint_id)
  69. self.log.debug("unlinking %s", cp_path)
  70. with self.perm_to_403():
  71. os.unlink(cp_path)
  72. def list_checkpoints(self, path):
  73. """list the checkpoints for a given file
  74. This contents manager currently only supports one checkpoint per file.
  75. """
  76. path = path.strip('/')
  77. checkpoint_id = "checkpoint"
  78. os_path = self.checkpoint_path(checkpoint_id, path)
  79. if not os.path.isfile(os_path):
  80. return []
  81. else:
  82. return [self.checkpoint_model(checkpoint_id, os_path)]
  83. # Checkpoint-related utilities
  84. def checkpoint_path(self, checkpoint_id, path):
  85. """find the path to a checkpoint"""
  86. path = path.strip('/')
  87. parent, name = ('/' + path).rsplit('/', 1)
  88. parent = parent.strip('/')
  89. basename, ext = os.path.splitext(name)
  90. filename = u"{name}-{checkpoint_id}{ext}".format(
  91. name=basename,
  92. checkpoint_id=checkpoint_id,
  93. ext=ext,
  94. )
  95. os_path = self._get_os_path(path=parent)
  96. cp_dir = os.path.join(os_path, self.checkpoint_dir)
  97. with self.perm_to_403():
  98. ensure_dir_exists(cp_dir)
  99. cp_path = os.path.join(cp_dir, filename)
  100. return cp_path
  101. def checkpoint_model(self, checkpoint_id, os_path):
  102. """construct the info dict for a given checkpoint"""
  103. stats = os.stat(os_path)
  104. last_modified = tz.utcfromtimestamp(stats.st_mtime)
  105. info = dict(
  106. id=checkpoint_id,
  107. last_modified=last_modified,
  108. )
  109. return info
  110. # Error Handling
  111. def no_such_checkpoint(self, path, checkpoint_id):
  112. raise HTTPError(
  113. 404,
  114. u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
  115. )
  116. class GenericFileCheckpoints(GenericCheckpointsMixin, FileCheckpoints):
  117. """
  118. Local filesystem Checkpoints that works with any conforming
  119. ContentsManager.
  120. """
  121. def create_file_checkpoint(self, content, format, path):
  122. """Create a checkpoint from the current content of a file."""
  123. path = path.strip('/')
  124. # only the one checkpoint ID:
  125. checkpoint_id = u"checkpoint"
  126. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  127. self.log.debug("creating checkpoint for %s", path)
  128. with self.perm_to_403():
  129. self._save_file(os_checkpoint_path, content, format=format)
  130. # return the checkpoint info
  131. return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
  132. def create_notebook_checkpoint(self, nb, path):
  133. """Create a checkpoint from the current content of a notebook."""
  134. path = path.strip('/')
  135. # only the one checkpoint ID:
  136. checkpoint_id = u"checkpoint"
  137. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  138. self.log.debug("creating checkpoint for %s", path)
  139. with self.perm_to_403():
  140. self._save_notebook(os_checkpoint_path, nb)
  141. # return the checkpoint info
  142. return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
  143. def get_notebook_checkpoint(self, checkpoint_id, path):
  144. """Get a checkpoint for a notebook."""
  145. path = path.strip('/')
  146. self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
  147. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  148. if not os.path.isfile(os_checkpoint_path):
  149. self.no_such_checkpoint(path, checkpoint_id)
  150. return {
  151. 'type': 'notebook',
  152. 'content': self._read_notebook(
  153. os_checkpoint_path,
  154. as_version=4,
  155. ),
  156. }
  157. def get_file_checkpoint(self, checkpoint_id, path):
  158. """Get a checkpoint for a file."""
  159. path = path.strip('/')
  160. self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
  161. os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
  162. if not os.path.isfile(os_checkpoint_path):
  163. self.no_such_checkpoint(path, checkpoint_id)
  164. content, format = self._read_file(os_checkpoint_path, format=None)
  165. return {
  166. 'type': 'file',
  167. 'content': content,
  168. 'format': format,
  169. }