command.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. from __future__ import absolute_import
  2. from __future__ import print_function
  3. import atexit
  4. import logging
  5. import signal
  6. import sys
  7. import os
  8. from pprint import pformat
  9. from tornado.options import define, options
  10. from tornado.log import enable_pretty_logging
  11. from tornado.options import parse_command_line, parse_config_file
  12. from tornado.auth import GoogleOAuth2Mixin
  13. from celery.bin.base import Command
  14. from . import settings
  15. from . import __version__
  16. from .app import Flower
  17. define("port", default=5555, help="run on the given port", type=int)
  18. define("address", default='', help="run on the given address", type=str)
  19. define("debug", default=False, help="run in debug mode", type=bool)
  20. define("inspect", default=True, help="inspect workers", type=bool)
  21. define("inspect_timeout", default=1000, type=float,
  22. help="inspect timeout (in milliseconds)")
  23. define("auth", default='', type=str,
  24. help="regexp of emails to grant access")
  25. define("basic_auth", type=str, default=None, multiple=True,
  26. help="enable http basic authentication")
  27. define("oauth2_key", type=str, default=None, help="Google oauth2 key (requires --auth)")
  28. define("oauth2_secret", type=str, default=None, help="Google oauth2 secret (requires --auth)")
  29. define("oauth2_redirect_uri", type=str, default=None, help="Google oauth2 redirect uri (requires --auth)")
  30. define("url_prefix", type=str, help="base url prefix")
  31. define("max_tasks", type=int, default=10000,
  32. help="maximum number of tasks to keep in memory (default 10000)")
  33. define("db", type=str, default='flower', help="flower database file")
  34. define("persistent", type=bool, default=False, help="enable persistent mode")
  35. define("broker_api", type=str, default=None,
  36. help="inspect broker e.g. http://guest:guest@localhost:15672/api/")
  37. define("certfile", type=str, default=None, help="path to SSL certificate file")
  38. define("keyfile", type=str, default=None, help="path to SSL key file")
  39. define("xheaders", type=bool, default=False,
  40. help="enable support for the 'X-Real-Ip' and 'X-Scheme' headers.")
  41. define("auto_refresh", default=True, help="refresh dashboards", type=bool)
  42. define("cookie_secret", type=str, default=None, help="secure cookie secret")
  43. logger = logging.getLogger(__name__)
  44. class FlowerCommand(Command):
  45. def run_from_argv(self, prog_name, argv=None, **_kwargs):
  46. app_settings = settings.APP_SETTINGS
  47. argv = list(filter(self.flower_option, argv))
  48. try:
  49. parse_config_file('flowerconfig.py', final=False)
  50. except IOError:
  51. pass
  52. parse_command_line([prog_name] + argv)
  53. app_settings['debug'] = options.debug
  54. if options.cookie_secret:
  55. app_settings['cookie_secret'] = options.cookie_secret
  56. if options.url_prefix:
  57. prefix = options.url_prefix.strip('/')
  58. app_settings['static_url_prefix'] = '/{0}/static/'.format(prefix)
  59. app_settings['login_url'] = '/{0}/login'.format(prefix)
  60. settings.URL_PREFIX = prefix
  61. settings.CELERY_INSPECT_TIMEOUT = options.inspect_timeout
  62. settings.AUTO_REFRESH = options.auto_refresh
  63. if options.debug and options.logging == 'info':
  64. options.logging = 'debug'
  65. enable_pretty_logging()
  66. if options.auth:
  67. app_settings[GoogleOAuth2Mixin._OAUTH_SETTINGS_KEY] = {
  68. 'key': options.oauth2_key or os.environ.get('GOOGLE_OAUTH2_KEY'),
  69. 'secret': options.oauth2_secret or os.environ.get('GOOGLE_OAUTH2_SECRET'),
  70. 'redirect_uri': options.oauth2_redirect_uri or os.environ.get('GOOGLE_OAUTH2_REDIRECT_URI'),
  71. }
  72. # Monkey-patch to support Celery 2.5.5
  73. self.app.connection = self.app.broker_connection
  74. self.app.loader.import_default_modules()
  75. flower = Flower(celery_app=self.app, options=options,
  76. **app_settings)
  77. atexit.register(flower.stop)
  78. # graceful shutdown on SIGTERM
  79. def signal_handler(signal, frame):
  80. logger.info('SIGTERM detected, shutting down')
  81. sys.exit(0)
  82. signal.signal(signal.SIGTERM, signal_handler)
  83. logger.info('Visit me at http%s://%s:%s',
  84. 's' if flower.ssl else '',
  85. options.address or 'localhost',
  86. options.port)
  87. logger.info('Broker: %s', self.app.connection().as_uri())
  88. logger.debug('Registered tasks: \n%s',
  89. pformat(sorted(self.app.tasks.keys())))
  90. logger.debug('Settings: %s', pformat(app_settings))
  91. try:
  92. flower.start()
  93. except (KeyboardInterrupt, SystemExit):
  94. pass
  95. def handle_argv(self, prog_name, argv=None):
  96. return self.run_from_argv(prog_name, argv)
  97. def early_version(self, argv):
  98. if '--version' in argv:
  99. print(__version__, file=self.stdout)
  100. super(FlowerCommand, self).early_version(argv)
  101. @staticmethod
  102. def flower_option(arg):
  103. name, _, value = arg.lstrip('-').partition("=")
  104. name = name.replace('-', '_')
  105. return hasattr(options, name)