client.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242
  1. # -*- coding: utf-8 -*-
  2. """Python client for InfluxDB."""
  3. from __future__ import absolute_import
  4. from __future__ import division
  5. from __future__ import print_function
  6. from __future__ import unicode_literals
  7. import datetime
  8. import gzip
  9. import itertools
  10. import io
  11. import json
  12. import random
  13. import socket
  14. import struct
  15. import time
  16. from itertools import chain, islice
  17. import msgpack
  18. import requests
  19. import requests.exceptions
  20. from six.moves.urllib.parse import urlparse
  21. from influxdb.line_protocol import make_lines, quote_ident, quote_literal
  22. from influxdb.resultset import ResultSet
  23. from .exceptions import InfluxDBClientError
  24. from .exceptions import InfluxDBServerError
  25. class InfluxDBClient(object):
  26. """InfluxDBClient primary client object to connect InfluxDB.
  27. The :class:`~.InfluxDBClient` object holds information necessary to
  28. connect to InfluxDB. Requests can be made to InfluxDB directly through
  29. the client.
  30. The client supports the use as a `context manager
  31. <https://docs.python.org/3/reference/datamodel.html#context-managers>`_.
  32. :param host: hostname to connect to InfluxDB, defaults to 'localhost'
  33. :type host: str
  34. :param port: port to connect to InfluxDB, defaults to 8086
  35. :type port: int
  36. :param username: user to connect, defaults to 'root'
  37. :type username: str
  38. :param password: password of the user, defaults to 'root'
  39. :type password: str
  40. :param pool_size: urllib3 connection pool size, defaults to 10.
  41. :type pool_size: int
  42. :param database: database name to connect to, defaults to None
  43. :type database: str
  44. :param ssl: use https instead of http to connect to InfluxDB, defaults to
  45. False
  46. :type ssl: bool
  47. :param verify_ssl: verify SSL certificates for HTTPS requests, defaults to
  48. False
  49. :type verify_ssl: bool
  50. :param timeout: number of seconds Requests will wait for your client to
  51. establish a connection, defaults to None
  52. :type timeout: int
  53. :param retries: number of retries your client will try before aborting,
  54. defaults to 3. 0 indicates try until success
  55. :type retries: int
  56. :param use_udp: use UDP to connect to InfluxDB, defaults to False
  57. :type use_udp: bool
  58. :param udp_port: UDP port to connect to InfluxDB, defaults to 4444
  59. :type udp_port: int
  60. :param proxies: HTTP(S) proxy to use for Requests, defaults to {}
  61. :type proxies: dict
  62. :param path: path of InfluxDB on the server to connect, defaults to ''
  63. :type path: str
  64. :param cert: Path to client certificate information to use for mutual TLS
  65. authentication. You can specify a local cert to use
  66. as a single file containing the private key and the certificate, or as
  67. a tuple of both files’ paths, defaults to None
  68. :type cert: str
  69. :param gzip: use gzip content encoding to compress requests
  70. :type gzip: bool
  71. :param session: allow for the new client request to use an existing
  72. requests Session, defaults to None
  73. :type session: requests.Session
  74. :raises ValueError: if cert is provided but ssl is disabled (set to False)
  75. """
  76. def __init__(self,
  77. host='localhost',
  78. port=8086,
  79. username='root',
  80. password='root',
  81. database=None,
  82. ssl=False,
  83. verify_ssl=False,
  84. timeout=None,
  85. retries=3,
  86. use_udp=False,
  87. udp_port=4444,
  88. proxies=None,
  89. pool_size=10,
  90. path='',
  91. cert=None,
  92. gzip=False,
  93. session=None,
  94. ):
  95. """Construct a new InfluxDBClient object."""
  96. self.__host = host
  97. self.__port = int(port)
  98. self._username = username
  99. self._password = password
  100. self._database = database
  101. self._timeout = timeout
  102. self._retries = retries
  103. self._verify_ssl = verify_ssl
  104. self.__use_udp = use_udp
  105. self.__udp_port = int(udp_port)
  106. if not session:
  107. session = requests.Session()
  108. self._session = session
  109. adapter = requests.adapters.HTTPAdapter(
  110. pool_connections=int(pool_size),
  111. pool_maxsize=int(pool_size)
  112. )
  113. if use_udp:
  114. self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  115. if not path:
  116. self.__path = ''
  117. elif path[0] == '/':
  118. self.__path = path
  119. else:
  120. self.__path = '/' + path
  121. self._scheme = "http"
  122. if ssl is True:
  123. self._scheme = "https"
  124. self._session.mount(self._scheme + '://', adapter)
  125. if proxies is None:
  126. self._proxies = {}
  127. else:
  128. self._proxies = proxies
  129. if cert:
  130. if not ssl:
  131. raise ValueError(
  132. "Client certificate provided but ssl is disabled."
  133. )
  134. else:
  135. self._session.cert = cert
  136. self.__baseurl = "{0}://{1}:{2}{3}".format(
  137. self._scheme,
  138. self._host,
  139. self._port,
  140. self._path)
  141. self._headers = {
  142. 'Content-Type': 'application/json',
  143. 'Accept': 'application/x-msgpack'
  144. }
  145. self._gzip = gzip
  146. def __enter__(self):
  147. """Enter function as used by context manager."""
  148. pass
  149. def __exit__(self, _exc_type, _exc_value, _traceback):
  150. """Exit function as used by context manager."""
  151. self.close()
  152. @property
  153. def _baseurl(self):
  154. return self.__baseurl
  155. @property
  156. def _host(self):
  157. return self.__host
  158. @property
  159. def _port(self):
  160. return self.__port
  161. @property
  162. def _path(self):
  163. return self.__path
  164. @property
  165. def _udp_port(self):
  166. return self.__udp_port
  167. @property
  168. def _use_udp(self):
  169. return self.__use_udp
  170. @classmethod
  171. def from_dsn(cls, dsn, **kwargs):
  172. r"""Generate an instance of InfluxDBClient from given data source name.
  173. Return an instance of :class:`~.InfluxDBClient` from the provided
  174. data source name. Supported schemes are "influxdb", "https+influxdb"
  175. and "udp+influxdb". Parameters for the :class:`~.InfluxDBClient`
  176. constructor may also be passed to this method.
  177. :param dsn: data source name
  178. :type dsn: string
  179. :param kwargs: additional parameters for `InfluxDBClient`
  180. :type kwargs: dict
  181. :raises ValueError: if the provided DSN has any unexpected values
  182. :Example:
  183. ::
  184. >> cli = InfluxDBClient.from_dsn('influxdb://username:password@\
  185. localhost:8086/databasename', timeout=5)
  186. >> type(cli)
  187. <class 'influxdb.client.InfluxDBClient'>
  188. >> cli = InfluxDBClient.from_dsn('udp+influxdb://username:pass@\
  189. localhost:8086/databasename', timeout=5, udp_port=159)
  190. >> print('{0._baseurl} - {0.use_udp} {0.udp_port}'.format(cli))
  191. http://localhost:8086 - True 159
  192. .. note:: parameters provided in `**kwargs` may override dsn parameters
  193. .. note:: when using "udp+influxdb" the specified port (if any) will
  194. be used for the TCP connection; specify the UDP port with the
  195. additional `udp_port` parameter (cf. examples).
  196. """
  197. init_args = _parse_dsn(dsn)
  198. host, port = init_args.pop('hosts')[0]
  199. init_args['host'] = host
  200. init_args['port'] = port
  201. init_args.update(kwargs)
  202. return cls(**init_args)
  203. def switch_database(self, database):
  204. """Change the client's database.
  205. :param database: the name of the database to switch to
  206. :type database: str
  207. """
  208. self._database = database
  209. def switch_user(self, username, password):
  210. """Change the client's username.
  211. :param username: the username to switch to
  212. :type username: str
  213. :param password: the password for the username
  214. :type password: str
  215. """
  216. self._username = username
  217. self._password = password
  218. def request(self, url, method='GET', params=None, data=None, stream=False,
  219. expected_response_code=200, headers=None):
  220. """Make a HTTP request to the InfluxDB API.
  221. :param url: the path of the HTTP request, e.g. write, query, etc.
  222. :type url: str
  223. :param method: the HTTP method for the request, defaults to GET
  224. :type method: str
  225. :param params: additional parameters for the request, defaults to None
  226. :type params: dict
  227. :param data: the data of the request, defaults to None
  228. :type data: str
  229. :param stream: True if a query uses chunked responses
  230. :type stream: bool
  231. :param expected_response_code: the expected response code of
  232. the request, defaults to 200
  233. :type expected_response_code: int
  234. :param headers: headers to add to the request
  235. :type headers: dict
  236. :returns: the response from the request
  237. :rtype: :class:`requests.Response`
  238. :raises InfluxDBServerError: if the response code is any server error
  239. code (5xx)
  240. :raises InfluxDBClientError: if the response code is not the
  241. same as `expected_response_code` and is not a server error code
  242. """
  243. url = "{0}/{1}".format(self._baseurl, url)
  244. if headers is None:
  245. headers = self._headers
  246. if params is None:
  247. params = {}
  248. if isinstance(data, (dict, list)):
  249. data = json.dumps(data)
  250. if self._gzip:
  251. # Receive and send compressed data
  252. headers.update({
  253. 'Accept-Encoding': 'gzip',
  254. 'Content-Encoding': 'gzip',
  255. })
  256. if data is not None:
  257. # For Py 2.7 compatability use Gzipfile
  258. compressed = io.BytesIO()
  259. with gzip.GzipFile(
  260. compresslevel=9,
  261. fileobj=compressed,
  262. mode='w'
  263. ) as f:
  264. f.write(data)
  265. data = compressed.getvalue()
  266. # Try to send the request more than once by default (see #103)
  267. retry = True
  268. _try = 0
  269. while retry:
  270. try:
  271. response = self._session.request(
  272. method=method,
  273. url=url,
  274. auth=(self._username, self._password),
  275. params=params,
  276. data=data,
  277. stream=stream,
  278. headers=headers,
  279. proxies=self._proxies,
  280. verify=self._verify_ssl,
  281. timeout=self._timeout
  282. )
  283. break
  284. except (requests.exceptions.ConnectionError,
  285. requests.exceptions.HTTPError,
  286. requests.exceptions.Timeout):
  287. _try += 1
  288. if self._retries != 0:
  289. retry = _try < self._retries
  290. if method == "POST":
  291. time.sleep((2 ** _try) * random.random() / 100.0)
  292. if not retry:
  293. raise
  294. type_header = response.headers and response.headers.get("Content-Type")
  295. if type_header == "application/x-msgpack" and response.content:
  296. response._msgpack = msgpack.unpackb(
  297. packed=response.content,
  298. ext_hook=_msgpack_parse_hook,
  299. raw=False)
  300. else:
  301. response._msgpack = None
  302. def reformat_error(response):
  303. if response._msgpack:
  304. return json.dumps(response._msgpack, separators=(',', ':'))
  305. else:
  306. return response.content
  307. # if there's not an error, there must have been a successful response
  308. if 500 <= response.status_code < 600:
  309. raise InfluxDBServerError(reformat_error(response))
  310. elif response.status_code == expected_response_code:
  311. return response
  312. else:
  313. err_msg = reformat_error(response)
  314. raise InfluxDBClientError(err_msg, response.status_code)
  315. def write(self, data, params=None, expected_response_code=204,
  316. protocol='json'):
  317. """Write data to InfluxDB.
  318. :param data: the data to be written
  319. :type data: (if protocol is 'json') dict
  320. (if protocol is 'line') sequence of line protocol strings
  321. or single string
  322. :param params: additional parameters for the request, defaults to None
  323. :type params: dict
  324. :param expected_response_code: the expected response code of the write
  325. operation, defaults to 204
  326. :type expected_response_code: int
  327. :param protocol: protocol of input data, either 'json' or 'line'
  328. :type protocol: str
  329. :returns: True, if the write operation is successful
  330. :rtype: bool
  331. """
  332. headers = self._headers
  333. headers['Content-Type'] = 'application/octet-stream'
  334. if params:
  335. precision = params.get('precision')
  336. else:
  337. precision = None
  338. if protocol == 'json':
  339. data = make_lines(data, precision).encode('utf-8')
  340. elif protocol == 'line':
  341. if isinstance(data, str):
  342. data = [data]
  343. data = ('\n'.join(data) + '\n').encode('utf-8')
  344. self.request(
  345. url="write",
  346. method='POST',
  347. params=params,
  348. data=data,
  349. expected_response_code=expected_response_code,
  350. headers=headers
  351. )
  352. return True
  353. @staticmethod
  354. def _read_chunked_response(response, raise_errors=True):
  355. for line in response.iter_lines():
  356. if isinstance(line, bytes):
  357. line = line.decode('utf-8')
  358. data = json.loads(line)
  359. result_set = {}
  360. for result in data.get('results', []):
  361. for _key in result:
  362. if isinstance(result[_key], list):
  363. result_set.setdefault(
  364. _key, []).extend(result[_key])
  365. yield ResultSet(result_set, raise_errors=raise_errors)
  366. def query(self,
  367. query,
  368. params=None,
  369. bind_params=None,
  370. epoch=None,
  371. expected_response_code=200,
  372. database=None,
  373. raise_errors=True,
  374. chunked=False,
  375. chunk_size=0,
  376. method="GET"):
  377. """Send a query to InfluxDB.
  378. .. danger::
  379. In order to avoid injection vulnerabilities (similar to `SQL
  380. injection <https://www.owasp.org/index.php/SQL_Injection>`_
  381. vulnerabilities), do not directly include untrusted data into the
  382. ``query`` parameter, use ``bind_params`` instead.
  383. :param query: the actual query string
  384. :type query: str
  385. :param params: additional parameters for the request,
  386. defaults to {}
  387. :type params: dict
  388. :param bind_params: bind parameters for the query:
  389. any variable in the query written as ``'$var_name'`` will be
  390. replaced with ``bind_params['var_name']``. Only works in the
  391. ``WHERE`` clause and takes precedence over ``params['params']``
  392. :type bind_params: dict
  393. :param epoch: response timestamps to be in epoch format either 'h',
  394. 'm', 's', 'ms', 'u', or 'ns',defaults to `None` which is
  395. RFC3339 UTC format with nanosecond precision
  396. :type epoch: str
  397. :param expected_response_code: the expected status code of response,
  398. defaults to 200
  399. :type expected_response_code: int
  400. :param database: database to query, defaults to None
  401. :type database: str
  402. :param raise_errors: Whether or not to raise exceptions when InfluxDB
  403. returns errors, defaults to True
  404. :type raise_errors: bool
  405. :param chunked: Enable to use chunked responses from InfluxDB.
  406. With ``chunked`` enabled, one ResultSet is returned per chunk
  407. containing all results within that chunk
  408. :type chunked: bool
  409. :param chunk_size: Size of each chunk to tell InfluxDB to use.
  410. :type chunk_size: int
  411. :param method: the HTTP method for the request, defaults to GET
  412. :type method: str
  413. :returns: the queried data
  414. :rtype: :class:`~.ResultSet`
  415. """
  416. if params is None:
  417. params = {}
  418. if bind_params is not None:
  419. params_dict = json.loads(params.get('params', '{}'))
  420. params_dict.update(bind_params)
  421. params['params'] = json.dumps(params_dict)
  422. params['q'] = query
  423. params['db'] = database or self._database
  424. if epoch is not None:
  425. params['epoch'] = epoch
  426. if chunked:
  427. params['chunked'] = 'true'
  428. if chunk_size > 0:
  429. params['chunk_size'] = chunk_size
  430. if query.lower().startswith("select ") and " into " in query.lower():
  431. method = "POST"
  432. response = self.request(
  433. url="query",
  434. method=method,
  435. params=params,
  436. data=None,
  437. stream=chunked,
  438. expected_response_code=expected_response_code
  439. )
  440. data = response._msgpack
  441. if not data:
  442. if chunked:
  443. return self._read_chunked_response(response)
  444. data = response.json()
  445. results = [
  446. ResultSet(result, raise_errors=raise_errors)
  447. for result
  448. in data.get('results', [])
  449. ]
  450. # TODO(aviau): Always return a list. (This would be a breaking change)
  451. if len(results) == 1:
  452. return results[0]
  453. return results
  454. def write_points(self,
  455. points,
  456. time_precision=None,
  457. database=None,
  458. retention_policy=None,
  459. tags=None,
  460. batch_size=None,
  461. protocol='json',
  462. consistency=None
  463. ):
  464. """Write to multiple time series names.
  465. :param points: the list of points to be written in the database
  466. :type points: list of dictionaries, each dictionary represents a point
  467. :type points: (if protocol is 'json') list of dicts, where each dict
  468. represents a point.
  469. (if protocol is 'line') sequence of line protocol strings.
  470. :param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None
  471. :type time_precision: str
  472. :param database: the database to write the points to. Defaults to
  473. the client's current database
  474. :type database: str
  475. :param tags: a set of key-value pairs associated with each point. Both
  476. keys and values must be strings. These are shared tags and will be
  477. merged with point-specific tags, defaults to None
  478. :type tags: dict
  479. :param retention_policy: the retention policy for the points. Defaults
  480. to None
  481. :type retention_policy: str
  482. :param batch_size: value to write the points in batches
  483. instead of all at one time. Useful for when doing data dumps from
  484. one database to another or when doing a massive write operation,
  485. defaults to None
  486. :type batch_size: int
  487. :param protocol: Protocol for writing data. Either 'line' or 'json'.
  488. :type protocol: str
  489. :param consistency: Consistency for the points.
  490. One of {'any','one','quorum','all'}.
  491. :type consistency: str
  492. :returns: True, if the operation is successful
  493. :rtype: bool
  494. .. note:: if no retention policy is specified, the default retention
  495. policy for the database is used
  496. """
  497. if batch_size and batch_size > 0:
  498. for batch in self._batches(points, batch_size):
  499. self._write_points(points=batch,
  500. time_precision=time_precision,
  501. database=database,
  502. retention_policy=retention_policy,
  503. tags=tags, protocol=protocol,
  504. consistency=consistency)
  505. return True
  506. return self._write_points(points=points,
  507. time_precision=time_precision,
  508. database=database,
  509. retention_policy=retention_policy,
  510. tags=tags, protocol=protocol,
  511. consistency=consistency)
  512. def ping(self):
  513. """Check connectivity to InfluxDB.
  514. :returns: The version of the InfluxDB the client is connected to
  515. """
  516. response = self.request(
  517. url="ping",
  518. method='GET',
  519. expected_response_code=204
  520. )
  521. return response.headers['X-Influxdb-Version']
  522. @staticmethod
  523. def _batches(iterable, size):
  524. # Iterate over an iterable producing iterables of batches. Based on:
  525. # http://code.activestate.com/recipes/303279-getting-items-in-batches/
  526. iterator = iter(iterable)
  527. while True:
  528. try: # Try get the first element in the iterator...
  529. head = (next(iterator),)
  530. except StopIteration:
  531. return # ...so that we can stop if there isn't one
  532. # Otherwise, lazily slice the rest of the batch
  533. rest = islice(iterator, size - 1)
  534. yield chain(head, rest)
  535. def _write_points(self,
  536. points,
  537. time_precision,
  538. database,
  539. retention_policy,
  540. tags,
  541. protocol='json',
  542. consistency=None):
  543. if time_precision not in ['n', 'u', 'ms', 's', 'm', 'h', None]:
  544. raise ValueError(
  545. "Invalid time precision is given. "
  546. "(use 'n', 'u', 'ms', 's', 'm' or 'h')")
  547. if consistency not in ['any', 'one', 'quorum', 'all', None]:
  548. raise ValueError('Invalid consistency: {}'.format(consistency))
  549. if protocol == 'json':
  550. data = {
  551. 'points': points
  552. }
  553. if tags is not None:
  554. data['tags'] = tags
  555. else:
  556. data = points
  557. params = {
  558. 'db': database or self._database
  559. }
  560. if consistency is not None:
  561. params['consistency'] = consistency
  562. if time_precision is not None:
  563. params['precision'] = time_precision
  564. if retention_policy is not None:
  565. params['rp'] = retention_policy
  566. if self._use_udp:
  567. self.send_packet(
  568. data, protocol=protocol, time_precision=time_precision
  569. )
  570. else:
  571. self.write(
  572. data=data,
  573. params=params,
  574. expected_response_code=204,
  575. protocol=protocol
  576. )
  577. return True
  578. def get_list_database(self):
  579. """Get the list of databases in InfluxDB.
  580. :returns: all databases in InfluxDB
  581. :rtype: list of dictionaries
  582. :Example:
  583. ::
  584. >> dbs = client.get_list_database()
  585. >> dbs
  586. [{u'name': u'db1'}, {u'name': u'db2'}, {u'name': u'db3'}]
  587. """
  588. return list(self.query("SHOW DATABASES").get_points())
  589. def get_list_series(self, database=None, measurement=None, tags=None):
  590. """
  591. Query SHOW SERIES returns the distinct series in your database.
  592. FROM and WHERE clauses are optional.
  593. :param measurement: Show all series from a measurement
  594. :type id: string
  595. :param tags: Show all series that match given tags
  596. :type id: dict
  597. :param database: the database from which the series should be
  598. shows, defaults to client's current database
  599. :type database: str
  600. """
  601. database = database or self._database
  602. query_str = 'SHOW SERIES'
  603. if measurement:
  604. query_str += ' FROM "{0}"'.format(measurement)
  605. if tags:
  606. query_str += ' WHERE ' + ' and '.join(["{0}='{1}'".format(k, v)
  607. for k, v in tags.items()])
  608. return list(
  609. itertools.chain.from_iterable(
  610. [
  611. x.values()
  612. for x in (self.query(query_str, database=database)
  613. .get_points())
  614. ]
  615. )
  616. )
  617. def create_database(self, dbname):
  618. """Create a new database in InfluxDB.
  619. :param dbname: the name of the database to create
  620. :type dbname: str
  621. """
  622. self.query("CREATE DATABASE {0}".format(quote_ident(dbname)),
  623. method="POST")
  624. def drop_database(self, dbname):
  625. """Drop a database from InfluxDB.
  626. :param dbname: the name of the database to drop
  627. :type dbname: str
  628. """
  629. self.query("DROP DATABASE {0}".format(quote_ident(dbname)),
  630. method="POST")
  631. def get_list_measurements(self):
  632. """Get the list of measurements in InfluxDB.
  633. :returns: all measurements in InfluxDB
  634. :rtype: list of dictionaries
  635. :Example:
  636. ::
  637. >> dbs = client.get_list_measurements()
  638. >> dbs
  639. [{u'name': u'measurements1'},
  640. {u'name': u'measurements2'},
  641. {u'name': u'measurements3'}]
  642. """
  643. return list(self.query("SHOW MEASUREMENTS").get_points())
  644. def drop_measurement(self, measurement):
  645. """Drop a measurement from InfluxDB.
  646. :param measurement: the name of the measurement to drop
  647. :type measurement: str
  648. """
  649. self.query("DROP MEASUREMENT {0}".format(quote_ident(measurement)),
  650. method="POST")
  651. def create_retention_policy(self, name, duration, replication,
  652. database=None,
  653. default=False, shard_duration="0s"):
  654. """Create a retention policy for a database.
  655. :param name: the name of the new retention policy
  656. :type name: str
  657. :param duration: the duration of the new retention policy.
  658. Durations such as 1h, 90m, 12h, 7d, and 4w, are all supported
  659. and mean 1 hour, 90 minutes, 12 hours, 7 day, and 4 weeks,
  660. respectively. For infinite retention - meaning the data will
  661. never be deleted - use 'INF' for duration.
  662. The minimum retention period is 1 hour.
  663. :type duration: str
  664. :param replication: the replication of the retention policy
  665. :type replication: str
  666. :param database: the database for which the retention policy is
  667. created. Defaults to current client's database
  668. :type database: str
  669. :param default: whether or not to set the policy as default
  670. :type default: bool
  671. :param shard_duration: the shard duration of the retention policy.
  672. Durations such as 1h, 90m, 12h, 7d, and 4w, are all supported and
  673. mean 1 hour, 90 minutes, 12 hours, 7 day, and 4 weeks,
  674. respectively. Infinite retention is not supported. As a workaround,
  675. specify a "1000w" duration to achieve an extremely long shard group
  676. duration. Defaults to "0s", which is interpreted by the database
  677. to mean the default value given the duration.
  678. The minimum shard group duration is 1 hour.
  679. :type shard_duration: str
  680. """
  681. query_string = \
  682. "CREATE RETENTION POLICY {0} ON {1} " \
  683. "DURATION {2} REPLICATION {3} SHARD DURATION {4}".format(
  684. quote_ident(name), quote_ident(database or self._database),
  685. duration, replication, shard_duration)
  686. if default is True:
  687. query_string += " DEFAULT"
  688. self.query(query_string, method="POST")
  689. def alter_retention_policy(self, name, database=None,
  690. duration=None, replication=None,
  691. default=None, shard_duration=None):
  692. """Modify an existing retention policy for a database.
  693. :param name: the name of the retention policy to modify
  694. :type name: str
  695. :param database: the database for which the retention policy is
  696. modified. Defaults to current client's database
  697. :type database: str
  698. :param duration: the new duration of the existing retention policy.
  699. Durations such as 1h, 90m, 12h, 7d, and 4w, are all supported
  700. and mean 1 hour, 90 minutes, 12 hours, 7 day, and 4 weeks,
  701. respectively. For infinite retention, meaning the data will
  702. never be deleted, use 'INF' for duration.
  703. The minimum retention period is 1 hour.
  704. :type duration: str
  705. :param replication: the new replication of the existing
  706. retention policy
  707. :type replication: int
  708. :param default: whether or not to set the modified policy as default
  709. :type default: bool
  710. :param shard_duration: the shard duration of the retention policy.
  711. Durations such as 1h, 90m, 12h, 7d, and 4w, are all supported and
  712. mean 1 hour, 90 minutes, 12 hours, 7 day, and 4 weeks,
  713. respectively. Infinite retention is not supported. As a workaround,
  714. specify a "1000w" duration to achieve an extremely long shard group
  715. duration.
  716. The minimum shard group duration is 1 hour.
  717. :type shard_duration: str
  718. .. note:: at least one of duration, replication, or default flag
  719. should be set. Otherwise the operation will fail.
  720. """
  721. query_string = (
  722. "ALTER RETENTION POLICY {0} ON {1}"
  723. ).format(quote_ident(name),
  724. quote_ident(database or self._database), shard_duration)
  725. if duration:
  726. query_string += " DURATION {0}".format(duration)
  727. if shard_duration:
  728. query_string += " SHARD DURATION {0}".format(shard_duration)
  729. if replication:
  730. query_string += " REPLICATION {0}".format(replication)
  731. if default is True:
  732. query_string += " DEFAULT"
  733. self.query(query_string, method="POST")
  734. def drop_retention_policy(self, name, database=None):
  735. """Drop an existing retention policy for a database.
  736. :param name: the name of the retention policy to drop
  737. :type name: str
  738. :param database: the database for which the retention policy is
  739. dropped. Defaults to current client's database
  740. :type database: str
  741. """
  742. query_string = (
  743. "DROP RETENTION POLICY {0} ON {1}"
  744. ).format(quote_ident(name), quote_ident(database or self._database))
  745. self.query(query_string, method="POST")
  746. def get_list_retention_policies(self, database=None):
  747. """Get the list of retention policies for a database.
  748. :param database: the name of the database, defaults to the client's
  749. current database
  750. :type database: str
  751. :returns: all retention policies for the database
  752. :rtype: list of dictionaries
  753. :Example:
  754. ::
  755. >> ret_policies = client.get_list_retention_policies('my_db')
  756. >> ret_policies
  757. [{u'default': True,
  758. u'duration': u'0',
  759. u'name': u'default',
  760. u'replicaN': 1}]
  761. """
  762. if not (database or self._database):
  763. raise InfluxDBClientError(
  764. "get_list_retention_policies() requires a database as a "
  765. "parameter or the client to be using a database")
  766. rsp = self.query(
  767. "SHOW RETENTION POLICIES ON {0}".format(
  768. quote_ident(database or self._database))
  769. )
  770. return list(rsp.get_points())
  771. def get_list_users(self):
  772. """Get the list of all users in InfluxDB.
  773. :returns: all users in InfluxDB
  774. :rtype: list of dictionaries
  775. :Example:
  776. ::
  777. >> users = client.get_list_users()
  778. >> users
  779. [{u'admin': True, u'user': u'user1'},
  780. {u'admin': False, u'user': u'user2'},
  781. {u'admin': False, u'user': u'user3'}]
  782. """
  783. return list(self.query("SHOW USERS").get_points())
  784. def create_user(self, username, password, admin=False):
  785. """Create a new user in InfluxDB.
  786. :param username: the new username to create
  787. :type username: str
  788. :param password: the password for the new user
  789. :type password: str
  790. :param admin: whether the user should have cluster administration
  791. privileges or not
  792. :type admin: boolean
  793. """
  794. text = "CREATE USER {0} WITH PASSWORD {1}".format(
  795. quote_ident(username), quote_literal(password))
  796. if admin:
  797. text += ' WITH ALL PRIVILEGES'
  798. self.query(text, method="POST")
  799. def drop_user(self, username):
  800. """Drop a user from InfluxDB.
  801. :param username: the username to drop
  802. :type username: str
  803. """
  804. text = "DROP USER {0}".format(quote_ident(username), method="POST")
  805. self.query(text, method="POST")
  806. def set_user_password(self, username, password):
  807. """Change the password of an existing user.
  808. :param username: the username who's password is being changed
  809. :type username: str
  810. :param password: the new password for the user
  811. :type password: str
  812. """
  813. text = "SET PASSWORD FOR {0} = {1}".format(
  814. quote_ident(username), quote_literal(password))
  815. self.query(text)
  816. def delete_series(self, database=None, measurement=None, tags=None):
  817. """Delete series from a database.
  818. Series must be filtered by either measurement and tags.
  819. This method cannot be used to delete all series, use
  820. `drop_database` instead.
  821. :param database: the database from which the series should be
  822. deleted, defaults to client's current database
  823. :type database: str
  824. :param measurement: Delete all series from a measurement
  825. :type measurement: str
  826. :param tags: Delete all series that match given tags
  827. :type tags: dict
  828. """
  829. database = database or self._database
  830. query_str = 'DROP SERIES'
  831. if measurement:
  832. query_str += ' FROM {0}'.format(quote_ident(measurement))
  833. if tags:
  834. tag_eq_list = ["{0}={1}".format(quote_ident(k), quote_literal(v))
  835. for k, v in tags.items()]
  836. query_str += ' WHERE ' + ' AND '.join(tag_eq_list)
  837. self.query(query_str, database=database, method="POST")
  838. def grant_admin_privileges(self, username):
  839. """Grant cluster administration privileges to a user.
  840. :param username: the username to grant privileges to
  841. :type username: str
  842. .. note:: Only a cluster administrator can create/drop databases
  843. and manage users.
  844. """
  845. text = "GRANT ALL PRIVILEGES TO {0}".format(quote_ident(username))
  846. self.query(text, method="POST")
  847. def revoke_admin_privileges(self, username):
  848. """Revoke cluster administration privileges from a user.
  849. :param username: the username to revoke privileges from
  850. :type username: str
  851. .. note:: Only a cluster administrator can create/ drop databases
  852. and manage users.
  853. """
  854. text = "REVOKE ALL PRIVILEGES FROM {0}".format(quote_ident(username))
  855. self.query(text, method="POST")
  856. def grant_privilege(self, privilege, database, username):
  857. """Grant a privilege on a database to a user.
  858. :param privilege: the privilege to grant, one of 'read', 'write'
  859. or 'all'. The string is case-insensitive
  860. :type privilege: str
  861. :param database: the database to grant the privilege on
  862. :type database: str
  863. :param username: the username to grant the privilege to
  864. :type username: str
  865. """
  866. text = "GRANT {0} ON {1} TO {2}".format(privilege,
  867. quote_ident(database),
  868. quote_ident(username))
  869. self.query(text, method="POST")
  870. def revoke_privilege(self, privilege, database, username):
  871. """Revoke a privilege on a database from a user.
  872. :param privilege: the privilege to revoke, one of 'read', 'write'
  873. or 'all'. The string is case-insensitive
  874. :type privilege: str
  875. :param database: the database to revoke the privilege on
  876. :type database: str
  877. :param username: the username to revoke the privilege from
  878. :type username: str
  879. """
  880. text = "REVOKE {0} ON {1} FROM {2}".format(privilege,
  881. quote_ident(database),
  882. quote_ident(username))
  883. self.query(text, method="POST")
  884. def get_list_privileges(self, username):
  885. """Get the list of all privileges granted to given user.
  886. :param username: the username to get privileges of
  887. :type username: str
  888. :returns: all privileges granted to given user
  889. :rtype: list of dictionaries
  890. :Example:
  891. ::
  892. >> privileges = client.get_list_privileges('user1')
  893. >> privileges
  894. [{u'privilege': u'WRITE', u'database': u'db1'},
  895. {u'privilege': u'ALL PRIVILEGES', u'database': u'db2'},
  896. {u'privilege': u'NO PRIVILEGES', u'database': u'db3'}]
  897. """
  898. text = "SHOW GRANTS FOR {0}".format(quote_ident(username))
  899. return list(self.query(text).get_points())
  900. def get_list_continuous_queries(self):
  901. """Get the list of continuous queries in InfluxDB.
  902. :return: all CQs in InfluxDB
  903. :rtype: list of dictionaries
  904. :Example:
  905. ::
  906. >> cqs = client.get_list_cqs()
  907. >> cqs
  908. [
  909. {
  910. u'db1': []
  911. },
  912. {
  913. u'db2': [
  914. {
  915. u'name': u'vampire',
  916. u'query': u'CREATE CONTINUOUS QUERY vampire ON '
  917. 'mydb BEGIN SELECT count(dracula) INTO '
  918. 'mydb.autogen.all_of_them FROM '
  919. 'mydb.autogen.one GROUP BY time(5m) END'
  920. }
  921. ]
  922. }
  923. ]
  924. """
  925. query_string = "SHOW CONTINUOUS QUERIES"
  926. return [{sk[0]: list(p)} for sk, p in self.query(query_string).items()]
  927. def create_continuous_query(self, name, select, database=None,
  928. resample_opts=None):
  929. r"""Create a continuous query for a database.
  930. :param name: the name of continuous query to create
  931. :type name: str
  932. :param select: select statement for the continuous query
  933. :type select: str
  934. :param database: the database for which the continuous query is
  935. created. Defaults to current client's database
  936. :type database: str
  937. :param resample_opts: resample options
  938. :type resample_opts: str
  939. :Example:
  940. ::
  941. >> select_clause = 'SELECT mean("value") INTO "cpu_mean" ' \
  942. ... 'FROM "cpu" GROUP BY time(1m)'
  943. >> client.create_continuous_query(
  944. ... 'cpu_mean', select_clause, 'db_name', 'EVERY 10s FOR 2m'
  945. ... )
  946. >> client.get_list_continuous_queries()
  947. [
  948. {
  949. 'db_name': [
  950. {
  951. 'name': 'cpu_mean',
  952. 'query': 'CREATE CONTINUOUS QUERY "cpu_mean" '
  953. 'ON "db_name" '
  954. 'RESAMPLE EVERY 10s FOR 2m '
  955. 'BEGIN SELECT mean("value") '
  956. 'INTO "cpu_mean" FROM "cpu" '
  957. 'GROUP BY time(1m) END'
  958. }
  959. ]
  960. }
  961. ]
  962. """
  963. query_string = (
  964. "CREATE CONTINUOUS QUERY {0} ON {1}{2} BEGIN {3} END"
  965. ).format(quote_ident(name), quote_ident(database or self._database),
  966. ' RESAMPLE ' + resample_opts if resample_opts else '', select)
  967. self.query(query_string)
  968. def drop_continuous_query(self, name, database=None):
  969. """Drop an existing continuous query for a database.
  970. :param name: the name of continuous query to drop
  971. :type name: str
  972. :param database: the database for which the continuous query is
  973. dropped. Defaults to current client's database
  974. :type database: str
  975. """
  976. query_string = (
  977. "DROP CONTINUOUS QUERY {0} ON {1}"
  978. ).format(quote_ident(name), quote_ident(database or self._database))
  979. self.query(query_string)
  980. def send_packet(self, packet, protocol='json', time_precision=None):
  981. """Send an UDP packet.
  982. :param packet: the packet to be sent
  983. :type packet: (if protocol is 'json') dict
  984. (if protocol is 'line') list of line protocol strings
  985. :param protocol: protocol of input data, either 'json' or 'line'
  986. :type protocol: str
  987. :param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None
  988. :type time_precision: str
  989. """
  990. if protocol == 'json':
  991. data = make_lines(packet, time_precision).encode('utf-8')
  992. elif protocol == 'line':
  993. data = ('\n'.join(packet) + '\n').encode('utf-8')
  994. self.udp_socket.sendto(data, (self._host, self._udp_port))
  995. def close(self):
  996. """Close http session."""
  997. if isinstance(self._session, requests.Session):
  998. self._session.close()
  999. def _parse_dsn(dsn):
  1000. """Parse data source name.
  1001. This is a helper function to split the data source name provided in
  1002. the from_dsn classmethod
  1003. """
  1004. conn_params = urlparse(dsn)
  1005. init_args = {}
  1006. scheme_info = conn_params.scheme.split('+')
  1007. if len(scheme_info) == 1:
  1008. scheme = scheme_info[0]
  1009. modifier = None
  1010. else:
  1011. modifier, scheme = scheme_info
  1012. if scheme != 'influxdb':
  1013. raise ValueError('Unknown scheme "{0}".'.format(scheme))
  1014. if modifier:
  1015. if modifier == 'udp':
  1016. init_args['use_udp'] = True
  1017. elif modifier == 'https':
  1018. init_args['ssl'] = True
  1019. else:
  1020. raise ValueError('Unknown modifier "{0}".'.format(modifier))
  1021. netlocs = conn_params.netloc.split(',')
  1022. init_args['hosts'] = []
  1023. for netloc in netlocs:
  1024. parsed = _parse_netloc(netloc)
  1025. init_args['hosts'].append((parsed['host'], int(parsed['port'])))
  1026. init_args['username'] = parsed['username']
  1027. init_args['password'] = parsed['password']
  1028. if conn_params.path and len(conn_params.path) > 1:
  1029. init_args['database'] = conn_params.path[1:]
  1030. return init_args
  1031. def _parse_netloc(netloc):
  1032. info = urlparse("http://{0}".format(netloc))
  1033. return {'username': info.username or None,
  1034. 'password': info.password or None,
  1035. 'host': info.hostname or 'localhost',
  1036. 'port': info.port or 8086}
  1037. def _msgpack_parse_hook(code, data):
  1038. if code == 5:
  1039. (epoch_s, epoch_ns) = struct.unpack(">QI", data)
  1040. timestamp = datetime.datetime.utcfromtimestamp(epoch_s)
  1041. timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000))
  1042. return timestamp.isoformat() + 'Z'
  1043. return msgpack.ExtType(code, data)