delete_squashed_migrations.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import inspect
  4. import re
  5. import six
  6. from django.core.management.base import BaseCommand, CommandError
  7. from django.db import DEFAULT_DB_ALIAS, connections
  8. from django.db.migrations.loader import AmbiguityError, MigrationLoader
  9. REPLACES_REGEX = re.compile(r'\s+replaces\s*=\s*\[[^\]]+\]\s*')
  10. PYC = '.pyc'
  11. def py_from_pyc(pyc_fn):
  12. return pyc_fn[:-len(PYC)] + '.py'
  13. class Command(BaseCommand):
  14. help = "Deletes left over migrations that have been replaced by a "
  15. "squashed migration and converts squashed migration into a normal "
  16. "migration. Modifies your source tree! Use with care!"
  17. def add_arguments(self, parser):
  18. parser.add_argument(
  19. 'app_label',
  20. help='App label of the application to delete replaced migrations from.',
  21. )
  22. parser.add_argument(
  23. 'squashed_migration_name', default=None, nargs='?',
  24. help='The squashed migration to replace. '
  25. 'If not specified defaults to the first found.'
  26. )
  27. parser.add_argument(
  28. '--noinput', '--no-input', action='store_false', dest='interactive', default=True,
  29. help='Tells Django to NOT prompt the user for input of any kind.',
  30. )
  31. parser.add_argument(
  32. '--dry-run', action='store_true', default=False,
  33. help='Do not actually delete or change any files')
  34. def handle(self, **options):
  35. self.verbosity = options['verbosity']
  36. self.interactive = options['interactive']
  37. self.dry_run = options['dry_run']
  38. app_label = options['app_label']
  39. squashed_migration_name = options['squashed_migration_name']
  40. # Load the current graph state, check the app and migration they asked for exists
  41. loader = MigrationLoader(connections[DEFAULT_DB_ALIAS])
  42. if app_label not in loader.migrated_apps:
  43. raise CommandError(
  44. "App '%s' does not have migrations (so delete_squashed_migrations on "
  45. "it makes no sense)" % app_label
  46. )
  47. squashed_migration = None
  48. if squashed_migration_name:
  49. squashed_migration = self.find_migration(loader, app_label, squashed_migration_name)
  50. if not squashed_migration.replaces:
  51. raise CommandError(
  52. "The migration %s %s is not a squashed migration." %
  53. (squashed_migration.app_label, squashed_migration.name)
  54. )
  55. else:
  56. leaf_nodes = loader.graph.leaf_nodes(app=app_label)
  57. migration = loader.get_migration(*leaf_nodes[0])
  58. previous_migrations = [
  59. loader.get_migration(al, mn)
  60. for al, mn in loader.graph.forwards_plan((migration.app_label, migration.name))
  61. if al == migration.app_label
  62. ]
  63. migrations = previous_migrations + [migration]
  64. for migration in migrations:
  65. if migration.replaces:
  66. squashed_migration = migration
  67. break
  68. if not squashed_migration:
  69. raise CommandError(
  70. "Cannot find a squashed migration in app '%s'." %
  71. (app_label)
  72. )
  73. files_to_delete = []
  74. for al, mn in squashed_migration.replaces:
  75. try:
  76. migration = loader.disk_migrations[al, mn]
  77. except KeyError:
  78. if self.verbosity > 0:
  79. self.stderr.write("Couldn't find migration file for %s %s\n"
  80. % (al, mn))
  81. else:
  82. pyc_file = inspect.getfile(migration.__class__)
  83. files_to_delete.append(pyc_file)
  84. if pyc_file.endswith(PYC):
  85. py_file = py_from_pyc(pyc_file)
  86. files_to_delete.append(py_file)
  87. # Tell them what we're doing and optionally ask if we should proceed
  88. if self.verbosity > 0 or self.interactive:
  89. self.stdout.write(self.style.MIGRATE_HEADING("Will delete the following files:"))
  90. for fn in files_to_delete:
  91. self.stdout.write(" - %s" % fn)
  92. if not self.confirm():
  93. return
  94. for fn in files_to_delete:
  95. try:
  96. if not self.dry_run:
  97. os.remove(fn)
  98. except OSError:
  99. if self.verbosity > 0:
  100. self.stderr.write("Couldn't delete %s\n" % (fn,))
  101. # Try and delete replaces only if it's all on one line
  102. squashed_migration_fn = inspect.getfile(squashed_migration.__class__)
  103. if squashed_migration_fn.endswith(PYC):
  104. squashed_migration_fn = py_from_pyc(squashed_migration_fn)
  105. with open(squashed_migration_fn) as fp:
  106. squashed_migration_lines = list(fp)
  107. delete_lines = []
  108. for i, line in enumerate(squashed_migration_lines):
  109. if REPLACES_REGEX.match(line):
  110. delete_lines.append(i)
  111. if i > 0 and squashed_migration_lines[i - 1].strip() == '':
  112. delete_lines.insert(0, i - 1)
  113. break
  114. if not delete_lines:
  115. raise CommandError(
  116. ("Couldn't find 'replaces =' line in file %s. "
  117. "Please finish cleaning up manually.") % (squashed_migration_fn,)
  118. )
  119. if self.verbosity > 0 or self.interactive:
  120. self.stdout.write(self.style.MIGRATE_HEADING(
  121. "Will delete line %s%s from file %s" %
  122. (delete_lines[0],
  123. ' and ' + str(delete_lines[1]) if len(delete_lines) > 1 else "",
  124. squashed_migration_fn)))
  125. if not self.confirm():
  126. return
  127. for line_num in sorted(delete_lines, reverse=True):
  128. del squashed_migration_lines[line_num]
  129. with open(squashed_migration_fn, 'w') as fp:
  130. if not self.dry_run:
  131. fp.write("".join(squashed_migration_lines))
  132. def confirm(self):
  133. if self.interactive:
  134. answer = None
  135. while not answer or answer not in "yn":
  136. answer = six.moves.input("Do you wish to proceed? [yN] ")
  137. if not answer:
  138. answer = "n"
  139. break
  140. else:
  141. answer = answer[0].lower()
  142. return answer == "y"
  143. return True
  144. def find_migration(self, loader, app_label, name):
  145. try:
  146. return loader.get_migration_by_prefix(app_label, name)
  147. except AmbiguityError:
  148. raise CommandError(
  149. "More than one migration matches '%s' in app '%s'. Please be "
  150. "more specific." % (name, app_label)
  151. )
  152. except KeyError:
  153. raise CommandError(
  154. "Cannot find a migration matching '%s' from app '%s'." %
  155. (name, app_label)
  156. )