diff --git a/pubsub/iterator.go b/pubsub/iterator.go index e192172ea6e8..85740b57a469 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -533,9 +533,11 @@ func (it *messageIterator) handleKeepAlives() { it.checkDrained() } -// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is -// enabled, we'll retry these messages for a short duration in a goroutine. -func (it *messageIterator) sendAck(m map[string]*AckResult) { +type ackFunc = func(ctx context.Context, subName string, ackIds []string) error +type ackRecordStat = func(ctx context.Context, toSend []string) +type retryAckFunc = func(toRetry map[string]*ipubsub.AckResult) + +func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) { ackIDs := make([]string, 0, len(m)) for k := range m { ackIDs = append(ackIDs, k) @@ -543,36 +545,51 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { it.eoMu.RLock() exactlyOnceDelivery := it.enableExactlyOnceDelivery it.eoMu.RUnlock() + batches := makeBatches(ackIDs, ackIDBatchSize) + wg := sync.WaitGroup{} + + for _, batch := range batches { + wg.Add(1) + go func(toSend []string) { + defer wg.Done() + ackRecordStat(it.ctx, toSend) + // Use context.Background() as the call's context, not it.ctx. We don't + // want to cancel this RPC when the iterator is stopped. + cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + err := ackFunc(cctx, it.subName, toSend) + if exactlyOnceDelivery { + resultsByAckID := make(map[string]*AckResult) + for _, ackID := range toSend { + resultsByAckID[ackID] = m[ackID] + } - var toSend []string - for len(ackIDs) > 0 { - toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize) + st, md := extractMetadata(err) + _, toRetry := processResults(st, resultsByAckID, md) + if len(toRetry) > 0 { + // Retry modacks/nacks in a separate goroutine. + go func() { + retryAckFunc(toRetry) + }() + } + } + }(batch) + } + wg.Wait() +} - recordStat(it.ctx, AckCount, int64(len(toSend))) - addAcks(toSend) - // Use context.Background() as the call's context, not it.ctx. We don't - // want to cancel this RPC when the iterator is stopped. - cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{ +// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is +// enabled, we'll retry these messages for a short duration in a goroutine. +func (it *messageIterator) sendAck(m map[string]*AckResult) { + it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error { + return it.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{ Subscription: it.subName, - AckIds: toSend, + AckIds: ackIds, }) - if exactlyOnceDelivery { - resultsByAckID := make(map[string]*AckResult) - for _, ackID := range toSend { - resultsByAckID[ackID] = m[ackID] - } - st, md := extractMetadata(err) - _, toRetry := processResults(st, resultsByAckID, md) - if len(toRetry) > 0 { - // Retry acks in a separate goroutine. - go func() { - it.retryAcks(toRetry) - }() - } - } - } + }, it.retryAcks, func(ctx context.Context, toSend []string) { + recordStat(it.ctx, AckCount, int64(len(toSend))) + addAcks(toSend) + }) } // sendModAck is used to extend the lease of messages or nack them. @@ -583,47 +600,22 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { // enabled, we retry it in a separate goroutine for a short duration. func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) { deadlineSec := int32(deadline / time.Second) - ackIDs := make([]string, 0, len(m)) - for k := range m { - ackIDs = append(ackIDs, k) - } - it.eoMu.RLock() - exactlyOnceDelivery := it.enableExactlyOnceDelivery - it.eoMu.RUnlock() - var toSend []string - for len(ackIDs) > 0 { - toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize) + it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error { + return it.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{ + Subscription: it.subName, + AckDeadlineSeconds: deadlineSec, + AckIds: ackIds, + }) + }, func(toRetry map[string]*ipubsub.AckResult) { + it.retryModAcks(toRetry, deadlineSec, logOnInvalid) + }, func(ctx context.Context, toSend []string) { if deadline == 0 { recordStat(it.ctx, NackCount, int64(len(toSend))) } else { recordStat(it.ctx, ModAckCount, int64(len(toSend))) } addModAcks(toSend, deadlineSec) - // Use context.Background() as the call's context, not it.ctx. We don't - // want to cancel this RPC when the iterator is stopped. - cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{ - Subscription: it.subName, - AckDeadlineSeconds: deadlineSec, - AckIds: toSend, - }) - if exactlyOnceDelivery { - resultsByAckID := make(map[string]*AckResult) - for _, ackID := range toSend { - resultsByAckID[ackID] = m[ackID] - } - - st, md := extractMetadata(err) - _, toRetry := processResults(st, resultsByAckID, md) - if len(toRetry) > 0 { - // Retry modacks/nacks in a separate goroutine. - go func() { - it.retryModAcks(toRetry, deadlineSec, logOnInvalid) - }() - } - } - } + }) } // retryAcks retries the ack RPC with backoff. This must be called in a goroutine @@ -751,13 +743,20 @@ func calcFieldSizeInt(fields ...int) int { return overhead } -// splitRequestIDs takes a slice of ackIDs and returns two slices such that the first -// ackID slice can be used in a request where the payload does not exceed ackIDBatchSize. -func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string) { - if len(ids) < maxBatchSize { - return ids, []string{} +// makeBatches takes a slice of ackIDs and returns a slice of ackID batches. +// Each ackID batch can be used in a request where the payload does not exceed maxBatchSize. +func makeBatches(ids []string, maxBatchSize int) [][]string { + var batches [][]string + for len(ids) > 0 { + if len(ids) < maxBatchSize { + batches = append(batches, ids) + ids = []string{} + } else { + batches = append(batches, ids[:maxBatchSize]) + ids = ids[maxBatchSize:] + } } - return ids[:maxBatchSize], ids[maxBatchSize:] + return batches } // The deadline to ack is derived from a percentile distribution based diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index ddd1a442428d..64e12edb37ce 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -44,25 +44,22 @@ var ( fullyQualifiedSubName = fmt.Sprintf("projects/%s/subscriptions/%s", projName, subName) ) -func TestSplitRequestIDs(t *testing.T) { +func TestMakeBatches(t *testing.T) { t.Parallel() - ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"} - for _, test := range []struct { - ids []string - splitIndex int + ids := []string{"a", "b", "c", "d", "e"} + for i, test := range []struct { + ids []string + want [][]string }{ - {[]string{}, 0}, // empty slice, no split - {ids, 2}, // slice of size 5, split at index 2 - {ids[:2], 2}, // slice of size 3, split at index 2 - {ids[:1], 1}, // slice of size 1, split at index 1 + {[]string{}, [][]string{}}, // empty slice + {ids, [][]string{{"a", "b"}, {"c", "d"}, {"e"}}}, // slice of size 5 + {ids[:3], [][]string{{"a", "b"}, {"c"}}}, // slice of size 3 + {ids[:1], [][]string{{"a"}}}, // slice of size 1 } { - got1, got2 := splitRequestIDs(test.ids, 2) - want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:] - if !testutil.Equal(len(got1), len(want1)) { - t.Errorf("%v, 1: got %v, want %v", test, got1, want1) - } - if !testutil.Equal(len(got2), len(want2)) { - t.Errorf("%v, 2: got %v, want %v", test, got2, want2) + got := makeBatches(test.ids, 2) + want := test.want + if !testutil.Equal(len(got), len(want)) { + t.Errorf("test %d: %v, got %v, want %v", i, test, got, want) } } }