diff --git a/charts/kminion/values.yaml b/charts/kminion/values.yaml index 5ff2807..c7d0a12 100644 --- a/charts/kminion/values.yaml +++ b/charts/kminion/values.yaml @@ -182,6 +182,10 @@ kminion: # # IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics # # take precedence over allowed topics. # ignoredTopics: [ ] +# # infoMetric is a configuration object for the kminion_kafka_topic_info metric +# infoMetric: +# # ConfigKeys are set of strings of Topic configs that you want to have exported as part of the metric +# configKeys: ["cleanup.policy"] # logDirs: # # Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior # # to version 1.0.0 as describing log dirs was not supported back then. diff --git a/docs/reference-config.yaml b/docs/reference-config.yaml index bcbed41..672d881 100644 --- a/docs/reference-config.yaml +++ b/docs/reference-config.yaml @@ -87,6 +87,10 @@ minion: # IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics # take precedence over allowed topics. ignoredTopics: [ ] + # infoMetric is a configuration object for the kminion_kafka_topic_info metric + infoMetric: + # ConfigKeys are set of strings of Topic configs that you want to have exported as part of the metric + configKeys: ["cleanup.policy"] logDirs: # Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior # to version 1.0.0 as describing log dirs was not supported back then. diff --git a/minion/config_topic_config.go b/minion/config_topic_config.go index 2b94e83..262300a 100644 --- a/minion/config_topic_config.go +++ b/minion/config_topic_config.go @@ -1,6 +1,8 @@ package minion -import "fmt" +import ( + "fmt" +) const ( TopicGranularityTopic string = "topic" @@ -18,6 +20,16 @@ type TopicConfig struct { // IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics // take precedence over allowed topics. IgnoredTopics []string `koanf:"ignoredTopics"` + + // InfoMetric configures how the kafka_topic_info metric is populated + InfoMetric InfoMetricConfig `koanf:"infoMetric"` +} + +type InfoMetricConfig struct { + // ConfigKeys configures optional topic configuration keys that should be exported + // as prometheus metric labels. + // By default only "cleanup.policy" is exported + ConfigKeys []string `koanf:"configKeys"` } // Validate if provided TopicConfig is valid. @@ -50,4 +62,5 @@ func (c *TopicConfig) Validate() error { func (c *TopicConfig) SetDefaults() { c.Granularity = TopicGranularityPartition c.AllowedTopics = []string{"/.*/"} + c.InfoMetric = InfoMetricConfig{ConfigKeys: []string{"cleanup.policy"}} } diff --git a/minion/describe_topic_config.go b/minion/describe_topic_config.go index bb7cc77..8ffaf59 100644 --- a/minion/describe_topic_config.go +++ b/minion/describe_topic_config.go @@ -20,7 +20,6 @@ func (s *Service) GetTopicConfigs(ctx context.Context) (*kmsg.DescribeConfigsRes resourceReq := kmsg.NewDescribeConfigsRequestResource() resourceReq.ResourceType = kmsg.ConfigResourceTypeTopic resourceReq.ResourceName = topic.Topic - resourceReq.ConfigNames = []string{"cleanup.policy"} req.Resources = append(req.Resources, resourceReq) } diff --git a/prometheus/collect_topic_info.go b/prometheus/collect_topic_info.go index ccd9277..b740378 100644 --- a/prometheus/collect_topic_info.go +++ b/prometheus/collect_topic_info.go @@ -2,10 +2,11 @@ package prometheus import ( "context" + "strconv" + "github.com/prometheus/client_golang/prometheus" "github.com/twmb/franz-go/pkg/kerr" "go.uber.org/zap" - "strconv" ) func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Metric) bool { @@ -55,7 +56,7 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me e.logger.Warn("failed to get metadata of a specific topic", zap.String("topic_name", topic.Topic), zap.Error(typedErr)) - return false + continue } partitionCount := len(topic.Partitions) replicationFactor := -1 @@ -64,20 +65,26 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me replicationFactor = len(topic.Partitions[0].Replicas) } - cleanupPolicy, exists := configsByTopic[topic.Topic]["cleanup.policy"] - if !exists { - cleanupPolicy = "N/A" + var labelsValues []string + labelsValues = append(labelsValues, topic.Topic) + labelsValues = append(labelsValues, strconv.Itoa(partitionCount)) + labelsValues = append(labelsValues, strconv.Itoa(replicationFactor)) + for _, key := range e.minionSvc.Cfg.Topics.InfoMetric.ConfigKeys { + labelsValues = append(labelsValues, getOrDefault(configsByTopic[topic.Topic], key, "N/A")) } - ch <- prometheus.MustNewConstMetric( e.topicInfo, prometheus.GaugeValue, float64(1), - topic.Topic, - strconv.Itoa(partitionCount), - strconv.Itoa(replicationFactor), - cleanupPolicy, + labelsValues..., ) } return isOk } + +func getOrDefault(m map[string]string, key string, defaultValue string) string { + if value, exists := m[key]; exists { + return value + } + return defaultValue +} diff --git a/prometheus/exporter.go b/prometheus/exporter.go index b0bcf98..0795c34 100644 --- a/prometheus/exporter.go +++ b/prometheus/exporter.go @@ -3,6 +3,7 @@ package prometheus import ( "context" "os" + "strings" "time" "github.com/cloudhut/kminion/v2/minion" @@ -98,10 +99,15 @@ func (e *Exporter) InitializeMetrics() { // Topic / Partition metrics // Topic info + var labels = []string{"topic_name", "partition_count", "replication_factor"} + for _, key := range e.minionSvc.Cfg.Topics.InfoMetric.ConfigKeys { + // prometheus does not allow . in label keys + labels = append(labels, strings.ReplaceAll(key, ".", "_")) + } e.topicInfo = prometheus.NewDesc( prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info"), "Info labels for a given topic", - []string{"topic_name", "partition_count", "replication_factor", "cleanup_policy"}, + labels, nil, ) // Partition Low Water Mark