line_protocol.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. # -*- coding: utf-8 -*-
  2. """Define the line_protocol handler."""
  3. from __future__ import absolute_import
  4. from __future__ import division
  5. from __future__ import print_function
  6. from __future__ import unicode_literals
  7. from datetime import datetime
  8. from numbers import Integral
  9. from pytz import UTC
  10. from dateutil.parser import parse
  11. from six import binary_type, text_type, integer_types, PY2
  12. EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
  13. def _to_nanos(timestamp):
  14. delta = timestamp - EPOCH
  15. nanos_in_days = delta.days * 86400 * 10 ** 9
  16. nanos_in_seconds = delta.seconds * 10 ** 9
  17. nanos_in_micros = delta.microseconds * 10 ** 3
  18. return nanos_in_days + nanos_in_seconds + nanos_in_micros
  19. def _convert_timestamp(timestamp, precision=None):
  20. if isinstance(timestamp, Integral):
  21. return timestamp # assume precision is correct if timestamp is int
  22. if isinstance(_get_unicode(timestamp), text_type):
  23. timestamp = parse(timestamp)
  24. if isinstance(timestamp, datetime):
  25. if not timestamp.tzinfo:
  26. timestamp = UTC.localize(timestamp)
  27. ns = _to_nanos(timestamp)
  28. if precision is None or precision == 'n':
  29. return ns
  30. if precision == 'u':
  31. return ns / 10**3
  32. if precision == 'ms':
  33. return ns / 10**6
  34. if precision == 's':
  35. return ns / 10**9
  36. if precision == 'm':
  37. return ns / 10**9 / 60
  38. if precision == 'h':
  39. return ns / 10**9 / 3600
  40. raise ValueError(timestamp)
  41. def _escape_tag(tag):
  42. tag = _get_unicode(tag, force=True)
  43. return tag.replace(
  44. "\\", "\\\\"
  45. ).replace(
  46. " ", "\\ "
  47. ).replace(
  48. ",", "\\,"
  49. ).replace(
  50. "=", "\\="
  51. ).replace(
  52. "\n", "\\n"
  53. )
  54. def _escape_tag_value(value):
  55. ret = _escape_tag(value)
  56. if ret.endswith('\\'):
  57. ret += ' '
  58. return ret
  59. def quote_ident(value):
  60. """Indent the quotes."""
  61. return "\"{}\"".format(value
  62. .replace("\\", "\\\\")
  63. .replace("\"", "\\\"")
  64. .replace("\n", "\\n"))
  65. def quote_literal(value):
  66. """Quote provided literal."""
  67. return "'{}'".format(value
  68. .replace("\\", "\\\\")
  69. .replace("'", "\\'"))
  70. def _is_float(value):
  71. try:
  72. float(value)
  73. except (TypeError, ValueError):
  74. return False
  75. return True
  76. def _escape_value(value):
  77. if value is None:
  78. return ''
  79. value = _get_unicode(value)
  80. if isinstance(value, text_type):
  81. return quote_ident(value)
  82. if isinstance(value, integer_types) and not isinstance(value, bool):
  83. return str(value) + 'i'
  84. if isinstance(value, bool):
  85. return str(value)
  86. if _is_float(value):
  87. return repr(float(value))
  88. return str(value)
  89. def _get_unicode(data, force=False):
  90. """Try to return a text aka unicode object from the given data."""
  91. if isinstance(data, binary_type):
  92. return data.decode('utf-8')
  93. if data is None:
  94. return ''
  95. if force:
  96. if PY2:
  97. return unicode(data)
  98. return str(data)
  99. return data
  100. def make_line(measurement, tags=None, fields=None, time=None, precision=None):
  101. """Extract the actual point from a given measurement line."""
  102. tags = tags or {}
  103. fields = fields or {}
  104. line = _escape_tag(_get_unicode(measurement))
  105. # tags should be sorted client-side to take load off server
  106. tag_list = []
  107. for tag_key in sorted(tags.keys()):
  108. key = _escape_tag(tag_key)
  109. value = _escape_tag(tags[tag_key])
  110. if key != '' and value != '':
  111. tag_list.append(
  112. "{key}={value}".format(key=key, value=value)
  113. )
  114. if tag_list:
  115. line += ',' + ','.join(tag_list)
  116. field_list = []
  117. for field_key in sorted(fields.keys()):
  118. key = _escape_tag(field_key)
  119. value = _escape_value(fields[field_key])
  120. if key != '' and value != '':
  121. field_list.append("{key}={value}".format(
  122. key=key,
  123. value=value
  124. ))
  125. if field_list:
  126. line += ' ' + ','.join(field_list)
  127. if time is not None:
  128. timestamp = _get_unicode(str(int(
  129. _convert_timestamp(time, precision)
  130. )))
  131. line += ' ' + timestamp
  132. return line
  133. def make_lines(data, precision=None):
  134. """Extract points from given dict.
  135. Extracts the points from the given dict and returns a Unicode string
  136. matching the line protocol introduced in InfluxDB 0.9.0.
  137. """
  138. lines = []
  139. static_tags = data.get('tags')
  140. for point in data['points']:
  141. if static_tags:
  142. tags = dict(static_tags) # make a copy, since we'll modify
  143. tags.update(point.get('tags') or {})
  144. else:
  145. tags = point.get('tags') or {}
  146. line = make_line(
  147. point.get('measurement', data.get('measurement')),
  148. tags=tags,
  149. fields=point.get('fields'),
  150. precision=precision,
  151. time=point.get('time')
  152. )
  153. lines.append(line)
  154. return '\n'.join(lines) + '\n'