Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/kafkametrics] Migrate receiver to the new metrics builder #13319

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions receiver/kafkametricsreceiver/broker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,31 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

type brokerScraper struct {
client sarama.Client
logger *zap.Logger
settings component.ReceiverCreateSettings
config Config
saramaConfig *sarama.Config
mb *metadata.MetricsBuilder
}

func (s *brokerScraper) Name() string {
return brokersScraperName
}

func (s *brokerScraper) start(_ context.Context, _ component.Host) error {
s.mb = metadata.NewMetricsBuilder(s.config.Metrics, s.settings.BuildInfo)
return nil
}

func (s *brokerScraper) shutdown(context.Context) error {
if s.client != nil && !s.client.Closed() {
return s.client.Close()
Expand All @@ -57,23 +63,22 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {

brokers := s.client.Brokers()

md := pmetric.NewMetrics()
ilm := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(instrumentationLibName)
addIntGauge(ilm.Metrics(), metadata.M.KafkaBrokers.Name(), pcommon.NewTimestampFromTime(time.Now()), pcommon.NewMap(), int64(len(brokers)))
s.mb.RecordKafkaBrokersDataPoint(pcommon.NewTimestampFromTime(time.Now()), int64(len(brokers)))

return md, nil
return s.mb.Emit(), nil
}

func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, logger *zap.Logger) (scraperhelper.Scraper, error) {
func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config,
settings component.ReceiverCreateSettings) (scraperhelper.Scraper, error) {
s := brokerScraper{
logger: logger,
settings: settings,
config: cfg,
saramaConfig: saramaConfig,
}
return scraperhelper.NewScraper(
s.Name(),
s.scrape,
scraperhelper.WithStart(s.start),
scraperhelper.WithShutdown(s.shutdown),
)
}
34 changes: 19 additions & 15 deletions receiver/kafkametricsreceiver/broker_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

func TestBrokerShutdown(t *testing.T) {
Expand All @@ -32,9 +35,9 @@ func TestBrokerShutdown(t *testing.T) {
On("Close").Return(nil).
On("Closed").Return(false)
scraper := brokerScraper{
client: client,
logger: zap.NewNop(),
config: Config{},
client: client,
settings: componenttest.NewNopReceiverCreateSettings(),
config: Config{},
}
_ = scraper.shutdown(context.Background())
client.AssertExpectations(t)
Expand All @@ -46,9 +49,9 @@ func TestBrokerShutdown_closed(t *testing.T) {
client.Mock.
On("Closed").Return(true)
scraper := brokerScraper{
client: client,
logger: zap.NewNop(),
config: Config{},
client: client,
settings: componenttest.NewNopReceiverCreateSettings(),
config: Config{},
}
_ = scraper.shutdown(context.Background())
client.AssertExpectations(t)
Expand All @@ -62,15 +65,15 @@ func TestBrokerScraper_Name(t *testing.T) {
func TestBrokerScraper_createBrokerScraper(t *testing.T) {
sc := sarama.NewConfig()
newSaramaClient = mockNewSaramaClient
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
}

func TestBrokerScraperStart(t *testing.T) {
newSaramaClient = mockNewSaramaClient
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
assert.NoError(t, bs.Start(context.Background(), nil))
Expand All @@ -81,7 +84,7 @@ func TestBrokerScraper_scrape_handles_client_error(t *testing.T) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
_, err = bs.Scrape(context.Background())
Expand All @@ -93,7 +96,7 @@ func TestBrokerScraper_shutdown_handles_nil_client(t *testing.T) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
err = bs.Shutdown(context.Background())
Expand All @@ -104,10 +107,11 @@ func TestBrokerScraper_scrape(t *testing.T) {
client := newMockClient()
client.Mock.On("Brokers").Return(testBrokers)
bs := brokerScraper{
client: client,
logger: zap.NewNop(),
config: Config{},
client: client,
settings: componenttest.NewNopReceiverCreateSettings(),
config: Config{Metrics: metadata.DefaultMetricsSettings()},
}
require.NoError(t, bs.start(context.Background(), componenttest.NewNopHost()))
md, err := bs.scrape(context.Background())
assert.NoError(t, err)
expectedDp := int64(len(testBrokers))
Expand All @@ -119,7 +123,7 @@ func TestBrokerScraper_scrape(t *testing.T) {
func TestBrokersScraper_createBrokerScraper(t *testing.T) {
sc := sarama.NewConfig()
newSaramaClient = mockNewSaramaClient
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
}
4 changes: 4 additions & 0 deletions receiver/kafkametricsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/receiver/scraperhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

// Config represents user settings for kafkametrics receiver
Expand All @@ -44,4 +45,7 @@ type Config struct {

// ClientID is the id associated with the consumer that reads from topics in kafka.
ClientID string `mapstructure:"client_id"`

// Metrics allows customizing scraped metrics representation.
Metrics metadata.MetricsSettings `mapstructure:"metrics"`
}
2 changes: 2 additions & 0 deletions receiver/kafkametricsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

func TestLoadConfig(t *testing.T) {
Expand Down Expand Up @@ -57,5 +58,6 @@ func TestLoadConfig(t *testing.T) {
},
ClientID: defaultClientID,
Scrapers: []string{"brokers", "topics", "consumers"},
Metrics: metadata.DefaultMetricsSettings(),
}, r)
}
40 changes: 22 additions & 18 deletions receiver/kafkametricsreceiver/consumer_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,35 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

type consumerScraper struct {
client sarama.Client
logger *zap.Logger
settings component.ReceiverCreateSettings
groupFilter *regexp.Regexp
topicFilter *regexp.Regexp
clusterAdmin sarama.ClusterAdmin
saramaConfig *sarama.Config
config Config
mb *metadata.MetricsBuilder
}

func (s *consumerScraper) Name() string {
return consumersScraperName
}

func (s *consumerScraper) start(_ context.Context, _ component.Host) error {
s.mb = metadata.NewMetricsBuilder(s.config.Metrics, s.settings.BuildInfo)
return nil
}

func (s *consumerScraper) shutdown(_ context.Context) error {
if s.client != nil && !s.client.Closed() {
return s.client.Close()
Expand Down Expand Up @@ -120,18 +126,16 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
}

now := pcommon.NewTimestampFromTime(time.Now())
md := pmetric.NewMetrics()
ilm := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(instrumentationLibName)

for _, group := range consumerGroups {
labels := pcommon.NewMap()
labels.UpsertString(metadata.A.Group, group.GroupId)
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupMembers.Name(), now, labels, int64(len(group.Members)))
s.mb.RecordKafkaConsumerGroupMembersDataPoint(now, int64(len(group.Members)), group.GroupId)

groupOffsetFetchResponse, err := s.clusterAdmin.ListConsumerGroupOffsets(group.GroupId, topicPartitions)
if err != nil {
scrapeError = multierr.Append(scrapeError, err)
continue
}

for topic, partitions := range groupOffsetFetchResponse.Blocks {
// tracking matchedTopics consumed by this group
// by checking if any of the blocks has an offset
Expand All @@ -142,15 +146,14 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
break
}
}
labels.UpsertString(metadata.A.Topic, topic)
if isConsumed {
var lagSum int64
var offsetSum int64
for partition, block := range partitions {
labels.UpsertInt(metadata.A.Partition, int64(partition))
consumerOffset := block.Offset
offsetSum += consumerOffset
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupOffset.Name(), now, labels, consumerOffset)
s.mb.RecordKafkaConsumerGroupOffsetDataPoint(now, offsetSum, group.GroupId, topic, int64(partition))

// default -1 to indicate no lag measured.
var consumerLag int64 = -1
if partitionOffset, ok := topicPartitionOffset[topic][partition]; ok {
Expand All @@ -160,19 +163,19 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
lagSum += consumerLag
}
}
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupLag.Name(), now, labels, consumerLag)
s.mb.RecordKafkaConsumerGroupLagDataPoint(now, consumerLag, group.GroupId, topic, int64(partition))
}
labels.Remove(metadata.A.Partition)
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupOffsetSum.Name(), now, labels, offsetSum)
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupLagSum.Name(), now, labels, lagSum)
s.mb.RecordKafkaConsumerGroupOffsetSumDataPoint(now, offsetSum, group.GroupId, topic)
s.mb.RecordKafkaConsumerGroupLagSumDataPoint(now, lagSum, group.GroupId, topic)
}
}
}

return md, scrapeError
return s.mb.Emit(), scrapeError
}

func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, logger *zap.Logger) (scraperhelper.Scraper, error) {
func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config,
settings component.ReceiverCreateSettings) (scraperhelper.Scraper, error) {
groupFilter, err := regexp.Compile(cfg.GroupMatch)
if err != nil {
return nil, fmt.Errorf("failed to compile group_match: %w", err)
Expand All @@ -182,7 +185,7 @@ func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.C
return nil, fmt.Errorf("failed to compile topic filter: %w", err)
}
s := consumerScraper{
logger: logger,
settings: settings,
groupFilter: groupFilter,
topicFilter: topicFilter,
config: cfg,
Expand All @@ -191,6 +194,7 @@ func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.C
return scraperhelper.NewScraper(
s.Name(),
s.scrape,
scraperhelper.WithStart(s.start),
scraperhelper.WithShutdown(s.shutdown),
)
}
Loading