Skip to content

Commit

Permalink
[receiver/kafkareceiver] Add settings to Kafka Receiver for group man…
Browse files Browse the repository at this point in the history
…agement facilities (#33082)

Previous PR was closed due to inactivity waiting for review by
maintainer:
#32393

**Description:** 

To address #28630 i've wired in the session timeout and heartbeat
interval settings. I would have exposed max.poll.interval as well but it
appears Sarama doesn't make that setting available. It has a similar
setting called `MaxProcessingTime` that we could implement if desired
but behavior is different. Something to note is that Kafka recommends a
45s session timeout for consumers but Samara actually defaults to 10s so
I've also defaulted to 10s to keep behavior consistent.

**Link to tracking Issue:** #28630

**Testing:**

I started the OTel collector with the following configuration and
verified via debugger that the settings were applied to the Sarama
client.
```
receivers:
  kafka:
    topic: test
    session_timeout: 15s
    heartbeat_interval: 5s

exporters:
  debug:

service:
  pipelines:
    logs:  # a pipeline of “traces” type
      receivers: [kafka]
      exporters: [debug]

```

I then added a test to `config_test` that tests the default values and
tests providing the values via config.

**Documentation:** <Describe the documentation added.>

I modified the readme to reference the new settings.

---------

Co-authored-by: Sean Marciniak <[email protected]>
  • Loading branch information
strawgate and MovieStoreGuy authored Jul 23, 2024
1 parent 1fd6851 commit 821c256
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 19 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafkareceiversettings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add settings session_timeout and heartbeat_interval to Kafka Receiver for group management facilities

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [28630]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ The following settings can be optionally configured:
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
- `session_timeout` (default = `10s`): The request timeout for detecting client failures when using Kafka’s group management facilities.
- `heartbeat_interval` (default = `3s`): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
4 changes: 4 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Config struct {
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
// Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`
// Session interval for the Kafka consumer
SessionTimeout time.Duration `mapstructure:"session_timeout"`
// Heartbeat interval for the Kafka consumer
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
// The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs)
Topic string `mapstructure:"topic"`
// Encoding of the messages (default "otlp_proto")
Expand Down
16 changes: 10 additions & 6 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func TestLoadConfig(t *testing.T) {
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "latest",
SessionTimeout: 10 * time.Second,
HeartbeatInterval: 3 * time.Second,
Authentication: kafka.Authentication{
TLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -66,12 +68,14 @@ func TestLoadConfig(t *testing.T) {

id: component.NewIDWithName(metadata.Type, "logs"),
expected: &Config{
Topic: "logs",
Encoding: "direct",
Brokers: []string{"coffee:123", "foobar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "earliest",
Topic: "logs",
Encoding: "direct",
Brokers: []string{"coffee:123", "foobar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "earliest",
SessionTimeout: 45 * time.Second,
HeartbeatInterval: 15 * time.Second,
Authentication: kafka.Authentication{
TLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand Down
30 changes: 17 additions & 13 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import (
)

const (
defaultTracesTopic = "otlp_spans"
defaultMetricsTopic = "otlp_metrics"
defaultLogsTopic = "otlp_logs"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
defaultClientID = "otel-collector"
defaultGroupID = defaultClientID
defaultInitialOffset = offsetLatest
defaultTracesTopic = "otlp_spans"
defaultMetricsTopic = "otlp_metrics"
defaultLogsTopic = "otlp_logs"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
defaultClientID = "otel-collector"
defaultGroupID = defaultClientID
defaultInitialOffset = offsetLatest
defaultSessionTimeout = 10 * time.Second
defaultHeartbeatInterval = 3 * time.Second

// default from sarama.NewConfig()
defaultMetadataRetryMax = 3
Expand Down Expand Up @@ -93,11 +95,13 @@ func NewFactory(options ...FactoryOption) receiver.Factory {

func createDefaultConfig() component.Config {
return &Config{
Encoding: defaultEncoding,
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
GroupID: defaultGroupID,
InitialOffset: defaultInitialOffset,
Encoding: defaultEncoding,
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
GroupID: defaultGroupID,
InitialOffset: defaultInitialOffset,
SessionTimeout: defaultSessionTimeout,
HeartbeatInterval: defaultHeartbeatInterval,
Metadata: kafkaexporter.Metadata{
Full: defaultMetadataFull,
Retry: kafkaexporter.MetadataRetry{
Expand Down
2 changes: 2 additions & 0 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.Equal(t, defaultGroupID, cfg.GroupID)
assert.Equal(t, defaultClientID, cfg.ClientID)
assert.Equal(t, defaultInitialOffset, cfg.InitialOffset)
assert.Equal(t, defaultSessionTimeout, cfg.SessionTimeout)
assert.Equal(t, defaultHeartbeatInterval, cfg.HeartbeatInterval)
}

func TestCreateTracesReceiver(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func createKafkaClient(config Config) (sarama.ConsumerGroup, error) {
saramaConfig.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
saramaConfig.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable
saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval
saramaConfig.Consumer.Group.Session.Timeout = config.SessionTimeout
saramaConfig.Consumer.Group.Heartbeat.Interval = config.HeartbeatInterval

var err error
if saramaConfig.Consumer.Offsets.Initial, err = toSaramaInitialOffset(config.InitialOffset); err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions receiver/kafkareceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ kafka:
backoff: 5s
kafka/logs:
topic: logs
session_timeout: 45s
heartbeat_interval: 15s
encoding: direct
brokers:
- "coffee:123"
Expand Down

0 comments on commit 821c256

Please sign in to comment.