dataframe_client_test.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. # -*- coding: utf-8 -*-
  2. """Unit tests for misc module."""
  3. from datetime import timedelta
  4. import copy
  5. import json
  6. import unittest
  7. import warnings
  8. import requests_mock
  9. from nose.tools import raises
  10. from influxdb.tests import skip_if_pypy, using_pypy
  11. from .client_test import _mocked_session
  12. if not using_pypy:
  13. import pandas as pd
  14. from pandas.util.testing import assert_frame_equal
  15. from influxdb.influxdb08 import DataFrameClient
  16. @skip_if_pypy
  17. class TestDataFrameClient(unittest.TestCase):
  18. """Define the DataFramClient test object."""
  19. def setUp(self):
  20. """Set up an instance of TestDataFrameClient object."""
  21. # By default, raise exceptions on warnings
  22. warnings.simplefilter('error', FutureWarning)
  23. def test_write_points_from_dataframe(self):
  24. """Test write points from dataframe."""
  25. now = pd.Timestamp('1970-01-01 00:00+00:00')
  26. dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
  27. index=[now, now + timedelta(hours=1)],
  28. columns=["column_one", "column_two",
  29. "column_three"])
  30. points = [
  31. {
  32. "points": [
  33. ["1", 1, 1.0, 0],
  34. ["2", 2, 2.0, 3600]
  35. ],
  36. "name": "foo",
  37. "columns": ["column_one", "column_two", "column_three", "time"]
  38. }
  39. ]
  40. with requests_mock.Mocker() as m:
  41. m.register_uri(requests_mock.POST,
  42. "http://localhost:8086/db/db/series")
  43. cli = DataFrameClient(database='db')
  44. cli.write_points({"foo": dataframe})
  45. self.assertListEqual(json.loads(m.last_request.body), points)
  46. def test_write_points_from_dataframe_with_float_nan(self):
  47. """Test write points from dataframe with NaN float."""
  48. now = pd.Timestamp('1970-01-01 00:00+00:00')
  49. dataframe = pd.DataFrame(data=[[1, float("NaN"), 1.0], [2, 2, 2.0]],
  50. index=[now, now + timedelta(hours=1)],
  51. columns=["column_one", "column_two",
  52. "column_three"])
  53. points = [
  54. {
  55. "points": [
  56. [1, None, 1.0, 0],
  57. [2, 2, 2.0, 3600]
  58. ],
  59. "name": "foo",
  60. "columns": ["column_one", "column_two", "column_three", "time"]
  61. }
  62. ]
  63. with requests_mock.Mocker() as m:
  64. m.register_uri(requests_mock.POST,
  65. "http://localhost:8086/db/db/series")
  66. cli = DataFrameClient(database='db')
  67. cli.write_points({"foo": dataframe})
  68. self.assertListEqual(json.loads(m.last_request.body), points)
  69. def test_write_points_from_dataframe_in_batches(self):
  70. """Test write points from dataframe in batches."""
  71. now = pd.Timestamp('1970-01-01 00:00+00:00')
  72. dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
  73. index=[now, now + timedelta(hours=1)],
  74. columns=["column_one", "column_two",
  75. "column_three"])
  76. with requests_mock.Mocker() as m:
  77. m.register_uri(requests_mock.POST,
  78. "http://localhost:8086/db/db/series")
  79. cli = DataFrameClient(database='db')
  80. self.assertTrue(cli.write_points({"foo": dataframe}, batch_size=1))
  81. def test_write_points_from_dataframe_with_numeric_column_names(self):
  82. """Test write points from dataframe with numeric columns."""
  83. now = pd.Timestamp('1970-01-01 00:00+00:00')
  84. # df with numeric column names
  85. dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
  86. index=[now, now + timedelta(hours=1)])
  87. points = [
  88. {
  89. "points": [
  90. ["1", 1, 1.0, 0],
  91. ["2", 2, 2.0, 3600]
  92. ],
  93. "name": "foo",
  94. "columns": ['0', '1', '2', "time"]
  95. }
  96. ]
  97. with requests_mock.Mocker() as m:
  98. m.register_uri(requests_mock.POST,
  99. "http://localhost:8086/db/db/series")
  100. cli = DataFrameClient(database='db')
  101. cli.write_points({"foo": dataframe})
  102. self.assertListEqual(json.loads(m.last_request.body), points)
  103. def test_write_points_from_dataframe_with_period_index(self):
  104. """Test write points from dataframe with period index."""
  105. dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
  106. index=[pd.Period('1970-01-01'),
  107. pd.Period('1970-01-02')],
  108. columns=["column_one", "column_two",
  109. "column_three"])
  110. points = [
  111. {
  112. "points": [
  113. ["1", 1, 1.0, 0],
  114. ["2", 2, 2.0, 86400]
  115. ],
  116. "name": "foo",
  117. "columns": ["column_one", "column_two", "column_three", "time"]
  118. }
  119. ]
  120. with requests_mock.Mocker() as m:
  121. m.register_uri(requests_mock.POST,
  122. "http://localhost:8086/db/db/series")
  123. cli = DataFrameClient(database='db')
  124. cli.write_points({"foo": dataframe})
  125. self.assertListEqual(json.loads(m.last_request.body), points)
  126. def test_write_points_from_dataframe_with_time_precision(self):
  127. """Test write points from dataframe with time precision."""
  128. now = pd.Timestamp('1970-01-01 00:00+00:00')
  129. dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
  130. index=[now, now + timedelta(hours=1)],
  131. columns=["column_one", "column_two",
  132. "column_three"])
  133. points = [
  134. {
  135. "points": [
  136. ["1", 1, 1.0, 0],
  137. ["2", 2, 2.0, 3600]
  138. ],
  139. "name": "foo",
  140. "columns": ["column_one", "column_two", "column_three", "time"]
  141. }
  142. ]
  143. points_ms = copy.deepcopy(points)
  144. points_ms[0]["points"][1][-1] = 3600 * 1000
  145. points_us = copy.deepcopy(points)
  146. points_us[0]["points"][1][-1] = 3600 * 1000000
  147. with requests_mock.Mocker() as m:
  148. m.register_uri(requests_mock.POST,
  149. "http://localhost:8086/db/db/series")
  150. cli = DataFrameClient(database='db')
  151. cli.write_points({"foo": dataframe}, time_precision='s')
  152. self.assertListEqual(json.loads(m.last_request.body), points)
  153. cli.write_points({"foo": dataframe}, time_precision='m')
  154. self.assertListEqual(json.loads(m.last_request.body), points_ms)
  155. cli.write_points({"foo": dataframe}, time_precision='u')
  156. self.assertListEqual(json.loads(m.last_request.body), points_us)
  157. @raises(TypeError)
  158. def test_write_points_from_dataframe_fails_without_time_index(self):
  159. """Test write points from dataframe that fails without time index."""
  160. dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
  161. columns=["column_one", "column_two",
  162. "column_three"])
  163. with requests_mock.Mocker() as m:
  164. m.register_uri(requests_mock.POST,
  165. "http://localhost:8086/db/db/series")
  166. cli = DataFrameClient(database='db')
  167. cli.write_points({"foo": dataframe})
  168. @raises(TypeError)
  169. def test_write_points_from_dataframe_fails_with_series(self):
  170. """Test failed write points from dataframe with series."""
  171. now = pd.Timestamp('1970-01-01 00:00+00:00')
  172. dataframe = pd.Series(data=[1.0, 2.0],
  173. index=[now, now + timedelta(hours=1)])
  174. with requests_mock.Mocker() as m:
  175. m.register_uri(requests_mock.POST,
  176. "http://localhost:8086/db/db/series")
  177. cli = DataFrameClient(database='db')
  178. cli.write_points({"foo": dataframe})
  179. def test_query_into_dataframe(self):
  180. """Test query into a dataframe."""
  181. data = [
  182. {
  183. "name": "foo",
  184. "columns": ["time", "sequence_number", "column_one"],
  185. "points": [
  186. [3600, 16, 2], [3600, 15, 1],
  187. [0, 14, 2], [0, 13, 1]
  188. ]
  189. }
  190. ]
  191. # dataframe sorted ascending by time first, then sequence_number
  192. dataframe = pd.DataFrame(data=[[13, 1], [14, 2], [15, 1], [16, 2]],
  193. index=pd.to_datetime([0, 0,
  194. 3600, 3600],
  195. unit='s', utc=True),
  196. columns=['sequence_number', 'column_one'])
  197. with _mocked_session('get', 200, data):
  198. cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
  199. result = cli.query('select column_one from foo;')
  200. assert_frame_equal(dataframe, result)
  201. def test_query_multiple_time_series(self):
  202. """Test query for multiple time series."""
  203. data = [
  204. {
  205. "name": "series1",
  206. "columns": ["time", "mean", "min", "max", "stddev"],
  207. "points": [[0, 323048, 323048, 323048, 0]]
  208. },
  209. {
  210. "name": "series2",
  211. "columns": ["time", "mean", "min", "max", "stddev"],
  212. "points": [[0, -2.8233, -2.8503, -2.7832, 0.0173]]
  213. },
  214. {
  215. "name": "series3",
  216. "columns": ["time", "mean", "min", "max", "stddev"],
  217. "points": [[0, -0.01220, -0.01220, -0.01220, 0]]
  218. }
  219. ]
  220. dataframes = {
  221. 'series1': pd.DataFrame(data=[[323048, 323048, 323048, 0]],
  222. index=pd.to_datetime([0], unit='s',
  223. utc=True),
  224. columns=['mean', 'min', 'max', 'stddev']),
  225. 'series2': pd.DataFrame(data=[[-2.8233, -2.8503, -2.7832, 0.0173]],
  226. index=pd.to_datetime([0], unit='s',
  227. utc=True),
  228. columns=['mean', 'min', 'max', 'stddev']),
  229. 'series3': pd.DataFrame(data=[[-0.01220, -0.01220, -0.01220, 0]],
  230. index=pd.to_datetime([0], unit='s',
  231. utc=True),
  232. columns=['mean', 'min', 'max', 'stddev'])
  233. }
  234. with _mocked_session('get', 200, data):
  235. cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
  236. result = cli.query("""select mean(value), min(value), max(value),
  237. stddev(value) from series1, series2, series3""")
  238. self.assertEqual(dataframes.keys(), result.keys())
  239. for key in dataframes.keys():
  240. assert_frame_equal(dataframes[key], result[key])
  241. def test_query_with_empty_result(self):
  242. """Test query with empty results."""
  243. with _mocked_session('get', 200, []):
  244. cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
  245. result = cli.query('select column_one from foo;')
  246. self.assertEqual(result, [])
  247. def test_list_series(self):
  248. """Test list of series for dataframe object."""
  249. response = [
  250. {
  251. 'columns': ['time', 'name'],
  252. 'name': 'list_series_result',
  253. 'points': [[0, 'seriesA'], [0, 'seriesB']]
  254. }
  255. ]
  256. with _mocked_session('get', 200, response):
  257. cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
  258. series_list = cli.get_list_series()
  259. self.assertEqual(series_list, ['seriesA', 'seriesB'])
  260. def test_datetime_to_epoch(self):
  261. """Test convert datetime to epoch."""
  262. timestamp = pd.Timestamp('2013-01-01 00:00:00.000+00:00')
  263. cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
  264. self.assertEqual(
  265. cli._datetime_to_epoch(timestamp),
  266. 1356998400.0
  267. )
  268. self.assertEqual(
  269. cli._datetime_to_epoch(timestamp, time_precision='s'),
  270. 1356998400.0
  271. )
  272. self.assertEqual(
  273. cli._datetime_to_epoch(timestamp, time_precision='m'),
  274. 1356998400000.0
  275. )
  276. self.assertEqual(
  277. cli._datetime_to_epoch(timestamp, time_precision='ms'),
  278. 1356998400000.0
  279. )
  280. self.assertEqual(
  281. cli._datetime_to_epoch(timestamp, time_precision='u'),
  282. 1356998400000000.0
  283. )