-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pubsub): make lease management RPCs concurrent #10238
feat(pubsub): make lease management RPCs concurrent #10238
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good to me. One thing that got me a bit concerned is the logic on methods sendAck
and sendModAck
are almost the same and has duplicate code that can lead to it being out of sync sometimes. What differs is how to track telemetry, how to send the ack and how to retry.
I have a suggestion, but I don't have the full context to know if those ack
methods can diverse more in the future, but we can have a common sendAckWithFunc
method that contains the logic and accept functions for acking
, retrying
and recordTelemetry
.
Would be something like this:
func (it *messageIterator) sendAck(m map[string]*AckResult) {
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error {
return it.subc.Acknowledge(ctx, &pb.AcknowledgeRequest{
Subscription: it.subName,
AckIds: ackIds,
})
}, it.retryAcks, func(ctx context.Context, toSend []string) {
recordStat(it.ctx, AckCount, int64(len(toSend)))
addAcks(toSend)
})
}
func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
deadlineSec := int32(deadline / time.Second)
it.sendAckWithFunc(m, func(ctx context.Context, subName string, ackIds []string) error {
return it.subc.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
Subscription: it.subName,
AckDeadlineSeconds: deadlineSec,
AckIds: ackIds,
})
}, func(toRetry map[string]*ipubsub.AckResult) {
it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
}, func(ctx context.Context, toSend []string) {
if deadline == 0 {
recordStat(it.ctx, NackCount, int64(len(toSend)))
} else {
recordStat(it.ctx, ModAckCount, int64(len(toSend)))
}
addModAcks(toSend, deadlineSec)
})
}
type ackFunc = func(ctx context.Context, subName string, ackIds []string) error
type ackRecordStat = func(ctx context.Context, toSend []string)
type retryAckFunc = func(toRetry map[string]*ipubsub.AckResult)
func (it *messageIterator) sendAckWithFunc(m map[string]*AckResult, ackFunc ackFunc, retryAckFunc retryAckFunc, ackRecordStat ackRecordStat) {
ackIDs := make([]string, 0, len(m))
for k := range m {
ackIDs = append(ackIDs, k)
}
it.eoMu.RLock()
exactlyOnceDelivery := it.enableExactlyOnceDelivery
it.eoMu.RUnlock()
batches := makeBatches(ackIDs, ackIDBatchSize)
wg := sync.WaitGroup{}
for _, batch := range batches {
wg.Add(1)
go func(toSend []string) {
defer wg.Done()
ackRecordStat(it.ctx, toSend)
// Use context.Background() as the call's context, not it.ctx. We don't
// want to cancel this RPC when the iterator is stopped.
cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel2()
err := ackFunc(cctx, it.subName, toSend)
if exactlyOnceDelivery {
resultsByAckID := make(map[string]*AckResult)
for _, ackID := range toSend {
resultsByAckID[ackID] = m[ackID]
}
st, md := extractMetadata(err)
_, toRetry := processResults(st, resultsByAckID, md)
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
go func() {
retryAckFunc(toRetry)
}()
}
}
}(batch)
}
}
I haven't been a huge fan of callback based systems since it makes jumping around in the IDE more difficult. With that said, the tradeoffs could be worth it in this case since there is a large number of overlap in code. I'm changing these functions a fair bit as part of the otel tracing change, but I think it should still be mostly compatible even when wrapped with functions. I think that if we end up needing more than these set of function arguments, it would be even less ergonomic but as-is, this seems like a good change. |
This PR makes the lease management RPCs concurrent rather than serial. The previous behavior takes a list of
ackIDs
that need to be sent out, takes the firstn
messages (wheren
is the max batch size), issues the RPC, and then repeats untilackID
slice is empty.This PR improves performance for large amounts of ackIDs by concurrently sending out RPCs in a goroutine, and waiting for them all to complete in a waitgroup. This means rewriting the
splitRequestIDs
function into a newmakeBatches
function, that does the splitting / batch making upfront, rather than per iteration.Fixes #9727