From 1108e32047385d52269904686f7a8f210642b9aa Mon Sep 17 00:00:00 2001 From: Krzysztof Skitek Date: Mon, 11 Jul 2022 16:03:08 +0200 Subject: [PATCH 1/2] fix: Table load race condition --- partition_table.go | 32 +++++--------------------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/partition_table.go b/partition_table.go index 57e6ea92..d96f5825 100644 --- a/partition_table.go +++ b/partition_table.go @@ -18,8 +18,6 @@ const ( // internal offset we use to detect if the offset has never been stored locally offsetNotStored int64 = -3 - - consumerDrainTimeout = time.Second ) // Backoff is used for adding backoff capabilities to the restarting @@ -404,42 +402,22 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error { } func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error { - timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout) - defer cancel() - errg, _ := multierr.NewErrGroup(context.Background()) // drain errors channel errg.Go(func() error { var errs *multierror.Error - - for { - select { - case <-timeoutCtx.Done(): - p.log.Printf("draining errors channel timed out") - return errs - case err, ok := <-cons.Errors(): - if !ok { - return errs - } - errs = multierror.Append(errs, err) - } + for err := range cons.Errors() { + errs = multierror.Append(errs, err) } + return errs }) // drain message channel errg.Go(func() error { - for { - select { - case <-timeoutCtx.Done(): - p.log.Printf("draining messages channel timed out") - return nil - case _, ok := <-cons.Messages(): - if !ok { - return nil - } - } + for range cons.Messages() { } + return nil }) return errg.Wait().ErrorOrNil() From 676623223a910cbaae7435056391a71e1d864b90 Mon Sep 17 00:00:00 2001 From: Krzysztof Skitek Date: Mon, 11 Jul 2022 16:49:57 +0200 Subject: [PATCH 2/2] test: Increase TestRebalanceSharePartitions timeout --- systemtest/processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/systemtest/processor_test.go b/systemtest/processor_test.go index 7a104d88..fa47812c 100644 --- a/systemtest/processor_test.go +++ b/systemtest/processor_test.go @@ -447,7 +447,7 @@ func TestRebalanceSharePartitions(t *testing.T) { require.Equal(t, 0, p1Passive) p2, cancelP2, p2Done := runProc(createProc()) - pollTimed(t, "p2 started", 10, p2.Recovered) + pollTimed(t, "p2 started", 20, p2.Recovered) pollTimed(t, "p1 still running", 10, p1.Recovered) // now p1 and p2 share the partitions