123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- from __future__ import absolute_import
- from __future__ import print_function
- import atexit
- import logging
- import signal
- import sys
- import os
- from pprint import pformat
- from tornado.options import define, options
- from tornado.log import enable_pretty_logging
- from tornado.options import parse_command_line, parse_config_file
- from tornado.auth import GoogleOAuth2Mixin
- from celery.bin.base import Command
- from . import settings
- from . import __version__
- from .app import Flower
- define("port", default=5555, help="run on the given port", type=int)
- define("address", default='', help="run on the given address", type=str)
- define("debug", default=False, help="run in debug mode", type=bool)
- define("inspect", default=True, help="inspect workers", type=bool)
- define("inspect_timeout", default=1000, type=float,
- help="inspect timeout (in milliseconds)")
- define("auth", default='', type=str,
- help="regexp of emails to grant access")
- define("basic_auth", type=str, default=None, multiple=True,
- help="enable http basic authentication")
- define("oauth2_key", type=str, default=None, help="Google oauth2 key (requires --auth)")
- define("oauth2_secret", type=str, default=None, help="Google oauth2 secret (requires --auth)")
- define("oauth2_redirect_uri", type=str, default=None, help="Google oauth2 redirect uri (requires --auth)")
- define("url_prefix", type=str, help="base url prefix")
- define("max_tasks", type=int, default=10000,
- help="maximum number of tasks to keep in memory (default 10000)")
- define("db", type=str, default='flower', help="flower database file")
- define("persistent", type=bool, default=False, help="enable persistent mode")
- define("broker_api", type=str, default=None,
- help="inspect broker e.g. http://guest:guest@localhost:15672/api/")
- define("certfile", type=str, default=None, help="path to SSL certificate file")
- define("keyfile", type=str, default=None, help="path to SSL key file")
- define("xheaders", type=bool, default=False,
- help="enable support for the 'X-Real-Ip' and 'X-Scheme' headers.")
- define("auto_refresh", default=True, help="refresh dashboards", type=bool)
- define("cookie_secret", type=str, default=None, help="secure cookie secret")
- logger = logging.getLogger(__name__)
- class FlowerCommand(Command):
- def run_from_argv(self, prog_name, argv=None, **_kwargs):
- app_settings = settings.APP_SETTINGS
- argv = list(filter(self.flower_option, argv))
- try:
- parse_config_file('flowerconfig.py', final=False)
- except IOError:
- pass
- parse_command_line([prog_name] + argv)
- app_settings['debug'] = options.debug
- if options.cookie_secret:
- app_settings['cookie_secret'] = options.cookie_secret
- if options.url_prefix:
- prefix = options.url_prefix.strip('/')
- app_settings['static_url_prefix'] = '/{0}/static/'.format(prefix)
- app_settings['login_url'] = '/{0}/login'.format(prefix)
- settings.URL_PREFIX = prefix
- settings.CELERY_INSPECT_TIMEOUT = options.inspect_timeout
- settings.AUTO_REFRESH = options.auto_refresh
- if options.debug and options.logging == 'info':
- options.logging = 'debug'
- enable_pretty_logging()
- if options.auth:
- app_settings[GoogleOAuth2Mixin._OAUTH_SETTINGS_KEY] = {
- 'key': options.oauth2_key or os.environ.get('GOOGLE_OAUTH2_KEY'),
- 'secret': options.oauth2_secret or os.environ.get('GOOGLE_OAUTH2_SECRET'),
- 'redirect_uri': options.oauth2_redirect_uri or os.environ.get('GOOGLE_OAUTH2_REDIRECT_URI'),
- }
- # Monkey-patch to support Celery 2.5.5
- self.app.connection = self.app.broker_connection
- self.app.loader.import_default_modules()
- flower = Flower(celery_app=self.app, options=options,
- **app_settings)
- atexit.register(flower.stop)
- # graceful shutdown on SIGTERM
- def signal_handler(signal, frame):
- logger.info('SIGTERM detected, shutting down')
- sys.exit(0)
- signal.signal(signal.SIGTERM, signal_handler)
- logger.info('Visit me at http%s://%s:%s',
- 's' if flower.ssl else '',
- options.address or 'localhost',
- options.port)
- logger.info('Broker: %s', self.app.connection().as_uri())
- logger.debug('Registered tasks: \n%s',
- pformat(sorted(self.app.tasks.keys())))
- logger.debug('Settings: %s', pformat(app_settings))
- try:
- flower.start()
- except (KeyboardInterrupt, SystemExit):
- pass
- def handle_argv(self, prog_name, argv=None):
- return self.run_from_argv(prog_name, argv)
- def early_version(self, argv):
- if '--version' in argv:
- print(__version__, file=self.stdout)
- super(FlowerCommand, self).early_version(argv)
- @staticmethod
- def flower_option(arg):
- name, _, value = arg.lstrip('-').partition("=")
- name = name.replace('-', '_')
- return hasattr(options, name)
|