From 6ef11d0ce0f1078ff6878ec6037d440d9d2b08db Mon Sep 17 00:00:00 2001 From: Dmitry Date: Fri, 12 Aug 2022 17:07:11 -0700 Subject: [PATCH] [receiver/kafkametrics] Migrate receiver to the new metrics builder This allows users to disable particular metrics in through user settings. --- .../kafkametricsreceiver/broker_scraper.go | 23 +- .../broker_scraper_test.go | 34 +- receiver/kafkametricsreceiver/config.go | 4 + receiver/kafkametricsreceiver/config_test.go | 2 + .../kafkametricsreceiver/consumer_scraper.go | 40 +- .../consumer_scraper_test.go | 32 +- receiver/kafkametricsreceiver/doc.go | 2 +- .../kafkametricsreceiver/documentation.md | 9 +- receiver/kafkametricsreceiver/factory.go | 3 + .../internal/metadata/generated_metrics.go | 215 ----- .../internal/metadata/generated_metrics_v2.go | 831 ++++++++++++++++++ receiver/kafkametricsreceiver/metadata.yaml | 1 + receiver/kafkametricsreceiver/receiver.go | 12 +- .../kafkametricsreceiver/receiver_test.go | 8 +- .../kafkametricsreceiver/topic_scraper.go | 46 +- .../topic_scraper_test.go | 48 +- unreleased/migrate-kafkametrics.yaml | 6 + 17 files changed, 983 insertions(+), 333 deletions(-) delete mode 100644 receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go create mode 100644 receiver/kafkametricsreceiver/internal/metadata/generated_metrics_v2.go create mode 100755 unreleased/migrate-kafkametrics.yaml diff --git a/receiver/kafkametricsreceiver/broker_scraper.go b/receiver/kafkametricsreceiver/broker_scraper.go index 52ae04126ab7..edc322213416 100644 --- a/receiver/kafkametricsreceiver/broker_scraper.go +++ b/receiver/kafkametricsreceiver/broker_scraper.go @@ -20,25 +20,31 @@ import ( "time" "github.com/Shopify/sarama" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/scraperhelper" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) type brokerScraper struct { client sarama.Client - logger *zap.Logger + settings component.ReceiverCreateSettings config Config saramaConfig *sarama.Config + mb *metadata.MetricsBuilder } func (s *brokerScraper) Name() string { return brokersScraperName } +func (s *brokerScraper) start(_ context.Context, _ component.Host) error { + s.mb = metadata.NewMetricsBuilder(s.config.Metrics, s.settings.BuildInfo) + return nil +} + func (s *brokerScraper) shutdown(context.Context) error { if s.client != nil && !s.client.Closed() { return s.client.Close() @@ -57,23 +63,22 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) { brokers := s.client.Brokers() - md := pmetric.NewMetrics() - ilm := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() - ilm.Scope().SetName(instrumentationLibName) - addIntGauge(ilm.Metrics(), metadata.M.KafkaBrokers.Name(), pcommon.NewTimestampFromTime(time.Now()), pcommon.NewMap(), int64(len(brokers))) + s.mb.RecordKafkaBrokersDataPoint(pcommon.NewTimestampFromTime(time.Now()), int64(len(brokers))) - return md, nil + return s.mb.Emit(), nil } -func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, logger *zap.Logger) (scraperhelper.Scraper, error) { +func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, + settings component.ReceiverCreateSettings) (scraperhelper.Scraper, error) { s := brokerScraper{ - logger: logger, + settings: settings, config: cfg, saramaConfig: saramaConfig, } return scraperhelper.NewScraper( s.Name(), s.scrape, + scraperhelper.WithStart(s.start), scraperhelper.WithShutdown(s.shutdown), ) } diff --git a/receiver/kafkametricsreceiver/broker_scraper_test.go b/receiver/kafkametricsreceiver/broker_scraper_test.go index 10755ac02c76..f1ba15522b37 100644 --- a/receiver/kafkametricsreceiver/broker_scraper_test.go +++ b/receiver/kafkametricsreceiver/broker_scraper_test.go @@ -21,7 +21,10 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" - "go.uber.org/zap" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) func TestBrokerShutdown(t *testing.T) { @@ -32,9 +35,9 @@ func TestBrokerShutdown(t *testing.T) { On("Close").Return(nil). On("Closed").Return(false) scraper := brokerScraper{ - client: client, - logger: zap.NewNop(), - config: Config{}, + client: client, + settings: componenttest.NewNopReceiverCreateSettings(), + config: Config{}, } _ = scraper.shutdown(context.Background()) client.AssertExpectations(t) @@ -46,9 +49,9 @@ func TestBrokerShutdown_closed(t *testing.T) { client.Mock. On("Closed").Return(true) scraper := brokerScraper{ - client: client, - logger: zap.NewNop(), - config: Config{}, + client: client, + settings: componenttest.NewNopReceiverCreateSettings(), + config: Config{}, } _ = scraper.shutdown(context.Background()) client.AssertExpectations(t) @@ -62,7 +65,7 @@ func TestBrokerScraper_Name(t *testing.T) { func TestBrokerScraper_createBrokerScraper(t *testing.T) { sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient - bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop()) + bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, bs) } @@ -70,7 +73,7 @@ func TestBrokerScraper_createBrokerScraper(t *testing.T) { func TestBrokerScraperStart(t *testing.T) { newSaramaClient = mockNewSaramaClient sc := sarama.NewConfig() - bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop()) + bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, bs) assert.NoError(t, bs.Start(context.Background(), nil)) @@ -81,7 +84,7 @@ func TestBrokerScraper_scrape_handles_client_error(t *testing.T) { return nil, fmt.Errorf("new client failed") } sc := sarama.NewConfig() - bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop()) + bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, bs) _, err = bs.Scrape(context.Background()) @@ -93,7 +96,7 @@ func TestBrokerScraper_shutdown_handles_nil_client(t *testing.T) { return nil, fmt.Errorf("new client failed") } sc := sarama.NewConfig() - bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop()) + bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, bs) err = bs.Shutdown(context.Background()) @@ -104,10 +107,11 @@ func TestBrokerScraper_scrape(t *testing.T) { client := newMockClient() client.Mock.On("Brokers").Return(testBrokers) bs := brokerScraper{ - client: client, - logger: zap.NewNop(), - config: Config{}, + client: client, + settings: componenttest.NewNopReceiverCreateSettings(), + config: Config{Metrics: metadata.DefaultMetricsSettings()}, } + require.NoError(t, bs.start(context.Background(), componenttest.NewNopHost())) md, err := bs.scrape(context.Background()) assert.NoError(t, err) expectedDp := int64(len(testBrokers)) @@ -119,7 +123,7 @@ func TestBrokerScraper_scrape(t *testing.T) { func TestBrokersScraper_createBrokerScraper(t *testing.T) { sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient - bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop()) + bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, bs) } diff --git a/receiver/kafkametricsreceiver/config.go b/receiver/kafkametricsreceiver/config.go index 1f011fafcdd0..cbb29f93967a 100644 --- a/receiver/kafkametricsreceiver/config.go +++ b/receiver/kafkametricsreceiver/config.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/receiver/scraperhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) // Config represents user settings for kafkametrics receiver @@ -44,4 +45,7 @@ type Config struct { // ClientID is the id associated with the consumer that reads from topics in kafka. ClientID string `mapstructure:"client_id"` + + // Metrics allows customizing scraped metrics representation. + Metrics metadata.MetricsSettings `mapstructure:"metrics"` } diff --git a/receiver/kafkametricsreceiver/config_test.go b/receiver/kafkametricsreceiver/config_test.go index 9026f89ad790..80e6a9db5edc 100644 --- a/receiver/kafkametricsreceiver/config_test.go +++ b/receiver/kafkametricsreceiver/config_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/service/servicetest" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) func TestLoadConfig(t *testing.T) { @@ -57,5 +58,6 @@ func TestLoadConfig(t *testing.T) { }, ClientID: defaultClientID, Scrapers: []string{"brokers", "topics", "consumers"}, + Metrics: metadata.DefaultMetricsSettings(), }, r) } diff --git a/receiver/kafkametricsreceiver/consumer_scraper.go b/receiver/kafkametricsreceiver/consumer_scraper.go index 43576521082d..546bac3ac874 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper.go +++ b/receiver/kafkametricsreceiver/consumer_scraper.go @@ -21,29 +21,35 @@ import ( "time" "github.com/Shopify/sarama" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/scraperhelper" "go.uber.org/multierr" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) type consumerScraper struct { client sarama.Client - logger *zap.Logger + settings component.ReceiverCreateSettings groupFilter *regexp.Regexp topicFilter *regexp.Regexp clusterAdmin sarama.ClusterAdmin saramaConfig *sarama.Config config Config + mb *metadata.MetricsBuilder } func (s *consumerScraper) Name() string { return consumersScraperName } +func (s *consumerScraper) start(_ context.Context, _ component.Host) error { + s.mb = metadata.NewMetricsBuilder(s.config.Metrics, s.settings.BuildInfo) + return nil +} + func (s *consumerScraper) shutdown(_ context.Context) error { if s.client != nil && !s.client.Closed() { return s.client.Close() @@ -120,18 +126,16 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { } now := pcommon.NewTimestampFromTime(time.Now()) - md := pmetric.NewMetrics() - ilm := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() - ilm.Scope().SetName(instrumentationLibName) + for _, group := range consumerGroups { - labels := pcommon.NewMap() - labels.UpsertString(metadata.A.Group, group.GroupId) - addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupMembers.Name(), now, labels, int64(len(group.Members))) + s.mb.RecordKafkaConsumerGroupMembersDataPoint(now, int64(len(group.Members)), group.GroupId) + groupOffsetFetchResponse, err := s.clusterAdmin.ListConsumerGroupOffsets(group.GroupId, topicPartitions) if err != nil { scrapeError = multierr.Append(scrapeError, err) continue } + for topic, partitions := range groupOffsetFetchResponse.Blocks { // tracking matchedTopics consumed by this group // by checking if any of the blocks has an offset @@ -142,15 +146,14 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { break } } - labels.UpsertString(metadata.A.Topic, topic) if isConsumed { var lagSum int64 var offsetSum int64 for partition, block := range partitions { - labels.UpsertInt(metadata.A.Partition, int64(partition)) consumerOffset := block.Offset offsetSum += consumerOffset - addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupOffset.Name(), now, labels, consumerOffset) + s.mb.RecordKafkaConsumerGroupOffsetDataPoint(now, offsetSum, group.GroupId, topic, int64(partition)) + // default -1 to indicate no lag measured. var consumerLag int64 = -1 if partitionOffset, ok := topicPartitionOffset[topic][partition]; ok { @@ -160,19 +163,19 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { lagSum += consumerLag } } - addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupLag.Name(), now, labels, consumerLag) + s.mb.RecordKafkaConsumerGroupLagDataPoint(now, consumerLag, group.GroupId, topic, int64(partition)) } - labels.Remove(metadata.A.Partition) - addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupOffsetSum.Name(), now, labels, offsetSum) - addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupLagSum.Name(), now, labels, lagSum) + s.mb.RecordKafkaConsumerGroupOffsetSumDataPoint(now, offsetSum, group.GroupId, topic) + s.mb.RecordKafkaConsumerGroupLagSumDataPoint(now, lagSum, group.GroupId, topic) } } } - return md, scrapeError + return s.mb.Emit(), scrapeError } -func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, logger *zap.Logger) (scraperhelper.Scraper, error) { +func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, + settings component.ReceiverCreateSettings) (scraperhelper.Scraper, error) { groupFilter, err := regexp.Compile(cfg.GroupMatch) if err != nil { return nil, fmt.Errorf("failed to compile group_match: %w", err) @@ -182,7 +185,7 @@ func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.C return nil, fmt.Errorf("failed to compile topic filter: %w", err) } s := consumerScraper{ - logger: logger, + settings: settings, groupFilter: groupFilter, topicFilter: topicFilter, config: cfg, @@ -191,6 +194,7 @@ func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.C return scraperhelper.NewScraper( s.Name(), s.scrape, + scraperhelper.WithStart(s.start), scraperhelper.WithShutdown(s.shutdown), ) } diff --git a/receiver/kafkametricsreceiver/consumer_scraper_test.go b/receiver/kafkametricsreceiver/consumer_scraper_test.go index b2495170d522..54842dfc997b 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper_test.go +++ b/receiver/kafkametricsreceiver/consumer_scraper_test.go @@ -22,7 +22,8 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" - "go.uber.org/zap" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" ) func TestConsumerShutdown(t *testing.T) { @@ -60,7 +61,7 @@ func TestConsumerScraper_createConsumerScraper(t *testing.T) { sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient newClusterAdmin = mockNewClusterAdmin - cs, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop()) + cs, err := createConsumerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, cs) } @@ -70,7 +71,7 @@ func TestConsumerScraper_scrape_handles_client_error(t *testing.T) { return nil, fmt.Errorf("new client failed") } sc := sarama.NewConfig() - cs, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop()) + cs, err := createConsumerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, cs) _, err = cs.Scrape(context.Background()) @@ -82,7 +83,7 @@ func TestConsumerScraper_scrape_handles_nil_client(t *testing.T) { return nil, fmt.Errorf("new client failed") } sc := sarama.NewConfig() - cs, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop()) + cs, err := createConsumerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, cs) err = cs.Shutdown(context.Background()) @@ -100,7 +101,7 @@ func TestConsumerScraper_scrape_handles_clusterAdmin_error(t *testing.T) { return nil, fmt.Errorf("new cluster admin failed") } sc := sarama.NewConfig() - cs, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop()) + cs, err := createConsumerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, cs) _, err = cs.Scrape(context.Background()) @@ -111,7 +112,7 @@ func TestConsumerScraperStart(t *testing.T) { newSaramaClient = mockNewSaramaClient newClusterAdmin = mockNewClusterAdmin sc := sarama.NewConfig() - cs, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop()) + cs, err := createConsumerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, cs) err = cs.Start(context.Background(), nil) @@ -124,7 +125,7 @@ func TestConsumerScraper_createScraper_handles_invalid_topic_match(t *testing.T) sc := sarama.NewConfig() cs, err := createConsumerScraper(context.Background(), Config{ TopicMatch: "[", - }, sc, zap.NewNop()) + }, sc, componenttest.NewNopReceiverCreateSettings()) assert.Error(t, err) assert.Nil(t, cs) } @@ -135,7 +136,7 @@ func TestConsumerScraper_createScraper_handles_invalid_group_match(t *testing.T) sc := sarama.NewConfig() cs, err := createConsumerScraper(context.Background(), Config{ GroupMatch: "[", - }, sc, zap.NewNop()) + }, sc, componenttest.NewNopReceiverCreateSettings()) assert.Error(t, err) assert.Nil(t, cs) } @@ -144,11 +145,12 @@ func TestConsumerScraper_scrape(t *testing.T) { filter := regexp.MustCompile(defaultGroupMatch) cs := consumerScraper{ client: newMockClient(), - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), clusterAdmin: newMockClusterAdmin(), topicFilter: filter, groupFilter: filter, } + require.NoError(t, cs.start(context.Background(), componenttest.NewNopHost())) md, err := cs.scrape(context.Background()) assert.NoError(t, err) assert.NotNil(t, md) @@ -161,7 +163,7 @@ func TestConsumerScraper_scrape_handlesListTopicError(t *testing.T) { clusterAdmin.topics = nil cs := consumerScraper{ client: client, - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), clusterAdmin: clusterAdmin, topicFilter: filter, groupFilter: filter, @@ -176,7 +178,7 @@ func TestConsumerScraper_scrape_handlesListConsumerGroupError(t *testing.T) { clusterAdmin.consumerGroups = nil cs := consumerScraper{ client: newMockClient(), - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), clusterAdmin: clusterAdmin, topicFilter: filter, groupFilter: filter, @@ -191,7 +193,7 @@ func TestConsumerScraper_scrape_handlesDescribeConsumerError(t *testing.T) { clusterAdmin.consumerGroupDescriptions = nil cs := consumerScraper{ client: newMockClient(), - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), clusterAdmin: clusterAdmin, topicFilter: filter, groupFilter: filter, @@ -208,11 +210,12 @@ func TestConsumerScraper_scrape_handlesOffsetPartialError(t *testing.T) { clusterAdmin.consumerGroupOffsets = nil cs := consumerScraper{ client: client, - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), groupFilter: filter, topicFilter: filter, clusterAdmin: clusterAdmin, } + require.NoError(t, cs.start(context.Background(), componenttest.NewNopHost())) _, err := cs.scrape(context.Background()) assert.Error(t, err) } @@ -225,11 +228,12 @@ func TestConsumerScraper_scrape_handlesPartitionPartialError(t *testing.T) { clusterAdmin.consumerGroupOffsets = nil cs := consumerScraper{ client: client, - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), groupFilter: filter, topicFilter: filter, clusterAdmin: clusterAdmin, } + require.NoError(t, cs.start(context.Background(), componenttest.NewNopHost())) _, err := cs.scrape(context.Background()) assert.Error(t, err) } diff --git a/receiver/kafkametricsreceiver/doc.go b/receiver/kafkametricsreceiver/doc.go index 3b014044c480..01e2c3458d50 100644 --- a/receiver/kafkametricsreceiver/doc.go +++ b/receiver/kafkametricsreceiver/doc.go @@ -15,6 +15,6 @@ //go:build !windows // +build !windows -//go:generate mdatagen metadata.yaml +//go:generate mdatagen --experimental-gen metadata.yaml package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver" diff --git a/receiver/kafkametricsreceiver/documentation.md b/receiver/kafkametricsreceiver/documentation.md index 0b7470c9511b..79a988aca87f 100644 --- a/receiver/kafkametricsreceiver/documentation.md +++ b/receiver/kafkametricsreceiver/documentation.md @@ -20,7 +20,14 @@ These are the metrics available for this scraper. | **kafka.partition.replicas_in_sync** | Number of synchronized replicas of partition | {replicas} | Gauge(Int) | | | **kafka.topic.partitions** | Number of partitions in topic. | {partitions} | Gauge(Int) | | -**Highlighted metrics** are emitted by default. +**Highlighted metrics** are emitted by default. Other metrics are optional and not emitted by default. +Any metric can be enabled or disabled with the following scraper configuration: + +```yaml +metrics: + : + enabled: +``` ## Metric attributes diff --git a/receiver/kafkametricsreceiver/factory.go b/receiver/kafkametricsreceiver/factory.go index 7db58ec3946a..0e0a5829fd2a 100644 --- a/receiver/kafkametricsreceiver/factory.go +++ b/receiver/kafkametricsreceiver/factory.go @@ -21,6 +21,8 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver/scraperhelper" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) const ( @@ -47,6 +49,7 @@ func createDefaultConfig() config.Receiver { GroupMatch: defaultGroupMatch, TopicMatch: defaultTopicMatch, ClientID: defaultClientID, + Metrics: metadata.DefaultMetricsSettings(), } } diff --git a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go deleted file mode 100644 index 0bba79e7560b..000000000000 --- a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go +++ /dev/null @@ -1,215 +0,0 @@ -// Code generated by mdatagen. DO NOT EDIT. - -package metadata - -import ( - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/pdata/pmetric" -) - -// Type is the component type name. -const Type config.Type = "kafkametricsreceiver" - -// MetricIntf is an interface to generically interact with generated metric. -type MetricIntf interface { - Name() string - New() pmetric.Metric - Init(metric pmetric.Metric) -} - -// Intentionally not exposing this so that it is opaque and can change freely. -type metricImpl struct { - name string - initFunc func(pmetric.Metric) -} - -// Name returns the metric name. -func (m *metricImpl) Name() string { - return m.name -} - -// New creates a metric object preinitialized. -func (m *metricImpl) New() pmetric.Metric { - metric := pmetric.NewMetric() - m.Init(metric) - return metric -} - -// Init initializes the provided metric object. -func (m *metricImpl) Init(metric pmetric.Metric) { - m.initFunc(metric) -} - -type metricStruct struct { - KafkaBrokers MetricIntf - KafkaConsumerGroupLag MetricIntf - KafkaConsumerGroupLagSum MetricIntf - KafkaConsumerGroupMembers MetricIntf - KafkaConsumerGroupOffset MetricIntf - KafkaConsumerGroupOffsetSum MetricIntf - KafkaPartitionCurrentOffset MetricIntf - KafkaPartitionOldestOffset MetricIntf - KafkaPartitionReplicas MetricIntf - KafkaPartitionReplicasInSync MetricIntf - KafkaTopicPartitions MetricIntf -} - -// Names returns a list of all the metric name strings. -func (m *metricStruct) Names() []string { - return []string{ - "kafka.brokers", - "kafka.consumer_group.lag", - "kafka.consumer_group.lag_sum", - "kafka.consumer_group.members", - "kafka.consumer_group.offset", - "kafka.consumer_group.offset_sum", - "kafka.partition.current_offset", - "kafka.partition.oldest_offset", - "kafka.partition.replicas", - "kafka.partition.replicas_in_sync", - "kafka.topic.partitions", - } -} - -var metricsByName = map[string]MetricIntf{ - "kafka.brokers": Metrics.KafkaBrokers, - "kafka.consumer_group.lag": Metrics.KafkaConsumerGroupLag, - "kafka.consumer_group.lag_sum": Metrics.KafkaConsumerGroupLagSum, - "kafka.consumer_group.members": Metrics.KafkaConsumerGroupMembers, - "kafka.consumer_group.offset": Metrics.KafkaConsumerGroupOffset, - "kafka.consumer_group.offset_sum": Metrics.KafkaConsumerGroupOffsetSum, - "kafka.partition.current_offset": Metrics.KafkaPartitionCurrentOffset, - "kafka.partition.oldest_offset": Metrics.KafkaPartitionOldestOffset, - "kafka.partition.replicas": Metrics.KafkaPartitionReplicas, - "kafka.partition.replicas_in_sync": Metrics.KafkaPartitionReplicasInSync, - "kafka.topic.partitions": Metrics.KafkaTopicPartitions, -} - -func (m *metricStruct) ByName(n string) MetricIntf { - return metricsByName[n] -} - -// Metrics contains a set of methods for each metric that help with -// manipulating those metrics. -var Metrics = &metricStruct{ - &metricImpl{ - "kafka.brokers", - func(metric pmetric.Metric) { - metric.SetName("kafka.brokers") - metric.SetDescription("Number of brokers in the cluster.") - metric.SetUnit("{brokers}") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.consumer_group.lag", - func(metric pmetric.Metric) { - metric.SetName("kafka.consumer_group.lag") - metric.SetDescription("Current approximate lag of consumer group at partition of topic") - metric.SetUnit("1") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.consumer_group.lag_sum", - func(metric pmetric.Metric) { - metric.SetName("kafka.consumer_group.lag_sum") - metric.SetDescription("Current approximate sum of consumer group lag across all partitions of topic") - metric.SetUnit("1") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.consumer_group.members", - func(metric pmetric.Metric) { - metric.SetName("kafka.consumer_group.members") - metric.SetDescription("Count of members in the consumer group") - metric.SetUnit("{members}") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.consumer_group.offset", - func(metric pmetric.Metric) { - metric.SetName("kafka.consumer_group.offset") - metric.SetDescription("Current offset of the consumer group at partition of topic") - metric.SetUnit("1") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.consumer_group.offset_sum", - func(metric pmetric.Metric) { - metric.SetName("kafka.consumer_group.offset_sum") - metric.SetDescription("Sum of consumer group offset across partitions of topic") - metric.SetUnit("1") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.partition.current_offset", - func(metric pmetric.Metric) { - metric.SetName("kafka.partition.current_offset") - metric.SetDescription("Current offset of partition of topic.") - metric.SetUnit("1") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.partition.oldest_offset", - func(metric pmetric.Metric) { - metric.SetName("kafka.partition.oldest_offset") - metric.SetDescription("Oldest offset of partition of topic") - metric.SetUnit("1") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.partition.replicas", - func(metric pmetric.Metric) { - metric.SetName("kafka.partition.replicas") - metric.SetDescription("Number of replicas for partition of topic") - metric.SetUnit("{replicas}") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.partition.replicas_in_sync", - func(metric pmetric.Metric) { - metric.SetName("kafka.partition.replicas_in_sync") - metric.SetDescription("Number of synchronized replicas of partition") - metric.SetUnit("{replicas}") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, - &metricImpl{ - "kafka.topic.partitions", - func(metric pmetric.Metric) { - metric.SetName("kafka.topic.partitions") - metric.SetDescription("Number of partitions in topic.") - metric.SetUnit("{partitions}") - metric.SetDataType(pmetric.MetricDataTypeGauge) - }, - }, -} - -// M contains a set of methods for each metric that help with -// manipulating those metrics. M is an alias for Metrics -var M = Metrics - -// Attributes contains the possible metric attributes that can be used. -var Attributes = struct { - // Group (The ID (string) of a consumer group) - Group string - // Partition (The number (integer) of the partition) - Partition string - // Topic (The ID (integer) of a topic) - Topic string -}{ - "group", - "partition", - "topic", -} - -// A is an alias for Attributes. -var A = Attributes diff --git a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_v2.go b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_v2.go new file mode 100644 index 000000000000..4ab70a6b6999 --- /dev/null +++ b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_v2.go @@ -0,0 +1,831 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +// MetricSettings provides common settings for a particular metric. +type MetricSettings struct { + Enabled bool `mapstructure:"enabled"` +} + +// MetricsSettings provides settings for kafkametricsreceiver metrics. +type MetricsSettings struct { + KafkaBrokers MetricSettings `mapstructure:"kafka.brokers"` + KafkaConsumerGroupLag MetricSettings `mapstructure:"kafka.consumer_group.lag"` + KafkaConsumerGroupLagSum MetricSettings `mapstructure:"kafka.consumer_group.lag_sum"` + KafkaConsumerGroupMembers MetricSettings `mapstructure:"kafka.consumer_group.members"` + KafkaConsumerGroupOffset MetricSettings `mapstructure:"kafka.consumer_group.offset"` + KafkaConsumerGroupOffsetSum MetricSettings `mapstructure:"kafka.consumer_group.offset_sum"` + KafkaPartitionCurrentOffset MetricSettings `mapstructure:"kafka.partition.current_offset"` + KafkaPartitionOldestOffset MetricSettings `mapstructure:"kafka.partition.oldest_offset"` + KafkaPartitionReplicas MetricSettings `mapstructure:"kafka.partition.replicas"` + KafkaPartitionReplicasInSync MetricSettings `mapstructure:"kafka.partition.replicas_in_sync"` + KafkaTopicPartitions MetricSettings `mapstructure:"kafka.topic.partitions"` +} + +func DefaultMetricsSettings() MetricsSettings { + return MetricsSettings{ + KafkaBrokers: MetricSettings{ + Enabled: true, + }, + KafkaConsumerGroupLag: MetricSettings{ + Enabled: true, + }, + KafkaConsumerGroupLagSum: MetricSettings{ + Enabled: true, + }, + KafkaConsumerGroupMembers: MetricSettings{ + Enabled: true, + }, + KafkaConsumerGroupOffset: MetricSettings{ + Enabled: true, + }, + KafkaConsumerGroupOffsetSum: MetricSettings{ + Enabled: true, + }, + KafkaPartitionCurrentOffset: MetricSettings{ + Enabled: true, + }, + KafkaPartitionOldestOffset: MetricSettings{ + Enabled: true, + }, + KafkaPartitionReplicas: MetricSettings{ + Enabled: true, + }, + KafkaPartitionReplicasInSync: MetricSettings{ + Enabled: true, + }, + KafkaTopicPartitions: MetricSettings{ + Enabled: true, + }, + } +} + +type metricKafkaBrokers struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.brokers metric with initial data. +func (m *metricKafkaBrokers) init() { + m.data.SetName("kafka.brokers") + m.data.SetDescription("Number of brokers in the cluster.") + m.data.SetUnit("{brokers}") + m.data.SetDataType(pmetric.MetricDataTypeGauge) +} + +func (m *metricKafkaBrokers) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaBrokers) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaBrokers) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaBrokers(settings MetricSettings) metricKafkaBrokers { + m := metricKafkaBrokers{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaConsumerGroupLag struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.consumer_group.lag metric with initial data. +func (m *metricKafkaConsumerGroupLag) init() { + m.data.SetName("kafka.consumer_group.lag") + m.data.SetDescription("Current approximate lag of consumer group at partition of topic") + m.data.SetUnit("1") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaConsumerGroupLag) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, groupAttributeValue string, topicAttributeValue string, partitionAttributeValue int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("group", groupAttributeValue) + dp.Attributes().InsertString("topic", topicAttributeValue) + dp.Attributes().InsertInt("partition", partitionAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaConsumerGroupLag) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaConsumerGroupLag) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaConsumerGroupLag(settings MetricSettings) metricKafkaConsumerGroupLag { + m := metricKafkaConsumerGroupLag{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaConsumerGroupLagSum struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.consumer_group.lag_sum metric with initial data. +func (m *metricKafkaConsumerGroupLagSum) init() { + m.data.SetName("kafka.consumer_group.lag_sum") + m.data.SetDescription("Current approximate sum of consumer group lag across all partitions of topic") + m.data.SetUnit("1") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaConsumerGroupLagSum) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, groupAttributeValue string, topicAttributeValue string) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("group", groupAttributeValue) + dp.Attributes().InsertString("topic", topicAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaConsumerGroupLagSum) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaConsumerGroupLagSum) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaConsumerGroupLagSum(settings MetricSettings) metricKafkaConsumerGroupLagSum { + m := metricKafkaConsumerGroupLagSum{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaConsumerGroupMembers struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.consumer_group.members metric with initial data. +func (m *metricKafkaConsumerGroupMembers) init() { + m.data.SetName("kafka.consumer_group.members") + m.data.SetDescription("Count of members in the consumer group") + m.data.SetUnit("{members}") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaConsumerGroupMembers) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, groupAttributeValue string) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("group", groupAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaConsumerGroupMembers) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaConsumerGroupMembers) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaConsumerGroupMembers(settings MetricSettings) metricKafkaConsumerGroupMembers { + m := metricKafkaConsumerGroupMembers{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaConsumerGroupOffset struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.consumer_group.offset metric with initial data. +func (m *metricKafkaConsumerGroupOffset) init() { + m.data.SetName("kafka.consumer_group.offset") + m.data.SetDescription("Current offset of the consumer group at partition of topic") + m.data.SetUnit("1") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaConsumerGroupOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, groupAttributeValue string, topicAttributeValue string, partitionAttributeValue int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("group", groupAttributeValue) + dp.Attributes().InsertString("topic", topicAttributeValue) + dp.Attributes().InsertInt("partition", partitionAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaConsumerGroupOffset) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaConsumerGroupOffset) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaConsumerGroupOffset(settings MetricSettings) metricKafkaConsumerGroupOffset { + m := metricKafkaConsumerGroupOffset{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaConsumerGroupOffsetSum struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.consumer_group.offset_sum metric with initial data. +func (m *metricKafkaConsumerGroupOffsetSum) init() { + m.data.SetName("kafka.consumer_group.offset_sum") + m.data.SetDescription("Sum of consumer group offset across partitions of topic") + m.data.SetUnit("1") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaConsumerGroupOffsetSum) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, groupAttributeValue string, topicAttributeValue string) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("group", groupAttributeValue) + dp.Attributes().InsertString("topic", topicAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaConsumerGroupOffsetSum) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaConsumerGroupOffsetSum) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaConsumerGroupOffsetSum(settings MetricSettings) metricKafkaConsumerGroupOffsetSum { + m := metricKafkaConsumerGroupOffsetSum{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaPartitionCurrentOffset struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.partition.current_offset metric with initial data. +func (m *metricKafkaPartitionCurrentOffset) init() { + m.data.SetName("kafka.partition.current_offset") + m.data.SetDescription("Current offset of partition of topic.") + m.data.SetUnit("1") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaPartitionCurrentOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, topicAttributeValue string, partitionAttributeValue int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("topic", topicAttributeValue) + dp.Attributes().InsertInt("partition", partitionAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaPartitionCurrentOffset) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaPartitionCurrentOffset) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaPartitionCurrentOffset(settings MetricSettings) metricKafkaPartitionCurrentOffset { + m := metricKafkaPartitionCurrentOffset{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaPartitionOldestOffset struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.partition.oldest_offset metric with initial data. +func (m *metricKafkaPartitionOldestOffset) init() { + m.data.SetName("kafka.partition.oldest_offset") + m.data.SetDescription("Oldest offset of partition of topic") + m.data.SetUnit("1") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaPartitionOldestOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, topicAttributeValue string, partitionAttributeValue int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("topic", topicAttributeValue) + dp.Attributes().InsertInt("partition", partitionAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaPartitionOldestOffset) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaPartitionOldestOffset) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaPartitionOldestOffset(settings MetricSettings) metricKafkaPartitionOldestOffset { + m := metricKafkaPartitionOldestOffset{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaPartitionReplicas struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.partition.replicas metric with initial data. +func (m *metricKafkaPartitionReplicas) init() { + m.data.SetName("kafka.partition.replicas") + m.data.SetDescription("Number of replicas for partition of topic") + m.data.SetUnit("{replicas}") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaPartitionReplicas) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, topicAttributeValue string, partitionAttributeValue int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("topic", topicAttributeValue) + dp.Attributes().InsertInt("partition", partitionAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaPartitionReplicas) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaPartitionReplicas) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaPartitionReplicas(settings MetricSettings) metricKafkaPartitionReplicas { + m := metricKafkaPartitionReplicas{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaPartitionReplicasInSync struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.partition.replicas_in_sync metric with initial data. +func (m *metricKafkaPartitionReplicasInSync) init() { + m.data.SetName("kafka.partition.replicas_in_sync") + m.data.SetDescription("Number of synchronized replicas of partition") + m.data.SetUnit("{replicas}") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaPartitionReplicasInSync) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, topicAttributeValue string, partitionAttributeValue int64) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("topic", topicAttributeValue) + dp.Attributes().InsertInt("partition", partitionAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaPartitionReplicasInSync) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaPartitionReplicasInSync) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaPartitionReplicasInSync(settings MetricSettings) metricKafkaPartitionReplicasInSync { + m := metricKafkaPartitionReplicasInSync{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricKafkaTopicPartitions struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills kafka.topic.partitions metric with initial data. +func (m *metricKafkaTopicPartitions) init() { + m.data.SetName("kafka.topic.partitions") + m.data.SetDescription("Number of partitions in topic.") + m.data.SetUnit("{partitions}") + m.data.SetDataType(pmetric.MetricDataTypeGauge) + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricKafkaTopicPartitions) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, topicAttributeValue string) { + if !m.settings.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntVal(val) + dp.Attributes().InsertString("topic", topicAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricKafkaTopicPartitions) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricKafkaTopicPartitions) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricKafkaTopicPartitions(settings MetricSettings) metricKafkaTopicPartitions { + m := metricKafkaTopicPartitions{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations +// required to produce metric representation defined in metadata and user settings. +type MetricsBuilder struct { + startTime pcommon.Timestamp // start time that will be applied to all recorded data points. + metricsCapacity int // maximum observed number of metrics per resource. + resourceCapacity int // maximum observed number of resource attributes. + metricsBuffer pmetric.Metrics // accumulates metrics data before emitting. + buildInfo component.BuildInfo // contains version information + metricKafkaBrokers metricKafkaBrokers + metricKafkaConsumerGroupLag metricKafkaConsumerGroupLag + metricKafkaConsumerGroupLagSum metricKafkaConsumerGroupLagSum + metricKafkaConsumerGroupMembers metricKafkaConsumerGroupMembers + metricKafkaConsumerGroupOffset metricKafkaConsumerGroupOffset + metricKafkaConsumerGroupOffsetSum metricKafkaConsumerGroupOffsetSum + metricKafkaPartitionCurrentOffset metricKafkaPartitionCurrentOffset + metricKafkaPartitionOldestOffset metricKafkaPartitionOldestOffset + metricKafkaPartitionReplicas metricKafkaPartitionReplicas + metricKafkaPartitionReplicasInSync metricKafkaPartitionReplicasInSync + metricKafkaTopicPartitions metricKafkaTopicPartitions +} + +// metricBuilderOption applies changes to default metrics builder. +type metricBuilderOption func(*MetricsBuilder) + +// WithStartTime sets startTime on the metrics builder. +func WithStartTime(startTime pcommon.Timestamp) metricBuilderOption { + return func(mb *MetricsBuilder) { + mb.startTime = startTime + } +} + +func NewMetricsBuilder(settings MetricsSettings, buildInfo component.BuildInfo, options ...metricBuilderOption) *MetricsBuilder { + mb := &MetricsBuilder{ + startTime: pcommon.NewTimestampFromTime(time.Now()), + metricsBuffer: pmetric.NewMetrics(), + buildInfo: buildInfo, + metricKafkaBrokers: newMetricKafkaBrokers(settings.KafkaBrokers), + metricKafkaConsumerGroupLag: newMetricKafkaConsumerGroupLag(settings.KafkaConsumerGroupLag), + metricKafkaConsumerGroupLagSum: newMetricKafkaConsumerGroupLagSum(settings.KafkaConsumerGroupLagSum), + metricKafkaConsumerGroupMembers: newMetricKafkaConsumerGroupMembers(settings.KafkaConsumerGroupMembers), + metricKafkaConsumerGroupOffset: newMetricKafkaConsumerGroupOffset(settings.KafkaConsumerGroupOffset), + metricKafkaConsumerGroupOffsetSum: newMetricKafkaConsumerGroupOffsetSum(settings.KafkaConsumerGroupOffsetSum), + metricKafkaPartitionCurrentOffset: newMetricKafkaPartitionCurrentOffset(settings.KafkaPartitionCurrentOffset), + metricKafkaPartitionOldestOffset: newMetricKafkaPartitionOldestOffset(settings.KafkaPartitionOldestOffset), + metricKafkaPartitionReplicas: newMetricKafkaPartitionReplicas(settings.KafkaPartitionReplicas), + metricKafkaPartitionReplicasInSync: newMetricKafkaPartitionReplicasInSync(settings.KafkaPartitionReplicasInSync), + metricKafkaTopicPartitions: newMetricKafkaTopicPartitions(settings.KafkaTopicPartitions), + } + for _, op := range options { + op(mb) + } + return mb +} + +// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity. +func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) { + if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() { + mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len() + } + if mb.resourceCapacity < rm.Resource().Attributes().Len() { + mb.resourceCapacity = rm.Resource().Attributes().Len() + } +} + +// ResourceMetricsOption applies changes to provided resource metrics. +type ResourceMetricsOption func(pmetric.ResourceMetrics) + +// WithStartTimeOverride overrides start time for all the resource metrics data points. +// This option should be only used if different start time has to be set on metrics coming from different resources. +func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption { + return func(rm pmetric.ResourceMetrics) { + var dps pmetric.NumberDataPointSlice + metrics := rm.ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + switch metrics.At(i).DataType() { + case pmetric.MetricDataTypeGauge: + dps = metrics.At(i).Gauge().DataPoints() + case pmetric.MetricDataTypeSum: + dps = metrics.At(i).Sum().DataPoints() + } + for j := 0; j < dps.Len(); j++ { + dps.At(j).SetStartTimestamp(start) + } + } + } +} + +// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for +// recording another set of data points as part of another resource. This function can be helpful when one scraper +// needs to emit metrics from several resources. Otherwise calling this function is not required, +// just `Emit` function can be called instead. +// Resource attributes should be provided as ResourceMetricsOption arguments. +func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { + rm := pmetric.NewResourceMetrics() + rm.Resource().Attributes().EnsureCapacity(mb.resourceCapacity) + ils := rm.ScopeMetrics().AppendEmpty() + ils.Scope().SetName("otelcol/kafkametricsreceiver") + ils.Scope().SetVersion(mb.buildInfo.Version) + ils.Metrics().EnsureCapacity(mb.metricsCapacity) + mb.metricKafkaBrokers.emit(ils.Metrics()) + mb.metricKafkaConsumerGroupLag.emit(ils.Metrics()) + mb.metricKafkaConsumerGroupLagSum.emit(ils.Metrics()) + mb.metricKafkaConsumerGroupMembers.emit(ils.Metrics()) + mb.metricKafkaConsumerGroupOffset.emit(ils.Metrics()) + mb.metricKafkaConsumerGroupOffsetSum.emit(ils.Metrics()) + mb.metricKafkaPartitionCurrentOffset.emit(ils.Metrics()) + mb.metricKafkaPartitionOldestOffset.emit(ils.Metrics()) + mb.metricKafkaPartitionReplicas.emit(ils.Metrics()) + mb.metricKafkaPartitionReplicasInSync.emit(ils.Metrics()) + mb.metricKafkaTopicPartitions.emit(ils.Metrics()) + for _, op := range rmo { + op(rm) + } + if ils.Metrics().Len() > 0 { + mb.updateCapacity(rm) + rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty()) + } +} + +// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for +// recording another set of metrics. This function will be responsible for applying all the transformations required to +// produce metric representation defined in metadata and user settings, e.g. delta or cumulative. +func (mb *MetricsBuilder) Emit(rmo ...ResourceMetricsOption) pmetric.Metrics { + mb.EmitForResource(rmo...) + metrics := pmetric.NewMetrics() + mb.metricsBuffer.MoveTo(metrics) + return metrics +} + +// RecordKafkaBrokersDataPoint adds a data point to kafka.brokers metric. +func (mb *MetricsBuilder) RecordKafkaBrokersDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricKafkaBrokers.recordDataPoint(mb.startTime, ts, val) +} + +// RecordKafkaConsumerGroupLagDataPoint adds a data point to kafka.consumer_group.lag metric. +func (mb *MetricsBuilder) RecordKafkaConsumerGroupLagDataPoint(ts pcommon.Timestamp, val int64, groupAttributeValue string, topicAttributeValue string, partitionAttributeValue int64) { + mb.metricKafkaConsumerGroupLag.recordDataPoint(mb.startTime, ts, val, groupAttributeValue, topicAttributeValue, partitionAttributeValue) +} + +// RecordKafkaConsumerGroupLagSumDataPoint adds a data point to kafka.consumer_group.lag_sum metric. +func (mb *MetricsBuilder) RecordKafkaConsumerGroupLagSumDataPoint(ts pcommon.Timestamp, val int64, groupAttributeValue string, topicAttributeValue string) { + mb.metricKafkaConsumerGroupLagSum.recordDataPoint(mb.startTime, ts, val, groupAttributeValue, topicAttributeValue) +} + +// RecordKafkaConsumerGroupMembersDataPoint adds a data point to kafka.consumer_group.members metric. +func (mb *MetricsBuilder) RecordKafkaConsumerGroupMembersDataPoint(ts pcommon.Timestamp, val int64, groupAttributeValue string) { + mb.metricKafkaConsumerGroupMembers.recordDataPoint(mb.startTime, ts, val, groupAttributeValue) +} + +// RecordKafkaConsumerGroupOffsetDataPoint adds a data point to kafka.consumer_group.offset metric. +func (mb *MetricsBuilder) RecordKafkaConsumerGroupOffsetDataPoint(ts pcommon.Timestamp, val int64, groupAttributeValue string, topicAttributeValue string, partitionAttributeValue int64) { + mb.metricKafkaConsumerGroupOffset.recordDataPoint(mb.startTime, ts, val, groupAttributeValue, topicAttributeValue, partitionAttributeValue) +} + +// RecordKafkaConsumerGroupOffsetSumDataPoint adds a data point to kafka.consumer_group.offset_sum metric. +func (mb *MetricsBuilder) RecordKafkaConsumerGroupOffsetSumDataPoint(ts pcommon.Timestamp, val int64, groupAttributeValue string, topicAttributeValue string) { + mb.metricKafkaConsumerGroupOffsetSum.recordDataPoint(mb.startTime, ts, val, groupAttributeValue, topicAttributeValue) +} + +// RecordKafkaPartitionCurrentOffsetDataPoint adds a data point to kafka.partition.current_offset metric. +func (mb *MetricsBuilder) RecordKafkaPartitionCurrentOffsetDataPoint(ts pcommon.Timestamp, val int64, topicAttributeValue string, partitionAttributeValue int64) { + mb.metricKafkaPartitionCurrentOffset.recordDataPoint(mb.startTime, ts, val, topicAttributeValue, partitionAttributeValue) +} + +// RecordKafkaPartitionOldestOffsetDataPoint adds a data point to kafka.partition.oldest_offset metric. +func (mb *MetricsBuilder) RecordKafkaPartitionOldestOffsetDataPoint(ts pcommon.Timestamp, val int64, topicAttributeValue string, partitionAttributeValue int64) { + mb.metricKafkaPartitionOldestOffset.recordDataPoint(mb.startTime, ts, val, topicAttributeValue, partitionAttributeValue) +} + +// RecordKafkaPartitionReplicasDataPoint adds a data point to kafka.partition.replicas metric. +func (mb *MetricsBuilder) RecordKafkaPartitionReplicasDataPoint(ts pcommon.Timestamp, val int64, topicAttributeValue string, partitionAttributeValue int64) { + mb.metricKafkaPartitionReplicas.recordDataPoint(mb.startTime, ts, val, topicAttributeValue, partitionAttributeValue) +} + +// RecordKafkaPartitionReplicasInSyncDataPoint adds a data point to kafka.partition.replicas_in_sync metric. +func (mb *MetricsBuilder) RecordKafkaPartitionReplicasInSyncDataPoint(ts pcommon.Timestamp, val int64, topicAttributeValue string, partitionAttributeValue int64) { + mb.metricKafkaPartitionReplicasInSync.recordDataPoint(mb.startTime, ts, val, topicAttributeValue, partitionAttributeValue) +} + +// RecordKafkaTopicPartitionsDataPoint adds a data point to kafka.topic.partitions metric. +func (mb *MetricsBuilder) RecordKafkaTopicPartitionsDataPoint(ts pcommon.Timestamp, val int64, topicAttributeValue string) { + mb.metricKafkaTopicPartitions.recordDataPoint(mb.startTime, ts, val, topicAttributeValue) +} + +// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, +// and metrics builder should update its startTime and reset it's internal state accordingly. +func (mb *MetricsBuilder) Reset(options ...metricBuilderOption) { + mb.startTime = pcommon.NewTimestampFromTime(time.Now()) + for _, op := range options { + op(mb) + } +} diff --git a/receiver/kafkametricsreceiver/metadata.yaml b/receiver/kafkametricsreceiver/metadata.yaml index 221594d19a0e..0a5a0c40eeb7 100644 --- a/receiver/kafkametricsreceiver/metadata.yaml +++ b/receiver/kafkametricsreceiver/metadata.yaml @@ -5,6 +5,7 @@ attributes: description: The ID (integer) of a topic partition: description: The number (integer) of the partition + type: int group: description: The ID (string) of a consumer group diff --git a/receiver/kafkametricsreceiver/receiver.go b/receiver/kafkametricsreceiver/receiver.go index 58ecf98a0a5d..bdc320997201 100644 --- a/receiver/kafkametricsreceiver/receiver.go +++ b/receiver/kafkametricsreceiver/receiver.go @@ -22,19 +22,17 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver/scraperhelper" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" ) const ( - instrumentationLibName = "otelcol/kafkametricsreceiver" - brokersScraperName = "brokers" - topicsScraperName = "topics" - consumersScraperName = "consumers" + brokersScraperName = "brokers" + topicsScraperName = "topics" + consumersScraperName = "consumers" ) -type createKafkaScraper func(context.Context, Config, *sarama.Config, *zap.Logger) (scraperhelper.Scraper, error) +type createKafkaScraper func(context.Context, Config, *sarama.Config, component.ReceiverCreateSettings) (scraperhelper.Scraper, error) var ( allScrapers = map[string]createKafkaScraper{ @@ -65,7 +63,7 @@ var newMetricsReceiver = func( scraperControllerOptions := make([]scraperhelper.ScraperControllerOption, 0, len(config.Scrapers)) for _, scraper := range config.Scrapers { if s, ok := allScrapers[scraper]; ok { - s, err := s(ctx, config, sc, params.Logger) + s, err := s(ctx, config, sc, params) if err != nil { return nil, err } diff --git a/receiver/kafkametricsreceiver/receiver_test.go b/receiver/kafkametricsreceiver/receiver_test.go index a117c08335d7..d375bb094e96 100644 --- a/receiver/kafkametricsreceiver/receiver_test.go +++ b/receiver/kafkametricsreceiver/receiver_test.go @@ -21,11 +21,11 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/scraperhelper" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" ) @@ -41,7 +41,7 @@ func TestNewReceiver_invalid_version_err(t *testing.T) { func TestNewReceiver_invalid_scraper_error(t *testing.T) { c := createDefaultConfig().(*Config) c.Scrapers = []string{"brokers", "cpu"} - mockScraper := func(context.Context, Config, *sarama.Config, *zap.Logger) (scraperhelper.Scraper, error) { + mockScraper := func(context.Context, Config, *sarama.Config, component.ReceiverCreateSettings) (scraperhelper.Scraper, error) { return nil, nil } allScrapers["brokers"] = mockScraper @@ -71,7 +71,7 @@ func TestNewReceiver_invalid_auth_error(t *testing.T) { func TestNewReceiver(t *testing.T) { c := createDefaultConfig().(*Config) c.Scrapers = []string{"brokers"} - mockScraper := func(context.Context, Config, *sarama.Config, *zap.Logger) (scraperhelper.Scraper, error) { + mockScraper := func(context.Context, Config, *sarama.Config, component.ReceiverCreateSettings) (scraperhelper.Scraper, error) { return nil, nil } allScrapers["brokers"] = mockScraper @@ -83,7 +83,7 @@ func TestNewReceiver(t *testing.T) { func TestNewReceiver_handles_scraper_error(t *testing.T) { c := createDefaultConfig().(*Config) c.Scrapers = []string{"brokers"} - mockScraper := func(context.Context, Config, *sarama.Config, *zap.Logger) (scraperhelper.Scraper, error) { + mockScraper := func(context.Context, Config, *sarama.Config, component.ReceiverCreateSettings) (scraperhelper.Scraper, error) { return nil, fmt.Errorf("fail") } allScrapers["brokers"] = mockScraper diff --git a/receiver/kafkametricsreceiver/topic_scraper.go b/receiver/kafkametricsreceiver/topic_scraper.go index 6f0a3e3cff2e..60dc0a23309f 100644 --- a/receiver/kafkametricsreceiver/topic_scraper.go +++ b/receiver/kafkametricsreceiver/topic_scraper.go @@ -21,6 +21,7 @@ import ( "time" "github.com/Shopify/sarama" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/scrapererror" @@ -32,10 +33,11 @@ import ( type topicScraper struct { client sarama.Client - logger *zap.Logger + settings component.ReceiverCreateSettings topicFilter *regexp.Regexp saramaConfig *sarama.Config config Config + mb *metadata.MetricsBuilder } func (s *topicScraper) Name() string { @@ -49,6 +51,11 @@ func (s *topicScraper) shutdown(context.Context) error { return nil } +func (s *topicScraper) start(_ context.Context, _ component.Host) error { + s.mb = metadata.NewMetricsBuilder(s.config.Metrics, s.settings.BuildInfo) + return nil +} + func (s *topicScraper) scrape(context.Context) (pmetric.Metrics, error) { if s.client == nil { client, err := newSaramaClient(s.config.Brokers, s.saramaConfig) @@ -60,16 +67,14 @@ func (s *topicScraper) scrape(context.Context) (pmetric.Metrics, error) { topics, err := s.client.Topics() if err != nil { - s.logger.Error("Error fetching cluster topics ", zap.Error(err)) + s.settings.Logger.Error("Error fetching cluster topics ", zap.Error(err)) return pmetric.Metrics{}, err } var scrapeErrors = scrapererror.ScrapeErrors{} now := pcommon.NewTimestampFromTime(time.Now()) - md := pmetric.NewMetrics() - ilm := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty() - ilm.Scope().SetName(instrumentationLibName) + for _, topic := range topics { if !s.topicFilter.MatchString(topic) { continue @@ -79,47 +84,45 @@ func (s *topicScraper) scrape(context.Context) (pmetric.Metrics, error) { scrapeErrors.Add(err) continue } - labels := pcommon.NewMap() - labels.UpsertString(metadata.A.Topic, topic) - addIntGauge(ilm.Metrics(), metadata.M.KafkaTopicPartitions.Name(), now, labels, int64(len(partitions))) + + s.mb.RecordKafkaTopicPartitionsDataPoint(now, int64(len(partitions)), topic) for _, partition := range partitions { - labels.UpsertInt(metadata.A.Partition, int64(partition)) currentOffset, err := s.client.GetOffset(topic, partition, sarama.OffsetNewest) if err != nil { scrapeErrors.AddPartial(1, err) } else { - addIntGauge(ilm.Metrics(), metadata.M.KafkaPartitionCurrentOffset.Name(), now, labels, currentOffset) + s.mb.RecordKafkaPartitionCurrentOffsetDataPoint(now, currentOffset, topic, int64(partition)) } oldestOffset, err := s.client.GetOffset(topic, partition, sarama.OffsetOldest) if err != nil { scrapeErrors.AddPartial(1, err) } else { - addIntGauge(ilm.Metrics(), metadata.M.KafkaPartitionOldestOffset.Name(), now, labels, oldestOffset) + s.mb.RecordKafkaPartitionOldestOffsetDataPoint(now, oldestOffset, topic, int64(partition)) } replicas, err := s.client.Replicas(topic, partition) if err != nil { scrapeErrors.AddPartial(1, err) } else { - addIntGauge(ilm.Metrics(), metadata.M.KafkaPartitionReplicas.Name(), now, labels, int64(len(replicas))) + s.mb.RecordKafkaPartitionReplicasDataPoint(now, int64(len(replicas)), topic, int64(partition)) } replicasInSync, err := s.client.InSyncReplicas(topic, partition) if err != nil { scrapeErrors.AddPartial(1, err) } else { - addIntGauge(ilm.Metrics(), metadata.M.KafkaPartitionReplicasInSync.Name(), now, labels, int64(len(replicasInSync))) + s.mb.RecordKafkaPartitionReplicasInSyncDataPoint(now, int64(len(replicasInSync)), topic, int64(partition)) } } } - return md, scrapeErrors.Combine() + return s.mb.Emit(), scrapeErrors.Combine() } -func createTopicsScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, logger *zap.Logger) (scraperhelper.Scraper, error) { +func createTopicsScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, settings component.ReceiverCreateSettings) (scraperhelper.Scraper, error) { topicFilter, err := regexp.Compile(cfg.TopicMatch) if err != nil { return nil, fmt.Errorf("failed to compile topic filter: %w", err) } s := topicScraper{ - logger: logger, + settings: settings, topicFilter: topicFilter, saramaConfig: saramaConfig, config: cfg, @@ -127,16 +130,7 @@ func createTopicsScraper(_ context.Context, cfg Config, saramaConfig *sarama.Con return scraperhelper.NewScraper( s.Name(), s.scrape, + scraperhelper.WithStart(s.start), scraperhelper.WithShutdown(s.shutdown), ) } - -func addIntGauge(ms pmetric.MetricSlice, name string, now pcommon.Timestamp, labels pcommon.Map, value int64) { - m := ms.AppendEmpty() - m.SetName(name) - m.SetDataType(pmetric.MetricDataTypeGauge) - dp := m.Gauge().DataPoints().AppendEmpty() - dp.SetTimestamp(now) - dp.SetIntVal(value) - labels.CopyTo(dp.Attributes()) -} diff --git a/receiver/kafkametricsreceiver/topic_scraper_test.go b/receiver/kafkametricsreceiver/topic_scraper_test.go index 0f8fc2067b17..b327b8d1766c 100644 --- a/receiver/kafkametricsreceiver/topic_scraper_test.go +++ b/receiver/kafkametricsreceiver/topic_scraper_test.go @@ -23,9 +23,7 @@ import ( "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" + "go.opentelemetry.io/collector/component/componenttest" ) func TestTopicShutdown(t *testing.T) { @@ -36,9 +34,9 @@ func TestTopicShutdown(t *testing.T) { On("Close").Return(nil). On("Closed").Return(false) scraper := brokerScraper{ - client: client, - logger: zap.NewNop(), - config: Config{}, + client: client, + settings: componenttest.NewNopReceiverCreateSettings(), + config: Config{}, } _ = scraper.shutdown(context.Background()) client.AssertExpectations(t) @@ -50,9 +48,9 @@ func TestTopicShutdown_closed(t *testing.T) { client.Mock. On("Closed").Return(true) scraper := topicScraper{ - client: client, - logger: zap.NewNop(), - config: Config{}, + client: client, + settings: componenttest.NewNopReceiverCreateSettings(), + config: Config{}, } _ = scraper.shutdown(context.Background()) client.AssertExpectations(t) @@ -66,7 +64,7 @@ func TestTopicScraper_Name(t *testing.T) { func TestTopicScraper_createsScraper(t *testing.T) { sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient - ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop()) + ms, err := createTopicsScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NoError(t, err) assert.NotNil(t, ms) } @@ -76,7 +74,7 @@ func TestTopicScraper_ScrapeHandlesError(t *testing.T) { return nil, fmt.Errorf("no scraper here") } sc := sarama.NewConfig() - ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop()) + ms, err := createTopicsScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NotNil(t, ms) assert.Nil(t, err) _, err = ms.Scrape(context.Background()) @@ -88,7 +86,7 @@ func TestTopicScraper_ShutdownHandlesNilClient(t *testing.T) { return nil, fmt.Errorf("no scraper here") } sc := sarama.NewConfig() - ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop()) + ms, err := createTopicsScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NotNil(t, ms) assert.Nil(t, err) err = ms.Shutdown(context.Background()) @@ -98,7 +96,7 @@ func TestTopicScraper_ShutdownHandlesNilClient(t *testing.T) { func TestTopicScraper_startScraperCreatesClient(t *testing.T) { newSaramaClient = mockNewSaramaClient sc := sarama.NewConfig() - ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop()) + ms, err := createTopicsScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings()) assert.NotNil(t, ms) assert.NoError(t, err) err = ms.Start(context.Background(), nil) @@ -110,7 +108,7 @@ func TestTopicScraper_createScraperHandles_invalid_topicMatch(t *testing.T) { sc := sarama.NewConfig() ms, err := createTopicsScraper(context.Background(), Config{ TopicMatch: "[", - }, sc, zap.NewNop()) + }, sc, componenttest.NewNopReceiverCreateSettings()) assert.Error(t, err) assert.Nil(t, ms) } @@ -123,9 +121,11 @@ func TestTopicScraper_scrapes(t *testing.T) { match := regexp.MustCompile(config.TopicMatch) scraper := topicScraper{ client: client, - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), + config: *config, topicFilter: match, } + require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost())) md, err := scraper.scrape(context.Background()) assert.NoError(t, err) require.Equal(t, 1, md.ResourceMetrics().Len()) @@ -135,15 +135,15 @@ func TestTopicScraper_scrapes(t *testing.T) { m := ms.At(i) dp := m.Gauge().DataPoints().At(0) switch m.Name() { - case metadata.M.KafkaTopicPartitions.Name(): + case "kafka.topic.partitions": assert.Equal(t, dp.IntVal(), int64(len(testPartitions))) - case metadata.M.KafkaPartitionCurrentOffset.Name(): + case "kafka.partition.current_offset": assert.Equal(t, dp.IntVal(), testOffset) - case metadata.M.KafkaPartitionOldestOffset.Name(): + case "kafka.partition.oldest_offset": assert.Equal(t, dp.IntVal(), testOffset) - case metadata.M.KafkaPartitionReplicas.Name(): + case "kafka.partition.replicas": assert.Equal(t, dp.IntVal(), int64(len(testReplicas))) - case metadata.M.KafkaPartitionReplicasInSync.Name(): + case "kafka.partition.replicas_in_sync": assert.Equal(t, dp.IntVal(), int64(len(testReplicas))) } } @@ -156,7 +156,7 @@ func TestTopicScraper_scrape_handlesTopicError(t *testing.T) { match := regexp.MustCompile(config.TopicMatch) scraper := topicScraper{ client: client, - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), topicFilter: match, } _, err := scraper.scrape(context.Background()) @@ -170,9 +170,10 @@ func TestTopicScraper_scrape_handlesPartitionError(t *testing.T) { match := regexp.MustCompile(config.TopicMatch) scraper := topicScraper{ client: client, - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), topicFilter: match, } + require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost())) _, err := scraper.scrape(context.Background()) assert.Error(t, err) } @@ -187,9 +188,10 @@ func TestTopicScraper_scrape_handlesPartialScrapeErrors(t *testing.T) { match := regexp.MustCompile(config.TopicMatch) scraper := topicScraper{ client: client, - logger: zap.NewNop(), + settings: componenttest.NewNopReceiverCreateSettings(), topicFilter: match, } + require.NoError(t, scraper.start(context.Background(), componenttest.NewNopHost())) _, err := scraper.scrape(context.Background()) assert.Error(t, err) } diff --git a/unreleased/migrate-kafkametrics.yaml b/unreleased/migrate-kafkametrics.yaml new file mode 100755 index 000000000000..feaf844bb921 --- /dev/null +++ b/unreleased/migrate-kafkametrics.yaml @@ -0,0 +1,6 @@ +change_type: enhancement +component: kafkametricsreceiver +note: Migrate receiver to the new metrics builder +issues: [7142] +subtext: |- + This allows users to disable particular metrics in through user settings.