Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): make lease management RPCs concurrent #10238

Merged
139 changes: 69 additions & 70 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,46 +533,63 @@ func (it *messageIterator) handleKeepAlives() {
it.checkDrained()
}

// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
// enabled, we'll retry these messages for a short duration in a goroutine.
func (it *messageIterator) sendAck(m map[string]*AckResult) {
type ackFunc = func(ctx context.Context, subName string, ackIds []string) error
type ackRecordStat = func(ctx context.Context, toSend []string)
type retryAckFunc = func(toRetry map[string]*ipubsub.AckResult)

func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) {
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
batches := makeBatches(ackIDs, ackIDBatchSize)
wg := sync.WaitGroup{}

for _, batch := range batches {
wg.Add(1)
go func(toSend []string) {
defer wg.Done()
ackRecordStat(it.ctx, toSend)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := ackFunc(cctx, it.subName, toSend)
if exactlyOnceDelivery {
resultsByAckID := make(map[string]*AckResult)
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}

var toSend []string
for len(ackIDs) > 0 {
toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
go func() {
retryAckFunc(toRetry)
}()
}
}
}(batch)
}
wg.Wait()
}

recordStat(it.ctx, AckCount, int64(len(toSend)))
addAcks(toSend)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
// sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
// enabled, we'll retry these messages for a short duration in a goroutine.
func (it *messageIterator) sendAck(m map[string]*AckResult) {
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error {
return it.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: toSend,
AckIds: ackIds,
})
if exactlyOnceDelivery {
resultsByAckID := make(map[string]*AckResult)
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}
st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry acks in a separate goroutine.
go func() {
it.retryAcks(toRetry)
}()
}
}
}
}, it.retryAcks, func(ctx context.Context, toSend []string) {
recordStat(it.ctx, AckCount, int64(len(toSend)))
addAcks(toSend)
})
}

// sendModAck is used to extend the lease of messages or nack them.
Expand All @@ -583,47 +600,22 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
// enabled, we retry it in a separate goroutine for a short duration.
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
deadlineSec := int32(deadline / time.Second)
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
var toSend []string
for len(ackIDs) > 0 {
toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error {
return it.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
Subscription: it.subName,
AckDeadlineSeconds: deadlineSec,
AckIds: ackIds,
})
}, func(toRetry map[string]*ipubsub.AckResult) {
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
}, func(ctx context.Context, toSend []string) {
if deadline == 0 {
recordStat(it.ctx, NackCount, int64(len(toSend)))
} else {
recordStat(it.ctx, ModAckCount, int64(len(toSend)))
}
addModAcks(toSend, deadlineSec)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{
Subscription: it.subName,
AckDeadlineSeconds: deadlineSec,
AckIds: toSend,
})
if exactlyOnceDelivery {
resultsByAckID := make(map[string]*AckResult)
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}

st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
go func() {
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
}()
}
}
}
})
}

// retryAcks retries the ack RPC with backoff. This must be called in a goroutine
Expand Down Expand Up @@ -751,13 +743,20 @@ func calcFieldSizeInt(fields ...int) int {
return overhead
}

// splitRequestIDs takes a slice of ackIDs and returns two slices such that the first
// ackID slice can be used in a request where the payload does not exceed ackIDBatchSize.
func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string) {
if len(ids) < maxBatchSize {
return ids, []string{}
// makeBatches takes a slice of ackIDs and returns a slice of ackID batches.
// Each ackID batch can be used in a request where the payload does not exceed maxBatchSize.
func makeBatches(ids []string, maxBatchSize int) [][]string {
var batches [][]string
for len(ids) > 0 {
if len(ids) < maxBatchSize {
batches = append(batches, ids)
ids = []string{}
} else {
batches = append(batches, ids[:maxBatchSize])
ids = ids[maxBatchSize:]
}
}
return ids[:maxBatchSize], ids[maxBatchSize:]
return batches
}

// The deadline to ack is derived from a percentile distribution based
Expand Down
29 changes: 13 additions & 16 deletions pubsub/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,22 @@ var (
fullyQualifiedSubName = fmt.Sprintf("projects/%s/subscriptions/%s", projName, subName)
)

func TestSplitRequestIDs(t *testing.T) {
func TestMakeBatches(t *testing.T) {
t.Parallel()
ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}
for _, test := range []struct {
ids []string
splitIndex int
ids := []string{"a", "b", "c", "d", "e"}
for i, test := range []struct {
ids []string
want [][]string
}{
{[]string{}, 0}, // empty slice, no split
{ids, 2}, // slice of size 5, split at index 2
{ids[:2], 2}, // slice of size 3, split at index 2
{ids[:1], 1}, // slice of size 1, split at index 1
{[]string{}, [][]string{}}, // empty slice
{ids, [][]string{{"a", "b"}, {"c", "d"}, {"e"}}}, // slice of size 5
{ids[:3], [][]string{{"a", "b"}, {"c"}}}, // slice of size 3
{ids[:1], [][]string{{"a"}}}, // slice of size 1
} {
got1, got2 := splitRequestIDs(test.ids, 2)
want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:]
if !testutil.Equal(len(got1), len(want1)) {
t.Errorf("%v, 1: got %v, want %v", test, got1, want1)
}
if !testutil.Equal(len(got2), len(want2)) {
t.Errorf("%v, 2: got %v, want %v", test, got2, want2)
got := makeBatches(test.ids, 2)
want := test.want
if !testutil.Equal(len(got), len(want)) {
t.Errorf("test %d: %v, got %v, want %v", i, test, got, want)
}
}
}
Expand Down
Loading