Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/kafkareceiver] fix: Kafka receiver blocking shutdown #35767

Merged

Conversation

dpaasman00
Copy link
Contributor

@dpaasman00 dpaasman00 commented Oct 14, 2024

Description

Fixes an issue where the Kafka receiver would block on shutdown.

There was an earlier fix for this issue here. This does solve the issue, but it was only applied to the traces receiver, not the logs or metrics receiver.

The issue is this go routine in the Start() functions for logs and metrics:

go func() {
        if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil {
		componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
	}
}()

The consumeLoop() function returns a context.Canceled error when Shutdown() is called, which is expected. However componentstatus.ReportStatus() blocks while attempting to report this error. The reason/bug for this can be found here.

The previously mentioned PR fixed this for the traces receiver by checking if the error returned by consumeLoop() is context.Canceled:

go func() {
	if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) {
		componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
	}
}()

Additionally, this is consumeLoop() for the traces receiver, with the logs and metrics versions being identical:

func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
	for {
		// `Consume` should be called inside an infinite loop, when a
		// server-side rebalance happens, the consumer session will need to be
		// recreated to get the new claims
		if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil {
			c.settings.Logger.Error("Error from consumer", zap.Error(err))
		}
		// check if context was cancelled, signaling that the consumer should stop
		if ctx.Err() != nil {
			c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
			return ctx.Err()
		}
	}
}

This does fix the issue, however the only error that can be returned by consumeLoop() is a canceled context. When we create the context and cancel function, we use context.Background():

ctx, cancel := context.WithCancel(context.Background())

This context is only used by consumeLoop() and the cancel function is only called in Shutdown().

Because consumeLoop() can only return a context.Canceled error, this PR removes this unused code for the logs, metrics, and traces receivers. Instead, consumeLoop() still logs the context.Canceled error but it does not return any error and the go routine simply just calls consumeLoop().

Additional motivation for removing the call to componentstatus.ReportStatus() is the underlying function called by it, componentstatus.Report() says it does not need to be called during Shutdown() or Start() as the service already does so for the given component, comment here. Even if there wasn't a bug causing this call to block, the component still shouldn't call it since it would only be called during Shutdown().

Link to tracking issue

Fixes #30789

Testing

Tested in a build of the collector with these changes scraping logs from a Kafka instance. When the collector is stopped and Shutdown() gets called, the receiver did not block and the collector stopped gracefully as expected.

@dpaasman00 dpaasman00 marked this pull request as ready for review October 14, 2024 15:02
@dpaasman00 dpaasman00 requested a review from a team as a code owner October 14, 2024 15:02
@jsirianni
Copy link
Member

Thanks for taking care of this, it resolves an issue we are seeing in production with a high throughput logs pipeline.

@djaglowski
Copy link
Member

LGTM. @pavolloffay, @MovieStoreGuy, PTAL

Copy link
Contributor

@MovieStoreGuy MovieStoreGuy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment from me trying to sneak in some improvements, but the changes seem sensible.

receiver/kafkareceiver/kafka_receiver.go Show resolved Hide resolved
@djaglowski djaglowski merged commit 58a77db into open-telemetry:main Oct 22, 2024
158 checks passed
@github-actions github-actions bot added this to the next release milestone Oct 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka receiver stuck while shutting down at v0.93.0
4 participants