influxdb_instance.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # -*- coding: utf-8 -*-
  2. """Define the test module for an influxdb instance."""
  3. from __future__ import absolute_import
  4. from __future__ import division
  5. from __future__ import print_function
  6. from __future__ import unicode_literals
  7. import datetime
  8. import distutils.spawn
  9. import os
  10. import tempfile
  11. import shutil
  12. import subprocess
  13. import sys
  14. import time
  15. import unittest
  16. from influxdb.tests.misc import is_port_open, get_free_ports
  17. # hack in check_output if it's not defined, like for python 2.6
  18. if "check_output" not in dir(subprocess):
  19. def f(*popenargs, **kwargs):
  20. """Check for output."""
  21. if 'stdout' in kwargs:
  22. raise ValueError(
  23. 'stdout argument not allowed, it will be overridden.'
  24. )
  25. process = subprocess.Popen(stdout=subprocess.PIPE,
  26. *popenargs,
  27. **kwargs)
  28. output, unused_err = process.communicate()
  29. retcode = process.poll()
  30. if retcode:
  31. cmd = kwargs.get("args")
  32. if cmd is None:
  33. cmd = popenargs[0]
  34. raise subprocess.CalledProcessError(retcode, cmd)
  35. return output
  36. subprocess.check_output = f
  37. class InfluxDbInstance(object):
  38. """Define an instance of InfluxDB.
  39. A class to launch of fresh influxdb server instance
  40. in a temporary place, using a config file template.
  41. """
  42. def __init__(self, conf_template, udp_enabled=False):
  43. """Initialize an instance of InfluxDbInstance."""
  44. if os.environ.get("INFLUXDB_PYTHON_SKIP_SERVER_TESTS", None) == 'True':
  45. raise unittest.SkipTest(
  46. "Skipping server test (INFLUXDB_PYTHON_SKIP_SERVER_TESTS)"
  47. )
  48. self.influxd_path = self.find_influxd_path()
  49. errors = 0
  50. while True:
  51. try:
  52. self._start_server(conf_template, udp_enabled)
  53. break
  54. # Happens when the ports are already in use.
  55. except RuntimeError as e:
  56. errors += 1
  57. if errors > 2:
  58. raise e
  59. def _start_server(self, conf_template, udp_enabled):
  60. # create a temporary dir to store all needed files
  61. # for the influxdb server instance :
  62. self.temp_dir_base = tempfile.mkdtemp()
  63. # "temp_dir_base" will be used for conf file and logs,
  64. # while "temp_dir_influxdb" is for the databases files/dirs :
  65. tempdir = self.temp_dir_influxdb = tempfile.mkdtemp(
  66. dir=self.temp_dir_base)
  67. # find a couple free ports :
  68. free_ports = get_free_ports(4)
  69. ports = {}
  70. for service in 'http', 'global', 'meta', 'udp':
  71. ports[service + '_port'] = free_ports.pop()
  72. if not udp_enabled:
  73. ports['udp_port'] = -1
  74. conf_data = dict(
  75. meta_dir=os.path.join(tempdir, 'meta'),
  76. data_dir=os.path.join(tempdir, 'data'),
  77. wal_dir=os.path.join(tempdir, 'wal'),
  78. cluster_dir=os.path.join(tempdir, 'state'),
  79. handoff_dir=os.path.join(tempdir, 'handoff'),
  80. logs_file=os.path.join(self.temp_dir_base, 'logs.txt'),
  81. udp_enabled='true' if udp_enabled else 'false',
  82. )
  83. conf_data.update(ports)
  84. self.__dict__.update(conf_data)
  85. conf_file = os.path.join(self.temp_dir_base, 'influxdb.conf')
  86. with open(conf_file, "w") as fh:
  87. with open(conf_template) as fh_template:
  88. fh.write(fh_template.read().format(**conf_data))
  89. # now start the server instance:
  90. self.proc = subprocess.Popen(
  91. [self.influxd_path, '-config', conf_file],
  92. stdout=subprocess.PIPE,
  93. stderr=subprocess.PIPE
  94. )
  95. print(
  96. "%s > Started influxdb bin in %r with ports %s and %s.." % (
  97. datetime.datetime.now(),
  98. self.temp_dir_base,
  99. self.global_port,
  100. self.http_port
  101. )
  102. )
  103. # wait for it to listen on the broker and admin ports:
  104. # usually a fresh instance is ready in less than 1 sec ..
  105. timeout = time.time() + 10 # so 10 secs should be enough,
  106. # otherwise either your system load is high,
  107. # or you run a 286 @ 1Mhz ?
  108. try:
  109. while time.time() < timeout:
  110. if (is_port_open(self.http_port) and
  111. is_port_open(self.global_port)):
  112. # it's hard to check if a UDP port is open..
  113. if udp_enabled:
  114. # so let's just sleep 0.5 sec in this case
  115. # to be sure that the server has open the port
  116. time.sleep(0.5)
  117. break
  118. time.sleep(0.5)
  119. if self.proc.poll() is not None:
  120. raise RuntimeError('influxdb prematurely exited')
  121. else:
  122. self.proc.terminate()
  123. self.proc.wait()
  124. raise RuntimeError('Timeout waiting for influxdb to listen'
  125. ' on its ports (%s)' % ports)
  126. except RuntimeError as err:
  127. data = self.get_logs_and_output()
  128. data['reason'] = str(err)
  129. data['now'] = datetime.datetime.now()
  130. raise RuntimeError("%(now)s > %(reason)s. RC=%(rc)s\n"
  131. "stdout=%(out)s\nstderr=%(err)s\nlogs=%(logs)r"
  132. % data)
  133. def find_influxd_path(self):
  134. """Find the path for InfluxDB."""
  135. influxdb_bin_path = os.environ.get(
  136. 'INFLUXDB_PYTHON_INFLUXD_PATH',
  137. None
  138. )
  139. if influxdb_bin_path is None:
  140. influxdb_bin_path = distutils.spawn.find_executable('influxd')
  141. if not influxdb_bin_path:
  142. try:
  143. influxdb_bin_path = subprocess.check_output(
  144. ['which', 'influxd']
  145. ).strip()
  146. except subprocess.CalledProcessError:
  147. # fallback on :
  148. influxdb_bin_path = '/opt/influxdb/influxd'
  149. if not os.path.isfile(influxdb_bin_path):
  150. raise unittest.SkipTest("Could not find influxd binary")
  151. version = subprocess.check_output([influxdb_bin_path, 'version'])
  152. print("InfluxDB version: %s" % version, file=sys.stderr)
  153. return influxdb_bin_path
  154. def get_logs_and_output(self):
  155. """Query for logs and output."""
  156. proc = self.proc
  157. try:
  158. with open(self.logs_file) as fh:
  159. logs = fh.read()
  160. except IOError as err:
  161. logs = "Couldn't read logs: %s" % err
  162. return {
  163. 'rc': proc.returncode,
  164. 'out': proc.stdout.read(),
  165. 'err': proc.stderr.read(),
  166. 'logs': logs
  167. }
  168. def close(self, remove_tree=True):
  169. """Close an instance of InfluxDB."""
  170. self.proc.terminate()
  171. self.proc.wait()
  172. if remove_tree:
  173. shutil.rmtree(self.temp_dir_base)