handler.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. import datetime
  2. import sys
  3. import logging
  4. import json
  5. import zlib
  6. import traceback
  7. import struct
  8. import random
  9. import socket
  10. import ssl
  11. import math
  12. from logging.handlers import DatagramHandler, SocketHandler
  13. PY3 = sys.version_info[0] == 3
  14. WAN_CHUNK, LAN_CHUNK = 1420, 8154
  15. if PY3:
  16. data, text = bytes, str
  17. else:
  18. data, text = str, unicode
  19. class BaseGELFHandler(object):
  20. def __init__(self, host, port=12201, chunk_size=WAN_CHUNK,
  21. debugging_fields=True, extra_fields=True, fqdn=False,
  22. localname=None, facility=None, level_names=False, compress=True):
  23. self.debugging_fields = debugging_fields
  24. self.extra_fields = extra_fields
  25. self.chunk_size = chunk_size
  26. self.fqdn = fqdn
  27. self.localname = localname
  28. self.facility = facility
  29. self.level_names = level_names
  30. self.compress = compress
  31. def makePickle(self, record):
  32. message_dict = make_message_dict(
  33. record, self.debugging_fields, self.extra_fields, self.fqdn,
  34. self.localname, self.level_names, self.facility)
  35. packed = message_to_pickle(message_dict)
  36. frame = zlib.compress(packed) if self.compress else packed
  37. return frame
  38. class GELFHandler(BaseGELFHandler, DatagramHandler):
  39. """Graylog Extended Log Format UDP handler
  40. :param host: The host of the graylog server.
  41. :param port: The port of the graylog server (default 12201).
  42. :param chunk_size: Message chunk size. Messages larger than this
  43. size will be sent to graylog in multiple chunks. Defaults to
  44. `WAN_CHUNK=1420`.
  45. :param debugging_fields: Send debug fields if true (the default).
  46. :param extra_fields: Send extra fields on the log record to graylog
  47. if true (the default).
  48. :param fqdn: Use fully qualified domain name of localhost as source
  49. host (socket.getfqdn()).
  50. :param localname: Use specified hostname as source host.
  51. :param facility: Replace facility with specified value. If specified,
  52. record.name will be passed as `logger` parameter.
  53. :param level_names: Allows the use of string error level names instead
  54. of numerical values. Defaults to False
  55. :param compress: Use message compression. Defaults to True
  56. """
  57. def __init__(self, host, port=12201, chunk_size=WAN_CHUNK,
  58. debugging_fields=True, extra_fields=True, fqdn=False,
  59. localname=None, facility=None, level_names=False, compress=True):
  60. BaseGELFHandler.__init__(self, host, port, chunk_size,
  61. debugging_fields, extra_fields, fqdn,
  62. localname, facility, level_names, compress)
  63. DatagramHandler.__init__(self, host, int(port))
  64. def send(self, s):
  65. if len(s) < self.chunk_size:
  66. DatagramHandler.send(self, s)
  67. else:
  68. for chunk in ChunkedGELF(s, self.chunk_size):
  69. DatagramHandler.send(self, chunk)
  70. # TODO: Write tests
  71. class GELFTcpHandler(BaseGELFHandler, SocketHandler):
  72. """Graylog Extended Log Format TCP handler
  73. :param host: The host of the graylog server.
  74. :param port: The port of the graylog server (default 12201).
  75. :param chunk_size: Message chunk size. Messages larger than this
  76. size will be sent to graylog in multiple chunks. Defaults to
  77. `WAN_CHUNK=1420`.
  78. :param debugging_fields: Send debug fields if true (the default).
  79. :param extra_fields: Send extra fields on the log record to graylog
  80. if true (the default).
  81. :param fqdn: Use fully qualified domain name of localhost as source
  82. host (socket.getfqdn()).
  83. :param localname: Use specified hostname as source host.
  84. :param facility: Replace facility with specified value. If specified,
  85. record.name will be passed as `logger` parameter.
  86. :param level_names: Allows the use of string error level names instead
  87. of numerical values. Defaults to False
  88. :param tls: Use transport layer security on connection to graylog
  89. if true (not the default)
  90. :param tls_server_name: If using TLS, specify the name of the host
  91. to which the connection is being made. If not specified, hostname
  92. checking will not be performed.
  93. :param tls_cafile: If using TLS, optionally specify a file with a set
  94. of certificate authority certificates to use in certificate
  95. validation.
  96. :param tls_capath: If using TLS, optionally specify a path to files
  97. with a set of certificate authority certificates to use in
  98. certificate validation.
  99. :param tls_cadata: If using TLS, optionally specify an object with
  100. a set of certificate authority certificates to use in certificate
  101. validation.
  102. :param tls_client_cert: If using TLS, optionally specify a certificate
  103. to authenticate the client to the graylog server.
  104. :param tls_client_key: If using TLS, optionally specify a key file
  105. corresponding to the client certificate.
  106. :param tls_client_password: If using TLS, optionally specify a
  107. password corresponding to the client key file.
  108. """
  109. def __init__(self, host, port=12201, chunk_size=WAN_CHUNK,
  110. debugging_fields=True, extra_fields=True, fqdn=False,
  111. localname=None, facility=None, level_names=False,
  112. tls=False, tls_server_name=None, tls_cafile=None,
  113. tls_capath=None, tls_cadata=None, tls_client_cert=None,
  114. tls_client_key=None, tls_client_password=None):
  115. BaseGELFHandler.__init__(self, host, port, chunk_size,
  116. debugging_fields, extra_fields, fqdn,
  117. localname, facility, level_names, False)
  118. SocketHandler.__init__(self, host, int(port))
  119. self.tls = tls
  120. if self.tls:
  121. self.tls_cafile = tls_cafile
  122. self.tls_capath = tls_capath
  123. self.tls_cadata = tls_cadata
  124. self.tls_client_cert = tls_client_cert
  125. self.tls_client_key = tls_client_key
  126. self.tls_client_password = tls_client_password
  127. self.ssl_context = ssl.create_default_context(
  128. purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tls_cafile,
  129. capath=self.tls_capath, cadata=self.tls_cadata
  130. )
  131. self.tls_server_name = tls_server_name
  132. self.ssl_context.check_hostname = (self.tls_server_name
  133. is not None)
  134. if self.tls_client_cert is not None:
  135. self.ssl_context.load_cert_chain(self.tls_client_cert,
  136. self.tls_client_key,
  137. self.tls_client_password)
  138. def makeSocket(self, timeout=None):
  139. """Override SocketHandler.makeSocket, to allow creating
  140. wrapped TLS sockets.
  141. """
  142. sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
  143. if self.tls:
  144. sock = self.ssl_context.wrap_socket(sock=sock, server_side=False,
  145. server_hostname=
  146. self.tls_server_name)
  147. sock.connect((self.host, self.port))
  148. return sock
  149. def makePickle(self, record):
  150. # TCP frame object needs to be null terminated
  151. return BaseGELFHandler.makePickle(self, record) + b'\x00'
  152. class ChunkedGELF(object):
  153. def __init__(self, message, size):
  154. self.message = message
  155. self.size = size
  156. self.pieces = struct.pack('B', int(math.ceil(len(message) * 1.0/size)))
  157. self.id = struct.pack('Q', random.randint(0, 0xFFFFFFFFFFFFFFFF))
  158. def message_chunks(self):
  159. return (self.message[i:i + self.size] for i
  160. in range(0, len(self.message), self.size))
  161. def encode(self, sequence, chunk):
  162. return b''.join([
  163. b'\x1e\x0f',
  164. self.id,
  165. struct.pack('B', sequence),
  166. self.pieces,
  167. chunk
  168. ])
  169. def __iter__(self):
  170. for sequence, chunk in enumerate(self.message_chunks()):
  171. yield self.encode(sequence, chunk)
  172. def make_message_dict(record, debugging_fields, extra_fields, fqdn, localname,
  173. level_names, facility=None):
  174. if fqdn:
  175. host = socket.getfqdn()
  176. elif localname:
  177. host = localname
  178. else:
  179. host = socket.gethostname()
  180. fields = {'version': "1.0",
  181. 'host': host,
  182. 'short_message': record.getMessage(),
  183. 'full_message': get_full_message(record),
  184. 'timestamp': record.created,
  185. 'level': SYSLOG_LEVELS.get(record.levelno, record.levelno),
  186. 'facility': facility or record.name,
  187. }
  188. if level_names:
  189. fields['level_name'] = logging.getLevelName(record.levelno)
  190. if facility is not None:
  191. fields.update({
  192. '_logger': record.name
  193. })
  194. if debugging_fields:
  195. fields.update({
  196. 'file': record.pathname,
  197. 'line': record.lineno,
  198. '_function': record.funcName,
  199. '_pid': record.process,
  200. '_thread_name': record.threadName,
  201. })
  202. # record.processName was added in Python 2.6.2
  203. pn = getattr(record, 'processName', None)
  204. if pn is not None:
  205. fields['_process_name'] = pn
  206. if extra_fields:
  207. fields = add_extra_fields(fields, record)
  208. return fields
  209. SYSLOG_LEVELS = {
  210. logging.CRITICAL: 2,
  211. logging.ERROR: 3,
  212. logging.WARNING: 4,
  213. logging.INFO: 6,
  214. logging.DEBUG: 7,
  215. }
  216. def get_full_message(record):
  217. # format exception information if present
  218. if record.exc_info:
  219. return '\n'.join(traceback.format_exception(*record.exc_info))
  220. # use pre-formatted exception information in cases where the primary
  221. # exception information was removed, eg. for LogRecord serialization
  222. if record.exc_text:
  223. return record.exc_text
  224. return record.getMessage()
  225. def add_extra_fields(message_dict, record):
  226. # skip_list is used to filter additional fields in a log message.
  227. # It contains all attributes listed in
  228. # http://docs.python.org/library/logging.html#logrecord-attributes
  229. # plus exc_text, which is only found in the logging module source,
  230. # and id, which is prohibited by the GELF format.
  231. skip_list = (
  232. 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
  233. 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module',
  234. 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
  235. 'processName', 'relativeCreated', 'thread', 'threadName')
  236. for key, value in record.__dict__.items():
  237. if key not in skip_list and not key.startswith('_'):
  238. message_dict['_%s' % key] = value
  239. return message_dict
  240. def smarter_repr(obj):
  241. """ convert JSON incompatible object to string"""
  242. if isinstance(obj, datetime.datetime):
  243. return obj.isoformat()
  244. return repr(obj)
  245. def message_to_pickle(obj):
  246. """ convert object to a JSON-encoded string"""
  247. obj = sanitize(obj)
  248. serialized = json.dumps(obj, separators=',:', default=smarter_repr)
  249. return serialized.encode('utf-8')
  250. def sanitize(obj):
  251. """ convert all strings records of the object to unicode """
  252. if isinstance(obj, dict):
  253. return dict((sanitize(k), sanitize(v)) for k, v in obj.items())
  254. if isinstance(obj, (list, tuple)):
  255. return obj.__class__([sanitize(i) for i in obj])
  256. if isinstance(obj, data):
  257. obj = obj.decode('utf-8', errors='replace')
  258. return obj