Skip to content

Commit

Permalink
[receiver/kafkametrics] Migrate receiver to the new metrics builder
Browse files Browse the repository at this point in the history
This allows users to disable particular metrics in through user settings.
  • Loading branch information
dmitryax committed Aug 13, 2022
1 parent 42e3646 commit 6ef11d0
Show file tree
Hide file tree
Showing 17 changed files with 983 additions and 333 deletions.
23 changes: 14 additions & 9 deletions receiver/kafkametricsreceiver/broker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,31 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

type brokerScraper struct {
client sarama.Client
logger *zap.Logger
settings component.ReceiverCreateSettings
config Config
saramaConfig *sarama.Config
mb *metadata.MetricsBuilder
}

func (s *brokerScraper) Name() string {
return brokersScraperName
}

func (s *brokerScraper) start(_ context.Context, _ component.Host) error {
s.mb = metadata.NewMetricsBuilder(s.config.Metrics, s.settings.BuildInfo)
return nil
}

func (s *brokerScraper) shutdown(context.Context) error {
if s.client != nil && !s.client.Closed() {
return s.client.Close()
Expand All @@ -57,23 +63,22 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {

brokers := s.client.Brokers()

md := pmetric.NewMetrics()
ilm := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(instrumentationLibName)
addIntGauge(ilm.Metrics(), metadata.M.KafkaBrokers.Name(), pcommon.NewTimestampFromTime(time.Now()), pcommon.NewMap(), int64(len(brokers)))
s.mb.RecordKafkaBrokersDataPoint(pcommon.NewTimestampFromTime(time.Now()), int64(len(brokers)))

return md, nil
return s.mb.Emit(), nil
}

func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, logger *zap.Logger) (scraperhelper.Scraper, error) {
func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config,
settings component.ReceiverCreateSettings) (scraperhelper.Scraper, error) {
s := brokerScraper{
logger: logger,
settings: settings,
config: cfg,
saramaConfig: saramaConfig,
}
return scraperhelper.NewScraper(
s.Name(),
s.scrape,
scraperhelper.WithStart(s.start),
scraperhelper.WithShutdown(s.shutdown),
)
}
34 changes: 19 additions & 15 deletions receiver/kafkametricsreceiver/broker_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

func TestBrokerShutdown(t *testing.T) {
Expand All @@ -32,9 +35,9 @@ func TestBrokerShutdown(t *testing.T) {
On("Close").Return(nil).
On("Closed").Return(false)
scraper := brokerScraper{
client: client,
logger: zap.NewNop(),
config: Config{},
client: client,
settings: componenttest.NewNopReceiverCreateSettings(),
config: Config{},
}
_ = scraper.shutdown(context.Background())
client.AssertExpectations(t)
Expand All @@ -46,9 +49,9 @@ func TestBrokerShutdown_closed(t *testing.T) {
client.Mock.
On("Closed").Return(true)
scraper := brokerScraper{
client: client,
logger: zap.NewNop(),
config: Config{},
client: client,
settings: componenttest.NewNopReceiverCreateSettings(),
config: Config{},
}
_ = scraper.shutdown(context.Background())
client.AssertExpectations(t)
Expand All @@ -62,15 +65,15 @@ func TestBrokerScraper_Name(t *testing.T) {
func TestBrokerScraper_createBrokerScraper(t *testing.T) {
sc := sarama.NewConfig()
newSaramaClient = mockNewSaramaClient
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
}

func TestBrokerScraperStart(t *testing.T) {
newSaramaClient = mockNewSaramaClient
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
assert.NoError(t, bs.Start(context.Background(), nil))
Expand All @@ -81,7 +84,7 @@ func TestBrokerScraper_scrape_handles_client_error(t *testing.T) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
_, err = bs.Scrape(context.Background())
Expand All @@ -93,7 +96,7 @@ func TestBrokerScraper_shutdown_handles_nil_client(t *testing.T) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
err = bs.Shutdown(context.Background())
Expand All @@ -104,10 +107,11 @@ func TestBrokerScraper_scrape(t *testing.T) {
client := newMockClient()
client.Mock.On("Brokers").Return(testBrokers)
bs := brokerScraper{
client: client,
logger: zap.NewNop(),
config: Config{},
client: client,
settings: componenttest.NewNopReceiverCreateSettings(),
config: Config{Metrics: metadata.DefaultMetricsSettings()},
}
require.NoError(t, bs.start(context.Background(), componenttest.NewNopHost()))
md, err := bs.scrape(context.Background())
assert.NoError(t, err)
expectedDp := int64(len(testBrokers))
Expand All @@ -119,7 +123,7 @@ func TestBrokerScraper_scrape(t *testing.T) {
func TestBrokersScraper_createBrokerScraper(t *testing.T) {
sc := sarama.NewConfig()
newSaramaClient = mockNewSaramaClient
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
bs, err := createBrokerScraper(context.Background(), Config{}, sc, componenttest.NewNopReceiverCreateSettings())
assert.NoError(t, err)
assert.NotNil(t, bs)
}
4 changes: 4 additions & 0 deletions receiver/kafkametricsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/receiver/scraperhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

// Config represents user settings for kafkametrics receiver
Expand All @@ -44,4 +45,7 @@ type Config struct {

// ClientID is the id associated with the consumer that reads from topics in kafka.
ClientID string `mapstructure:"client_id"`

// Metrics allows customizing scraped metrics representation.
Metrics metadata.MetricsSettings `mapstructure:"metrics"`
}
2 changes: 2 additions & 0 deletions receiver/kafkametricsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

func TestLoadConfig(t *testing.T) {
Expand Down Expand Up @@ -57,5 +58,6 @@ func TestLoadConfig(t *testing.T) {
},
ClientID: defaultClientID,
Scrapers: []string{"brokers", "topics", "consumers"},
Metrics: metadata.DefaultMetricsSettings(),
}, r)
}
40 changes: 22 additions & 18 deletions receiver/kafkametricsreceiver/consumer_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,35 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)

type consumerScraper struct {
client sarama.Client
logger *zap.Logger
settings component.ReceiverCreateSettings
groupFilter *regexp.Regexp
topicFilter *regexp.Regexp
clusterAdmin sarama.ClusterAdmin
saramaConfig *sarama.Config
config Config
mb *metadata.MetricsBuilder
}

func (s *consumerScraper) Name() string {
return consumersScraperName
}

func (s *consumerScraper) start(_ context.Context, _ component.Host) error {
s.mb = metadata.NewMetricsBuilder(s.config.Metrics, s.settings.BuildInfo)
return nil
}

func (s *consumerScraper) shutdown(_ context.Context) error {
if s.client != nil && !s.client.Closed() {
return s.client.Close()
Expand Down Expand Up @@ -120,18 +126,16 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
}

now := pcommon.NewTimestampFromTime(time.Now())
md := pmetric.NewMetrics()
ilm := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty()
ilm.Scope().SetName(instrumentationLibName)

for _, group := range consumerGroups {
labels := pcommon.NewMap()
labels.UpsertString(metadata.A.Group, group.GroupId)
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupMembers.Name(), now, labels, int64(len(group.Members)))
s.mb.RecordKafkaConsumerGroupMembersDataPoint(now, int64(len(group.Members)), group.GroupId)

groupOffsetFetchResponse, err := s.clusterAdmin.ListConsumerGroupOffsets(group.GroupId, topicPartitions)
if err != nil {
scrapeError = multierr.Append(scrapeError, err)
continue
}

for topic, partitions := range groupOffsetFetchResponse.Blocks {
// tracking matchedTopics consumed by this group
// by checking if any of the blocks has an offset
Expand All @@ -142,15 +146,14 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
break
}
}
labels.UpsertString(metadata.A.Topic, topic)
if isConsumed {
var lagSum int64
var offsetSum int64
for partition, block := range partitions {
labels.UpsertInt(metadata.A.Partition, int64(partition))
consumerOffset := block.Offset
offsetSum += consumerOffset
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupOffset.Name(), now, labels, consumerOffset)
s.mb.RecordKafkaConsumerGroupOffsetDataPoint(now, offsetSum, group.GroupId, topic, int64(partition))

// default -1 to indicate no lag measured.
var consumerLag int64 = -1
if partitionOffset, ok := topicPartitionOffset[topic][partition]; ok {
Expand All @@ -160,19 +163,19 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
lagSum += consumerLag
}
}
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupLag.Name(), now, labels, consumerLag)
s.mb.RecordKafkaConsumerGroupLagDataPoint(now, consumerLag, group.GroupId, topic, int64(partition))
}
labels.Remove(metadata.A.Partition)
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupOffsetSum.Name(), now, labels, offsetSum)
addIntGauge(ilm.Metrics(), metadata.M.KafkaConsumerGroupLagSum.Name(), now, labels, lagSum)
s.mb.RecordKafkaConsumerGroupOffsetSumDataPoint(now, offsetSum, group.GroupId, topic)
s.mb.RecordKafkaConsumerGroupLagSumDataPoint(now, lagSum, group.GroupId, topic)
}
}
}

return md, scrapeError
return s.mb.Emit(), scrapeError
}

func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config, logger *zap.Logger) (scraperhelper.Scraper, error) {
func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Config,
settings component.ReceiverCreateSettings) (scraperhelper.Scraper, error) {
groupFilter, err := regexp.Compile(cfg.GroupMatch)
if err != nil {
return nil, fmt.Errorf("failed to compile group_match: %w", err)
Expand All @@ -182,7 +185,7 @@ func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.C
return nil, fmt.Errorf("failed to compile topic filter: %w", err)
}
s := consumerScraper{
logger: logger,
settings: settings,
groupFilter: groupFilter,
topicFilter: topicFilter,
config: cfg,
Expand All @@ -191,6 +194,7 @@ func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.C
return scraperhelper.NewScraper(
s.Name(),
s.scrape,
scraperhelper.WithStart(s.start),
scraperhelper.WithShutdown(s.shutdown),
)
}
Loading

0 comments on commit 6ef11d0

Please sign in to comment.