diff --git a/checks.d/kafka_consumer.py b/checks.d/kafka_consumer.py index 5df75795ed..8497768d3b 100644 --- a/checks.d/kafka_consumer.py +++ b/checks.d/kafka_consumer.py @@ -1,20 +1,30 @@ # stdlib from collections import defaultdict -import random -# project -from checks import AgentCheck - -# 3rd party +# 3p from kafka.client import KafkaClient from kafka.common import OffsetRequest from kazoo.client import KazooClient from kazoo.exceptions import NoNodeError +# project +from checks import AgentCheck + +DEFAULT_KAFKA_TIMEOUT = 5 +DEFAULT_ZK_TIMEOUT = 5 + + class KafkaCheck(AgentCheck): SOURCE_TYPE_NAME = 'kafka' + def __init__(self, name, init_config, agentConfig, instances=None): + AgentCheck.__init__(self, name, init_config, agentConfig, instances=instances) + self.zk_timeout = int( + init_config.get('zk_timeout', DEFAULT_ZK_TIMEOUT)) + self.kafka_timeout = int( + init_config.get('kafka_timeout', DEFAULT_KAFKA_TIMEOUT)) + def check(self, instance): consumer_groups = self.read_config(instance, 'consumer_groups', cast=self._validate_consumer_groups) @@ -26,7 +36,7 @@ def check(self, instance): zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s' # Connect to Zookeeper - zk_conn = KazooClient(zk_connect_str) + zk_conn = KazooClient(zk_connect_str, timeout=self.zk_timeout) zk_conn.start() try: @@ -56,7 +66,7 @@ def check(self, instance): self.log.exception('Error cleaning up Zookeeper connection') # Connect to Kafka - kafka_conn = KafkaClient(kafka_host_ports) + kafka_conn = KafkaClient(kafka_host_ports, timeout=self.kafka_timeout) try: # Query Kafka for the broker offsets diff --git a/conf.d/kafka_consumer.yaml.example b/conf.d/kafka_consumer.yaml.example index 0ac9e5e230..f6b9a71d1d 100644 --- a/conf.d/kafka_consumer.yaml.example +++ b/conf.d/kafka_consumer.yaml.example @@ -1,4 +1,8 @@ init_config: +# Customize the ZooKeeper connection timeout here +# zk_timeout: 5 +# Customize the Kafka connection timeout here +# kafka_timeout: 5 instances: # - kafka_connect_str: localhost:19092