Skip to content

Commit

Permalink
feat(inputs.kafka_consumer): Implement startup error behavior options (
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored and asaharn committed Oct 16, 2024
1 parent 844d43f commit 409a1fe
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 79 deletions.
21 changes: 14 additions & 7 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Startup error behavior options <!-- @/docs/includes/startup_error_behavior.md -->

In addition to the plugin-specific and global configuration settings the plugin
supports options for specifying the behavior when experiencing startup errors
using the `startup_error_behavior` setting. Available values are:

- `error`: Telegraf with stop and exit in case of startup errors. This is the
default behavior.
- `ignore`: Telegraf will ignore startup errors for this plugin and disables it
but continues processing for all other plugins.
- `retry`: Telegraf will try to startup the plugin in every gather or write
cycle in case of startup errors. The plugin is disabled until
the startup succeeds.

## Secret-store support

This plugin supports secrets from secret-stores for the `sasl_username`,
Expand Down Expand Up @@ -161,13 +175,6 @@ to use them.
## This list of hostnames then replaces the original address list.
## resolve_canonical_bootstrap_servers_only = false

## Strategy for making connection to kafka brokers. Valid options: "startup",
## "defer". If set to "defer" the plugin is allowed to start before making a
## connection. This is useful if the broker may be down when telegraf is
## started, but if there are any typos in the broker setting, they will cause
## connection failures without warning at startup
# connection_strategy = "startup"

## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 1000000
Expand Down
8 changes: 6 additions & 2 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package kafka_consumer
import (
"context"
_ "embed"
"errors"
"fmt"
"regexp"
"sort"
Expand Down Expand Up @@ -51,7 +52,7 @@ type KafkaConsumer struct {
MsgHeaderAsMetricName string `toml:"msg_header_as_metric_name"`
TimestampSource string `toml:"timestamp_source"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
ConnectionStrategy string `toml:"connection_strategy" deprecated:"1.33.0;1.40.0;use 'startup_error_behavior' instead"`
ResolveCanonicalBootstrapServersOnly bool `toml:"resolve_canonical_bootstrap_servers_only"`
Log telegraf.Logger `toml:"-"`
kafka.ReadConfig
Expand Down Expand Up @@ -307,7 +308,10 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
if k.ConnectionStrategy != "defer" {
err = k.create()
if err != nil {
return fmt.Errorf("create consumer: %w", err)
return &internal.StartupError{
Err: fmt.Errorf("create consumer: %w", err),
Retry: errors.Is(err, sarama.ErrOutOfBrokers),
}
}
k.startErrorAdder(acc)
}
Expand Down
Loading

0 comments on commit 409a1fe

Please sign in to comment.