Skip to content

Commit

Permalink
kvcoord: truncate BatchRequest to range boundaries serially
Browse files Browse the repository at this point in the history
This commit refactors the DistSender's loop of iterating over the range
descriptors so that the truncation of the BatchRequest happens
serially. This incurs a minor performance hit when the requests are sent
in parallel, but it makes it possible to apply the optimizations to this
iteration in the following commits.

Release note: None
  • Loading branch information
yuzefovich committed Jun 14, 2022
1 parent d1438e8 commit 25d7ef7
Showing 1 changed file with 25 additions and 27 deletions.
52 changes: 25 additions & 27 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// Take the fast path if this batch fits within a single range.
if !ri.NeedAnother(rs) {
resp := ds.sendPartialBatch(
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), false, /* needsTruncate */
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), nil, /* positions */
)
return resp.reply, resp.pErr
}
Expand Down Expand Up @@ -1367,16 +1367,34 @@ func (ds *DistSender) divideAndSendBatchToRanges(
return
}

// Truncate the request to range descriptor.
curRangeRS, err := rs.Intersect(ri.Token().Desc())
if err != nil {
responseCh <- response{pErr: roachpb.NewError(err)}
return
}
curRangeBatch := ba
var positions []int
curRangeBatch.Requests, positions, err = Truncate(ba.Requests, curRangeRS)
if len(positions) == 0 && err == nil {
// This shouldn't happen in the wild, but some tests exercise it.
err = errors.Newf("truncation resulted in empty batch on %s: %s", rs, ba)
}
if err != nil {
responseCh <- response{pErr: roachpb.NewError(err)}
return
}

lastRange := !ri.NeedAnother(rs)
// Send the next partial batch to the first range in the "rs" span.
// If we can reserve one of the limited goroutines available for parallel
// batch RPCs, send asynchronously.
if canParallelize && !lastRange && !ds.disableParallelBatches &&
ds.sendPartialBatchAsync(ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), responseCh) {
ds.sendPartialBatchAsync(ctx, curRangeBatch, rs, isReverse, withCommit, batchIdx, ri.Token(), responseCh, positions) {
// Sent the batch asynchronously.
} else {
resp := ds.sendPartialBatch(
ctx, ba, rs, isReverse, withCommit, batchIdx, ri.Token(), true, /* needsTruncate */
ctx, curRangeBatch, rs, isReverse, withCommit, batchIdx, ri.Token(), positions,
)
responseCh <- resp
if resp.pErr != nil {
Expand Down Expand Up @@ -1467,6 +1485,7 @@ func (ds *DistSender) sendPartialBatchAsync(
batchIdx int,
routing rangecache.EvictionToken,
responseCh chan response,
positions []int,
) bool {
if err := ds.rpcContext.Stopper.RunAsyncTaskEx(
ctx,
Expand All @@ -1479,7 +1498,7 @@ func (ds *DistSender) sendPartialBatchAsync(
func(ctx context.Context) {
ds.metrics.AsyncSentCount.Inc(1)
responseCh <- ds.sendPartialBatch(
ctx, ba, rs, isReverse, withCommit, batchIdx, routing, true, /* needsTruncate */
ctx, ba, rs, isReverse, withCommit, batchIdx, routing, positions,
)
},
); err != nil {
Expand Down Expand Up @@ -1518,9 +1537,7 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at
// replicas, we backoff and retry by refetching the range
// descriptor. If the underlying range seems to have split, we
// recursively invoke divideAndSendBatchToRanges to re-enumerate the
// ranges in the span and resend to each. If needsTruncate is true,
// the supplied batch and span must be truncated to the supplied range
// descriptor.
// ranges in the span and resend to each.
func (ds *DistSender) sendPartialBatch(
ctx context.Context,
ba roachpb.BatchRequest,
Expand All @@ -1529,7 +1546,7 @@ func (ds *DistSender) sendPartialBatch(
withCommit bool,
batchIdx int,
routingTok rangecache.EvictionToken,
needsTruncate bool,
positions []int,
) response {
if batchIdx == 1 {
ds.metrics.PartialBatchCount.Inc(2) // account for first batch
Expand All @@ -1539,25 +1556,6 @@ func (ds *DistSender) sendPartialBatch(
var reply *roachpb.BatchResponse
var pErr *roachpb.Error
var err error
var positions []int

if needsTruncate {
// Truncate the request to range descriptor.
rs, err = rs.Intersect(routingTok.Desc())
if err != nil {
return response{pErr: roachpb.NewError(err)}
}
ba.Requests, positions, err = Truncate(ba.Requests, rs)
if len(positions) == 0 && err == nil {
// This shouldn't happen in the wild, but some tests exercise it.
return response{
pErr: roachpb.NewErrorf("truncation resulted in empty batch on %s: %s", rs, ba),
}
}
if err != nil {
return response{pErr: roachpb.NewError(err)}
}
}

// Start a retry loop for sending the batch to the range. Each iteration of
// this loop uses a new descriptor. Attempts to send to multiple replicas in
Expand Down

0 comments on commit 25d7ef7

Please sign in to comment.