Skip to content

Commit

Permalink
Merge pull request #420 from twmb/test_fixup
Browse files Browse the repository at this point in the history
kgo tests: improve resilience
  • Loading branch information
twmb authored Mar 31, 2023
2 parents c92d13d + 7269863 commit c8bb63c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ func TestIssue325(t *testing.T) {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cl.AddConsumeTopics(topic)
recs := cl.PollFetches(ctx).Records()
if len(recs) != 1 && string(recs[0].Value) != "foo" {
if len(recs) != 1 || string(recs[0].Value) != "foo" {
t.Fatal(recs)
}
}
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestIssue337(t *testing.T) {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var recs []*Record
out:
Expand Down
3 changes: 3 additions & 0 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
Balancers(c.balancer),
MaxBufferedRecords(10000),
ConsumePreferringLagFn(PreferLagAt(1)),
BlockRebalanceOnPoll(),

// Even with autocommitting, autocommitting does not commit
// *the latest* when being revoked. We always want to commit
Expand Down Expand Up @@ -160,7 +161,9 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
}
}()

defer cl.AllowRebalance()
for {
cl.AllowRebalance()
// If we etl a few times before quitting, then we want to
// commit at least some of our work (except the last commit,
// see above). To do so, we commit every time _before_ we poll.
Expand Down

0 comments on commit c8bb63c

Please sign in to comment.