From e806438eb3b8070d0a20bb0f0eab795110b09bce Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 2 May 2024 13:57:54 -0700 Subject: [PATCH 1/8] feat(pubsub): batch receipt modacks --- pubsub/iterator.go | 58 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 8e3155dca883..04e03c431f2d 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -62,20 +62,22 @@ var ( ) type messageIterator struct { - ctx context.Context - cancel func() // the function that will cancel ctx; called in stop - po *pullOptions - ps *pullStream - subc *vkit.SubscriberClient - subName string - kaTick <-chan time.Time // keep-alive (deadline extensions) - ackTicker *time.Ticker // message acks - nackTicker *time.Ticker // message nacks - pingTicker *time.Ticker // sends to the stream to keep it open - failed chan struct{} // closed on stream error - drained chan struct{} // closed when stopped && no more pending messages - wg sync.WaitGroup - + ctx context.Context + cancel func() // the function that will cancel ctx; called in stop + po *pullOptions + ps *pullStream + subc *vkit.SubscriberClient + subName string + kaTick <-chan time.Time // keep-alive (deadline extensions) + ackTicker *time.Ticker // message acks + nackTicker *time.Ticker // message nacks + pingTicker *time.Ticker // sends to the stream to keep it open + receiptTicker *time.Ticker // sends receipt modacks + failed chan struct{} // closed on stream error + drained chan struct{} // closed when stopped && no more pending messages + wg sync.WaitGroup + + // This mutex guards the structs related to lease extension. mu sync.Mutex ackTimeDist *distribution.D // dist uses seconds @@ -91,7 +93,9 @@ type messageIterator struct { // ack IDs whose ack deadline is to be modified // ModAcks don't have AckResults but allows reuse of the SendModAck function. pendingModAcks map[string]*AckResult - err error // error from stream failure + // ack IDs whose receipt need to be acknowledged with a modack. + pendingReceipts map[string]*AckResult + err error // error from stream failure eoMu sync.RWMutex enableExactlyOnceDelivery bool @@ -127,6 +131,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt ackTicker := time.NewTicker(100 * time.Millisecond) nackTicker := time.NewTicker(100 * time.Millisecond) pingTicker := time.NewTicker(30 * time.Second) + receiptTicker := time.NewTicker(100 * time.Millisecond) cctx, cancel := context.WithCancel(context.Background()) cctx = withSubscriptionKey(cctx, subName) it := &messageIterator{ @@ -140,6 +145,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt ackTicker: ackTicker, nackTicker: nackTicker, pingTicker: pingTicker, + receiptTicker: receiptTicker, failed: make(chan struct{}), drained: make(chan struct{}), ackTimeDist: distribution.New(int(maxDurationPerLeaseExtension/time.Second) + 1), @@ -147,6 +153,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt pendingAcks: map[string]*AckResult{}, pendingNacks: map[string]*AckResult{}, pendingModAcks: map[string]*AckResult{}, + pendingReceipts: map[string]*AckResult{}, } it.wg.Add(1) go it.sender() @@ -299,7 +306,12 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // When exactly once delivery is not enabled, modacks are fire and forget. if !exactlyOnceDelivery { go func() { - it.sendModAck(ackIDs, deadline, false) + // Add pending receipt modacks to queue to batch with other modacks + it.mu.Lock() + for id := range ackIDs { + it.pendingReceipts[id] = newSuccessAckResult() + } + it.mu.Unlock() }() return msgs, nil } @@ -391,6 +403,7 @@ func (it *messageIterator) sender() { defer it.ackTicker.Stop() defer it.nackTicker.Stop() defer it.pingTicker.Stop() + defer it.receiptTicker.Stop() defer func() { if it.ps != nil { it.ps.CloseSend() @@ -403,6 +416,7 @@ func (it *messageIterator) sender() { sendNacks := false sendModAcks := false sendPing := false + sendReceipt := false dl := it.ackDeadline() @@ -445,9 +459,12 @@ func (it *messageIterator) sender() { it.mu.Lock() // Ping only if we are processing messages via streaming. sendPing = !it.po.synchronous + case <-it.receiptTicker.C: + it.mu.Lock() + sendReceipt = (len(it.pendingReceipts) > 0) } // Lock is held here. - var acks, nacks, modAcks map[string]*AckResult + var acks, nacks, modAcks, receipts map[string]*AckResult if sendAcks { acks = it.pendingAcks it.pendingAcks = map[string]*AckResult{} @@ -460,6 +477,10 @@ func (it *messageIterator) sender() { modAcks = it.pendingModAcks it.pendingModAcks = map[string]*AckResult{} } + if sendReceipt { + receipts = it.pendingReceipts + it.pendingReceipts = map[string]*AckResult{} + } it.mu.Unlock() // Make Ack and ModAck RPCs. if sendAcks { @@ -475,6 +496,9 @@ func (it *messageIterator) sender() { if sendPing { it.pingStream() } + if sendReceipt { + it.sendModAck(receipts, dl, true) + } } } From 1f0364e95ec3564f9d67e59612b18dae29be589d Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 13 May 2024 19:38:00 -0700 Subject: [PATCH 2/8] make ack/modack calls concurrent --- pubsub/iterator.go | 131 +++++++++++++++++++--------------- pubsub/streaming_pull_test.go | 2 +- 2 files changed, 75 insertions(+), 58 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 04e03c431f2d..7bb47486a1e7 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -534,34 +534,34 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { exactlyOnceDelivery := it.enableExactlyOnceDelivery it.eoMu.RUnlock() - var toSend []string - for len(ackIDs) > 0 { - toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize) - - recordStat(it.ctx, AckCount, int64(len(toSend))) - addAcks(toSend) - // Use context.Background() as the call's context, not it.ctx. We don't - // want to cancel this RPC when the iterator is stopped. - cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ - Subscription: it.subName, - AckIds: toSend, - }) - if exactlyOnceDelivery { - resultsByAckID := make(map[string]*AckResult) - for _, ackID := range toSend { - resultsByAckID[ackID] = m[ackID] - } - st, md := extractMetadata(err) - _, toRetry := processResults(st, resultsByAckID, md) - if len(toRetry) > 0 { - // Retry acks in a separate goroutine. - go func() { - it.retryAcks(toRetry) - }() + batches := makeBatches(ackIDs, ackIDBatchSize) + for _, batch := range batches { + go func(toSend []string) { + recordStat(it.ctx, AckCount, int64(len(toSend))) + addAcks(toSend) + // Use context.Background() as the call's context, not it.ctx. We don't + // want to cancel this RPC when the iterator is stopped. + cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ + Subscription: it.subName, + AckIds: toSend, + }) + if exactlyOnceDelivery { + resultsByAckID := make(map[string]*AckResult) + for _, ackID := range toSend { + resultsByAckID[ackID] = m[ackID] + } + st, md := extractMetadata(err) + _, toRetry := processResults(st, resultsByAckID, md) + if len(toRetry) > 0 { + // Retry acks in a separate goroutine. + go func() { + it.retryAcks(toRetry) + }() + } } - } + }(batch) } } @@ -580,39 +580,40 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur it.eoMu.RLock() exactlyOnceDelivery := it.enableExactlyOnceDelivery it.eoMu.RUnlock() - var toSend []string - for len(ackIDs) > 0 { - toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize) - if deadline == 0 { - recordStat(it.ctx, NackCount, int64(len(toSend))) - } else { - recordStat(it.ctx, ModAckCount, int64(len(toSend))) - } - addModAcks(toSend, deadlineSec) - // Use context.Background() as the call's context, not it.ctx. We don't - // want to cancel this RPC when the iterator is stopped. - cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{ - Subscription: it.subName, - AckDeadlineSeconds: deadlineSec, - AckIds: toSend, - }) - if exactlyOnceDelivery { - resultsByAckID := make(map[string]*AckResult) - for _, ackID := range toSend { - resultsByAckID[ackID] = m[ackID] + batches := makeBatches(ackIDs, ackIDBatchSize) + for _, batch := range batches { + go func(toSend []string) { + if deadline == 0 { + recordStat(it.ctx, NackCount, int64(len(toSend))) + } else { + recordStat(it.ctx, ModAckCount, int64(len(toSend))) } + addModAcks(toSend, deadlineSec) + // Use context.Background() as the call's context, not it.ctx. We don't + // want to cancel this RPC when the iterator is stopped. + cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{ + Subscription: it.subName, + AckDeadlineSeconds: deadlineSec, + AckIds: toSend, + }) + if exactlyOnceDelivery { + resultsByAckID := make(map[string]*AckResult) + for _, ackID := range toSend { + resultsByAckID[ackID] = m[ackID] + } - st, md := extractMetadata(err) - _, toRetry := processResults(st, resultsByAckID, md) - if len(toRetry) > 0 { - // Retry modacks/nacks in a separate goroutine. - go func() { - it.retryModAcks(toRetry, deadlineSec, logOnInvalid) - }() + st, md := extractMetadata(err) + _, toRetry := processResults(st, resultsByAckID, md) + if len(toRetry) > 0 { + // Retry modacks/nacks in a separate goroutine. + go func() { + it.retryModAcks(toRetry, deadlineSec, logOnInvalid) + }() + } } - } + }(batch) } } @@ -750,6 +751,22 @@ func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string return ids[:maxBatchSize], ids[maxBatchSize:] } +// makeBatches takes a slice of ackIDs and returns a slice of ackID batches. +// Each ackID batch can be used in a request where the payload does not exceed ackIDBatchSize. +func makeBatches(ids []string, maxBatchSize int) [][]string { + var batches [][]string + for len(ids) > 0 { + if len(ids) < maxBatchSize { + batches = append(batches, ids) + ids = []string{} + } else { + batches = append(batches, ids[:maxBatchSize]) + ids = ids[maxBatchSize:] + } + } + return batches +} + // The deadline to ack is derived from a percentile distribution based // on the time it takes to process messages. The percentile chosen is the 99%th // percentile - that is, processing times up to the 99%th longest processing diff --git a/pubsub/streaming_pull_test.go b/pubsub/streaming_pull_test.go index de832f1c7152..f300a20ea6fd 100644 --- a/pubsub/streaming_pull_test.go +++ b/pubsub/streaming_pull_test.go @@ -196,7 +196,7 @@ func TestStreamingPullRetry(t *testing.T) { sub := client.Subscription("S") sub.ReceiveSettings.NumGoroutines = 1 - gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 0, func(_ context.Context, m *Message) { + gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 2*time.Second, func(_ context.Context, m *Message) { id, err := strconv.Atoi(msgAckID(m)) if err != nil { t.Fatalf("pullN err: %v", err) From f9c80a3edc3de4c9bcee8162dc86437b516c8ff8 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 13 May 2024 20:01:50 -0700 Subject: [PATCH 3/8] fix streaming pull test --- pubsub/streaming_pull_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pubsub/streaming_pull_test.go b/pubsub/streaming_pull_test.go index f300a20ea6fd..9de277d5f291 100644 --- a/pubsub/streaming_pull_test.go +++ b/pubsub/streaming_pull_test.go @@ -196,7 +196,8 @@ func TestStreamingPullRetry(t *testing.T) { sub := client.Subscription("S") sub.ReceiveSettings.NumGoroutines = 1 - gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 2*time.Second, func(_ context.Context, m *Message) { + minDurationPerLeaseExtension = 1 * time.Minute + gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 1*time.Second, func(_ context.Context, m *Message) { id, err := strconv.Atoi(msgAckID(m)) if err != nil { t.Fatalf("pullN err: %v", err) @@ -244,15 +245,17 @@ func TestStreamingPullRetry(t *testing.T) { server.wait() for i := 0; i < len(testMessages); i++ { id := testMessages[i].AckId + server.mu.Lock() if i%2 == 0 { if !server.Acked[id] { t.Errorf("msg %q should have been acked but wasn't", id) } } else { - if dl, ok := server.Deadlines[id]; !ok || dl != 0 { - t.Errorf("msg %q should have been nacked but wasn't", id) + if server.Acked[id] { + t.Errorf("msg %q should have not been acked", id) } } + server.mu.Unlock() } } From 5aafa5e829b50ed68a706cf6bc08740c260e57ee Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 20 May 2024 12:36:08 -0700 Subject: [PATCH 4/8] add waitgroups to ack/modack --- pubsub/iterator.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 7bb47486a1e7..9ca432942404 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -535,8 +535,11 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { it.eoMu.RUnlock() batches := makeBatches(ackIDs, ackIDBatchSize) + wg := sync.WaitGroup{} for _, batch := range batches { + wg.Add(1) go func(toSend []string) { + defer wg.Done() recordStat(it.ctx, AckCount, int64(len(toSend))) addAcks(toSend) // Use context.Background() as the call's context, not it.ctx. We don't @@ -563,6 +566,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { } }(batch) } + wg.Wait() } // sendModAck is used to extend the lease of messages or nack them. @@ -581,8 +585,12 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur exactlyOnceDelivery := it.enableExactlyOnceDelivery it.eoMu.RUnlock() batches := makeBatches(ackIDs, ackIDBatchSize) + wg := sync.WaitGroup{} + for _, batch := range batches { + wg.Add(1) go func(toSend []string) { + defer wg.Done() if deadline == 0 { recordStat(it.ctx, NackCount, int64(len(toSend))) } else { @@ -615,6 +623,7 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur } }(batch) } + wg.Wait() } // retryAcks retries the ack RPC with backoff. This must be called in a goroutine From 0381cd80d3644d6c6ab40bf1824ed48772f91e48 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 20 May 2024 14:15:30 -0700 Subject: [PATCH 5/8] revert change to streaming pull retry test --- pubsub/streaming_pull_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pubsub/streaming_pull_test.go b/pubsub/streaming_pull_test.go index f3c1d8c34bc5..7f9943d4b7af 100644 --- a/pubsub/streaming_pull_test.go +++ b/pubsub/streaming_pull_test.go @@ -197,8 +197,7 @@ func TestStreamingPullRetry(t *testing.T) { sub := client.Subscription("S") sub.ReceiveSettings.NumGoroutines = 1 - minDurationPerLeaseExtension = 1 * time.Minute - gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 1*time.Second, func(_ context.Context, m *Message) { + gotMsgs, err := pullN(context.Background(), sub, len(testMessages), 0, func(_ context.Context, m *Message) { id, err := strconv.Atoi(msgAckID(m)) if err != nil { t.Fatalf("pullN err: %v", err) From fe960171d7ede6750cd5dcf4482204ca4ecb64ad Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 20 May 2024 14:43:35 -0700 Subject: [PATCH 6/8] remove SplitRequestIDs, add test for makeBatches --- pubsub/iterator.go | 9 --------- pubsub/iterator_test.go | 29 +++++++++++++---------------- 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 784b499b4c2f..30dfddc4fb20 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -761,15 +761,6 @@ func calcFieldSizeInt(fields ...int) int { return overhead } -// splitRequestIDs takes a slice of ackIDs and returns two slices such that the first -// ackID slice can be used in a request where the payload does not exceed ackIDBatchSize. -func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string) { - if len(ids) < maxBatchSize { - return ids, []string{} - } - return ids[:maxBatchSize], ids[maxBatchSize:] -} - // makeBatches takes a slice of ackIDs and returns a slice of ackID batches. // Each ackID batch can be used in a request where the payload does not exceed ackIDBatchSize. func makeBatches(ids []string, maxBatchSize int) [][]string { diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index ddd1a442428d..64e12edb37ce 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -44,25 +44,22 @@ var ( fullyQualifiedSubName = fmt.Sprintf("projects/%s/subscriptions/%s", projName, subName) ) -func TestSplitRequestIDs(t *testing.T) { +func TestMakeBatches(t *testing.T) { t.Parallel() - ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"} - for _, test := range []struct { - ids []string - splitIndex int + ids := []string{"a", "b", "c", "d", "e"} + for i, test := range []struct { + ids []string + want [][]string }{ - {[]string{}, 0}, // empty slice, no split - {ids, 2}, // slice of size 5, split at index 2 - {ids[:2], 2}, // slice of size 3, split at index 2 - {ids[:1], 1}, // slice of size 1, split at index 1 + {[]string{}, [][]string{}}, // empty slice + {ids, [][]string{{"a", "b"}, {"c", "d"}, {"e"}}}, // slice of size 5 + {ids[:3], [][]string{{"a", "b"}, {"c"}}}, // slice of size 3 + {ids[:1], [][]string{{"a"}}}, // slice of size 1 } { - got1, got2 := splitRequestIDs(test.ids, 2) - want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:] - if !testutil.Equal(len(got1), len(want1)) { - t.Errorf("%v, 1: got %v, want %v", test, got1, want1) - } - if !testutil.Equal(len(got2), len(want2)) { - t.Errorf("%v, 2: got %v, want %v", test, got2, want2) + got := makeBatches(test.ids, 2) + want := test.want + if !testutil.Equal(len(got), len(want)) { + t.Errorf("test %d: %v, got %v, want %v", i, test, got, want) } } } From 5dfb7619feb55f72a2b494acd04031a7440accb2 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 20 May 2024 14:45:33 -0700 Subject: [PATCH 7/8] fix comment for makeBatches --- pubsub/iterator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 30dfddc4fb20..ade695dc19d1 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -762,7 +762,7 @@ func calcFieldSizeInt(fields ...int) int { } // makeBatches takes a slice of ackIDs and returns a slice of ackID batches. -// Each ackID batch can be used in a request where the payload does not exceed ackIDBatchSize. +// Each ackID batch can be used in a request where the payload does not exceed maxBatchSize. func makeBatches(ids []string, maxBatchSize int) [][]string { var batches [][]string for len(ids) > 0 { From 6f59fbaa3b1d68b8179ae3c98e0304984afb6cab Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 17 Jun 2024 22:25:04 -0700 Subject: [PATCH 8/8] reduce duplicate code in sendack/modack func --- pubsub/iterator.go | 102 +++++++++++++++++++-------------------------- 1 file changed, 42 insertions(+), 60 deletions(-) diff --git a/pubsub/iterator.go b/pubsub/iterator.go index ade695dc19d1..85740b57a469 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -533,9 +533,11 @@ func (it *messageIterator) handleKeepAlives() { it.checkDrained() } -// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is -// enabled, we'll retry these messages for a short duration in a goroutine. -func (it *messageIterator) sendAck(m map[string]*AckResult) { +type ackFunc = func(ctx context.Context, subName string, ackIds []string) error +type ackRecordStat = func(ctx context.Context, toSend []string) +type retryAckFunc = func(toRetry map[string]*ipubsub.AckResult) + +func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) { ackIDs := make([]string, 0, len(m)) for k := range m { ackIDs = append(ackIDs, k) @@ -543,34 +545,31 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { it.eoMu.RLock() exactlyOnceDelivery := it.enableExactlyOnceDelivery it.eoMu.RUnlock() - batches := makeBatches(ackIDs, ackIDBatchSize) wg := sync.WaitGroup{} + for _, batch := range batches { wg.Add(1) go func(toSend []string) { defer wg.Done() - recordStat(it.ctx, AckCount, int64(len(toSend))) - addAcks(toSend) + ackRecordStat(it.ctx, toSend) // Use context.Background() as the call's context, not it.ctx. We don't // want to cancel this RPC when the iterator is stopped. - cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) defer cancel2() - err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ - Subscription: it.subName, - AckIds: toSend, - }) + err := ackFunc(cctx, it.subName, toSend) if exactlyOnceDelivery { resultsByAckID := make(map[string]*AckResult) for _, ackID := range toSend { resultsByAckID[ackID] = m[ackID] } + st, md := extractMetadata(err) _, toRetry := processResults(st, resultsByAckID, md) if len(toRetry) > 0 { - // Retry acks in a separate goroutine. + // Retry modacks/nacks in a separate goroutine. go func() { - it.retryAcks(toRetry) + retryAckFunc(toRetry) }() } } @@ -579,6 +578,20 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { wg.Wait() } +// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is +// enabled, we'll retry these messages for a short duration in a goroutine. +func (it *messageIterator) sendAck(m map[string]*AckResult) { + it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error { + return it.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{ + Subscription: it.subName, + AckIds: ackIds, + }) + }, it.retryAcks, func(ctx context.Context, toSend []string) { + recordStat(it.ctx, AckCount, int64(len(toSend))) + addAcks(toSend) + }) +} + // sendModAck is used to extend the lease of messages or nack them. // The receipt mod-ack amount is derived from a percentile distribution based // on the time it takes to process messages. The percentile chosen is the 99%th @@ -587,53 +600,22 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { // enabled, we retry it in a separate goroutine for a short duration. func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) { deadlineSec := int32(deadline / time.Second) - ackIDs := make([]string, 0, len(m)) - for k := range m { - ackIDs = append(ackIDs, k) - } - it.eoMu.RLock() - exactlyOnceDelivery := it.enableExactlyOnceDelivery - it.eoMu.RUnlock() - batches := makeBatches(ackIDs, ackIDBatchSize) - wg := sync.WaitGroup{} - - for _, batch := range batches { - wg.Add(1) - go func(toSend []string) { - defer wg.Done() - if deadline == 0 { - recordStat(it.ctx, NackCount, int64(len(toSend))) - } else { - recordStat(it.ctx, ModAckCount, int64(len(toSend))) - } - addModAcks(toSend, deadlineSec) - // Use context.Background() as the call's context, not it.ctx. We don't - // want to cancel this RPC when the iterator is stopped. - cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{ - Subscription: it.subName, - AckDeadlineSeconds: deadlineSec, - AckIds: toSend, - }) - if exactlyOnceDelivery { - resultsByAckID := make(map[string]*AckResult) - for _, ackID := range toSend { - resultsByAckID[ackID] = m[ackID] - } - - st, md := extractMetadata(err) - _, toRetry := processResults(st, resultsByAckID, md) - if len(toRetry) > 0 { - // Retry modacks/nacks in a separate goroutine. - go func() { - it.retryModAcks(toRetry, deadlineSec, logOnInvalid) - }() - } - } - }(batch) - } - wg.Wait() + it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error { + return it.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{ + Subscription: it.subName, + AckDeadlineSeconds: deadlineSec, + AckIds: ackIds, + }) + }, func(toRetry map[string]*ipubsub.AckResult) { + it.retryModAcks(toRetry, deadlineSec, logOnInvalid) + }, func(ctx context.Context, toSend []string) { + if deadline == 0 { + recordStat(it.ctx, NackCount, int64(len(toSend))) + } else { + recordStat(it.ctx, ModAckCount, int64(len(toSend))) + } + addModAcks(toSend, deadlineSec) + }) } // retryAcks retries the ack RPC with backoff. This must be called in a goroutine