From 726986320043cf5f685d5aa2f564c11f36556d27 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Fri, 31 Mar 2023 17:11:25 -0600 Subject: [PATCH] kgo tests: improve resilience * Allow more time for the specific Issue tests * Block rebalance in poll in the group tests, to avoid some rare duplicate processing --- pkg/kgo/consumer_direct_test.go | 6 +++--- pkg/kgo/group_test.go | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index d61b30a2..3a2d01ef 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -21,11 +21,11 @@ func TestIssue325(t *testing.T) { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() cl.AddConsumeTopics(topic) recs := cl.PollFetches(ctx).Records() - if len(recs) != 1 && string(recs[0].Value) != "foo" { + if len(recs) != 1 || string(recs[0].Value) != "foo" { t.Fatal(recs) } } @@ -53,7 +53,7 @@ func TestIssue337(t *testing.T) { t.Fatal(err) } - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var recs []*Record out: diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 867434e4..59f4f7ee 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -125,6 +125,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { Balancers(c.balancer), MaxBufferedRecords(10000), ConsumePreferringLagFn(PreferLagAt(1)), + BlockRebalanceOnPoll(), // Even with autocommitting, autocommitting does not commit // *the latest* when being revoked. We always want to commit @@ -160,7 +161,9 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { } }() + defer cl.AllowRebalance() for { + cl.AllowRebalance() // If we etl a few times before quitting, then we want to // commit at least some of our work (except the last commit, // see above). To do so, we commit every time _before_ we poll.