dataframe_client.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # -*- coding: utf-8 -*-
  2. """DataFrame client for InfluxDB v0.8."""
  3. from __future__ import absolute_import
  4. from __future__ import division
  5. from __future__ import print_function
  6. from __future__ import unicode_literals
  7. import math
  8. import warnings
  9. from .client import InfluxDBClient
  10. class DataFrameClient(InfluxDBClient):
  11. """Primary defintion of the DataFrameClient for v0.8.
  12. The ``DataFrameClient`` object holds information necessary to connect
  13. to InfluxDB. Requests can be made to InfluxDB directly through the client.
  14. The client reads and writes from pandas DataFrames.
  15. """
  16. def __init__(self, ignore_nan=True, *args, **kwargs):
  17. """Initialize an instance of the DataFrameClient."""
  18. super(DataFrameClient, self).__init__(*args, **kwargs)
  19. try:
  20. global pd
  21. import pandas as pd
  22. except ImportError as ex:
  23. raise ImportError('DataFrameClient requires Pandas, '
  24. '"{ex}" problem importing'.format(ex=str(ex)))
  25. self.EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
  26. self.ignore_nan = ignore_nan
  27. def write_points(self, data, *args, **kwargs):
  28. """Write to multiple time series names.
  29. :param data: A dictionary mapping series names to pandas DataFrames
  30. :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
  31. or 'u'.
  32. :param batch_size: [Optional] Value to write the points in batches
  33. instead of all at one time. Useful for when doing data dumps from
  34. one database to another or when doing a massive write operation
  35. :type batch_size: int
  36. """
  37. batch_size = kwargs.get('batch_size')
  38. time_precision = kwargs.get('time_precision', 's')
  39. if batch_size:
  40. kwargs.pop('batch_size') # don't hand over to InfluxDBClient
  41. for key, data_frame in data.items():
  42. number_batches = int(math.ceil(
  43. len(data_frame) / float(batch_size)))
  44. for batch in range(number_batches):
  45. start_index = batch * batch_size
  46. end_index = (batch + 1) * batch_size
  47. outdata = [
  48. self._convert_dataframe_to_json(
  49. name=key,
  50. dataframe=data_frame
  51. .iloc[start_index:end_index].copy(),
  52. time_precision=time_precision)]
  53. InfluxDBClient.write_points(self, outdata, *args, **kwargs)
  54. return True
  55. outdata = [
  56. self._convert_dataframe_to_json(name=key, dataframe=dataframe,
  57. time_precision=time_precision)
  58. for key, dataframe in data.items()]
  59. return InfluxDBClient.write_points(self, outdata, *args, **kwargs)
  60. def write_points_with_precision(self, data, time_precision='s'):
  61. """Write to multiple time series names.
  62. DEPRECATED
  63. """
  64. warnings.warn(
  65. "write_points_with_precision is deprecated, and will be removed "
  66. "in future versions. Please use "
  67. "``DataFrameClient.write_points(time_precision='..')`` instead.",
  68. FutureWarning)
  69. return self.write_points(data, time_precision='s')
  70. def query(self, query, time_precision='s', chunked=False):
  71. """Query data into DataFrames.
  72. Returns a DataFrame for a single time series and a map for multiple
  73. time series with the time series as value and its name as key.
  74. :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
  75. or 'u'.
  76. :param chunked: [Optional, default=False] True if the data shall be
  77. retrieved in chunks, False otherwise.
  78. """
  79. result = InfluxDBClient.query(self, query=query,
  80. time_precision=time_precision,
  81. chunked=chunked)
  82. if len(result) == 0:
  83. return result
  84. elif len(result) == 1:
  85. return self._to_dataframe(result[0], time_precision)
  86. else:
  87. ret = {}
  88. for time_series in result:
  89. ret[time_series['name']] = self._to_dataframe(time_series,
  90. time_precision)
  91. return ret
  92. @staticmethod
  93. def _to_dataframe(json_result, time_precision):
  94. dataframe = pd.DataFrame(data=json_result['points'],
  95. columns=json_result['columns'])
  96. if 'sequence_number' in dataframe.keys():
  97. dataframe.sort_values(['time', 'sequence_number'], inplace=True)
  98. else:
  99. dataframe.sort_values(['time'], inplace=True)
  100. pandas_time_unit = time_precision
  101. if time_precision == 'm':
  102. pandas_time_unit = 'ms'
  103. elif time_precision == 'u':
  104. pandas_time_unit = 'us'
  105. dataframe.index = pd.to_datetime(list(dataframe['time']),
  106. unit=pandas_time_unit,
  107. utc=True)
  108. del dataframe['time']
  109. return dataframe
  110. def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'):
  111. if not isinstance(dataframe, pd.DataFrame):
  112. raise TypeError('Must be DataFrame, but type was: {0}.'
  113. .format(type(dataframe)))
  114. if not (isinstance(dataframe.index, pd.PeriodIndex) or
  115. isinstance(dataframe.index, pd.DatetimeIndex)):
  116. raise TypeError('Must be DataFrame with DatetimeIndex or \
  117. PeriodIndex.')
  118. if isinstance(dataframe.index, pd.PeriodIndex):
  119. dataframe.index = dataframe.index.to_timestamp()
  120. else:
  121. dataframe.index = pd.to_datetime(dataframe.index)
  122. if dataframe.index.tzinfo is None:
  123. dataframe.index = dataframe.index.tz_localize('UTC')
  124. dataframe['time'] = [self._datetime_to_epoch(dt, time_precision)
  125. for dt in dataframe.index]
  126. data = {'name': name,
  127. 'columns': [str(column) for column in dataframe.columns],
  128. 'points': [self._convert_array(x) for x in dataframe.values]}
  129. return data
  130. def _convert_array(self, array):
  131. try:
  132. global np
  133. import numpy as np
  134. except ImportError as ex:
  135. raise ImportError('DataFrameClient requires Numpy, '
  136. '"{ex}" problem importing'.format(ex=str(ex)))
  137. if self.ignore_nan:
  138. number_types = (int, float, np.number)
  139. condition = (all(isinstance(el, number_types) for el in array) and
  140. np.isnan(array))
  141. return list(np.where(condition, None, array))
  142. return list(array)
  143. def _datetime_to_epoch(self, datetime, time_precision='s'):
  144. seconds = (datetime - self.EPOCH).total_seconds()
  145. if time_precision == 's':
  146. return seconds
  147. elif time_precision == 'm' or time_precision == 'ms':
  148. return seconds * 1000
  149. elif time_precision == 'u':
  150. return seconds * 1000000