conn.py 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246
  1. from __future__ import absolute_import
  2. import collections
  3. import copy
  4. import errno
  5. import logging
  6. from random import shuffle, uniform
  7. import socket
  8. import time
  9. import sys
  10. from kafka.vendor import six
  11. import kafka.errors as Errors
  12. from kafka.future import Future
  13. from kafka.metrics.stats import Avg, Count, Max, Rate
  14. from kafka.protocol.api import RequestHeader
  15. from kafka.protocol.admin import SaslHandShakeRequest
  16. from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
  17. from kafka.protocol.frame import KafkaBytes
  18. from kafka.protocol.metadata import MetadataRequest
  19. from kafka.protocol.types import Int32
  20. from kafka.version import __version__
  21. if six.PY2:
  22. ConnectionError = socket.error
  23. BlockingIOError = Exception
  24. log = logging.getLogger(__name__)
  25. DEFAULT_KAFKA_PORT = 9092
  26. try:
  27. import ssl
  28. ssl_available = True
  29. try:
  30. SSLEOFError = ssl.SSLEOFError
  31. SSLWantReadError = ssl.SSLWantReadError
  32. SSLWantWriteError = ssl.SSLWantWriteError
  33. SSLZeroReturnError = ssl.SSLZeroReturnError
  34. except:
  35. # support older ssl libraries
  36. log.warning('Old SSL module detected.'
  37. ' SSL error handling may not operate cleanly.'
  38. ' Consider upgrading to Python 3.3 or 2.7.9')
  39. SSLEOFError = ssl.SSLError
  40. SSLWantReadError = ssl.SSLError
  41. SSLWantWriteError = ssl.SSLError
  42. SSLZeroReturnError = ssl.SSLError
  43. except ImportError:
  44. # support Python without ssl libraries
  45. ssl_available = False
  46. class SSLWantReadError(Exception):
  47. pass
  48. class SSLWantWriteError(Exception):
  49. pass
  50. # needed for SASL_GSSAPI authentication:
  51. try:
  52. import gssapi
  53. from gssapi.raw.misc import GSSError
  54. except ImportError:
  55. #no gssapi available, will disable gssapi mechanism
  56. gssapi = None
  57. GSSError = None
  58. class ConnectionStates(object):
  59. DISCONNECTING = '<disconnecting>'
  60. DISCONNECTED = '<disconnected>'
  61. CONNECTING = '<connecting>'
  62. HANDSHAKE = '<handshake>'
  63. CONNECTED = '<connected>'
  64. AUTHENTICATING = '<authenticating>'
  65. InFlightRequest = collections.namedtuple('InFlightRequest',
  66. ['request', 'response_type', 'correlation_id', 'future', 'timestamp'])
  67. class BrokerConnection(object):
  68. """Initialize a Kafka broker connection
  69. Keyword Arguments:
  70. client_id (str): a name for this client. This string is passed in
  71. each request to servers and can be used to identify specific
  72. server-side log entries that correspond to this client. Also
  73. submitted to GroupCoordinator for logging with respect to
  74. consumer group administration. Default: 'kafka-python-{version}'
  75. reconnect_backoff_ms (int): The amount of time in milliseconds to
  76. wait before attempting to reconnect to a given host.
  77. Default: 50.
  78. reconnect_backoff_max_ms (int): The maximum amount of time in
  79. milliseconds to wait when reconnecting to a broker that has
  80. repeatedly failed to connect. If provided, the backoff per host
  81. will increase exponentially for each consecutive connection
  82. failure, up to this maximum. To avoid connection storms, a
  83. randomization factor of 0.2 will be applied to the backoff
  84. resulting in a random range between 20% below and 20% above
  85. the computed value. Default: 1000.
  86. request_timeout_ms (int): Client request timeout in milliseconds.
  87. Default: 40000.
  88. max_in_flight_requests_per_connection (int): Requests are pipelined
  89. to kafka brokers up to this number of maximum requests per
  90. broker connection. Default: 5.
  91. receive_buffer_bytes (int): The size of the TCP receive buffer
  92. (SO_RCVBUF) to use when reading data. Default: None (relies on
  93. system defaults). Java client defaults to 32768.
  94. send_buffer_bytes (int): The size of the TCP send buffer
  95. (SO_SNDBUF) to use when sending data. Default: None (relies on
  96. system defaults). Java client defaults to 131072.
  97. socket_options (list): List of tuple-arguments to socket.setsockopt
  98. to apply to broker connection sockets. Default:
  99. [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
  100. security_protocol (str): Protocol used to communicate with brokers.
  101. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
  102. Default: PLAINTEXT.
  103. ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
  104. socket connections. If provided, all other ssl_* configurations
  105. will be ignored. Default: None.
  106. ssl_check_hostname (bool): flag to configure whether ssl handshake
  107. should verify that the certificate matches the brokers hostname.
  108. default: True.
  109. ssl_cafile (str): optional filename of ca file to use in certificate
  110. veriication. default: None.
  111. ssl_certfile (str): optional filename of file in pem format containing
  112. the client certificate, as well as any ca certificates needed to
  113. establish the certificate's authenticity. default: None.
  114. ssl_keyfile (str): optional filename containing the client private key.
  115. default: None.
  116. ssl_password (callable, str, bytes, bytearray): optional password or
  117. callable function that returns a password, for decrypting the
  118. client private key. Default: None.
  119. ssl_crlfile (str): optional filename containing the CRL to check for
  120. certificate expiration. By default, no CRL check is done. When
  121. providing a file, only the leaf certificate will be checked against
  122. this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
  123. default: None.
  124. api_version (tuple): Specify which Kafka API version to use.
  125. Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9),
  126. (0, 10). Default: (0, 8, 2)
  127. api_version_auto_timeout_ms (int): number of milliseconds to throw a
  128. timeout exception from the constructor when checking the broker
  129. api version. Only applies if api_version is None
  130. state_change_callback (callable): function to be called when the
  131. connection state changes from CONNECTING to CONNECTED etc.
  132. metrics (kafka.metrics.Metrics): Optionally provide a metrics
  133. instance for capturing network IO stats. Default: None.
  134. metric_group_prefix (str): Prefix for metric names. Default: ''
  135. sasl_mechanism (str): Authentication mechanism when security_protocol
  136. is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
  137. PLAIN, GSSAPI. Default: PLAIN
  138. sasl_plain_username (str): username for sasl PLAIN authentication.
  139. Default: None
  140. sasl_plain_password (str): password for sasl PLAIN authentication.
  141. Default: None
  142. sasl_kerberos_service_name (str): Service name to include in GSSAPI
  143. sasl mechanism handshake. Default: 'kafka'
  144. """
  145. DEFAULT_CONFIG = {
  146. 'client_id': 'kafka-python-' + __version__,
  147. 'node_id': 0,
  148. 'request_timeout_ms': 40000,
  149. 'reconnect_backoff_ms': 50,
  150. 'reconnect_backoff_max_ms': 1000,
  151. 'max_in_flight_requests_per_connection': 5,
  152. 'receive_buffer_bytes': None,
  153. 'send_buffer_bytes': None,
  154. 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
  155. 'security_protocol': 'PLAINTEXT',
  156. 'ssl_context': None,
  157. 'ssl_check_hostname': True,
  158. 'ssl_cafile': None,
  159. 'ssl_certfile': None,
  160. 'ssl_keyfile': None,
  161. 'ssl_crlfile': None,
  162. 'ssl_password': None,
  163. 'api_version': (0, 8, 2), # default to most restrictive
  164. 'state_change_callback': lambda conn: True,
  165. 'metrics': None,
  166. 'metric_group_prefix': '',
  167. 'sasl_mechanism': 'PLAIN',
  168. 'sasl_plain_username': None,
  169. 'sasl_plain_password': None,
  170. 'sasl_kerberos_service_name': 'kafka'
  171. }
  172. SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
  173. SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
  174. def __init__(self, host, port, afi, **configs):
  175. self.hostname = host
  176. self.host = host
  177. self.port = port
  178. self.afi = afi
  179. self._init_host = host
  180. self._init_port = port
  181. self._init_afi = afi
  182. self.in_flight_requests = collections.deque()
  183. self._api_versions = None
  184. self.config = copy.copy(self.DEFAULT_CONFIG)
  185. for key in self.config:
  186. if key in configs:
  187. self.config[key] = configs[key]
  188. self.node_id = self.config.pop('node_id')
  189. if self.config['receive_buffer_bytes'] is not None:
  190. self.config['socket_options'].append(
  191. (socket.SOL_SOCKET, socket.SO_RCVBUF,
  192. self.config['receive_buffer_bytes']))
  193. if self.config['send_buffer_bytes'] is not None:
  194. self.config['socket_options'].append(
  195. (socket.SOL_SOCKET, socket.SO_SNDBUF,
  196. self.config['send_buffer_bytes']))
  197. assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, (
  198. 'security_protcol must be in ' + ', '.join(self.SECURITY_PROTOCOLS))
  199. if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
  200. assert ssl_available, "Python wasn't built with SSL support"
  201. if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
  202. assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
  203. 'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS))
  204. if self.config['sasl_mechanism'] == 'PLAIN':
  205. assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
  206. assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
  207. if self.config['sasl_mechanism'] == 'GSSAPI':
  208. assert gssapi is not None, 'GSSAPI lib not available'
  209. assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
  210. self.state = ConnectionStates.DISCONNECTED
  211. self._reset_reconnect_backoff()
  212. self._sock = None
  213. self._ssl_context = None
  214. if self.config['ssl_context'] is not None:
  215. self._ssl_context = self.config['ssl_context']
  216. self._sasl_auth_future = None
  217. self._header = KafkaBytes(4)
  218. self._rbuffer = None
  219. self._receiving = False
  220. self.last_attempt = 0
  221. self._processing = False
  222. self._correlation_id = 0
  223. self._gai = None
  224. self._gai_index = 0
  225. self._sensors = None
  226. if self.config['metrics']:
  227. self._sensors = BrokerConnectionMetrics(self.config['metrics'],
  228. self.config['metric_group_prefix'],
  229. self.node_id)
  230. def connect(self):
  231. """Attempt to connect and return ConnectionState"""
  232. if self.state is ConnectionStates.DISCONNECTED:
  233. log.debug('%s: creating new socket', self)
  234. # if self.afi is set to AF_UNSPEC, then we need to do a name
  235. # resolution and try all available address families
  236. if self._init_afi == socket.AF_UNSPEC:
  237. if self._gai is None:
  238. # XXX: all DNS functions in Python are blocking. If we really
  239. # want to be non-blocking here, we need to use a 3rd-party
  240. # library like python-adns, or move resolution onto its
  241. # own thread. This will be subject to the default libc
  242. # name resolution timeout (5s on most Linux boxes)
  243. try:
  244. self._gai = socket.getaddrinfo(self._init_host,
  245. self._init_port,
  246. socket.AF_UNSPEC,
  247. socket.SOCK_STREAM)
  248. except socket.gaierror as ex:
  249. log.warning('DNS lookup failed for %s:%d,'
  250. ' exception was %s. Is your'
  251. ' advertised.listeners (called'
  252. ' advertised.host.name before Kafka 9)'
  253. ' correct and resolvable?',
  254. self._init_host, self._init_port, ex)
  255. self._gai = []
  256. self._gai_index = 0
  257. else:
  258. # if self._gai already exists, then we should try the next
  259. # name
  260. self._gai_index += 1
  261. while True:
  262. if self._gai_index >= len(self._gai):
  263. error = 'Unable to connect to any of the names for {0}:{1}'.format(
  264. self._init_host, self._init_port)
  265. log.error(error)
  266. self.close(Errors.ConnectionError(error))
  267. return
  268. afi, _, __, ___, sockaddr = self._gai[self._gai_index]
  269. if afi not in (socket.AF_INET, socket.AF_INET6):
  270. self._gai_index += 1
  271. continue
  272. break
  273. self.host, self.port = sockaddr[:2]
  274. self._sock = socket.socket(afi, socket.SOCK_STREAM)
  275. else:
  276. self._sock = socket.socket(self._init_afi, socket.SOCK_STREAM)
  277. for option in self.config['socket_options']:
  278. log.debug('%s: setting socket option %s', self, option)
  279. self._sock.setsockopt(*option)
  280. self._sock.setblocking(False)
  281. if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
  282. self._wrap_ssl()
  283. log.info('%s: connecting to %s:%d', self, self.host, self.port)
  284. self.state = ConnectionStates.CONNECTING
  285. self.last_attempt = time.time()
  286. self.config['state_change_callback'](self)
  287. if self.state is ConnectionStates.CONNECTING:
  288. # in non-blocking mode, use repeated calls to socket.connect_ex
  289. # to check connection status
  290. request_timeout = self.config['request_timeout_ms'] / 1000.0
  291. ret = None
  292. try:
  293. ret = self._sock.connect_ex((self.host, self.port))
  294. # if we got here through a host lookup, we've found a host,port,af tuple
  295. # that works save it so we don't do a GAI lookup again
  296. if self._gai is not None:
  297. self.afi = self._sock.family
  298. self._gai = None
  299. except socket.error as err:
  300. ret = err.errno
  301. # Connection succeeded
  302. if not ret or ret == errno.EISCONN:
  303. log.debug('%s: established TCP connection', self)
  304. if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
  305. log.debug('%s: initiating SSL handshake', self)
  306. self.state = ConnectionStates.HANDSHAKE
  307. elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
  308. log.debug('%s: initiating SASL authentication', self)
  309. self.state = ConnectionStates.AUTHENTICATING
  310. else:
  311. # security_protocol PLAINTEXT
  312. log.debug('%s: Connection complete.', self)
  313. self.state = ConnectionStates.CONNECTED
  314. self._reset_reconnect_backoff()
  315. self.config['state_change_callback'](self)
  316. # Connection failed
  317. # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
  318. elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
  319. log.error('Connect attempt to %s returned error %s.'
  320. ' Disconnecting.', self, ret)
  321. self.close(Errors.ConnectionError(ret))
  322. # Connection timed out
  323. elif time.time() > request_timeout + self.last_attempt:
  324. log.error('Connection attempt to %s timed out', self)
  325. self.close(Errors.ConnectionError('timeout'))
  326. # Needs retry
  327. else:
  328. pass
  329. if self.state is ConnectionStates.HANDSHAKE:
  330. if self._try_handshake():
  331. log.debug('%s: completed SSL handshake.', self)
  332. if self.config['security_protocol'] == 'SASL_SSL':
  333. log.debug('%s: initiating SASL authentication', self)
  334. self.state = ConnectionStates.AUTHENTICATING
  335. else:
  336. log.debug('%s: Connection complete.', self)
  337. self.state = ConnectionStates.CONNECTED
  338. self.config['state_change_callback'](self)
  339. if self.state is ConnectionStates.AUTHENTICATING:
  340. assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
  341. if self._try_authenticate():
  342. log.debug('%s: Connection complete.', self)
  343. self.state = ConnectionStates.CONNECTED
  344. self._reset_reconnect_backoff()
  345. self.config['state_change_callback'](self)
  346. return self.state
  347. def _wrap_ssl(self):
  348. assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
  349. if self._ssl_context is None:
  350. log.debug('%s: configuring default SSL Context', self)
  351. self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member
  352. self._ssl_context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member
  353. self._ssl_context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member
  354. self._ssl_context.verify_mode = ssl.CERT_OPTIONAL
  355. if self.config['ssl_check_hostname']:
  356. self._ssl_context.check_hostname = True
  357. if self.config['ssl_cafile']:
  358. log.info('%s: Loading SSL CA from %s', self, self.config['ssl_cafile'])
  359. self._ssl_context.load_verify_locations(self.config['ssl_cafile'])
  360. self._ssl_context.verify_mode = ssl.CERT_REQUIRED
  361. if self.config['ssl_certfile'] and self.config['ssl_keyfile']:
  362. log.info('%s: Loading SSL Cert from %s', self, self.config['ssl_certfile'])
  363. log.info('%s: Loading SSL Key from %s', self, self.config['ssl_keyfile'])
  364. self._ssl_context.load_cert_chain(
  365. certfile=self.config['ssl_certfile'],
  366. keyfile=self.config['ssl_keyfile'],
  367. password=self.config['ssl_password'])
  368. if self.config['ssl_crlfile']:
  369. if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'):
  370. error = 'No CRL support with this version of Python.'
  371. log.error('%s: %s Disconnecting.', self, error)
  372. self.close(Errors.ConnectionError(error))
  373. return
  374. log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile'])
  375. self._ssl_context.load_verify_locations(self.config['ssl_crlfile'])
  376. # pylint: disable=no-member
  377. self._ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
  378. log.debug('%s: wrapping socket in ssl context', self)
  379. try:
  380. self._sock = self._ssl_context.wrap_socket(
  381. self._sock,
  382. server_hostname=self.hostname,
  383. do_handshake_on_connect=False)
  384. except ssl.SSLError as e:
  385. log.exception('%s: Failed to wrap socket in SSLContext!', self)
  386. self.close(e)
  387. def _try_handshake(self):
  388. assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
  389. try:
  390. self._sock.do_handshake()
  391. return True
  392. # old ssl in python2.6 will swallow all SSLErrors here...
  393. except (SSLWantReadError, SSLWantWriteError):
  394. pass
  395. except (SSLZeroReturnError, ConnectionError, SSLEOFError):
  396. log.warning('SSL connection closed by server during handshake.')
  397. self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
  398. # Other SSLErrors will be raised to user
  399. return False
  400. def _try_authenticate(self):
  401. assert self.config['api_version'] is None or self.config['api_version'] >= (0, 10)
  402. if self._sasl_auth_future is None:
  403. # Build a SaslHandShakeRequest message
  404. request = SaslHandShakeRequest[0](self.config['sasl_mechanism'])
  405. future = Future()
  406. sasl_response = self._send(request)
  407. sasl_response.add_callback(self._handle_sasl_handshake_response, future)
  408. sasl_response.add_errback(lambda f, e: f.failure(e), future)
  409. self._sasl_auth_future = future
  410. self._recv()
  411. if self._sasl_auth_future.failed():
  412. raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
  413. return self._sasl_auth_future.succeeded()
  414. def _handle_sasl_handshake_response(self, future, response):
  415. error_type = Errors.for_code(response.error_code)
  416. if error_type is not Errors.NoError:
  417. error = error_type(self)
  418. self.close(error=error)
  419. return future.failure(error_type(self))
  420. if self.config['sasl_mechanism'] == 'PLAIN':
  421. return self._try_authenticate_plain(future)
  422. elif self.config['sasl_mechanism'] == 'GSSAPI':
  423. return self._try_authenticate_gssapi(future)
  424. else:
  425. return future.failure(
  426. Errors.UnsupportedSaslMechanismError(
  427. 'kafka-python does not support SASL mechanism %s' %
  428. self.config['sasl_mechanism']))
  429. def _try_authenticate_plain(self, future):
  430. if self.config['security_protocol'] == 'SASL_PLAINTEXT':
  431. log.warning('%s: Sending username and password in the clear', self)
  432. data = b''
  433. try:
  434. self._sock.setblocking(True)
  435. # Send PLAIN credentials per RFC-4616
  436. msg = bytes('\0'.join([self.config['sasl_plain_username'],
  437. self.config['sasl_plain_username'],
  438. self.config['sasl_plain_password']]).encode('utf-8'))
  439. size = Int32.encode(len(msg))
  440. self._sock.sendall(size + msg)
  441. # The server will send a zero sized message (that is Int32(0)) on success.
  442. # The connection is closed on failure
  443. while len(data) < 4:
  444. fragment = self._sock.recv(4 - len(data))
  445. if not fragment:
  446. log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username'])
  447. error = Errors.AuthenticationFailedError(
  448. 'Authentication failed for user {0}'.format(
  449. self.config['sasl_plain_username']))
  450. future.failure(error)
  451. raise error
  452. data += fragment
  453. self._sock.setblocking(False)
  454. except (AssertionError, ConnectionError) as e:
  455. log.exception("%s: Error receiving reply from server", self)
  456. error = Errors.ConnectionError("%s: %s" % (self, e))
  457. future.failure(error)
  458. self.close(error=error)
  459. if data != b'\x00\x00\x00\x00':
  460. return future.failure(Errors.AuthenticationFailedError())
  461. log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
  462. return future.success(True)
  463. def _try_authenticate_gssapi(self, future):
  464. data = b''
  465. gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
  466. ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
  467. ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
  468. log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
  469. ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
  470. # Exchange tokens until authentication either succeeds or fails:
  471. received_token = None
  472. try:
  473. while not ctx_Context.complete:
  474. # calculate the output token
  475. try:
  476. output_token = ctx_Context.step(received_token)
  477. except GSSError as e:
  478. log.exception("%s: Error invalid token received from server", self)
  479. error = Errors.ConnectionError("%s: %s" % (self, e))
  480. if not output_token:
  481. if ctx_Context.complete:
  482. log.debug("%s: Security Context complete ", self)
  483. log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name)
  484. break
  485. try:
  486. self._sock.setblocking(True)
  487. # Send output token
  488. msg = output_token
  489. size = Int32.encode(len(msg))
  490. self._sock.sendall(size + msg)
  491. # The server will send a token back. Processing of this token either
  492. # establishes a security context, or it needs further token exchange.
  493. # The gssapi will be able to identify the needed next step.
  494. # The connection is closed on failure.
  495. response = self._sock.recv(2000)
  496. self._sock.setblocking(False)
  497. except (AssertionError, ConnectionError) as e:
  498. log.exception("%s: Error receiving reply from server", self)
  499. error = Errors.ConnectionError("%s: %s" % (self, e))
  500. future.failure(error)
  501. self.close(error=error)
  502. # pass the received token back to gssapi, strip the first 4 bytes
  503. received_token = response[4:]
  504. except Exception as e:
  505. log.exception("%s: GSSAPI handshake error", self)
  506. error = Errors.ConnectionError("%s: %s" % (self, e))
  507. future.failure(error)
  508. self.close(error=error)
  509. log.info('%s: Authenticated as %s', self, gssname)
  510. return future.success(True)
  511. def blacked_out(self):
  512. """
  513. Return true if we are disconnected from the given node and can't
  514. re-establish a connection yet
  515. """
  516. if self.state is ConnectionStates.DISCONNECTED:
  517. if time.time() < self.last_attempt + self._reconnect_backoff:
  518. return True
  519. return False
  520. def connection_delay(self):
  521. time_waited_ms = time.time() - (self.last_attempt or 0)
  522. if self.state is ConnectionStates.DISCONNECTED:
  523. return max(self._reconnect_backoff - time_waited_ms, 0)
  524. elif self.connecting():
  525. return 0
  526. else:
  527. return 999999999
  528. def connected(self):
  529. """Return True iff socket is connected."""
  530. return self.state is ConnectionStates.CONNECTED
  531. def connecting(self):
  532. """Returns True if still connecting (this may encompass several
  533. different states, such as SSL handshake, authorization, etc)."""
  534. return self.state in (ConnectionStates.CONNECTING,
  535. ConnectionStates.HANDSHAKE,
  536. ConnectionStates.AUTHENTICATING)
  537. def disconnected(self):
  538. """Return True iff socket is closed"""
  539. return self.state is ConnectionStates.DISCONNECTED
  540. def _reset_reconnect_backoff(self):
  541. self._failures = 0
  542. self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0
  543. def _update_reconnect_backoff(self):
  544. if self.config['reconnect_backoff_max_ms'] > self.config['reconnect_backoff_ms']:
  545. self._failures += 1
  546. self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1)
  547. self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms'])
  548. self._reconnect_backoff *= uniform(0.8, 1.2)
  549. self._reconnect_backoff /= 1000.0
  550. log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures)
  551. def close(self, error=None):
  552. """Close socket and fail all in-flight-requests.
  553. Arguments:
  554. error (Exception, optional): pending in-flight-requests
  555. will be failed with this exception.
  556. Default: kafka.errors.ConnectionError.
  557. """
  558. if self.state is ConnectionStates.DISCONNECTED:
  559. if error is not None:
  560. if sys.version_info >= (3, 2):
  561. log.warning('%s: close() called on disconnected connection with error: %s', self, error, stack_info=True)
  562. else:
  563. log.warning('%s: close() called on disconnected connection with error: %s', self, error)
  564. return
  565. log.info('%s: Closing connection. %s', self, error or '')
  566. self.state = ConnectionStates.DISCONNECTING
  567. self.config['state_change_callback'](self)
  568. self._update_reconnect_backoff()
  569. if self._sock:
  570. self._sock.close()
  571. self._sock = None
  572. self.state = ConnectionStates.DISCONNECTED
  573. self.last_attempt = time.time()
  574. self._sasl_auth_future = None
  575. self._reset_buffer()
  576. if error is None:
  577. error = Errors.Cancelled(str(self))
  578. while self.in_flight_requests:
  579. ifr = self.in_flight_requests.popleft()
  580. ifr.future.failure(error)
  581. self.config['state_change_callback'](self)
  582. def _reset_buffer(self):
  583. self._receiving = False
  584. self._header.seek(0)
  585. self._rbuffer = None
  586. def send(self, request):
  587. """send request, return Future()
  588. Can block on network if request is larger than send_buffer_bytes
  589. """
  590. future = Future()
  591. if self.connecting():
  592. return future.failure(Errors.NodeNotReadyError(str(self)))
  593. elif not self.connected():
  594. return future.failure(Errors.ConnectionError(str(self)))
  595. elif not self.can_send_more():
  596. return future.failure(Errors.TooManyInFlightRequests(str(self)))
  597. return self._send(request)
  598. def _send(self, request):
  599. assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
  600. future = Future()
  601. correlation_id = self._next_correlation_id()
  602. header = RequestHeader(request,
  603. correlation_id=correlation_id,
  604. client_id=self.config['client_id'])
  605. message = b''.join([header.encode(), request.encode()])
  606. size = Int32.encode(len(message))
  607. data = size + message
  608. try:
  609. # In the future we might manage an internal write buffer
  610. # and send bytes asynchronously. For now, just block
  611. # sending each request payload
  612. self._sock.setblocking(True)
  613. total_sent = 0
  614. while total_sent < len(data):
  615. sent_bytes = self._sock.send(data[total_sent:])
  616. total_sent += sent_bytes
  617. assert total_sent == len(data)
  618. if self._sensors:
  619. self._sensors.bytes_sent.record(total_sent)
  620. self._sock.setblocking(False)
  621. except (AssertionError, ConnectionError) as e:
  622. log.exception("Error sending %s to %s", request, self)
  623. error = Errors.ConnectionError("%s: %s" % (self, e))
  624. self.close(error=error)
  625. return future.failure(error)
  626. log.debug('%s Request %d: %s', self, correlation_id, request)
  627. if request.expect_response():
  628. ifr = InFlightRequest(request=request,
  629. correlation_id=correlation_id,
  630. response_type=request.RESPONSE_TYPE,
  631. future=future,
  632. timestamp=time.time())
  633. self.in_flight_requests.append(ifr)
  634. else:
  635. future.success(None)
  636. return future
  637. def can_send_more(self):
  638. """Return True unless there are max_in_flight_requests_per_connection."""
  639. max_ifrs = self.config['max_in_flight_requests_per_connection']
  640. return len(self.in_flight_requests) < max_ifrs
  641. def recv(self):
  642. """Non-blocking network receive.
  643. Return response if available
  644. """
  645. assert not self._processing, 'Recursion not supported'
  646. if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
  647. log.warning('%s cannot recv: socket not connected', self)
  648. # If requests are pending, we should close the socket and
  649. # fail all the pending request futures
  650. if self.in_flight_requests:
  651. self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
  652. return ()
  653. elif not self.in_flight_requests:
  654. log.warning('%s: No in-flight-requests to recv', self)
  655. return ()
  656. response = self._recv()
  657. if not response and self.requests_timed_out():
  658. log.warning('%s timed out after %s ms. Closing connection.',
  659. self, self.config['request_timeout_ms'])
  660. self.close(error=Errors.RequestTimedOutError(
  661. 'Request timed out after %s ms' %
  662. self.config['request_timeout_ms']))
  663. return ()
  664. return response
  665. def _recv(self):
  666. responses = []
  667. SOCK_CHUNK_BYTES = 4096
  668. while True:
  669. try:
  670. data = self._sock.recv(SOCK_CHUNK_BYTES)
  671. # We expect socket.recv to raise an exception if there is not
  672. # enough data to read the full bytes_to_read
  673. # but if the socket is disconnected, we will get empty data
  674. # without an exception raised
  675. if not data:
  676. log.error('%s: socket disconnected', self)
  677. self.close(error=Errors.ConnectionError('socket disconnected'))
  678. break
  679. else:
  680. responses.extend(self.receive_bytes(data))
  681. if len(data) < SOCK_CHUNK_BYTES:
  682. break
  683. except SSLWantReadError:
  684. break
  685. except ConnectionError as e:
  686. if six.PY2 and e.errno == errno.EWOULDBLOCK:
  687. break
  688. log.exception('%s: Error receiving network data'
  689. ' closing socket', self)
  690. self.close(error=Errors.ConnectionError(e))
  691. break
  692. except BlockingIOError:
  693. if six.PY3:
  694. break
  695. raise
  696. return responses
  697. def receive_bytes(self, data):
  698. i = 0
  699. n = len(data)
  700. responses = []
  701. if self._sensors:
  702. self._sensors.bytes_received.record(n)
  703. while i < n:
  704. # Not receiving is the state of reading the payload header
  705. if not self._receiving:
  706. bytes_to_read = min(4 - self._header.tell(), n - i)
  707. self._header.write(data[i:i+bytes_to_read])
  708. i += bytes_to_read
  709. if self._header.tell() == 4:
  710. self._header.seek(0)
  711. nbytes = Int32.decode(self._header)
  712. # reset buffer and switch state to receiving payload bytes
  713. self._rbuffer = KafkaBytes(nbytes)
  714. self._receiving = True
  715. elif self._header.tell() > 4:
  716. raise Errors.KafkaError('this should not happen - are you threading?')
  717. if self._receiving:
  718. total_bytes = len(self._rbuffer)
  719. staged_bytes = self._rbuffer.tell()
  720. bytes_to_read = min(total_bytes - staged_bytes, n - i)
  721. self._rbuffer.write(data[i:i+bytes_to_read])
  722. i += bytes_to_read
  723. staged_bytes = self._rbuffer.tell()
  724. if staged_bytes > total_bytes:
  725. self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?'))
  726. if staged_bytes != total_bytes:
  727. break
  728. self._receiving = False
  729. self._rbuffer.seek(0)
  730. resp = self._process_response(self._rbuffer)
  731. if resp is not None:
  732. responses.append(resp)
  733. self._reset_buffer()
  734. return responses
  735. def _process_response(self, read_buffer):
  736. assert not self._processing, 'Recursion not supported'
  737. self._processing = True
  738. recv_correlation_id = Int32.decode(read_buffer)
  739. if not self.in_flight_requests:
  740. error = Errors.CorrelationIdError(
  741. '%s: No in-flight-request found for server response'
  742. ' with correlation ID %d'
  743. % (self, recv_correlation_id))
  744. self.close(error)
  745. self._processing = False
  746. return None
  747. else:
  748. ifr = self.in_flight_requests.popleft()
  749. if self._sensors:
  750. self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000)
  751. # verify send/recv correlation ids match
  752. # 0.8.2 quirk
  753. if (self.config['api_version'] == (0, 8, 2) and
  754. ifr.response_type is GroupCoordinatorResponse[0] and
  755. ifr.correlation_id != 0 and
  756. recv_correlation_id == 0):
  757. log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
  758. ' Correlation ID does not match request. This'
  759. ' should go away once at least one topic has been'
  760. ' initialized on the broker.')
  761. elif ifr.correlation_id != recv_correlation_id:
  762. error = Errors.CorrelationIdError(
  763. '%s: Correlation IDs do not match: sent %d, recv %d'
  764. % (self, ifr.correlation_id, recv_correlation_id))
  765. ifr.future.failure(error)
  766. self.close(error)
  767. self._processing = False
  768. return None
  769. # decode response
  770. try:
  771. response = ifr.response_type.decode(read_buffer)
  772. except ValueError:
  773. read_buffer.seek(0)
  774. buf = read_buffer.read()
  775. log.error('%s Response %d [ResponseType: %s Request: %s]:'
  776. ' Unable to decode %d-byte buffer: %r', self,
  777. ifr.correlation_id, ifr.response_type,
  778. ifr.request, len(buf), buf)
  779. error = Errors.UnknownError('Unable to decode response')
  780. ifr.future.failure(error)
  781. self.close(error)
  782. self._processing = False
  783. return None
  784. log.debug('%s Response %d: %s', self, ifr.correlation_id, response)
  785. ifr.future.success(response)
  786. self._processing = False
  787. return response
  788. def requests_timed_out(self):
  789. if self.in_flight_requests:
  790. oldest_at = self.in_flight_requests[0].timestamp
  791. timeout = self.config['request_timeout_ms'] / 1000.0
  792. if time.time() >= oldest_at + timeout:
  793. return True
  794. return False
  795. def _next_correlation_id(self):
  796. self._correlation_id = (self._correlation_id + 1) % 2**31
  797. return self._correlation_id
  798. def _handle_api_version_response(self, response):
  799. error_type = Errors.for_code(response.error_code)
  800. assert error_type is Errors.NoError, "API version check failed"
  801. self._api_versions = dict([
  802. (api_key, (min_version, max_version))
  803. for api_key, min_version, max_version in response.api_versions
  804. ])
  805. return self._api_versions
  806. def _infer_broker_version_from_api_versions(self, api_versions):
  807. # The logic here is to check the list of supported request versions
  808. # in reverse order. As soon as we find one that works, return it
  809. test_cases = [
  810. # format (<broker version>, <needed struct>)
  811. ((0, 11, 0), MetadataRequest[4]),
  812. ((0, 10, 2), OffsetFetchRequest[2]),
  813. ((0, 10, 1), MetadataRequest[2]),
  814. ]
  815. # Get the best match of test cases
  816. for broker_version, struct in sorted(test_cases, reverse=True):
  817. if struct.API_KEY not in api_versions:
  818. continue
  819. min_version, max_version = api_versions[struct.API_KEY]
  820. if min_version <= struct.API_VERSION <= max_version:
  821. return broker_version
  822. # We know that ApiVersionResponse is only supported in 0.10+
  823. # so if all else fails, choose that
  824. return (0, 10, 0)
  825. def check_version(self, timeout=2, strict=False):
  826. """Attempt to guess the broker version.
  827. Note: This is a blocking call.
  828. Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
  829. """
  830. # Monkeypatch some connection configurations to avoid timeouts
  831. override_config = {
  832. 'request_timeout_ms': timeout * 1000,
  833. 'max_in_flight_requests_per_connection': 5
  834. }
  835. stashed = {}
  836. for key in override_config:
  837. stashed[key] = self.config[key]
  838. self.config[key] = override_config[key]
  839. # kafka kills the connection when it doesn't recognize an API request
  840. # so we can send a test request and then follow immediately with a
  841. # vanilla MetadataRequest. If the server did not recognize the first
  842. # request, both will be failed with a ConnectionError that wraps
  843. # socket.error (32, 54, or 104)
  844. from .protocol.admin import ApiVersionRequest, ListGroupsRequest
  845. from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
  846. # Socket errors are logged as exceptions and can alarm users. Mute them
  847. from logging import Filter
  848. class ConnFilter(Filter):
  849. def filter(self, record):
  850. if record.funcName == 'check_version':
  851. return True
  852. return False
  853. log_filter = ConnFilter()
  854. log.addFilter(log_filter)
  855. test_cases = [
  856. # All cases starting from 0.10 will be based on ApiVersionResponse
  857. ((0, 10), ApiVersionRequest[0]()),
  858. ((0, 9), ListGroupsRequest[0]()),
  859. ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
  860. ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
  861. ((0, 8, 0), MetadataRequest[0]([])),
  862. ]
  863. def connect():
  864. self.connect()
  865. if self.connected():
  866. return
  867. timeout_at = time.time() + timeout
  868. while time.time() < timeout_at and self.connecting():
  869. if self.connect() is ConnectionStates.CONNECTED:
  870. return
  871. time.sleep(0.05)
  872. raise Errors.NodeNotReadyError()
  873. for version, request in test_cases:
  874. connect()
  875. f = self.send(request)
  876. # HACK: sleeping to wait for socket to send bytes
  877. time.sleep(0.1)
  878. # when broker receives an unrecognized request API
  879. # it abruptly closes our socket.
  880. # so we attempt to send a second request immediately
  881. # that we believe it will definitely recognize (metadata)
  882. # the attempt to write to a disconnected socket should
  883. # immediately fail and allow us to infer that the prior
  884. # request was unrecognized
  885. mr = self.send(MetadataRequest[0]([]))
  886. if self._sock:
  887. self._sock.setblocking(True)
  888. while not (f.is_done and mr.is_done):
  889. self.recv()
  890. if self._sock:
  891. self._sock.setblocking(False)
  892. if f.succeeded():
  893. if isinstance(request, ApiVersionRequest[0]):
  894. # Starting from 0.10 kafka broker we determine version
  895. # by looking at ApiVersionResponse
  896. api_versions = self._handle_api_version_response(f.value)
  897. version = self._infer_broker_version_from_api_versions(api_versions)
  898. log.info('Broker version identifed as %s', '.'.join(map(str, version)))
  899. log.info('Set configuration api_version=%s to skip auto'
  900. ' check_version requests on startup', version)
  901. break
  902. # Only enable strict checking to verify that we understand failure
  903. # modes. For most users, the fact that the request failed should be
  904. # enough to rule out a particular broker version.
  905. if strict:
  906. # If the socket flush hack did not work (which should force the
  907. # connection to close and fail all pending requests), then we
  908. # get a basic Request Timeout. This is not ideal, but we'll deal
  909. if isinstance(f.exception, Errors.RequestTimedOutError):
  910. pass
  911. # 0.9 brokers do not close the socket on unrecognized api
  912. # requests (bug...). In this case we expect to see a correlation
  913. # id mismatch
  914. elif (isinstance(f.exception, Errors.CorrelationIdError) and
  915. version == (0, 10)):
  916. pass
  917. elif six.PY2:
  918. assert isinstance(f.exception.args[0], socket.error)
  919. assert f.exception.args[0].errno in (32, 54, 104)
  920. else:
  921. assert isinstance(f.exception.args[0], ConnectionError)
  922. log.info("Broker is not v%s -- it did not recognize %s",
  923. version, request.__class__.__name__)
  924. else:
  925. raise Errors.UnrecognizedBrokerVersion()
  926. log.removeFilter(log_filter)
  927. for key in stashed:
  928. self.config[key] = stashed[key]
  929. return version
  930. def __repr__(self):
  931. return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
  932. self.node_id, self.hostname, self.host, self.port)
  933. class BrokerConnectionMetrics(object):
  934. def __init__(self, metrics, metric_group_prefix, node_id):
  935. self.metrics = metrics
  936. # Any broker may have registered summary metrics already
  937. # but if not, we need to create them so we can set as parents below
  938. all_conns_transferred = metrics.get_sensor('bytes-sent-received')
  939. if not all_conns_transferred:
  940. metric_group_name = metric_group_prefix + '-metrics'
  941. bytes_transferred = metrics.sensor('bytes-sent-received')
  942. bytes_transferred.add(metrics.metric_name(
  943. 'network-io-rate', metric_group_name,
  944. 'The average number of network operations (reads or writes) on all'
  945. ' connections per second.'), Rate(sampled_stat=Count()))
  946. bytes_sent = metrics.sensor('bytes-sent',
  947. parents=[bytes_transferred])
  948. bytes_sent.add(metrics.metric_name(
  949. 'outgoing-byte-rate', metric_group_name,
  950. 'The average number of outgoing bytes sent per second to all'
  951. ' servers.'), Rate())
  952. bytes_sent.add(metrics.metric_name(
  953. 'request-rate', metric_group_name,
  954. 'The average number of requests sent per second.'),
  955. Rate(sampled_stat=Count()))
  956. bytes_sent.add(metrics.metric_name(
  957. 'request-size-avg', metric_group_name,
  958. 'The average size of all requests in the window.'), Avg())
  959. bytes_sent.add(metrics.metric_name(
  960. 'request-size-max', metric_group_name,
  961. 'The maximum size of any request sent in the window.'), Max())
  962. bytes_received = metrics.sensor('bytes-received',
  963. parents=[bytes_transferred])
  964. bytes_received.add(metrics.metric_name(
  965. 'incoming-byte-rate', metric_group_name,
  966. 'Bytes/second read off all sockets'), Rate())
  967. bytes_received.add(metrics.metric_name(
  968. 'response-rate', metric_group_name,
  969. 'Responses received sent per second.'),
  970. Rate(sampled_stat=Count()))
  971. request_latency = metrics.sensor('request-latency')
  972. request_latency.add(metrics.metric_name(
  973. 'request-latency-avg', metric_group_name,
  974. 'The average request latency in ms.'),
  975. Avg())
  976. request_latency.add(metrics.metric_name(
  977. 'request-latency-max', metric_group_name,
  978. 'The maximum request latency in ms.'),
  979. Max())
  980. # if one sensor of the metrics has been registered for the connection,
  981. # then all other sensors should have been registered; and vice versa
  982. node_str = 'node-{0}'.format(node_id)
  983. node_sensor = metrics.get_sensor(node_str + '.bytes-sent')
  984. if not node_sensor:
  985. metric_group_name = metric_group_prefix + '-node-metrics.' + node_str
  986. bytes_sent = metrics.sensor(
  987. node_str + '.bytes-sent',
  988. parents=[metrics.get_sensor('bytes-sent')])
  989. bytes_sent.add(metrics.metric_name(
  990. 'outgoing-byte-rate', metric_group_name,
  991. 'The average number of outgoing bytes sent per second.'),
  992. Rate())
  993. bytes_sent.add(metrics.metric_name(
  994. 'request-rate', metric_group_name,
  995. 'The average number of requests sent per second.'),
  996. Rate(sampled_stat=Count()))
  997. bytes_sent.add(metrics.metric_name(
  998. 'request-size-avg', metric_group_name,
  999. 'The average size of all requests in the window.'),
  1000. Avg())
  1001. bytes_sent.add(metrics.metric_name(
  1002. 'request-size-max', metric_group_name,
  1003. 'The maximum size of any request sent in the window.'),
  1004. Max())
  1005. bytes_received = metrics.sensor(
  1006. node_str + '.bytes-received',
  1007. parents=[metrics.get_sensor('bytes-received')])
  1008. bytes_received.add(metrics.metric_name(
  1009. 'incoming-byte-rate', metric_group_name,
  1010. 'Bytes/second read off node-connection socket'),
  1011. Rate())
  1012. bytes_received.add(metrics.metric_name(
  1013. 'response-rate', metric_group_name,
  1014. 'The average number of responses received per second.'),
  1015. Rate(sampled_stat=Count()))
  1016. request_time = metrics.sensor(
  1017. node_str + '.latency',
  1018. parents=[metrics.get_sensor('request-latency')])
  1019. request_time.add(metrics.metric_name(
  1020. 'request-latency-avg', metric_group_name,
  1021. 'The average request latency in ms.'),
  1022. Avg())
  1023. request_time.add(metrics.metric_name(
  1024. 'request-latency-max', metric_group_name,
  1025. 'The maximum request latency in ms.'),
  1026. Max())
  1027. self.bytes_sent = metrics.sensor(node_str + '.bytes-sent')
  1028. self.bytes_received = metrics.sensor(node_str + '.bytes-received')
  1029. self.request_time = metrics.sensor(node_str + '.latency')
  1030. def _address_family(address):
  1031. """
  1032. Attempt to determine the family of an address (or hostname)
  1033. :return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family
  1034. could not be determined
  1035. """
  1036. if address.startswith('[') and address.endswith(']'):
  1037. return socket.AF_INET6
  1038. for af in (socket.AF_INET, socket.AF_INET6):
  1039. try:
  1040. socket.inet_pton(af, address)
  1041. return af
  1042. except (ValueError, AttributeError, socket.error):
  1043. continue
  1044. return socket.AF_UNSPEC
  1045. def get_ip_port_afi(host_and_port_str):
  1046. """
  1047. Parse the IP and port from a string in the format of:
  1048. * host_or_ip <- Can be either IPv4 address literal or hostname/fqdn
  1049. * host_or_ipv4:port <- Can be either IPv4 address literal or hostname/fqdn
  1050. * [host_or_ip] <- IPv6 address literal
  1051. * [host_or_ip]:port. <- IPv6 address literal
  1052. .. note:: IPv6 address literals with ports *must* be enclosed in brackets
  1053. .. note:: If the port is not specified, default will be returned.
  1054. :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC
  1055. """
  1056. host_and_port_str = host_and_port_str.strip()
  1057. if host_and_port_str.startswith('['):
  1058. af = socket.AF_INET6
  1059. host, rest = host_and_port_str[1:].split(']')
  1060. if rest:
  1061. port = int(rest[1:])
  1062. else:
  1063. port = DEFAULT_KAFKA_PORT
  1064. return host, port, af
  1065. else:
  1066. if ':' not in host_and_port_str:
  1067. af = _address_family(host_and_port_str)
  1068. return host_and_port_str, DEFAULT_KAFKA_PORT, af
  1069. else:
  1070. # now we have something with a colon in it and no square brackets. It could be
  1071. # either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair
  1072. try:
  1073. # if it decodes as an IPv6 address, use that
  1074. socket.inet_pton(socket.AF_INET6, host_and_port_str)
  1075. return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6
  1076. except AttributeError:
  1077. log.warning('socket.inet_pton not available on this platform.'
  1078. ' consider `pip install win_inet_pton`')
  1079. pass
  1080. except (ValueError, socket.error):
  1081. # it's a host:port pair
  1082. pass
  1083. host, port = host_and_port_str.rsplit(':', 1)
  1084. port = int(port)
  1085. af = _address_family(host)
  1086. return host, port, af
  1087. def collect_hosts(hosts, randomize=True):
  1088. """
  1089. Collects a comma-separated set of hosts (host:port) and optionally
  1090. randomize the returned list.
  1091. """
  1092. if isinstance(hosts, six.string_types):
  1093. hosts = hosts.strip().split(',')
  1094. result = []
  1095. afi = socket.AF_INET
  1096. for host_port in hosts:
  1097. host, port, afi = get_ip_port_afi(host_port)
  1098. if port < 0:
  1099. port = DEFAULT_KAFKA_PORT
  1100. result.append((host, port, afi))
  1101. if randomize:
  1102. shuffle(result)
  1103. return result