Skip to content

Commit

Permalink
[receiver/kafkametricsreceiver] do not crash collector on startup whe…
Browse files Browse the repository at this point in the history
…n kafka is unavailable (#8817)

* kafkametricsreceiver initialize client in scrape

* add changelog entry

* more descriptive changelog entry

* kafkametricsreceiver: guard against nil client in shutdown

* kafkametricsreceiver: inline client creation in scrape
  • Loading branch information
mwear authored Apr 21, 2022
1 parent c2b384e commit f5f05d5
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- `fluentforwardreceiver`: Release port on shutdown (#9111)
- `prometheusexporter`: Prometheus fails to generate logs when prometheus exporter produced a check exception occurs. (#8949)
- `resourcedetectionprocessor`: Wire docker detector (#9372)
- `kafkametricsreceiver`: The kafkametricsreceiver was changed to connect to kafka during scrape, rather than startup. If kafka is unavailable the receiver will attempt to connect during subsequent scrapes until succcessful (#8817).

## v0.49.0

Expand Down
21 changes: 9 additions & 12 deletions receiver/kafkametricsreceiver/broker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scraperhelper"
Expand All @@ -40,23 +39,22 @@ func (s *brokerScraper) Name() string {
return brokersScraperName
}

func (s *brokerScraper) start(context.Context, component.Host) error {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
if err != nil {
return fmt.Errorf("failed to create client while starting brokers scraper: %w", err)
}
s.client = client
return nil
}

func (s *brokerScraper) shutdown(context.Context) error {
if !s.client.Closed() {
if s.client != nil && !s.client.Closed() {
return s.client.Close()
}
return nil
}

func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
if s.client == nil {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
if err != nil {
return pmetric.Metrics{}, fmt.Errorf("failed to create client in brokers scraper: %w", err)
}
s.client = client
}

brokers := s.client.Brokers()

md := pmetric.NewMetrics()
Expand All @@ -77,6 +75,5 @@ func createBrokerScraper(_ context.Context, cfg Config, saramaConfig *sarama.Con
s.Name(),
s.scrape,
scraperhelper.WithShutdown(s.shutdown),
scraperhelper.WithStart(s.start),
)
}
17 changes: 15 additions & 2 deletions receiver/kafkametricsreceiver/broker_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,28 @@ func TestBrokerScraperStart(t *testing.T) {
assert.NoError(t, bs.Start(context.Background(), nil))
}

func TestBrokerScraper_startBrokerScraper_handles_client_error(t *testing.T) {
func TestBrokerScraper_scrape_handles_client_error(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, bs)
assert.Error(t, bs.Start(context.Background(), nil))
_, err = bs.Scrape(context.Background())
assert.Error(t, err)
}

func TestBrokerScraper_shutdown_handles_nil_client(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
bs, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, bs)
err = bs.Shutdown(context.Background())
assert.NoError(t, err)
}

func TestBrokerScraper_scrape(t *testing.T) {
Expand Down
37 changes: 17 additions & 20 deletions receiver/kafkametricsreceiver/consumer_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scraperhelper"
Expand All @@ -45,31 +44,30 @@ func (s *consumerScraper) Name() string {
return consumersScraperName
}

func (s *consumerScraper) start(context.Context, component.Host) error {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
if err != nil {
return fmt.Errorf("failed to create client while starting consumer scraper: %w", err)
}
clusterAdmin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig)
if err != nil {
if client != nil {
_ = client.Close()
}
return fmt.Errorf("failed to create cluster admin while starting consumer scraper: %w", err)
}
s.client = client
s.clusterAdmin = clusterAdmin
return nil
}

func (s *consumerScraper) shutdown(_ context.Context) error {
if !s.client.Closed() {
if s.client != nil && !s.client.Closed() {
return s.client.Close()
}
return nil
}

func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) {
if s.client == nil {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
if err != nil {
return pmetric.Metrics{}, fmt.Errorf("failed to create client in consumer scraper: %w", err)
}
clusterAdmin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig)
if err != nil {
if client != nil {
_ = client.Close()
}
return pmetric.Metrics{}, fmt.Errorf("failed to create cluster admin in consumer scraper: %w", err)
}
s.client = client
s.clusterAdmin = clusterAdmin
}

cgs, listErr := s.clusterAdmin.ListConsumerGroups()
if listErr != nil {
return pmetric.Metrics{}, listErr
Expand Down Expand Up @@ -194,6 +192,5 @@ func createConsumerScraper(_ context.Context, cfg Config, saramaConfig *sarama.C
s.Name(),
s.scrape,
scraperhelper.WithShutdown(s.shutdown),
scraperhelper.WithStart(s.start),
)
}
20 changes: 16 additions & 4 deletions receiver/kafkametricsreceiver/consumer_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,31 @@ func TestConsumerScraper_createConsumerScraper(t *testing.T) {
assert.NotNil(t, cs)
}

func TestConsumerScraper_startScraper_handles_client_error(t *testing.T) {
func TestConsumerScraper_scrape_handles_client_error(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
cs, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, cs)
err = cs.Start(context.Background(), nil)
_, err = cs.Scrape(context.Background())
assert.Error(t, err)
}

func TestConsumerScraper_startScraper_handles_clusterAdmin_error(t *testing.T) {
func TestConsumerScraper_scrape_handles_nil_client(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
cs, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, cs)
err = cs.Shutdown(context.Background())
assert.NoError(t, err)
}

func TestConsumerScraper_scrape_handles_clusterAdmin_error(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
client := newMockClient()
client.Mock.
Expand All @@ -91,7 +103,7 @@ func TestConsumerScraper_startScraper_handles_clusterAdmin_error(t *testing.T) {
cs, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, cs)
err = cs.Start(context.Background(), nil)
_, err = cs.Scrape(context.Background())
assert.Error(t, err)
}

Expand Down
21 changes: 9 additions & 12 deletions receiver/kafkametricsreceiver/topic_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scrapererror"
Expand All @@ -43,23 +42,22 @@ func (s *topicScraper) Name() string {
return topicsScraperName
}

func (s *topicScraper) start(context.Context, component.Host) error {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
if err != nil {
return fmt.Errorf("failed to create client while starting topics scraper: %w", err)
}
s.client = client
return nil
}

func (s *topicScraper) shutdown(context.Context) error {
if !s.client.Closed() {
if s.client != nil && !s.client.Closed() {
return s.client.Close()
}
return nil
}

func (s *topicScraper) scrape(context.Context) (pmetric.Metrics, error) {
if s.client == nil {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
if err != nil {
return pmetric.Metrics{}, fmt.Errorf("failed to create client in topics scraper: %w", err)
}
s.client = client
}

topics, err := s.client.Topics()
if err != nil {
s.logger.Error("Error fetching cluster topics ", zap.Error(err))
Expand Down Expand Up @@ -130,7 +128,6 @@ func createTopicsScraper(_ context.Context, cfg Config, saramaConfig *sarama.Con
s.Name(),
s.scrape,
scraperhelper.WithShutdown(s.shutdown),
scraperhelper.WithStart(s.start),
)
}

Expand Down
16 changes: 14 additions & 2 deletions receiver/kafkametricsreceiver/topic_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,30 @@ func TestTopicScraper_createsScraper(t *testing.T) {
assert.NotNil(t, ms)
}

func TestTopicScraper_startScraperHandlesError(t *testing.T) {
func TestTopicScraper_ScrapeHandlesError(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
return nil, fmt.Errorf("no scraper here")
}
sc := sarama.NewConfig()
ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NotNil(t, ms)
assert.Nil(t, err)
err = ms.Start(context.Background(), nil)
_, err = ms.Scrape(context.Background())
assert.Error(t, err)
}

func TestTopicScraper_ShutdownHandlesNilClient(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
return nil, fmt.Errorf("no scraper here")
}
sc := sarama.NewConfig()
ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NotNil(t, ms)
assert.Nil(t, err)
err = ms.Shutdown(context.Background())
assert.NoError(t, err)
}

func TestTopicScraper_startScraperCreatesClient(t *testing.T) {
newSaramaClient = mockNewSaramaClient
sc := sarama.NewConfig()
Expand Down

0 comments on commit f5f05d5

Please sign in to comment.