From f45a3f5c03c0218bada7b888fb0b0c7aaff21000 Mon Sep 17 00:00:00 2001 From: Alex Lopez Date: Wed, 19 Apr 2023 20:09:59 +0200 Subject: [PATCH] Fix errors related to filtering based on user-specified consumer groups filters (#14406) * Add extra test case * Move consumer group validation to config class * Move validation of correct consumer group configuration to Config class Since it's a configuration concern, that's where it seems to belong, and the behavior would thus be consistent with where other `ConfigurationError`s are raised. * Make sure we don't return metrics for consumer groups that don't exist * Separate partition filtering step * Separate filtering based on `consumer_groups` to its own function * Use consistent logic for all the types of filtering * Separate all filtering to a separate method * Fix early skip when filtering by regexes * Change looping order to simplify and avoid unnecessary work * Simplify filtering functions * Deduplicate filtered partitions and remove unnecessary option checking * Rename exact match filtering function to better reflect what it does * Fix wording on test id Co-authored-by: Ilia Kurenkov --------- Co-authored-by: Ilia Kurenkov Co-authored-by: Andrew Zhang --- .../datadog_checks/kafka_consumer/client.py | 173 ++++++++---------- kafka_consumer/tests/test_integration.py | 18 ++ 2 files changed, 90 insertions(+), 101 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index 1a62e7d57ae16..9dc9bb513f1d5 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -155,27 +155,28 @@ def get_consumer_offsets(self): return consumer_offsets def _get_consumer_groups(self): - if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_regex: - # Get all consumer groups - consumer_groups = [] - consumer_groups_future = self.kafka_client.list_consumer_groups() - self.log.debug('MONITOR UNLISTED CG FUTURES: %s', consumer_groups_future) - try: - list_consumer_groups_result = consumer_groups_future.result() - self.log.debug('MONITOR UNLISTED FUTURES RESULT: %s', list_consumer_groups_result) - - consumer_groups.extend( - valid_consumer_group.group_id for valid_consumer_group in list_consumer_groups_result.valid - ) - except Exception as e: - self.log.error("Failed to collect consumer groups: %s", e) - return consumer_groups - elif self.config._consumer_groups: - return self.config._consumer_groups + # Get all consumer groups + consumer_groups = [] + consumer_groups_future = self.kafka_client.list_consumer_groups() + self.log.debug('MONITOR UNLISTED CG FUTURES: %s', consumer_groups_future) + try: + list_consumer_groups_result = consumer_groups_future.result() + self.log.debug('MONITOR UNLISTED FUTURES RESULT: %s', list_consumer_groups_result) + + consumer_groups.extend( + valid_consumer_group.group_id for valid_consumer_group in list_consumer_groups_result.valid + ) + except Exception as e: + self.log.error("Failed to collect consumer groups: %s", e) + return consumer_groups def _get_consumer_offset_futures(self, consumer_groups): - topics = self.kafka_client.list_topics(timeout=self.config._request_timeout) - # {(consumer_group, topic, partition): offset} + topic_metadata = self.kafka_client.list_topics(timeout=self.config._request_timeout).topics + topics = { + topic: list(topic_metadata[topic].partitions.keys()) + for topic in topic_metadata + if topic not in KAFKA_INTERNAL_TOPICS + } for consumer_group in consumer_groups: self.log.debug('CONSUMER GROUP: %s', consumer_group) @@ -186,91 +187,61 @@ def _get_consumer_offset_futures(self, consumer_groups): )[consumer_group] def _get_topic_partitions(self, topics, consumer_group): - for topic in topics.topics: - if topic in KAFKA_INTERNAL_TOPICS: + for topic, partitions in topics.items(): + self.log.debug('CONFIGURED TOPICS: %s', topic) + + if self.config._monitor_unlisted_consumer_groups: + filtered_partitions = partitions + else: + filtered_partitions = self._filter_partitions(consumer_group, topic, partitions) + + for partition in filtered_partitions: + topic_partition = TopicPartition(topic, partition) + self.log.debug("TOPIC PARTITION: %s", topic_partition) + yield topic_partition + + def _filter_partitions(self, consumer_group, topic, partitions): + return ( + self._filter_partitions_with_regex(consumer_group, topic, partitions) + | self._filter_partitions_with_exact_match(consumer_group, topic, partitions) + ) # fmt: skip + + def _filter_partitions_with_regex(self, consumer_group, topic, partitions): + partitions_to_collect = set() + + for consumer_group_regex, topic_filters in self.config._consumer_groups_compiled_regex.items(): + if not consumer_group_regex.match(consumer_group): continue - self.log.debug('CONFIGURED TOPICS: %s', topic) + # No topics specified means we collect all topics and partitions + if not topic_filters: + return set(partitions) - partitions = list(topics.topics[topic].partitions.keys()) + for topic_regex, topic_partitions in topic_filters.items(): + if not topic_regex.match(topic): + continue - if self.config._monitor_unlisted_consumer_groups: - for partition in partitions: - topic_partition = TopicPartition(topic, partition) - self.log.debug("TOPIC PARTITION: %s", topic_partition) - yield topic_partition - - elif self.config._consumer_groups_regex: - for filtered_topic_partition in self._get_regex_filtered_topic_partitions( - consumer_group, topic, partitions - ): - topic_partition = TopicPartition(filtered_topic_partition[0], filtered_topic_partition[1]) - self.log.debug("TOPIC PARTITION: %s", topic_partition) - yield topic_partition - - if self.config._consumer_groups: - for partition in partitions: - # Get all topic-partition combinations allowed based on config - # if topics is None => collect all topics and partitions for the consumer group - # if partitions is None => collect all partitions from the consumer group's topic - if self.config._consumer_groups.get(consumer_group): - if ( - self.config._consumer_groups[consumer_group] - and topic not in self.config._consumer_groups[consumer_group] - ): - self.log.debug( - "Partition %s skipped because the topic %s is not in the consumer_group.", - partition, - topic, - ) - continue - if ( - self.config._consumer_groups[consumer_group].get(topic) - and partition not in self.config._consumer_groups[consumer_group][topic] - ): - self.log.debug( - "Partition %s skipped because it is not defined in the consumer group for the topic %s", - partition, - topic, - ) - continue - - topic_partition = TopicPartition(topic, partition) - self.log.debug("TOPIC PARTITION: %s", topic_partition) - yield topic_partition - - def _get_regex_filtered_topic_partitions(self, consumer_group, topic, partitions): - for partition in partitions: - # Do a regex filtering here for consumer groups - for consumer_group_compiled_regex in self.config._consumer_groups_compiled_regex: - if not consumer_group_compiled_regex.match(consumer_group): - return - - consumer_group_topics_regex = self.config._consumer_groups_compiled_regex.get( - consumer_group_compiled_regex - ) - - # If topics is empty, return all combinations of topic and partition - if not consumer_group_topics_regex: - yield (topic, partition) - - # Do a regex filtering here for topics - for topic_regex in consumer_group_topics_regex: - if not topic_regex.match(topic): - self.log.debug( - "Partition %s skipped because the topic %s is not in the consumer_group.", partition, topic - ) - continue + # No partitions specified means we collect all + if not topic_partitions: + return set(partitions) - if ( - consumer_group_topics_regex.get(topic_regex) - and partition not in consumer_group_topics_regex[topic_regex] - ): - self.log.debug( - "Partition %s skipped because it is not defined in the consumer group for the topic %s", - partition, - topic, - ) - continue + partitions_to_collect.update(topic_partitions) + + return partitions_to_collect.intersection(partitions) + + def _filter_partitions_with_exact_match(self, consumer_group, topic, partitions): + if consumer_group not in self.config._consumer_groups: + return set() + + # No topics specified means we allow all topics and partitions + if not self.config._consumer_groups[consumer_group]: + return set(partitions) + + if topic not in self.config._consumer_groups[consumer_group]: + return set() + + # No partitions specified means we collect all + if not self.config._consumer_groups[consumer_group][topic]: + return set(partitions) - yield (topic, partition) + return set(self.config._consumer_groups[consumer_group][topic]).intersection(partitions) diff --git a/kafka_consumer/tests/test_integration.py b/kafka_consumer/tests/test_integration.py index 95ba75fefd18a..35bf224a1119d 100644 --- a/kafka_consumer/tests/test_integration.py +++ b/kafka_consumer/tests/test_integration.py @@ -147,6 +147,12 @@ def test_monitor_broker_highwatermarks( 2, id="One consumer group, one topic, all partitions", ), + pytest.param( + {'consumer_groups': {'nonsense': {'marvel': None}}}, + does_not_raise(), + 0, + id="Nonexistent consumer group, resulting in no metrics", + ), pytest.param( {'consumer_groups': {'my_consumer': {'marvel': [1]}}}, does_not_raise(), @@ -305,6 +311,18 @@ def test_config(dd_run_check, check, kafka_instance, override, aggregator, expec '', id="Specified topic, monitor_unlisted_consumer_groups false", ), + pytest.param( + { + 'consumer_groups': {}, + 'consumer_groups_regex': {'foo': {'bar': []}, 'my_consume*': {'dc': []}}, + 'monitor_unlisted_consumer_groups': False, + }, + 2, + 2, + 2, + '', + id="Specified topic with an extra nonmatching consumer group regex, monitor_unlisted_consumer_groups false", + ), pytest.param( { 'consumer_groups': {'my_consumer': {'marvel': []}},