123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- # -*- coding: utf-8 -*-
- """DataFrame client for InfluxDB v0.8."""
- from __future__ import absolute_import
- from __future__ import division
- from __future__ import print_function
- from __future__ import unicode_literals
- import math
- import warnings
- from .client import InfluxDBClient
- class DataFrameClient(InfluxDBClient):
- """Primary defintion of the DataFrameClient for v0.8.
- The ``DataFrameClient`` object holds information necessary to connect
- to InfluxDB. Requests can be made to InfluxDB directly through the client.
- The client reads and writes from pandas DataFrames.
- """
- def __init__(self, ignore_nan=True, *args, **kwargs):
- """Initialize an instance of the DataFrameClient."""
- super(DataFrameClient, self).__init__(*args, **kwargs)
- try:
- global pd
- import pandas as pd
- except ImportError as ex:
- raise ImportError('DataFrameClient requires Pandas, '
- '"{ex}" problem importing'.format(ex=str(ex)))
- self.EPOCH = pd.Timestamp('1970-01-01 00:00:00.000+00:00')
- self.ignore_nan = ignore_nan
- def write_points(self, data, *args, **kwargs):
- """Write to multiple time series names.
- :param data: A dictionary mapping series names to pandas DataFrames
- :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
- or 'u'.
- :param batch_size: [Optional] Value to write the points in batches
- instead of all at one time. Useful for when doing data dumps from
- one database to another or when doing a massive write operation
- :type batch_size: int
- """
- batch_size = kwargs.get('batch_size')
- time_precision = kwargs.get('time_precision', 's')
- if batch_size:
- kwargs.pop('batch_size') # don't hand over to InfluxDBClient
- for key, data_frame in data.items():
- number_batches = int(math.ceil(
- len(data_frame) / float(batch_size)))
- for batch in range(number_batches):
- start_index = batch * batch_size
- end_index = (batch + 1) * batch_size
- outdata = [
- self._convert_dataframe_to_json(
- name=key,
- dataframe=data_frame
- .iloc[start_index:end_index].copy(),
- time_precision=time_precision)]
- InfluxDBClient.write_points(self, outdata, *args, **kwargs)
- return True
- outdata = [
- self._convert_dataframe_to_json(name=key, dataframe=dataframe,
- time_precision=time_precision)
- for key, dataframe in data.items()]
- return InfluxDBClient.write_points(self, outdata, *args, **kwargs)
- def write_points_with_precision(self, data, time_precision='s'):
- """Write to multiple time series names.
- DEPRECATED
- """
- warnings.warn(
- "write_points_with_precision is deprecated, and will be removed "
- "in future versions. Please use "
- "``DataFrameClient.write_points(time_precision='..')`` instead.",
- FutureWarning)
- return self.write_points(data, time_precision='s')
- def query(self, query, time_precision='s', chunked=False):
- """Query data into DataFrames.
- Returns a DataFrame for a single time series and a map for multiple
- time series with the time series as value and its name as key.
- :param time_precision: [Optional, default 's'] Either 's', 'm', 'ms'
- or 'u'.
- :param chunked: [Optional, default=False] True if the data shall be
- retrieved in chunks, False otherwise.
- """
- result = InfluxDBClient.query(self, query=query,
- time_precision=time_precision,
- chunked=chunked)
- if len(result) == 0:
- return result
- elif len(result) == 1:
- return self._to_dataframe(result[0], time_precision)
- else:
- ret = {}
- for time_series in result:
- ret[time_series['name']] = self._to_dataframe(time_series,
- time_precision)
- return ret
- @staticmethod
- def _to_dataframe(json_result, time_precision):
- dataframe = pd.DataFrame(data=json_result['points'],
- columns=json_result['columns'])
- if 'sequence_number' in dataframe.keys():
- dataframe.sort_values(['time', 'sequence_number'], inplace=True)
- else:
- dataframe.sort_values(['time'], inplace=True)
- pandas_time_unit = time_precision
- if time_precision == 'm':
- pandas_time_unit = 'ms'
- elif time_precision == 'u':
- pandas_time_unit = 'us'
- dataframe.index = pd.to_datetime(list(dataframe['time']),
- unit=pandas_time_unit,
- utc=True)
- del dataframe['time']
- return dataframe
- def _convert_dataframe_to_json(self, dataframe, name, time_precision='s'):
- if not isinstance(dataframe, pd.DataFrame):
- raise TypeError('Must be DataFrame, but type was: {0}.'
- .format(type(dataframe)))
- if not (isinstance(dataframe.index, pd.PeriodIndex) or
- isinstance(dataframe.index, pd.DatetimeIndex)):
- raise TypeError('Must be DataFrame with DatetimeIndex or \
- PeriodIndex.')
- if isinstance(dataframe.index, pd.PeriodIndex):
- dataframe.index = dataframe.index.to_timestamp()
- else:
- dataframe.index = pd.to_datetime(dataframe.index)
- if dataframe.index.tzinfo is None:
- dataframe.index = dataframe.index.tz_localize('UTC')
- dataframe['time'] = [self._datetime_to_epoch(dt, time_precision)
- for dt in dataframe.index]
- data = {'name': name,
- 'columns': [str(column) for column in dataframe.columns],
- 'points': [self._convert_array(x) for x in dataframe.values]}
- return data
- def _convert_array(self, array):
- try:
- global np
- import numpy as np
- except ImportError as ex:
- raise ImportError('DataFrameClient requires Numpy, '
- '"{ex}" problem importing'.format(ex=str(ex)))
- if self.ignore_nan:
- number_types = (int, float, np.number)
- condition = (all(isinstance(el, number_types) for el in array) and
- np.isnan(array))
- return list(np.where(condition, None, array))
- return list(array)
- def _datetime_to_epoch(self, datetime, time_precision='s'):
- seconds = (datetime - self.EPOCH).total_seconds()
- if time_precision == 's':
- return seconds
- elif time_precision == 'm' or time_precision == 'ms':
- return seconds * 1000
- elif time_precision == 'u':
- return seconds * 1000000
|