log.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.log
  4. ~~~~~~~~~~~~~~
  5. The Celery instances logging section: ``Celery.log``.
  6. Sets up logging for the worker and other programs,
  7. redirects stdouts, colors log output, patches logging
  8. related compatibility fixes, and so on.
  9. """
  10. from __future__ import absolute_import
  11. import logging
  12. import os
  13. import sys
  14. from logging.handlers import WatchedFileHandler
  15. from kombu.log import NullHandler
  16. from kombu.utils.encoding import set_default_encoding_file
  17. from celery import signals
  18. from celery._state import get_current_task
  19. from celery.five import class_property, string_t
  20. from celery.utils import isatty
  21. from celery.utils.log import (
  22. get_logger, mlevel,
  23. ColorFormatter, ensure_process_aware_logger,
  24. LoggingProxy, get_multiprocessing_logger,
  25. reset_multiprocessing_logger,
  26. )
  27. from celery.utils.term import colored
  28. __all__ = ['TaskFormatter', 'Logging']
  29. MP_LOG = os.environ.get('MP_LOG', False)
  30. class TaskFormatter(ColorFormatter):
  31. def format(self, record):
  32. task = get_current_task()
  33. if task and task.request:
  34. record.__dict__.update(task_id=task.request.id,
  35. task_name=task.name)
  36. else:
  37. record.__dict__.setdefault('task_name', '???')
  38. record.__dict__.setdefault('task_id', '???')
  39. return ColorFormatter.format(self, record)
  40. class Logging(object):
  41. #: The logging subsystem is only configured once per process.
  42. #: setup_logging_subsystem sets this flag, and subsequent calls
  43. #: will do nothing.
  44. _setup = False
  45. def __init__(self, app):
  46. self.app = app
  47. self.loglevel = mlevel(self.app.conf.CELERYD_LOG_LEVEL)
  48. self.format = self.app.conf.CELERYD_LOG_FORMAT
  49. self.task_format = self.app.conf.CELERYD_TASK_LOG_FORMAT
  50. self.colorize = self.app.conf.CELERYD_LOG_COLOR
  51. def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
  52. redirect_level='WARNING', colorize=None):
  53. handled = self.setup_logging_subsystem(
  54. loglevel, logfile, colorize=colorize,
  55. )
  56. if not handled:
  57. if redirect_stdouts:
  58. self.redirect_stdouts(redirect_level)
  59. os.environ.update(
  60. CELERY_LOG_LEVEL=str(loglevel) if loglevel else '',
  61. CELERY_LOG_FILE=str(logfile) if logfile else '',
  62. )
  63. return handled
  64. def redirect_stdouts(self, loglevel=None, name='celery.redirected'):
  65. self.redirect_stdouts_to_logger(
  66. get_logger(name), loglevel=loglevel
  67. )
  68. os.environ.update(
  69. CELERY_LOG_REDIRECT='1',
  70. CELERY_LOG_REDIRECT_LEVEL=str(loglevel or ''),
  71. )
  72. def setup_logging_subsystem(self, loglevel=None, logfile=None,
  73. format=None, colorize=None, **kwargs):
  74. if self.already_setup:
  75. return
  76. self.already_setup = True
  77. loglevel = mlevel(loglevel or self.loglevel)
  78. format = format or self.format
  79. colorize = self.supports_color(colorize, logfile)
  80. reset_multiprocessing_logger()
  81. ensure_process_aware_logger()
  82. receivers = signals.setup_logging.send(
  83. sender=None, loglevel=loglevel, logfile=logfile,
  84. format=format, colorize=colorize,
  85. )
  86. if not receivers:
  87. root = logging.getLogger()
  88. if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  89. root.handlers = []
  90. # Configure root logger
  91. self._configure_logger(
  92. root, logfile, loglevel, format, colorize, **kwargs
  93. )
  94. # Configure the multiprocessing logger
  95. self._configure_logger(
  96. get_multiprocessing_logger(),
  97. logfile, loglevel if MP_LOG else logging.ERROR,
  98. format, colorize, **kwargs
  99. )
  100. signals.after_setup_logger.send(
  101. sender=None, logger=root,
  102. loglevel=loglevel, logfile=logfile,
  103. format=format, colorize=colorize,
  104. )
  105. # then setup the root task logger.
  106. self.setup_task_loggers(loglevel, logfile, colorize=colorize)
  107. try:
  108. stream = logging.getLogger().handlers[0].stream
  109. except (AttributeError, IndexError):
  110. pass
  111. else:
  112. set_default_encoding_file(stream)
  113. # This is a hack for multiprocessing's fork+exec, so that
  114. # logging before Process.run works.
  115. logfile_name = logfile if isinstance(logfile, string_t) else ''
  116. os.environ.update(_MP_FORK_LOGLEVEL_=str(loglevel),
  117. _MP_FORK_LOGFILE_=logfile_name,
  118. _MP_FORK_LOGFORMAT_=format)
  119. return receivers
  120. def _configure_logger(self, logger, logfile, loglevel,
  121. format, colorize, **kwargs):
  122. if logger is not None:
  123. self.setup_handlers(logger, logfile, format,
  124. colorize, **kwargs)
  125. if loglevel:
  126. logger.setLevel(loglevel)
  127. def setup_task_loggers(self, loglevel=None, logfile=None, format=None,
  128. colorize=None, propagate=False, **kwargs):
  129. """Setup the task logger.
  130. If `logfile` is not specified, then `sys.stderr` is used.
  131. Will return the base task logger object.
  132. """
  133. loglevel = mlevel(loglevel or self.loglevel)
  134. format = format or self.task_format
  135. colorize = self.supports_color(colorize, logfile)
  136. logger = self.setup_handlers(
  137. get_logger('celery.task'),
  138. logfile, format, colorize,
  139. formatter=TaskFormatter, **kwargs
  140. )
  141. logger.setLevel(loglevel)
  142. logger.propagate = int(propagate) # this is an int for some reason.
  143. # better to not question why.
  144. signals.after_setup_task_logger.send(
  145. sender=None, logger=logger,
  146. loglevel=loglevel, logfile=logfile,
  147. format=format, colorize=colorize,
  148. )
  149. return logger
  150. def redirect_stdouts_to_logger(self, logger, loglevel=None,
  151. stdout=True, stderr=True):
  152. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  153. logging instance.
  154. :param logger: The :class:`logging.Logger` instance to redirect to.
  155. :param loglevel: The loglevel redirected messages will be logged as.
  156. """
  157. proxy = LoggingProxy(logger, loglevel)
  158. if stdout:
  159. sys.stdout = proxy
  160. if stderr:
  161. sys.stderr = proxy
  162. return proxy
  163. def supports_color(self, colorize=None, logfile=None):
  164. colorize = self.colorize if colorize is None else colorize
  165. if self.app.IS_WINDOWS:
  166. # Windows does not support ANSI color codes.
  167. return False
  168. if colorize or colorize is None:
  169. # Only use color if there is no active log file
  170. # and stderr is an actual terminal.
  171. return logfile is None and isatty(sys.stderr)
  172. return colorize
  173. def colored(self, logfile=None, enabled=None):
  174. return colored(enabled=self.supports_color(enabled, logfile))
  175. def setup_handlers(self, logger, logfile, format, colorize,
  176. formatter=ColorFormatter, **kwargs):
  177. if self._is_configured(logger):
  178. return logger
  179. handler = self._detect_handler(logfile)
  180. handler.setFormatter(formatter(format, use_color=colorize))
  181. logger.addHandler(handler)
  182. return logger
  183. def _detect_handler(self, logfile=None):
  184. """Create log handler with either a filename, an open stream
  185. or :const:`None` (stderr)."""
  186. logfile = sys.__stderr__ if logfile is None else logfile
  187. if hasattr(logfile, 'write'):
  188. return logging.StreamHandler(logfile)
  189. return WatchedFileHandler(logfile)
  190. def _has_handler(self, logger):
  191. return (logger.handlers and
  192. not isinstance(logger.handlers[0], NullHandler))
  193. def _is_configured(self, logger):
  194. return self._has_handler(logger) and not getattr(
  195. logger, '_rudimentary_setup', False)
  196. def setup_logger(self, name='celery', *args, **kwargs):
  197. """Deprecated: No longer used."""
  198. self.setup_logging_subsystem(*args, **kwargs)
  199. return logging.root
  200. def get_default_logger(self, name='celery', **kwargs):
  201. return get_logger(name)
  202. @class_property
  203. def already_setup(cls):
  204. return cls._setup
  205. @already_setup.setter # noqa
  206. def already_setup(cls, was_setup):
  207. cls._setup = was_setup