Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvstreamer: clean up Result struct #82422

Merged
merged 1 commit into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type singleRangeBatch struct {
//
// subRequestIdx is only allocated in InOrder mode when
// Hints.SingleRowLookup is false and some Scan requests were enqueued.
subRequestIdx []int
subRequestIdx []int32
// reqsReservedBytes tracks the memory reservation against the budget for
// the memory usage of reqs.
reqsReservedBytes int64
Expand Down Expand Up @@ -114,7 +114,7 @@ func (r singleRangeBatch) priority() int {
// compared when two batches have the same priority value.
//
// It is invalid to call this method on a batch with no requests.
func (r singleRangeBatch) subPriority() int {
func (r singleRangeBatch) subPriority() int32 {
if r.subRequestIdx == nil {
return 0
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (b *resultsBufferBase) initLocked(isEmpty bool, numExpectedResponses int) e

func (b *resultsBufferBase) findCompleteResponses(results []Result) {
for i := range results {
if results[i].GetResp != nil || results[i].ScanResp.Complete {
if results[i].GetResp != nil || results[i].scanComplete {
b.numCompleteResponses++
}
}
Expand Down Expand Up @@ -320,7 +320,7 @@ type inOrderResultsBuffer struct {
// multiple ranges - in such a scenario, multiple Results are created. For
// Get requests and for single-range Scan requests this will always stay at
// zero.
headOfLineSubRequestIdx int
headOfLineSubRequestIdx int32
// buffered contains all buffered Results, regardless of whether they are
// stored in-memory or on disk.
buffered []inOrderBufferedResult
Expand Down Expand Up @@ -475,7 +475,7 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error)
// of the sub-request.
b.headOfLineSubRequestIdx++
}
if result.GetResp != nil || result.ScanResp.Complete {
if result.GetResp != nil || result.scanComplete {
// If the current Result is complete, then we need to advance the
// head-of-the-line position.
b.headOfLinePosition++
Expand Down Expand Up @@ -595,8 +595,9 @@ func (b *inOrderResultsBuffer) close(ctx context.Context) {
// inOrderBufferedResult describes a single Result for InOrder mode, regardless
// of where it is stored (in-memory or on disk).
type inOrderBufferedResult struct {
// If onDisk is true, then only Result.ScanResp.Complete, Result.memoryTok,
// Result.Position, Result.subRequestIdx, and Result.subRequestDone are set.
// If onDisk is true, then only Result.Position, Result.memoryTok,
// Result.subRequestIdx, Result.subRequestDone, and Result.scanComplete are
// set.
Result
// addEpoch indicates the value of addCounter variable when this result was
// added to the buffer. This "epoch" allows us to order correctly two
Expand All @@ -623,7 +624,7 @@ type inOrderBufferedResult struct {
// spill updates r to represent a result that has been spilled to disk and is
// identified by the provided ordinal in the disk buffer.
func (r *inOrderBufferedResult) spill(diskResultID int) {
isScanComplete := r.ScanResp.Complete
isScanComplete := r.scanComplete
*r = inOrderBufferedResult{
Result: Result{
memoryTok: r.memoryTok,
Expand All @@ -635,7 +636,7 @@ func (r *inOrderBufferedResult) spill(diskResultID int) {
onDisk: true,
diskResultID: diskResultID,
}
r.ScanResp.Complete = isScanComplete
r.scanComplete = isScanComplete
}

// get returns the Result, deserializing it from disk if necessary. toConsume
Expand Down
15 changes: 3 additions & 12 deletions pkg/kv/kvclient/kvstreamer/results_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func TestInOrderResultsBuffer(t *testing.T) {
}
for j := 0; j < numRanges; j++ {
scan := makeResultWithScanResp(rng)
scan.ScanResp.Complete = j+1 == numRanges
scan.scanComplete = j+1 == numRanges
scan.memoryTok.toRelease = rng.Int63n(100)
scan.Position = i
scan.subRequestIdx = j
scan.subRequestIdx = int32(j)
scan.subRequestDone = true
results = append(results, scan)
}
Expand Down Expand Up @@ -160,13 +160,6 @@ func TestInOrderResultsBuffer(t *testing.T) {
}
}

func fillEnqueueKeys(r *Result, rng *rand.Rand) {
r.EnqueueKeysSatisfied = make([]int, rng.Intn(20)+1)
for i := range r.EnqueueKeysSatisfied {
r.EnqueueKeysSatisfied[i] = rng.Int()
}
}

func makeResultWithGetResp(rng *rand.Rand, empty bool) Result {
var r Result
r.GetResp = &roachpb.GetResponse{}
Expand All @@ -182,7 +175,6 @@ func makeResultWithGetResp(rng *rand.Rand, empty bool) Result {
},
}
}
fillEnqueueKeys(&r, rng)
return r
}

Expand All @@ -195,9 +187,8 @@ func makeResultWithScanResp(rng *rand.Rand) Result {
rng.Read(batchResponse)
batchResponses[i] = batchResponse
}
r.ScanResp.ScanResponse = &roachpb.ScanResponse{
r.ScanResp = &roachpb.ScanResponse{
BatchResponses: batchResponses,
}
fillEnqueueKeys(&r, rng)
return r
}
129 changes: 47 additions & 82 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,10 @@ type Result struct {
// is false). In that case, there will be a further result with the
// continuation; that result will use the same Key. Notably, SQL rows will
// never be split across multiple results.
ScanResp struct {
// The response is always using BATCH_RESPONSE format (meaning that Rows
// field is always nil). IntentRows field is also nil.
*roachpb.ScanResponse
// If the Result represents a scan result, Complete indicates whether
// this is the last response for the respective scan, or if there are
// more responses to come. In any case, ScanResp never contains partial
// rows (i.e. a single row is never split into different Results).
//
// When running in InOrder mode, Results for a single scan will be
// delivered in key order (in addition to results for different scans
// being delivered in request order). When running in OutOfOrder mode,
// Results for a single scan can be delivered out of key order (in
// addition to results for different scans being delivered out of
// request order).
Complete bool
}
// EnqueueKeysSatisfied identifies the requests that this Result satisfies.
// In OutOfOrder mode, a single Result can satisfy multiple identical
// requests. In InOrder mode a Result can only satisfy multiple consecutive
// requests.
EnqueueKeysSatisfied []int
// memoryTok describes the memory reservation of this Result that needs to
// be released back to the Streamer's budget when the Result is Release()'d.
memoryTok struct {
streamer *Streamer
toRelease int64
}
//
// The response is always using BATCH_RESPONSE format (meaning that Rows
// field is always nil). IntentRows field is also nil.
ScanResp *roachpb.ScanResponse
// Position tracks the ordinal among all originally enqueued requests that
// this result satisfies. See singleRangeBatch.positions for more details.
//
Expand All @@ -118,11 +94,17 @@ type Result struct {
// TODO(yuzefovich): this might need to be []int when non-unique requests
// are supported.
Position int
// memoryTok describes the memory reservation of this Result that needs to
// be released back to the Streamer's budget when the Result is Release()'d.
memoryTok struct {
streamer *Streamer
toRelease int64
}
// subRequestIdx allows us to order two Results that come for the same
// original Scan request but from different ranges. It is non-zero only in
// InOrder mode when Hints.SingleRowLookup is false, in all other cases it
// will remain zero. See singleRangeBatch.subRequestIdx for more details.
subRequestIdx int
subRequestIdx int32
// subRequestDone is true if the current Result is the last one for the
// corresponding sub-request. For all Get requests and for Scan requests
// contained within a single range, it is always true since those can only
Expand All @@ -132,6 +114,17 @@ type Result struct {
// properly if this Result is a Scan response and Hints.SingleRowLookup is
// false.
subRequestDone bool
// If the Result represents a scan result, scanComplete indicates whether
// this is the last response for the respective scan, or if there are more
// responses to come. In any case, ScanResp never contains partial rows
// (i.e. a single row is never split into different Results).
//
// When running in InOrder mode, Results for a single scan will be delivered
// in key order (in addition to results for different scans being delivered
// in request order). When running in OutOfOrder mode, Results for a single
// scan can be delivered out of key order (in addition to results for
// different scans being delivered out of request order).
scanComplete bool
}

// Hints provides different hints to the Streamer for optimization purposes.
Expand Down Expand Up @@ -189,7 +182,7 @@ func (r Result) Release(ctx context.Context) {
// }
// // All previously enqueued requests have already been responded to.
// if moreRequestsToEnqueue {
// err := s.Enqueue(ctx, requests, enqueueKeys)
// err := s.Enqueue(ctx, requests)
// // err check
// ...
// } else {
Expand Down Expand Up @@ -238,8 +231,6 @@ type Streamer struct {

waitGroup sync.WaitGroup

enqueueKeys []int

// requestsToServe contains all single-range sub-requests that have yet
// to be served.
requestsToServe requestsProvider
Expand All @@ -266,7 +257,7 @@ type Streamer struct {
// It is allocated lazily if Hints.SingleRowLookup is false when the
// first ScanRequest is encountered in Enqueue.
// TODO(yuzefovich): perform memory accounting for this.
numRangesPerScanRequest []int
numRangesPerScanRequest []int32

// numRequestsInFlight tracks the number of single-range batches that
// are currently being served asynchronously (i.e. those that have
Expand Down Expand Up @@ -378,16 +369,7 @@ func (s *Streamer) Init(
}

// Enqueue dispatches multiple requests for execution. Results are delivered
// through the GetResults call. If enqueueKeys is not nil, it needs to contain
// one ID for each request; responses will reference that ID so that the client
// can associate them to the requests. If enqueueKeys is nil, then the responses
// will reference the ordinals of the corresponding requests among reqs.
//
// Multiple requests can specify the same key. In this case, their respective
// responses will also reference the same key. This is useful, for example, for
// "range-based lookup joins" where multiple spans are read in the context of
// the same input-side row (see multiSpanGenerator implementation of
// rowexec.joinReaderSpanGenerator interface for more details).
// through the GetResults call.
//
// The Streamer takes over the given requests, will perform the memory
// accounting against its budget and might modify the requests in place.
Expand All @@ -405,9 +387,7 @@ func (s *Streamer) Init(
// Currently, enqueuing new requests while there are still requests in progress
// from the previous invocation is prohibited.
// TODO(yuzefovich): lift this restriction and introduce the pipelining.
func (s *Streamer) Enqueue(
ctx context.Context, reqs []roachpb.RequestUnion, enqueueKeys []int,
) (retErr error) {
func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (retErr error) {
if !s.coordinatorStarted {
var coordinatorCtx context.Context
coordinatorCtx, s.coordinatorCtxCancel = s.stopper.WithCancelOnQuiesce(ctx)
Expand Down Expand Up @@ -436,11 +416,6 @@ func (s *Streamer) Enqueue(
}
}()

if enqueueKeys != nil && len(enqueueKeys) != len(reqs) {
return errors.AssertionFailedf("invalid enqueueKeys: len(reqs) = %d, len(enqueueKeys) = %d", len(reqs), len(enqueueKeys))
}
s.enqueueKeys = enqueueKeys

if err := s.results.init(ctx, len(reqs)); err != nil {
return err
}
Expand Down Expand Up @@ -486,7 +461,7 @@ func (s *Streamer) Enqueue(
if err != nil {
return err
}
var subRequestIdx []int
var subRequestIdx []int32
if !s.hints.SingleRowLookup {
for i, pos := range positions {
if _, isScan := reqs[pos].GetInner().(*roachpb.ScanRequest); isScan {
Expand All @@ -497,20 +472,20 @@ func (s *Streamer) Enqueue(
streamerLocked = true
s.mu.Lock()
if cap(s.mu.numRangesPerScanRequest) < len(reqs) {
s.mu.numRangesPerScanRequest = make([]int, len(reqs))
s.mu.numRangesPerScanRequest = make([]int32, len(reqs))
} else {
// We can reuse numRangesPerScanRequest allocated on
// the previous call to Enqueue after we zero it
// out.
s.mu.numRangesPerScanRequest = s.mu.numRangesPerScanRequest[:len(reqs)]
for n := 0; n < len(s.mu.numRangesPerScanRequest); {
n += copy(s.mu.numRangesPerScanRequest[n:], zeroIntSlice)
n += copy(s.mu.numRangesPerScanRequest[n:], zeroInt32Slice)
}
}
}
if s.mode == InOrder {
if subRequestIdx == nil {
subRequestIdx = make([]int, len(singleRangeReqs))
subRequestIdx = make([]int32, len(singleRangeReqs))
}
subRequestIdx[i] = s.mu.numRangesPerScanRequest[pos]
}
Expand Down Expand Up @@ -1280,11 +1255,7 @@ func (w *workerCoordinator) processSingleRangeResults(
var memoryTokensBytes int64
for i, resp := range br.Responses {
position := req.positions[i]
enqueueKey := position
if w.s.enqueueKeys != nil {
enqueueKey = w.s.enqueueKeys[position]
}
var subRequestIdx int
var subRequestIdx int32
if req.subRequestIdx != nil {
subRequestIdx = req.subRequestIdx[i]
}
Expand Down Expand Up @@ -1318,13 +1289,10 @@ func (w *workerCoordinator) processSingleRangeResults(
)
}
result := Result{
GetResp: get,
// This currently only works because all requests are
// unique.
EnqueueKeysSatisfied: []int{enqueueKey},
Position: position,
subRequestIdx: subRequestIdx,
subRequestDone: true,
GetResp: get,
Position: position,
subRequestIdx: subRequestIdx,
subRequestDone: true,
}
result.memoryTok.streamer = w.s
result.memoryTok.toRelease = getResponseSize(get)
Expand Down Expand Up @@ -1354,21 +1322,18 @@ func (w *workerCoordinator) processSingleRangeResults(
// want to be able to set Complete field on such an empty
// Result).
result := Result{
// This currently only works because all requests
// are unique.
EnqueueKeysSatisfied: []int{enqueueKey},
Position: position,
subRequestIdx: subRequestIdx,
subRequestDone: scan.ResumeSpan == nil,
Position: position,
subRequestIdx: subRequestIdx,
subRequestDone: scan.ResumeSpan == nil,
}
result.memoryTok.streamer = w.s
result.memoryTok.toRelease = scanResponseSize(scan)
memoryTokensBytes += result.memoryTok.toRelease
result.ScanResp.ScanResponse = scan
result.ScanResp = scan
if w.s.hints.SingleRowLookup {
// When SingleRowLookup is false, Complete field will be set
// in finalizeSingleRangeResults().
result.ScanResp.Complete = true
// When SingleRowLookup is false, scanComplete field will be
// set in finalizeSingleRangeResults().
result.scanComplete = true
}
results = append(results, result)
hasNonEmptyScanResponse = true
Expand Down Expand Up @@ -1482,14 +1447,14 @@ func (w *workerCoordinator) finalizeSingleRangeResults(
// field has already been set correctly.
if hasNonEmptyScanResponse && !w.s.hints.SingleRowLookup {
for i := range results {
if results[i].ScanResp.ScanResponse != nil {
if results[i].ScanResp != nil {
if results[i].ScanResp.ResumeSpan == nil {
// The scan within the range is complete.
if w.s.mode == OutOfOrder {
w.s.mu.numRangesPerScanRequest[results[i].Position]--
if w.s.mu.numRangesPerScanRequest[results[i].Position] == 0 {
// The scan across all ranges is now complete too.
results[i].ScanResp.Complete = true
results[i].scanComplete = true
}
} else {
// In InOrder mode, the scan is marked as complete when
Expand All @@ -1499,7 +1464,7 @@ func (w *workerCoordinator) finalizeSingleRangeResults(
// Result until the previous sub-requests are responded
// to.
numSubRequests := w.s.mu.numRangesPerScanRequest[results[i].Position]
results[i].ScanResp.Complete = results[i].subRequestIdx+1 == numSubRequests
results[i].scanComplete = results[i].subRequestIdx+1 == numSubRequests
}
} else {
// Unset the ResumeSpan on the result in order to not
Expand All @@ -1524,10 +1489,10 @@ func (w *workerCoordinator) finalizeSingleRangeResults(
w.s.results.add(results)
}

var zeroIntSlice []int
var zeroInt32Slice []int32

func init() {
zeroIntSlice = make([]int, 1<<10)
zeroInt32Slice = make([]int32, 1<<10)
}

const requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{}))
Expand Down
Loading