Skip to content

Commit

Permalink
kvstreamer: clean up range loop a bit
Browse files Browse the repository at this point in the history
This commit refactors the range loop in the streamer slightly in order
to avoid seeking the range iterator to `/Max` key (which can be
confusing if seen in the trace).

Release note: None
  • Loading branch information
yuzefovich committed Jul 14, 2022
1 parent 750b231 commit 041b104
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,9 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
// of singleRangeBatch objects in flight is limited by the number of ranges
// of a single table, so it doesn't seem urgent to fix the accounting here.
var requestsToServe []singleRangeBatch
seekKey := rs.Key
const scanDir = kvcoord.Ascending
ri := kvcoord.MakeRangeIterator(s.distSender)
ri.Seek(ctx, seekKey, scanDir)
ri.Seek(ctx, rs.Key, scanDir)
if !ri.Valid() {
return ri.Error()
}
Expand Down Expand Up @@ -534,7 +533,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
}
var reqsKeysScratch []roachpb.Key
var newNumRangesPerScanRequestMemoryUsage int64
for ; ri.Valid(); ri.Seek(ctx, seekKey, scanDir) {
for {
// Find all requests that touch the current range.
var singleRangeReqs []roachpb.RequestUnion
var positions []int
Expand All @@ -546,19 +545,18 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
for i := range positions {
positions[i] = i
}
seekKey = roachpb.RKeyMax
rs.Key = roachpb.RKeyMax
} else {
// Truncate the request span to the current range.
singleRangeSpan, err := rs.Intersect(ri.Token().Desc())
if err != nil {
return err
}
singleRangeReqs, positions, seekKey, err = s.truncationHelper.Truncate(singleRangeSpan)
singleRangeReqs, positions, rs.Key, err = s.truncationHelper.Truncate(singleRangeSpan)
if err != nil {
return err
}
}
rs.Key = seekKey
var subRequestIdx []int32
var subRequestIdxOverhead int64
if !s.hints.SingleRowLookup {
Expand Down Expand Up @@ -632,6 +630,12 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re

requestsToServe = append(requestsToServe, r)
s.enqueuedSingleRangeRequests += len(singleRangeReqs)

if !ri.NeedAnother(rs) {
// This was the last range.
break
}
ri.Seek(ctx, rs.Key, scanDir)
}

if streamerLocked {
Expand Down

0 comments on commit 041b104

Please sign in to comment.