From 25d7ef790aa636b99869734a1bf58a859076c21b Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 10 Jun 2022 15:44:28 -0700 Subject: [PATCH] kvcoord: truncate BatchRequest to range boundaries serially This commit refactors the DistSender's loop of iterating over the range descriptors so that the truncation of the BatchRequest happens serially. This incurs a minor performance hit when the requests are sent in parallel, but it makes it possible to apply the optimizations to this iteration in the following commits. Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 52 +++++++++++++------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index f55c496b07be..e5f22213e6a0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1208,7 +1208,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( // Take the fast path if this batch fits within a single range. if !ri.NeedAnother(rs) { resp := ds.sendPartialBatch( - ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), false, /* needsTruncate */ + ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), nil, /* positions */ ) return resp.reply, resp.pErr } @@ -1367,16 +1367,34 @@ func (ds *DistSender) divideAndSendBatchToRanges( return } + // Truncate the request to range descriptor. + curRangeRS, err := rs.Intersect(ri.Token().Desc()) + if err != nil { + responseCh <- response{pErr: roachpb.NewError(err)} + return + } + curRangeBatch := ba + var positions []int + curRangeBatch.Requests, positions, err = Truncate(ba.Requests, curRangeRS) + if len(positions) == 0 && err == nil { + // This shouldn't happen in the wild, but some tests exercise it. + err = errors.Newf("truncation resulted in empty batch on %s: %s", rs, ba) + } + if err != nil { + responseCh <- response{pErr: roachpb.NewError(err)} + return + } + lastRange := !ri.NeedAnother(rs) // Send the next partial batch to the first range in the "rs" span. // If we can reserve one of the limited goroutines available for parallel // batch RPCs, send asynchronously. if canParallelize && !lastRange && !ds.disableParallelBatches && - ds.sendPartialBatchAsync(ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), responseCh) { + ds.sendPartialBatchAsync(ctx, curRangeBatch, rs, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) { // Sent the batch asynchronously. } else { resp := ds.sendPartialBatch( - ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), true, /* needsTruncate */ + ctx, curRangeBatch, rs, isReverse, withCommit, batchIdx, ri.Token(), positions, ) responseCh <- resp if resp.pErr != nil { @@ -1467,6 +1485,7 @@ func (ds *DistSender) sendPartialBatchAsync( batchIdx int, routing rangecache.EvictionToken, responseCh chan response, + positions []int, ) bool { if err := ds.rpcContext.Stopper.RunAsyncTaskEx( ctx, @@ -1479,7 +1498,7 @@ func (ds *DistSender) sendPartialBatchAsync( func(ctx context.Context) { ds.metrics.AsyncSentCount.Inc(1) responseCh <- ds.sendPartialBatch( - ctx, ba, rs, isReverse, withCommit, batchIdx, routing, true, /* needsTruncate */ + ctx, ba, rs, isReverse, withCommit, batchIdx, routing, positions, ) }, ); err != nil { @@ -1518,9 +1537,7 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at // replicas, we backoff and retry by refetching the range // descriptor. If the underlying range seems to have split, we // recursively invoke divideAndSendBatchToRanges to re-enumerate the -// ranges in the span and resend to each. If needsTruncate is true, -// the supplied batch and span must be truncated to the supplied range -// descriptor. +// ranges in the span and resend to each. func (ds *DistSender) sendPartialBatch( ctx context.Context, ba roachpb.BatchRequest, @@ -1529,7 +1546,7 @@ func (ds *DistSender) sendPartialBatch( withCommit bool, batchIdx int, routingTok rangecache.EvictionToken, - needsTruncate bool, + positions []int, ) response { if batchIdx == 1 { ds.metrics.PartialBatchCount.Inc(2) // account for first batch @@ -1539,25 +1556,6 @@ func (ds *DistSender) sendPartialBatch( var reply *roachpb.BatchResponse var pErr *roachpb.Error var err error - var positions []int - - if needsTruncate { - // Truncate the request to range descriptor. - rs, err = rs.Intersect(routingTok.Desc()) - if err != nil { - return response{pErr: roachpb.NewError(err)} - } - ba.Requests, positions, err = Truncate(ba.Requests, rs) - if len(positions) == 0 && err == nil { - // This shouldn't happen in the wild, but some tests exercise it. - return response{ - pErr: roachpb.NewErrorf("truncation resulted in empty batch on %s: %s", rs, ba), - } - } - if err != nil { - return response{pErr: roachpb.NewError(err)} - } - } // Start a retry loop for sending the batch to the range. Each iteration of // this loop uses a new descriptor. Attempts to send to multiple replicas in