client_test.py 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451
  1. # -*- coding: utf-8 -*-
  2. """Unit tests for the InfluxDBClient.
  3. NB/WARNING:
  4. This module implements tests for the InfluxDBClient class
  5. but does so
  6. + without any server instance running
  7. + by mocking all the expected responses.
  8. So any change of (response format from) the server will **NOT** be
  9. detected by this module.
  10. See client_test_with_server.py for tests against a running server instance.
  11. """
  12. from __future__ import absolute_import
  13. from __future__ import division
  14. from __future__ import print_function
  15. from __future__ import unicode_literals
  16. import random
  17. import socket
  18. import unittest
  19. import warnings
  20. import io
  21. import gzip
  22. import json
  23. import mock
  24. import requests
  25. import requests.exceptions
  26. import requests_mock
  27. from nose.tools import raises
  28. from influxdb import InfluxDBClient
  29. from influxdb.resultset import ResultSet
  30. def _build_response_object(status_code=200, content=""):
  31. resp = requests.Response()
  32. resp.status_code = status_code
  33. resp._content = content.encode("utf8")
  34. return resp
  35. def _mocked_session(cli, method="GET", status_code=200, content=""):
  36. method = method.upper()
  37. def request(*args, **kwargs):
  38. """Request content from the mocked session."""
  39. c = content
  40. # Check method
  41. assert method == kwargs.get('method', 'GET')
  42. if method == 'POST':
  43. data = kwargs.get('data', None)
  44. if data is not None:
  45. # Data must be a string
  46. assert isinstance(data, str)
  47. # Data must be a JSON string
  48. assert c == json.loads(data, strict=True)
  49. c = data
  50. # Anyway, Content must be a JSON string (or empty string)
  51. if not isinstance(c, str):
  52. c = json.dumps(c)
  53. return _build_response_object(status_code=status_code, content=c)
  54. return mock.patch.object(cli._session, 'request', side_effect=request)
  55. class TestInfluxDBClient(unittest.TestCase):
  56. """Set up the TestInfluxDBClient object."""
  57. def setUp(self):
  58. """Initialize an instance of TestInfluxDBClient object."""
  59. # By default, raise exceptions on warnings
  60. warnings.simplefilter('error', FutureWarning)
  61. self.cli = InfluxDBClient('localhost', 8086, 'username', 'password')
  62. self.dummy_points = [
  63. {
  64. "measurement": "cpu_load_short",
  65. "tags": {
  66. "host": "server01",
  67. "region": "us-west"
  68. },
  69. "time": "2009-11-10T23:00:00.123456Z",
  70. "fields": {
  71. "value": 0.64
  72. }
  73. }
  74. ]
  75. self.dsn_string = 'influxdb://uSr:pWd@my.host.fr:1886/db'
  76. def test_scheme(self):
  77. """Set up the test schema for TestInfluxDBClient object."""
  78. cli = InfluxDBClient('host', 8086, 'username', 'password', 'database')
  79. self.assertEqual('http://host:8086', cli._baseurl)
  80. cli = InfluxDBClient(
  81. 'host', 8086, 'username', 'password', 'database', ssl=True
  82. )
  83. self.assertEqual('https://host:8086', cli._baseurl)
  84. cli = InfluxDBClient(
  85. 'host', 8086, 'username', 'password', 'database', ssl=True,
  86. path="somepath"
  87. )
  88. self.assertEqual('https://host:8086/somepath', cli._baseurl)
  89. cli = InfluxDBClient(
  90. 'host', 8086, 'username', 'password', 'database', ssl=True,
  91. path=None
  92. )
  93. self.assertEqual('https://host:8086', cli._baseurl)
  94. cli = InfluxDBClient(
  95. 'host', 8086, 'username', 'password', 'database', ssl=True,
  96. path="/somepath"
  97. )
  98. self.assertEqual('https://host:8086/somepath', cli._baseurl)
  99. def test_dsn(self):
  100. """Set up the test datasource name for TestInfluxDBClient object."""
  101. cli = InfluxDBClient.from_dsn('influxdb://192.168.0.1:1886')
  102. self.assertEqual('http://192.168.0.1:1886', cli._baseurl)
  103. cli = InfluxDBClient.from_dsn(self.dsn_string)
  104. self.assertEqual('http://my.host.fr:1886', cli._baseurl)
  105. self.assertEqual('uSr', cli._username)
  106. self.assertEqual('pWd', cli._password)
  107. self.assertEqual('db', cli._database)
  108. self.assertFalse(cli._use_udp)
  109. cli = InfluxDBClient.from_dsn('udp+' + self.dsn_string)
  110. self.assertTrue(cli._use_udp)
  111. cli = InfluxDBClient.from_dsn('https+' + self.dsn_string)
  112. self.assertEqual('https://my.host.fr:1886', cli._baseurl)
  113. cli = InfluxDBClient.from_dsn('https+' + self.dsn_string,
  114. **{'ssl': False})
  115. self.assertEqual('http://my.host.fr:1886', cli._baseurl)
  116. def test_cert(self):
  117. """Test mutual TLS authentication for TestInfluxDBClient object."""
  118. cli = InfluxDBClient(ssl=True, cert='/etc/pki/tls/private/dummy.crt')
  119. self.assertEqual(cli._session.cert, '/etc/pki/tls/private/dummy.crt')
  120. with self.assertRaises(ValueError):
  121. cli = InfluxDBClient(cert='/etc/pki/tls/private/dummy.crt')
  122. def test_switch_database(self):
  123. """Test switch database in TestInfluxDBClient object."""
  124. cli = InfluxDBClient('host', 8086, 'username', 'password', 'database')
  125. cli.switch_database('another_database')
  126. self.assertEqual('another_database', cli._database)
  127. def test_switch_user(self):
  128. """Test switch user in TestInfluxDBClient object."""
  129. cli = InfluxDBClient('host', 8086, 'username', 'password', 'database')
  130. cli.switch_user('another_username', 'another_password')
  131. self.assertEqual('another_username', cli._username)
  132. self.assertEqual('another_password', cli._password)
  133. def test_write(self):
  134. """Test write in TestInfluxDBClient object."""
  135. with requests_mock.Mocker() as m:
  136. m.register_uri(
  137. requests_mock.POST,
  138. "http://localhost:8086/write",
  139. status_code=204
  140. )
  141. cli = InfluxDBClient(database='db')
  142. cli.write(
  143. {"database": "mydb",
  144. "retentionPolicy": "mypolicy",
  145. "points": [{"measurement": "cpu_load_short",
  146. "tags": {"host": "server01",
  147. "region": "us-west"},
  148. "time": "2009-11-10T23:00:00Z",
  149. "fields": {"value": 0.64}}]}
  150. )
  151. self.assertEqual(
  152. m.last_request.body,
  153. b"cpu_load_short,host=server01,region=us-west "
  154. b"value=0.64 1257894000000000000\n",
  155. )
  156. def test_write_points(self):
  157. """Test write points for TestInfluxDBClient object."""
  158. with requests_mock.Mocker() as m:
  159. m.register_uri(
  160. requests_mock.POST,
  161. "http://localhost:8086/write",
  162. status_code=204
  163. )
  164. cli = InfluxDBClient(database='db')
  165. cli.write_points(
  166. self.dummy_points,
  167. )
  168. self.assertEqual(
  169. 'cpu_load_short,host=server01,region=us-west '
  170. 'value=0.64 1257894000123456000\n',
  171. m.last_request.body.decode('utf-8'),
  172. )
  173. def test_write_gzip(self):
  174. """Test write in TestInfluxDBClient object."""
  175. with requests_mock.Mocker() as m:
  176. m.register_uri(
  177. requests_mock.POST,
  178. "http://localhost:8086/write",
  179. status_code=204
  180. )
  181. cli = InfluxDBClient(database='db', gzip=True)
  182. cli.write(
  183. {"database": "mydb",
  184. "retentionPolicy": "mypolicy",
  185. "points": [{"measurement": "cpu_load_short",
  186. "tags": {"host": "server01",
  187. "region": "us-west"},
  188. "time": "2009-11-10T23:00:00Z",
  189. "fields": {"value": 0.64}}]}
  190. )
  191. compressed = io.BytesIO()
  192. with gzip.GzipFile(
  193. compresslevel=9,
  194. fileobj=compressed,
  195. mode='w'
  196. ) as f:
  197. f.write(
  198. b"cpu_load_short,host=server01,region=us-west "
  199. b"value=0.64 1257894000000000000\n"
  200. )
  201. self.assertEqual(
  202. m.last_request.body,
  203. compressed.getvalue(),
  204. )
  205. def test_write_points_gzip(self):
  206. """Test write points for TestInfluxDBClient object."""
  207. with requests_mock.Mocker() as m:
  208. m.register_uri(
  209. requests_mock.POST,
  210. "http://localhost:8086/write",
  211. status_code=204
  212. )
  213. cli = InfluxDBClient(database='db', gzip=True)
  214. cli.write_points(
  215. self.dummy_points,
  216. )
  217. compressed = io.BytesIO()
  218. with gzip.GzipFile(
  219. compresslevel=9,
  220. fileobj=compressed,
  221. mode='w'
  222. ) as f:
  223. f.write(
  224. b'cpu_load_short,host=server01,region=us-west '
  225. b'value=0.64 1257894000123456000\n'
  226. )
  227. self.assertEqual(
  228. m.last_request.body,
  229. compressed.getvalue(),
  230. )
  231. def test_write_points_toplevel_attributes(self):
  232. """Test write points attrs for TestInfluxDBClient object."""
  233. with requests_mock.Mocker() as m:
  234. m.register_uri(
  235. requests_mock.POST,
  236. "http://localhost:8086/write",
  237. status_code=204
  238. )
  239. cli = InfluxDBClient(database='db')
  240. cli.write_points(
  241. self.dummy_points,
  242. database='testdb',
  243. tags={"tag": "hello"},
  244. retention_policy="somepolicy"
  245. )
  246. self.assertEqual(
  247. 'cpu_load_short,host=server01,region=us-west,tag=hello '
  248. 'value=0.64 1257894000123456000\n',
  249. m.last_request.body.decode('utf-8'),
  250. )
  251. def test_write_points_batch(self):
  252. """Test write points batch for TestInfluxDBClient object."""
  253. dummy_points = [
  254. {"measurement": "cpu_usage", "tags": {"unit": "percent"},
  255. "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
  256. {"measurement": "network", "tags": {"direction": "in"},
  257. "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
  258. {"measurement": "network", "tags": {"direction": "out"},
  259. "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
  260. ]
  261. expected_last_body = (
  262. "network,direction=out,host=server01,region=us-west "
  263. "value=12.0 1257894000000000000\n"
  264. )
  265. with requests_mock.Mocker() as m:
  266. m.register_uri(requests_mock.POST,
  267. "http://localhost:8086/write",
  268. status_code=204)
  269. cli = InfluxDBClient(database='db')
  270. cli.write_points(points=dummy_points,
  271. database='db',
  272. tags={"host": "server01",
  273. "region": "us-west"},
  274. batch_size=2)
  275. self.assertEqual(m.call_count, 2)
  276. self.assertEqual(expected_last_body,
  277. m.last_request.body.decode('utf-8'))
  278. def test_write_points_batch_generator(self):
  279. """Test write points batch from a generator for TestInfluxDBClient."""
  280. dummy_points = [
  281. {"measurement": "cpu_usage", "tags": {"unit": "percent"},
  282. "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
  283. {"measurement": "network", "tags": {"direction": "in"},
  284. "time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
  285. {"measurement": "network", "tags": {"direction": "out"},
  286. "time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
  287. ]
  288. dummy_points_generator = (point for point in dummy_points)
  289. expected_last_body = (
  290. "network,direction=out,host=server01,region=us-west "
  291. "value=12.0 1257894000000000000\n"
  292. )
  293. with requests_mock.Mocker() as m:
  294. m.register_uri(requests_mock.POST,
  295. "http://localhost:8086/write",
  296. status_code=204)
  297. cli = InfluxDBClient(database='db')
  298. cli.write_points(points=dummy_points_generator,
  299. database='db',
  300. tags={"host": "server01",
  301. "region": "us-west"},
  302. batch_size=2)
  303. self.assertEqual(m.call_count, 2)
  304. self.assertEqual(expected_last_body,
  305. m.last_request.body.decode('utf-8'))
  306. def test_write_points_udp(self):
  307. """Test write points UDP for TestInfluxDBClient object."""
  308. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  309. port = random.randint(4000, 8000)
  310. s.bind(('0.0.0.0', port))
  311. cli = InfluxDBClient(
  312. 'localhost', 8086, 'root', 'root',
  313. 'test', use_udp=True, udp_port=port
  314. )
  315. cli.write_points(self.dummy_points)
  316. received_data, addr = s.recvfrom(1024)
  317. self.assertEqual(
  318. 'cpu_load_short,host=server01,region=us-west '
  319. 'value=0.64 1257894000123456000\n',
  320. received_data.decode()
  321. )
  322. @raises(Exception)
  323. def test_write_points_fails(self):
  324. """Test write points fail for TestInfluxDBClient object."""
  325. cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')
  326. with _mocked_session(cli, 'post', 500):
  327. cli.write_points([])
  328. def test_write_points_with_precision(self):
  329. """Test write points with precision for TestInfluxDBClient object."""
  330. with requests_mock.Mocker() as m:
  331. m.register_uri(
  332. requests_mock.POST,
  333. "http://localhost:8086/write",
  334. status_code=204
  335. )
  336. cli = InfluxDBClient(database='db')
  337. cli.write_points(self.dummy_points, time_precision='n')
  338. self.assertEqual(
  339. b'cpu_load_short,host=server01,region=us-west '
  340. b'value=0.64 1257894000123456000\n',
  341. m.last_request.body,
  342. )
  343. cli.write_points(self.dummy_points, time_precision='u')
  344. self.assertEqual(
  345. b'cpu_load_short,host=server01,region=us-west '
  346. b'value=0.64 1257894000123456\n',
  347. m.last_request.body,
  348. )
  349. cli.write_points(self.dummy_points, time_precision='ms')
  350. self.assertEqual(
  351. b'cpu_load_short,host=server01,region=us-west '
  352. b'value=0.64 1257894000123\n',
  353. m.last_request.body,
  354. )
  355. cli.write_points(self.dummy_points, time_precision='s')
  356. self.assertEqual(
  357. b"cpu_load_short,host=server01,region=us-west "
  358. b"value=0.64 1257894000\n",
  359. m.last_request.body,
  360. )
  361. cli.write_points(self.dummy_points, time_precision='m')
  362. self.assertEqual(
  363. b'cpu_load_short,host=server01,region=us-west '
  364. b'value=0.64 20964900\n',
  365. m.last_request.body,
  366. )
  367. cli.write_points(self.dummy_points, time_precision='h')
  368. self.assertEqual(
  369. b'cpu_load_short,host=server01,region=us-west '
  370. b'value=0.64 349415\n',
  371. m.last_request.body,
  372. )
  373. def test_write_points_with_consistency(self):
  374. """Test write points with consistency for TestInfluxDBClient object."""
  375. with requests_mock.Mocker() as m:
  376. m.register_uri(
  377. requests_mock.POST,
  378. 'http://localhost:8086/write',
  379. status_code=204
  380. )
  381. cli = InfluxDBClient(database='db')
  382. cli.write_points(self.dummy_points, consistency='any')
  383. self.assertEqual(
  384. m.last_request.qs,
  385. {'db': ['db'], 'consistency': ['any']}
  386. )
  387. def test_write_points_with_precision_udp(self):
  388. """Test write points with precision for TestInfluxDBClient object."""
  389. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  390. port = random.randint(4000, 8000)
  391. s.bind(('0.0.0.0', port))
  392. cli = InfluxDBClient(
  393. 'localhost', 8086, 'root', 'root',
  394. 'test', use_udp=True, udp_port=port
  395. )
  396. cli.write_points(self.dummy_points, time_precision='n')
  397. received_data, addr = s.recvfrom(1024)
  398. self.assertEqual(
  399. b'cpu_load_short,host=server01,region=us-west '
  400. b'value=0.64 1257894000123456000\n',
  401. received_data,
  402. )
  403. cli.write_points(self.dummy_points, time_precision='u')
  404. received_data, addr = s.recvfrom(1024)
  405. self.assertEqual(
  406. b'cpu_load_short,host=server01,region=us-west '
  407. b'value=0.64 1257894000123456\n',
  408. received_data,
  409. )
  410. cli.write_points(self.dummy_points, time_precision='ms')
  411. received_data, addr = s.recvfrom(1024)
  412. self.assertEqual(
  413. b'cpu_load_short,host=server01,region=us-west '
  414. b'value=0.64 1257894000123\n',
  415. received_data,
  416. )
  417. cli.write_points(self.dummy_points, time_precision='s')
  418. received_data, addr = s.recvfrom(1024)
  419. self.assertEqual(
  420. b"cpu_load_short,host=server01,region=us-west "
  421. b"value=0.64 1257894000\n",
  422. received_data,
  423. )
  424. cli.write_points(self.dummy_points, time_precision='m')
  425. received_data, addr = s.recvfrom(1024)
  426. self.assertEqual(
  427. b'cpu_load_short,host=server01,region=us-west '
  428. b'value=0.64 20964900\n',
  429. received_data,
  430. )
  431. cli.write_points(self.dummy_points, time_precision='h')
  432. received_data, addr = s.recvfrom(1024)
  433. self.assertEqual(
  434. b'cpu_load_short,host=server01,region=us-west '
  435. b'value=0.64 349415\n',
  436. received_data,
  437. )
  438. def test_write_points_bad_precision(self):
  439. """Test write points w/bad precision TestInfluxDBClient object."""
  440. cli = InfluxDBClient()
  441. with self.assertRaisesRegexp(
  442. Exception,
  443. "Invalid time precision is given. "
  444. "\(use 'n', 'u', 'ms', 's', 'm' or 'h'\)"
  445. ):
  446. cli.write_points(
  447. self.dummy_points,
  448. time_precision='g'
  449. )
  450. def test_write_points_bad_consistency(self):
  451. """Test write points w/bad consistency value."""
  452. cli = InfluxDBClient()
  453. with self.assertRaises(ValueError):
  454. cli.write_points(
  455. self.dummy_points,
  456. consistency='boo'
  457. )
  458. @raises(Exception)
  459. def test_write_points_with_precision_fails(self):
  460. """Test write points w/precision fail for TestInfluxDBClient object."""
  461. cli = InfluxDBClient('host', 8086, 'username', 'password', 'db')
  462. with _mocked_session(cli, 'post', 500):
  463. cli.write_points_with_precision([])
  464. def test_query(self):
  465. """Test query method for TestInfluxDBClient object."""
  466. example_response = (
  467. '{"results": [{"series": [{"measurement": "sdfsdfsdf", '
  468. '"columns": ["time", "value"], "values": '
  469. '[["2009-11-10T23:00:00Z", 0.64]]}]}, {"series": '
  470. '[{"measurement": "cpu_load_short", "columns": ["time", "value"], '
  471. '"values": [["2009-11-10T23:00:00Z", 0.64]]}]}]}'
  472. )
  473. with requests_mock.Mocker() as m:
  474. m.register_uri(
  475. requests_mock.GET,
  476. "http://localhost:8086/query",
  477. text=example_response
  478. )
  479. rs = self.cli.query('select * from foo')
  480. self.assertListEqual(
  481. list(rs[0].get_points()),
  482. [{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
  483. )
  484. def test_query_msgpack(self):
  485. """Test query method with a messagepack response."""
  486. example_response = bytes(bytearray.fromhex(
  487. "81a7726573756c74739182ac73746174656d656e745f696400a673657269"
  488. "65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661"
  489. "6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000"
  490. ))
  491. with requests_mock.Mocker() as m:
  492. m.register_uri(
  493. requests_mock.GET,
  494. "http://localhost:8086/query",
  495. request_headers={"Accept": "application/x-msgpack"},
  496. headers={"Content-Type": "application/x-msgpack"},
  497. content=example_response
  498. )
  499. rs = self.cli.query('select * from a')
  500. self.assertListEqual(
  501. list(rs.get_points()),
  502. [{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}]
  503. )
  504. def test_select_into_post(self):
  505. """Test SELECT.*INTO is POSTed."""
  506. example_response = (
  507. '{"results": [{"series": [{"measurement": "sdfsdfsdf", '
  508. '"columns": ["time", "value"], "values": '
  509. '[["2009-11-10T23:00:00Z", 0.64]]}]}, {"series": '
  510. '[{"measurement": "cpu_load_short", "columns": ["time", "value"], '
  511. '"values": [["2009-11-10T23:00:00Z", 0.64]]}]}]}'
  512. )
  513. with requests_mock.Mocker() as m:
  514. m.register_uri(
  515. requests_mock.POST,
  516. "http://localhost:8086/query",
  517. text=example_response
  518. )
  519. rs = self.cli.query('select * INTO newmeas from foo')
  520. self.assertListEqual(
  521. list(rs[0].get_points()),
  522. [{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
  523. )
  524. @unittest.skip('Not implemented for 0.9')
  525. def test_query_chunked(self):
  526. """Test chunked query for TestInfluxDBClient object."""
  527. cli = InfluxDBClient(database='db')
  528. example_object = {
  529. 'points': [
  530. [1415206250119, 40001, 667],
  531. [1415206244555, 30001, 7],
  532. [1415206228241, 20001, 788],
  533. [1415206212980, 10001, 555],
  534. [1415197271586, 10001, 23]
  535. ],
  536. 'measurement': 'foo',
  537. 'columns': [
  538. 'time',
  539. 'sequence_number',
  540. 'val'
  541. ]
  542. }
  543. example_response = \
  544. json.dumps(example_object) + json.dumps(example_object)
  545. with requests_mock.Mocker() as m:
  546. m.register_uri(
  547. requests_mock.GET,
  548. "http://localhost:8086/db/db/series",
  549. text=example_response
  550. )
  551. self.assertListEqual(
  552. cli.query('select * from foo', chunked=True),
  553. [example_object, example_object]
  554. )
  555. @raises(Exception)
  556. def test_query_fail(self):
  557. """Test query failed for TestInfluxDBClient object."""
  558. with _mocked_session(self.cli, 'get', 401):
  559. self.cli.query('select column_one from foo;')
  560. def test_ping(self):
  561. """Test ping querying InfluxDB version."""
  562. with requests_mock.Mocker() as m:
  563. m.register_uri(
  564. requests_mock.GET,
  565. "http://localhost:8086/ping",
  566. status_code=204,
  567. headers={'X-Influxdb-Version': '1.2.3'}
  568. )
  569. version = self.cli.ping()
  570. self.assertEqual(version, '1.2.3')
  571. def test_create_database(self):
  572. """Test create database for TestInfluxDBClient object."""
  573. with requests_mock.Mocker() as m:
  574. m.register_uri(
  575. requests_mock.POST,
  576. "http://localhost:8086/query",
  577. text='{"results":[{}]}'
  578. )
  579. self.cli.create_database('new_db')
  580. self.assertEqual(
  581. m.last_request.qs['q'][0],
  582. 'create database "new_db"'
  583. )
  584. def test_create_numeric_named_database(self):
  585. """Test create db w/numeric name for TestInfluxDBClient object."""
  586. with requests_mock.Mocker() as m:
  587. m.register_uri(
  588. requests_mock.POST,
  589. "http://localhost:8086/query",
  590. text='{"results":[{}]}'
  591. )
  592. self.cli.create_database('123')
  593. self.assertEqual(
  594. m.last_request.qs['q'][0],
  595. 'create database "123"'
  596. )
  597. @raises(Exception)
  598. def test_create_database_fails(self):
  599. """Test create database fail for TestInfluxDBClient object."""
  600. with _mocked_session(self.cli, 'post', 401):
  601. self.cli.create_database('new_db')
  602. def test_drop_database(self):
  603. """Test drop database for TestInfluxDBClient object."""
  604. with requests_mock.Mocker() as m:
  605. m.register_uri(
  606. requests_mock.POST,
  607. "http://localhost:8086/query",
  608. text='{"results":[{}]}'
  609. )
  610. self.cli.drop_database('new_db')
  611. self.assertEqual(
  612. m.last_request.qs['q'][0],
  613. 'drop database "new_db"'
  614. )
  615. def test_drop_measurement(self):
  616. """Test drop measurement for TestInfluxDBClient object."""
  617. with requests_mock.Mocker() as m:
  618. m.register_uri(
  619. requests_mock.POST,
  620. "http://localhost:8086/query",
  621. text='{"results":[{}]}'
  622. )
  623. self.cli.drop_measurement('new_measurement')
  624. self.assertEqual(
  625. m.last_request.qs['q'][0],
  626. 'drop measurement "new_measurement"'
  627. )
  628. def test_drop_numeric_named_database(self):
  629. """Test drop numeric db for TestInfluxDBClient object."""
  630. with requests_mock.Mocker() as m:
  631. m.register_uri(
  632. requests_mock.POST,
  633. "http://localhost:8086/query",
  634. text='{"results":[{}]}'
  635. )
  636. self.cli.drop_database('123')
  637. self.assertEqual(
  638. m.last_request.qs['q'][0],
  639. 'drop database "123"'
  640. )
  641. def test_get_list_database(self):
  642. """Test get list of databases for TestInfluxDBClient object."""
  643. data = {'results': [
  644. {'series': [
  645. {'name': 'databases',
  646. 'values': [
  647. ['new_db_1'],
  648. ['new_db_2']],
  649. 'columns': ['name']}]}
  650. ]}
  651. with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
  652. self.assertListEqual(
  653. self.cli.get_list_database(),
  654. [{'name': 'new_db_1'}, {'name': 'new_db_2'}]
  655. )
  656. @raises(Exception)
  657. def test_get_list_database_fails(self):
  658. """Test get list of dbs fail for TestInfluxDBClient object."""
  659. cli = InfluxDBClient('host', 8086, 'username', 'password')
  660. with _mocked_session(cli, 'get', 401):
  661. cli.get_list_database()
  662. def test_get_list_measurements(self):
  663. """Test get list of measurements for TestInfluxDBClient object."""
  664. data = {
  665. "results": [{
  666. "series": [
  667. {"name": "measurements",
  668. "columns": ["name"],
  669. "values": [["cpu"], ["disk"]
  670. ]}]}
  671. ]
  672. }
  673. with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
  674. self.assertListEqual(
  675. self.cli.get_list_measurements(),
  676. [{'name': 'cpu'}, {'name': 'disk'}]
  677. )
  678. def test_get_list_series(self):
  679. """Test get a list of series from the database."""
  680. data = {'results': [
  681. {'series': [
  682. {
  683. 'values': [
  684. ['cpu_load_short,host=server01,region=us-west'],
  685. ['memory_usage,host=server02,region=us-east']],
  686. 'columns': ['key']
  687. }
  688. ]}
  689. ]}
  690. with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
  691. self.assertListEqual(
  692. self.cli.get_list_series(),
  693. ['cpu_load_short,host=server01,region=us-west',
  694. 'memory_usage,host=server02,region=us-east'])
  695. def test_get_list_series_with_measurement(self):
  696. """Test get a list of series from the database by filter."""
  697. data = {'results': [
  698. {'series': [
  699. {
  700. 'values': [
  701. ['cpu_load_short,host=server01,region=us-west']],
  702. 'columns': ['key']
  703. }
  704. ]}
  705. ]}
  706. with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
  707. self.assertListEqual(
  708. self.cli.get_list_series(measurement='cpu_load_short'),
  709. ['cpu_load_short,host=server01,region=us-west'])
  710. def test_get_list_series_with_tags(self):
  711. """Test get a list of series from the database by tags."""
  712. data = {'results': [
  713. {'series': [
  714. {
  715. 'values': [
  716. ['cpu_load_short,host=server01,region=us-west']],
  717. 'columns': ['key']
  718. }
  719. ]}
  720. ]}
  721. with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
  722. self.assertListEqual(
  723. self.cli.get_list_series(tags={'region': 'us-west'}),
  724. ['cpu_load_short,host=server01,region=us-west'])
  725. @raises(Exception)
  726. def test_get_list_series_fails(self):
  727. """Test get a list of series from the database but fail."""
  728. cli = InfluxDBClient('host', 8086, 'username', 'password')
  729. with _mocked_session(cli, 'get', 401):
  730. cli.get_list_series()
  731. def test_create_retention_policy_default(self):
  732. """Test create default ret policy for TestInfluxDBClient object."""
  733. example_response = '{"results":[{}]}'
  734. with requests_mock.Mocker() as m:
  735. m.register_uri(
  736. requests_mock.POST,
  737. "http://localhost:8086/query",
  738. text=example_response
  739. )
  740. self.cli.create_retention_policy(
  741. 'somename', '1d', 4, default=True, database='db'
  742. )
  743. self.assertEqual(
  744. m.last_request.qs['q'][0],
  745. 'create retention policy "somename" on '
  746. '"db" duration 1d replication 4 shard duration 0s default'
  747. )
  748. def test_create_retention_policy(self):
  749. """Test create retention policy for TestInfluxDBClient object."""
  750. example_response = '{"results":[{}]}'
  751. with requests_mock.Mocker() as m:
  752. m.register_uri(
  753. requests_mock.POST,
  754. "http://localhost:8086/query",
  755. text=example_response
  756. )
  757. self.cli.create_retention_policy(
  758. 'somename', '1d', 4, database='db'
  759. )
  760. self.assertEqual(
  761. m.last_request.qs['q'][0],
  762. 'create retention policy "somename" on '
  763. '"db" duration 1d replication 4 shard duration 0s'
  764. )
  765. def test_create_retention_policy_shard_duration(self):
  766. """Test create retention policy with a custom shard duration."""
  767. example_response = '{"results":[{}]}'
  768. with requests_mock.Mocker() as m:
  769. m.register_uri(
  770. requests_mock.POST,
  771. "http://localhost:8086/query",
  772. text=example_response
  773. )
  774. self.cli.create_retention_policy(
  775. 'somename2', '1d', 4, database='db',
  776. shard_duration='1h'
  777. )
  778. self.assertEqual(
  779. m.last_request.qs['q'][0],
  780. 'create retention policy "somename2" on '
  781. '"db" duration 1d replication 4 shard duration 1h'
  782. )
  783. def test_create_retention_policy_shard_duration_default(self):
  784. """Test create retention policy with a default shard duration."""
  785. example_response = '{"results":[{}]}'
  786. with requests_mock.Mocker() as m:
  787. m.register_uri(
  788. requests_mock.POST,
  789. "http://localhost:8086/query",
  790. text=example_response
  791. )
  792. self.cli.create_retention_policy(
  793. 'somename3', '1d', 4, database='db',
  794. shard_duration='1h', default=True
  795. )
  796. self.assertEqual(
  797. m.last_request.qs['q'][0],
  798. 'create retention policy "somename3" on '
  799. '"db" duration 1d replication 4 shard duration 1h '
  800. 'default'
  801. )
  802. def test_alter_retention_policy(self):
  803. """Test alter retention policy for TestInfluxDBClient object."""
  804. example_response = '{"results":[{}]}'
  805. with requests_mock.Mocker() as m:
  806. m.register_uri(
  807. requests_mock.POST,
  808. "http://localhost:8086/query",
  809. text=example_response
  810. )
  811. # Test alter duration
  812. self.cli.alter_retention_policy('somename', 'db',
  813. duration='4d')
  814. self.assertEqual(
  815. m.last_request.qs['q'][0],
  816. 'alter retention policy "somename" on "db" duration 4d'
  817. )
  818. # Test alter replication
  819. self.cli.alter_retention_policy('somename', 'db',
  820. replication=4)
  821. self.assertEqual(
  822. m.last_request.qs['q'][0],
  823. 'alter retention policy "somename" on "db" replication 4'
  824. )
  825. # Test alter shard duration
  826. self.cli.alter_retention_policy('somename', 'db',
  827. shard_duration='1h')
  828. self.assertEqual(
  829. m.last_request.qs['q'][0],
  830. 'alter retention policy "somename" on "db" shard duration 1h'
  831. )
  832. # Test alter default
  833. self.cli.alter_retention_policy('somename', 'db',
  834. default=True)
  835. self.assertEqual(
  836. m.last_request.qs['q'][0],
  837. 'alter retention policy "somename" on "db" default'
  838. )
  839. @raises(Exception)
  840. def test_alter_retention_policy_invalid(self):
  841. """Test invalid alter ret policy for TestInfluxDBClient object."""
  842. cli = InfluxDBClient('host', 8086, 'username', 'password')
  843. with _mocked_session(cli, 'get', 400):
  844. self.cli.alter_retention_policy('somename', 'db')
  845. def test_drop_retention_policy(self):
  846. """Test drop retention policy for TestInfluxDBClient object."""
  847. example_response = '{"results":[{}]}'
  848. with requests_mock.Mocker() as m:
  849. m.register_uri(
  850. requests_mock.POST,
  851. "http://localhost:8086/query",
  852. text=example_response
  853. )
  854. self.cli.drop_retention_policy('somename', 'db')
  855. self.assertEqual(
  856. m.last_request.qs['q'][0],
  857. 'drop retention policy "somename" on "db"'
  858. )
  859. @raises(Exception)
  860. def test_drop_retention_policy_fails(self):
  861. """Test failed drop ret policy for TestInfluxDBClient object."""
  862. cli = InfluxDBClient('host', 8086, 'username', 'password')
  863. with _mocked_session(cli, 'delete', 401):
  864. cli.drop_retention_policy('default', 'db')
  865. def test_get_list_retention_policies(self):
  866. """Test get retention policies for TestInfluxDBClient object."""
  867. example_response = \
  868. '{"results": [{"series": [{"values": [["fsfdsdf", "24h0m0s", 2]],'\
  869. ' "columns": ["name", "duration", "replicaN"]}]}]}'
  870. with requests_mock.Mocker() as m:
  871. m.register_uri(
  872. requests_mock.GET,
  873. "http://localhost:8086/query",
  874. text=example_response
  875. )
  876. self.assertListEqual(
  877. self.cli.get_list_retention_policies("db"),
  878. [{'duration': '24h0m0s',
  879. 'name': 'fsfdsdf', 'replicaN': 2}]
  880. )
  881. @mock.patch('requests.Session.request')
  882. def test_request_retry(self, mock_request):
  883. """Test that two connection errors will be handled."""
  884. class CustomMock(object):
  885. """Create custom mock object for test."""
  886. def __init__(self):
  887. self.i = 0
  888. def connection_error(self, *args, **kwargs):
  889. """Handle a connection error for the CustomMock object."""
  890. self.i += 1
  891. if self.i < 3:
  892. raise requests.exceptions.ConnectionError
  893. r = requests.Response()
  894. r.status_code = 204
  895. return r
  896. mock_request.side_effect = CustomMock().connection_error
  897. cli = InfluxDBClient(database='db')
  898. cli.write_points(
  899. self.dummy_points
  900. )
  901. @mock.patch('requests.Session.request')
  902. def test_request_retry_raises(self, mock_request):
  903. """Test that three requests errors will not be handled."""
  904. class CustomMock(object):
  905. """Create custom mock object for test."""
  906. def __init__(self):
  907. self.i = 0
  908. def connection_error(self, *args, **kwargs):
  909. """Handle a connection error for the CustomMock object."""
  910. self.i += 1
  911. if self.i < 4:
  912. raise requests.exceptions.HTTPError
  913. else:
  914. r = requests.Response()
  915. r.status_code = 200
  916. return r
  917. mock_request.side_effect = CustomMock().connection_error
  918. cli = InfluxDBClient(database='db')
  919. with self.assertRaises(requests.exceptions.HTTPError):
  920. cli.write_points(self.dummy_points)
  921. @mock.patch('requests.Session.request')
  922. def test_random_request_retry(self, mock_request):
  923. """Test that a random number of connection errors will be handled."""
  924. class CustomMock(object):
  925. """Create custom mock object for test."""
  926. def __init__(self, retries):
  927. self.i = 0
  928. self.retries = retries
  929. def connection_error(self, *args, **kwargs):
  930. """Handle a connection error for the CustomMock object."""
  931. self.i += 1
  932. if self.i < self.retries:
  933. raise requests.exceptions.ConnectionError
  934. else:
  935. r = requests.Response()
  936. r.status_code = 204
  937. return r
  938. retries = random.randint(1, 5)
  939. mock_request.side_effect = CustomMock(retries).connection_error
  940. cli = InfluxDBClient(database='db', retries=retries)
  941. cli.write_points(self.dummy_points)
  942. @mock.patch('requests.Session.request')
  943. def test_random_request_retry_raises(self, mock_request):
  944. """Test a random number of conn errors plus one will not be handled."""
  945. class CustomMock(object):
  946. """Create custom mock object for test."""
  947. def __init__(self, retries):
  948. self.i = 0
  949. self.retries = retries
  950. def connection_error(self, *args, **kwargs):
  951. """Handle a connection error for the CustomMock object."""
  952. self.i += 1
  953. if self.i < self.retries + 1:
  954. raise requests.exceptions.ConnectionError
  955. else:
  956. r = requests.Response()
  957. r.status_code = 200
  958. return r
  959. retries = random.randint(1, 5)
  960. mock_request.side_effect = CustomMock(retries).connection_error
  961. cli = InfluxDBClient(database='db', retries=retries)
  962. with self.assertRaises(requests.exceptions.ConnectionError):
  963. cli.write_points(self.dummy_points)
  964. def test_get_list_users(self):
  965. """Test get users for TestInfluxDBClient object."""
  966. example_response = (
  967. '{"results":[{"series":[{"columns":["user","admin"],'
  968. '"values":[["test",false]]}]}]}'
  969. )
  970. with requests_mock.Mocker() as m:
  971. m.register_uri(
  972. requests_mock.GET,
  973. "http://localhost:8086/query",
  974. text=example_response
  975. )
  976. self.assertListEqual(
  977. self.cli.get_list_users(),
  978. [{'user': 'test', 'admin': False}]
  979. )
  980. def test_get_list_users_empty(self):
  981. """Test get empty userlist for TestInfluxDBClient object."""
  982. example_response = (
  983. '{"results":[{"series":[{"columns":["user","admin"]}]}]}'
  984. )
  985. with requests_mock.Mocker() as m:
  986. m.register_uri(
  987. requests_mock.GET,
  988. "http://localhost:8086/query",
  989. text=example_response
  990. )
  991. self.assertListEqual(self.cli.get_list_users(), [])
  992. def test_grant_admin_privileges(self):
  993. """Test grant admin privs for TestInfluxDBClient object."""
  994. example_response = '{"results":[{}]}'
  995. with requests_mock.Mocker() as m:
  996. m.register_uri(
  997. requests_mock.POST,
  998. "http://localhost:8086/query",
  999. text=example_response
  1000. )
  1001. self.cli.grant_admin_privileges('test')
  1002. self.assertEqual(
  1003. m.last_request.qs['q'][0],
  1004. 'grant all privileges to "test"'
  1005. )
  1006. @raises(Exception)
  1007. def test_grant_admin_privileges_invalid(self):
  1008. """Test grant invalid admin privs for TestInfluxDBClient object."""
  1009. cli = InfluxDBClient('host', 8086, 'username', 'password')
  1010. with _mocked_session(cli, 'get', 400):
  1011. self.cli.grant_admin_privileges('')
  1012. def test_revoke_admin_privileges(self):
  1013. """Test revoke admin privs for TestInfluxDBClient object."""
  1014. example_response = '{"results":[{}]}'
  1015. with requests_mock.Mocker() as m:
  1016. m.register_uri(
  1017. requests_mock.POST,
  1018. "http://localhost:8086/query",
  1019. text=example_response
  1020. )
  1021. self.cli.revoke_admin_privileges('test')
  1022. self.assertEqual(
  1023. m.last_request.qs['q'][0],
  1024. 'revoke all privileges from "test"'
  1025. )
  1026. @raises(Exception)
  1027. def test_revoke_admin_privileges_invalid(self):
  1028. """Test revoke invalid admin privs for TestInfluxDBClient object."""
  1029. cli = InfluxDBClient('host', 8086, 'username', 'password')
  1030. with _mocked_session(cli, 'get', 400):
  1031. self.cli.revoke_admin_privileges('')
  1032. def test_grant_privilege(self):
  1033. """Test grant privs for TestInfluxDBClient object."""
  1034. example_response = '{"results":[{}]}'
  1035. with requests_mock.Mocker() as m:
  1036. m.register_uri(
  1037. requests_mock.POST,
  1038. "http://localhost:8086/query",
  1039. text=example_response
  1040. )
  1041. self.cli.grant_privilege('read', 'testdb', 'test')
  1042. self.assertEqual(
  1043. m.last_request.qs['q'][0],
  1044. 'grant read on "testdb" to "test"'
  1045. )
  1046. @raises(Exception)
  1047. def test_grant_privilege_invalid(self):
  1048. """Test grant invalid privs for TestInfluxDBClient object."""
  1049. cli = InfluxDBClient('host', 8086, 'username', 'password')
  1050. with _mocked_session(cli, 'get', 400):
  1051. self.cli.grant_privilege('', 'testdb', 'test')
  1052. def test_revoke_privilege(self):
  1053. """Test revoke privs for TestInfluxDBClient object."""
  1054. example_response = '{"results":[{}]}'
  1055. with requests_mock.Mocker() as m:
  1056. m.register_uri(
  1057. requests_mock.POST,
  1058. "http://localhost:8086/query",
  1059. text=example_response
  1060. )
  1061. self.cli.revoke_privilege('read', 'testdb', 'test')
  1062. self.assertEqual(
  1063. m.last_request.qs['q'][0],
  1064. 'revoke read on "testdb" from "test"'
  1065. )
  1066. @raises(Exception)
  1067. def test_revoke_privilege_invalid(self):
  1068. """Test revoke invalid privs for TestInfluxDBClient object."""
  1069. cli = InfluxDBClient('host', 8086, 'username', 'password')
  1070. with _mocked_session(cli, 'get', 400):
  1071. self.cli.revoke_privilege('', 'testdb', 'test')
  1072. def test_get_list_privileges(self):
  1073. """Test get list of privs for TestInfluxDBClient object."""
  1074. data = {'results': [
  1075. {'series': [
  1076. {'columns': ['database', 'privilege'],
  1077. 'values': [
  1078. ['db1', 'READ'],
  1079. ['db2', 'ALL PRIVILEGES'],
  1080. ['db3', 'NO PRIVILEGES']]}
  1081. ]}
  1082. ]}
  1083. with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
  1084. self.assertListEqual(
  1085. self.cli.get_list_privileges('test'),
  1086. [{'database': 'db1', 'privilege': 'READ'},
  1087. {'database': 'db2', 'privilege': 'ALL PRIVILEGES'},
  1088. {'database': 'db3', 'privilege': 'NO PRIVILEGES'}]
  1089. )
  1090. @raises(Exception)
  1091. def test_get_list_privileges_fails(self):
  1092. """Test failed get list of privs for TestInfluxDBClient object."""
  1093. cli = InfluxDBClient('host', 8086, 'username', 'password')
  1094. with _mocked_session(cli, 'get', 401):
  1095. cli.get_list_privileges('test')
  1096. def test_get_list_continuous_queries(self):
  1097. """Test getting a list of continuous queries."""
  1098. data = {
  1099. "results": [
  1100. {
  1101. "statement_id": 0,
  1102. "series": [
  1103. {
  1104. "name": "testdb01",
  1105. "columns": ["name", "query"],
  1106. "values": [["testname01", "testquery01"],
  1107. ["testname02", "testquery02"]]
  1108. },
  1109. {
  1110. "name": "testdb02",
  1111. "columns": ["name", "query"],
  1112. "values": [["testname03", "testquery03"]]
  1113. },
  1114. {
  1115. "name": "testdb03",
  1116. "columns": ["name", "query"]
  1117. }
  1118. ]
  1119. }
  1120. ]
  1121. }
  1122. with _mocked_session(self.cli, 'get', 200, json.dumps(data)):
  1123. self.assertListEqual(
  1124. self.cli.get_list_continuous_queries(),
  1125. [
  1126. {
  1127. 'testdb01': [
  1128. {'name': 'testname01', 'query': 'testquery01'},
  1129. {'name': 'testname02', 'query': 'testquery02'}
  1130. ]
  1131. },
  1132. {
  1133. 'testdb02': [
  1134. {'name': 'testname03', 'query': 'testquery03'}
  1135. ]
  1136. },
  1137. {
  1138. 'testdb03': []
  1139. }
  1140. ]
  1141. )
  1142. @raises(Exception)
  1143. def test_get_list_continuous_queries_fails(self):
  1144. """Test failing to get a list of continuous queries."""
  1145. with _mocked_session(self.cli, 'get', 400):
  1146. self.cli.get_list_continuous_queries()
  1147. def test_create_continuous_query(self):
  1148. """Test continuous query creation."""
  1149. data = {"results": [{}]}
  1150. with requests_mock.Mocker() as m:
  1151. m.register_uri(
  1152. requests_mock.GET,
  1153. "http://localhost:8086/query",
  1154. text=json.dumps(data)
  1155. )
  1156. query = 'SELECT count("value") INTO "6_months"."events" FROM ' \
  1157. '"events" GROUP BY time(10m)'
  1158. self.cli.create_continuous_query('cq_name', query, 'db_name')
  1159. self.assertEqual(
  1160. m.last_request.qs['q'][0],
  1161. 'create continuous query "cq_name" on "db_name" begin select '
  1162. 'count("value") into "6_months"."events" from "events" group '
  1163. 'by time(10m) end'
  1164. )
  1165. self.cli.create_continuous_query('cq_name', query, 'db_name',
  1166. 'EVERY 10s FOR 2m')
  1167. self.assertEqual(
  1168. m.last_request.qs['q'][0],
  1169. 'create continuous query "cq_name" on "db_name" resample '
  1170. 'every 10s for 2m begin select count("value") into '
  1171. '"6_months"."events" from "events" group by time(10m) end'
  1172. )
  1173. @raises(Exception)
  1174. def test_create_continuous_query_fails(self):
  1175. """Test failing to create a continuous query."""
  1176. with _mocked_session(self.cli, 'get', 400):
  1177. self.cli.create_continuous_query('cq_name', 'select', 'db_name')
  1178. def test_drop_continuous_query(self):
  1179. """Test dropping a continuous query."""
  1180. data = {"results": [{}]}
  1181. with requests_mock.Mocker() as m:
  1182. m.register_uri(
  1183. requests_mock.GET,
  1184. "http://localhost:8086/query",
  1185. text=json.dumps(data)
  1186. )
  1187. self.cli.drop_continuous_query('cq_name', 'db_name')
  1188. self.assertEqual(
  1189. m.last_request.qs['q'][0],
  1190. 'drop continuous query "cq_name" on "db_name"'
  1191. )
  1192. @raises(Exception)
  1193. def test_drop_continuous_query_fails(self):
  1194. """Test failing to drop a continuous query."""
  1195. with _mocked_session(self.cli, 'get', 400):
  1196. self.cli.drop_continuous_query('cq_name', 'db_name')
  1197. def test_invalid_port_fails(self):
  1198. """Test invalid port fail for TestInfluxDBClient object."""
  1199. with self.assertRaises(ValueError):
  1200. InfluxDBClient('host', '80/redir', 'username', 'password')
  1201. def test_chunked_response(self):
  1202. """Test chunked reponse for TestInfluxDBClient object."""
  1203. example_response = \
  1204. u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \
  1205. '"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \
  1206. 'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \
  1207. '"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \
  1208. '["df"],["mount"]]}]}]}\n'
  1209. with requests_mock.Mocker() as m:
  1210. m.register_uri(
  1211. requests_mock.GET,
  1212. "http://localhost:8086/query",
  1213. text=example_response
  1214. )
  1215. response = self.cli.query('show series',
  1216. chunked=True, chunk_size=4)
  1217. res = list(response)
  1218. self.assertTrue(len(res) == 2)
  1219. self.assertEqual(res[0].__repr__(), ResultSet(
  1220. {'series': [{
  1221. 'columns': ['key'],
  1222. 'values': [['cpu'], ['memory'], ['iops'], ['network']]
  1223. }]}).__repr__())
  1224. self.assertEqual(res[1].__repr__(), ResultSet(
  1225. {'series': [{
  1226. 'columns': ['key'],
  1227. 'values': [['qps'], ['uptime'], ['df'], ['mount']]
  1228. }]}).__repr__())
  1229. class FakeClient(InfluxDBClient):
  1230. """Set up a fake client instance of InfluxDBClient."""
  1231. def __init__(self, *args, **kwargs):
  1232. """Initialize an instance of the FakeClient object."""
  1233. super(FakeClient, self).__init__(*args, **kwargs)
  1234. def query(self,
  1235. query,
  1236. params=None,
  1237. expected_response_code=200,
  1238. database=None):
  1239. """Query data from the FakeClient object."""
  1240. if query == 'Fail':
  1241. raise Exception("Fail")
  1242. elif query == 'Fail once' and self._host == 'host1':
  1243. raise Exception("Fail Once")
  1244. elif query == 'Fail twice' and self._host in 'host1 host2':
  1245. raise Exception("Fail Twice")
  1246. else:
  1247. return "Success"