rabbitmq.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import json
  2. from amqplib import client_0_8 as amqp
  3. from graypy.handler import make_message_dict
  4. from logging import Filter
  5. from logging.handlers import SocketHandler
  6. try:
  7. from urllib.parse import urlparse, unquote
  8. except ImportError:
  9. from urlparse import urlparse
  10. from urllib import unquote
  11. _ifnone = lambda v, x: x if v is None else v
  12. class GELFRabbitHandler(SocketHandler):
  13. """RabbitMQ / Graylog Extended Log Format handler
  14. NOTE: this handler ingores all messages logged by amqplib.
  15. :param url: RabbitMQ URL (ex: amqp://guest:guest@localhost:5672/).
  16. :param exchange: RabbitMQ exchange. Default 'logging.gelf'.
  17. A queue binding must be defined on the server to prevent
  18. log messages from being dropped.
  19. :param debugging_fields: Send debug fields if true (the default).
  20. :param extra_fields: Send extra fields on the log record to graylog
  21. if true (the default).
  22. :param fqdn: Use fully qualified domain name of localhost as source
  23. host (socket.getfqdn()).
  24. :param exchange_type: RabbitMQ exchange type (default 'fanout').
  25. :param localname: Use specified hostname as source host.
  26. :param facility: Replace facility with specified value. If specified,
  27. record.name will be passed as `logger` parameter.
  28. """
  29. def __init__(self, url, exchange='logging.gelf', debugging_fields=True,
  30. extra_fields=True, fqdn=False, exchange_type='fanout', localname=None,
  31. facility=None, virtual_host='/', routing_key=''):
  32. self.url = url
  33. parsed = urlparse(url)
  34. if parsed.scheme != 'amqp':
  35. raise ValueError('invalid URL scheme (expected "amqp"): %s' % url)
  36. host = parsed.hostname or 'localhost'
  37. port = _ifnone(parsed.port, 5672)
  38. virtual_host = virtual_host if not unquote(parsed.path[1:]) else unquote(parsed.path[1:])
  39. self.cn_args = {
  40. 'host': '%s:%s' % (host, port),
  41. 'userid': _ifnone(parsed.username, 'guest'),
  42. 'password': _ifnone(parsed.password, 'guest'),
  43. 'virtual_host': virtual_host,
  44. 'insist': False,
  45. }
  46. self.exchange = exchange
  47. self.debugging_fields = debugging_fields
  48. self.extra_fields = extra_fields
  49. self.fqdn = fqdn
  50. self.exchange_type = exchange_type
  51. self.localname = localname
  52. self.facility = facility
  53. self.virtual_host = virtual_host
  54. self.routing_key = routing_key
  55. SocketHandler.__init__(self, host, port)
  56. self.addFilter(ExcludeFilter('amqplib'))
  57. def makeSocket(self, timeout=1):
  58. return RabbitSocket(self.cn_args, timeout, self.exchange,
  59. self.exchange_type, self.routing_key)
  60. def makePickle(self, record):
  61. message_dict = make_message_dict(
  62. record, self.debugging_fields, self.extra_fields, self.fqdn, self.localname,
  63. self.facility)
  64. return json.dumps(message_dict)
  65. class RabbitSocket(object):
  66. def __init__(self, cn_args, timeout, exchange, exchange_type, routing_key):
  67. self.cn_args = cn_args
  68. self.timeout = timeout
  69. self.exchange = exchange
  70. self.exchange_type = exchange_type
  71. self.routing_key = routing_key
  72. self.connection = amqp.Connection(
  73. connection_timeout=timeout, **self.cn_args)
  74. self.channel = self.connection.channel()
  75. self.channel.exchange_declare(
  76. exchange=self.exchange,
  77. type=self.exchange_type,
  78. durable=True,
  79. auto_delete=False,
  80. )
  81. def sendall(self, data):
  82. msg = amqp.Message(data, delivery_mode=2)
  83. self.channel.basic_publish(msg, exchange=self.exchange, routing_key=self.routing_key)
  84. def close(self):
  85. try:
  86. self.connection.close()
  87. except Exception:
  88. pass
  89. class ExcludeFilter(Filter):
  90. def __init__(self, name):
  91. """Initialize filter.
  92. Initialize with the name of the logger which, together with its
  93. children, will have its events excluded (filtered out).
  94. """
  95. if not name:
  96. raise ValueError('ExcludeFilter requires a non-empty name')
  97. self.name = name
  98. self.nlen = len(name)
  99. def filter(self, record):
  100. return not (record.name.startswith(self.name) and (
  101. len(record.name) == self.nlen or record.name[self.nlen] == "."))