runscript.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. # -*- coding: utf-8 -*-
  2. import importlib
  3. import inspect
  4. import os
  5. import traceback
  6. from argparse import ArgumentTypeError
  7. from django.apps import apps
  8. from django.core.management.base import CommandError
  9. from django_extensions.management.email_notifications import EmailNotificationCommand
  10. from django_extensions.management.utils import signalcommand
  11. class DirPolicyChoices:
  12. NONE = 'none'
  13. EACH = 'each'
  14. ROOT = 'root'
  15. def check_is_directory(value):
  16. if value is None or not os.path.isdir(value):
  17. raise ArgumentTypeError("%s is not a directory!" % value)
  18. return value
  19. class BadCustomDirectoryException(Exception):
  20. def __init__(self, value):
  21. self.message = value + ' If --dir-policy is custom than you must set correct directory in ' \
  22. '--dir option or in settings.RUNSCRIPT_CHDIR'
  23. def __str__(self):
  24. return self.message
  25. class Command(EmailNotificationCommand):
  26. help = 'Runs a script in django context.'
  27. def __init__(self, *args, **kwargs):
  28. super().__init__(*args, **kwargs)
  29. self.current_directory = os.getcwd()
  30. def add_arguments(self, parser):
  31. super().add_arguments(parser)
  32. parser.add_argument('script', nargs='+')
  33. parser.add_argument(
  34. '--fixtures', action='store_true', dest='infixtures', default=False,
  35. help='Also look in app.fixtures subdir',
  36. )
  37. parser.add_argument(
  38. '--noscripts', action='store_true', dest='noscripts', default=False,
  39. help='Do not look in app.scripts subdir',
  40. )
  41. parser.add_argument(
  42. '-s', '--silent', action='store_true', dest='silent', default=False,
  43. help='Run silently, do not show errors and tracebacks',
  44. )
  45. parser.add_argument(
  46. '--no-traceback', action='store_true', dest='no_traceback', default=False,
  47. help='Do not show tracebacks',
  48. )
  49. parser.add_argument(
  50. '--script-args', nargs='*', type=str,
  51. help='Space-separated argument list to be passed to the scripts. Note that the '
  52. 'same arguments will be passed to all named scripts.',
  53. )
  54. parser.add_argument(
  55. '--dir-policy', type=str,
  56. choices=[DirPolicyChoices.NONE, DirPolicyChoices.EACH, DirPolicyChoices.ROOT],
  57. help='Policy of selecting scripts execution directory: '
  58. 'none - start all scripts in current directory '
  59. 'each - start all scripts in their directories '
  60. 'root - start all scripts in BASE_DIR directory ',
  61. )
  62. parser.add_argument(
  63. '--chdir', type=check_is_directory,
  64. help='If dir-policy option is set to custom, than this option determines script execution directory.',
  65. )
  66. @signalcommand
  67. def handle(self, *args, **options):
  68. from django.conf import settings
  69. NOTICE = self.style.SQL_TABLE
  70. NOTICE2 = self.style.SQL_FIELD
  71. ERROR = self.style.ERROR
  72. ERROR2 = self.style.NOTICE
  73. subdirs = []
  74. scripts = options['script']
  75. if not options['noscripts']:
  76. subdirs.append(getattr(settings, 'RUNSCRIPT_SCRIPT_DIR', 'scripts'))
  77. if options['infixtures']:
  78. subdirs.append('fixtures')
  79. verbosity = options["verbosity"]
  80. show_traceback = options['traceback']
  81. no_traceback = options['no_traceback']
  82. if no_traceback:
  83. show_traceback = False
  84. else:
  85. show_traceback = True
  86. silent = options['silent']
  87. if silent:
  88. verbosity = 0
  89. email_notifications = options['email_notifications']
  90. if len(subdirs) < 1:
  91. print(NOTICE("No subdirs to run left."))
  92. return
  93. if len(scripts) < 1:
  94. print(ERROR("Script name required."))
  95. return
  96. def get_directory_from_chdir():
  97. directory = options['chdir'] or getattr(settings, 'RUNSCRIPT_CHDIR', None)
  98. try:
  99. check_is_directory(directory)
  100. except ArgumentTypeError as e:
  101. raise BadCustomDirectoryException(str(e))
  102. return directory
  103. def get_directory_basing_on_policy(script_module):
  104. policy = options['dir_policy'] or getattr(settings, 'RUNSCRIPT_CHDIR_POLICY', DirPolicyChoices.NONE)
  105. if policy == DirPolicyChoices.ROOT:
  106. return settings.BASE_DIR
  107. elif policy == DirPolicyChoices.EACH:
  108. return os.path.dirname(inspect.getfile(script_module))
  109. else:
  110. return self.current_directory
  111. def set_directory(script_module):
  112. if options['chdir']:
  113. directory = get_directory_from_chdir()
  114. elif options['dir_policy']:
  115. directory = get_directory_basing_on_policy(script_module)
  116. elif getattr(settings, 'RUNSCRIPT_CHDIR', None):
  117. directory = get_directory_from_chdir()
  118. else:
  119. directory = get_directory_basing_on_policy(script_module)
  120. os.chdir(os.path.abspath(directory))
  121. def run_script(mod, *script_args):
  122. try:
  123. set_directory(mod)
  124. mod.run(*script_args)
  125. if email_notifications:
  126. self.send_email_notification(notification_id=mod.__name__)
  127. except Exception as e:
  128. if silent:
  129. return
  130. if verbosity > 0:
  131. print(ERROR("Exception while running run() in '%s'" % mod.__name__))
  132. if email_notifications:
  133. self.send_email_notification(
  134. notification_id=mod.__name__, include_traceback=True)
  135. if show_traceback:
  136. if not isinstance(e, CommandError):
  137. raise
  138. def my_import(parent_package, module_name):
  139. full_module_path = "%s.%s" % (parent_package, module_name)
  140. if verbosity > 1:
  141. print(NOTICE("Check for %s" % full_module_path))
  142. # Try importing the parent package first
  143. try:
  144. importlib.import_module(parent_package)
  145. except ImportError as e:
  146. if str(e).startswith('No module named'):
  147. # No need to proceed if the parent package doesn't exist
  148. return False
  149. try:
  150. t = importlib.import_module(full_module_path)
  151. except ImportError as e:
  152. # The parent package exists, but the module doesn't
  153. try:
  154. if importlib.util.find_spec(full_module_path) is None:
  155. return False
  156. except Exception:
  157. module_file = os.path.join(settings.BASE_DIR, *full_module_path.split('.')) + '.py'
  158. if not os.path.isfile(module_file):
  159. return False
  160. if silent:
  161. return False
  162. if show_traceback:
  163. traceback.print_exc()
  164. if verbosity > 0:
  165. print(ERROR("Cannot import module '%s': %s." % (full_module_path, e)))
  166. return False
  167. if hasattr(t, "run"):
  168. if verbosity > 1:
  169. print(NOTICE2("Found script '%s' ..." % full_module_path))
  170. return t
  171. else:
  172. if verbosity > 1:
  173. print(ERROR2("Found script '%s' but no run() function found." % full_module_path))
  174. def find_modules_for_script(script):
  175. """ Find script module which contains 'run' attribute """
  176. modules = []
  177. # first look in apps
  178. for app in apps.get_app_configs():
  179. for subdir in subdirs:
  180. mod = my_import("%s.%s" % (app.name, subdir), script)
  181. if mod:
  182. modules.append(mod)
  183. # try direct import
  184. if script.find(".") != -1:
  185. parent, mod_name = script.rsplit(".", 1)
  186. mod = my_import(parent, mod_name)
  187. if mod:
  188. modules.append(mod)
  189. else:
  190. # try app.DIR.script import
  191. for subdir in subdirs:
  192. mod = my_import(subdir, script)
  193. if mod:
  194. modules.append(mod)
  195. return modules
  196. if options['script_args']:
  197. script_args = options['script_args']
  198. else:
  199. script_args = []
  200. for script in scripts:
  201. modules = find_modules_for_script(script)
  202. if not modules:
  203. if verbosity > 0 and not silent:
  204. print(ERROR("No (valid) module for script '%s' found" % script))
  205. if verbosity < 2:
  206. print(ERROR("Try running with a higher verbosity level like: -v2 or -v3"))
  207. for mod in modules:
  208. if verbosity > 1:
  209. print(NOTICE2("Running script '%s' ..." % mod.__name__))
  210. run_script(mod, *script_args)