Skip to content

Commit

Permalink
test(pubsublite): fix flaky TestAssigningSubscriberAddRemovePartitions (
Browse files Browse the repository at this point in the history
#8496)

Wait for commit requests to reach the server before ending the test.

Fixes: https://togithub.com/googleapis/google-cloud-go/issues/8459
  • Loading branch information
tmdiep authored Sep 11, 2023
1 parent 73a958d commit e4da890
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions pubsublite/internal/wire/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
cmtStream3 := test.NewRPCVerifier(t)
cmtStream3.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 3}), initCommitResp(), nil)
cmtStream3.Push(commitReq(34), commitResp(1), nil)
cmtStream3.Push(commitReq(35), commitResp(1), nil)
cmt2Barrier := cmtStream3.PushWithBarrier(commitReq(35), commitResp(1), nil)
verifiers.AddCommitStream(subscription, 3, cmtStream3)

// Partition 6
Expand All @@ -1203,7 +1203,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {

cmtStream6 := test.NewRPCVerifier(t)
cmtStream6.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 6}), initCommitResp(), nil)
cmtStream6.Push(commitReq(67), commitResp(1), nil)
cmt3Barrier := cmtStream6.PushWithBarrier(commitReq(67), commitResp(1), nil)
verifiers.AddCommitStream(subscription, 6, cmtStream6)

// Partition 8
Expand All @@ -1214,7 +1214,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {

cmtStream8 := test.NewRPCVerifier(t)
cmtStream8.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 8}), initCommitResp(), nil)
cmtStream8.Push(commitReq(89), commitResp(1), nil)
cmt5Barrier := cmtStream8.PushWithBarrier(commitReq(89), commitResp(1), nil)
verifiers.AddCommitStream(subscription, 8, cmtStream8)

mockServer.OnTestStart(verifiers)
Expand Down Expand Up @@ -1243,8 +1243,11 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
msg4Barrier.Release()
receiver.ValidateMsgs(partitionMsgs(3, msg2))

// Ensure the second assignment ack is received by the server to avoid test
// flakiness.
// Ensure requests are received by the server to avoid test flakiness.
sub.FlushCommits()
cmt2Barrier.Release()
cmt3Barrier.Release()
cmt5Barrier.Release()
assignmentBarrier2.Release()

// Stop should flush all commit cursors.
Expand Down

0 comments on commit e4da890

Please sign in to comment.