From 3bb41bf4d6616f4cfbb12f0443fb331976cfa8d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Jankovi=C4=87?= Date: Thu, 15 Sep 2022 11:12:20 +0200 Subject: [PATCH] processor: add WaitForReadyContext method Currently the only option to test for processor readines is WaitForReady method. This method is blocking until processor is ready. When processor is dealing with long initialization, this call becomes unresponsive for virtually indefinite time. This behaviour is deal breaker for monitoring calls that just want to check on the status of the processor and return in reasonable amount of time i.e. Kubernetes healthcheck. This change adds context aware method WaitForReadyContext that has all the same checks with addition to context cancelation check. This allows it to return when context is canceled with the context reported error. If the error is not nil it indicates that processor is not ready. --- processor.go | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/processor.go b/processor.go index 8948ceef..8d61d850 100644 --- a/processor.go +++ b/processor.go @@ -734,22 +734,38 @@ func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error { return err } -// WaitForReady waits until the processor is ready to consume messages (or is actually consuming messages) +// WaitForReady waits until the processor is ready to consume messages +// (or is actually consuming messages) // i.e., it is done catching up all partition tables, joins and lookup tables func (g *Processor) WaitForReady() { + g.waitForReady(context.Background()) +} + +// WaitForReadyContext is context aware option of WaitForReady. +// It either waits until the processor is ready or until context is canceled. +// If the return was caused by context it will return context reported error. +func (g *Processor) WaitForReadyContext(ctx context.Context) error { + return g.waitForReady(ctx) +} + +func (g *Processor) waitForReady(ctx context.Context) error { // wait for the processor to be started (or stopped) select { case <-g.state.WaitForStateMin(ProcStateStarting): case <-g.done: - return + return nil + case <-ctx.Done(): + return ctx.Err() } // wait that the processor is actually running select { case <-g.state.WaitForState(ProcStateRunning): case <-g.done: + case <-ctx.Done(): + return ctx.Err() } - // wait for all partitionprocessors to be running + // wait for all PartitionProcessors to be running // copy them first with the mutex so we don't run into a deadlock @@ -764,8 +780,12 @@ func (g *Processor) WaitForReady() { select { case <-part.state.WaitForState(PPStateRunning): case <-g.done: + case <-ctx.Done(): + return ctx.Err() } } + + return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). @@ -1015,7 +1035,7 @@ func ensureCopartitioned(tm TopicManager, topics []string) (int, error) { // Note that the offset to be committed must be the offset that the consumer expects to consume next, not the offset of the message. // See documentation at https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html which says: // -// Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed. +// Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed. // // This has the same semantics as sarama's implementation of session.MarkMessage (which calls MarkOffset with offset+1) func createMessageCommitter(session sarama.ConsumerGroupSession) commitCallback {