From 3a05903dd5c74e3a12cb237cda0c364c1dc06603 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 3 Jun 2022 11:16:25 -0700 Subject: [PATCH] kvstreamer: clean up Result struct MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit cleans up the `Result` struct in order to reduce its memory size, bringing it down into 48 byte size class. ``` name old time/op new time/op delta IndexJoin/Cockroach-24 6.29ms ± 1% 6.22ms ± 2% -1.06% (p=0.024 n=9+9) IndexJoin/MultinodeCockroach-24 7.99ms ± 1% 7.93ms ± 2% ~ (p=0.165 n=10+10) name old alloc/op new alloc/op delta IndexJoin/Cockroach-24 1.64MB ± 1% 1.48MB ± 0% -9.25% (p=0.000 n=9+10) IndexJoin/MultinodeCockroach-24 2.37MB ± 1% 2.20MB ± 1% -7.06% (p=0.000 n=10+10) name old allocs/op new allocs/op delta IndexJoin/Cockroach-24 8.15k ± 1% 7.15k ± 1% -12.28% (p=0.000 n=8+10) IndexJoin/MultinodeCockroach-24 12.7k ± 1% 11.7k ± 1% -8.18% (p=0.000 n=10+10) ``` The main change of this commit is the removal of the concept of "enqueue keys" from the Streamer API in favor of relying on `Result.Position`. When requests are unique, then a single `Result` can only satisfy a single enqueue key; however, for non-unique requests a single `Result` can satisfy multiple requests and, thus, can have multiple enqueue keys. At the moment, only unique requests are supported though. Once non-unique requests are supported too, we'll need to figure out how to handle those (maybe we'll be returning a `Result` `N` number of times if it satisfies `N` original requests with different values for `Position`). Also, initially multiple "enqueue keys" support was envisioned for the case of `multiSpanGenerator` in the lookup joins (i.e. multi-equality lookup joins); however, I believe we should push that complexity out of the streamer (into `TxnKVStreamer`) which is what this commit does. Other changes done in this commit: - unexport `ScanResp.Complete` field since this is currently only used within the `kvstreamer` package - reorder all existing fields so that the footprint of the struct is minimized (in particular, `scanComplete` field is moved to the bottom and `ScanResp` anonymous struct is removed) - make `subRequestIdx` `int32` rather than `int`. This value is bound by the number of ranges in the cluster, so max int32 is more than sufficient. Release note: None --- .../kvclient/kvstreamer/requests_provider.go | 4 +- pkg/kv/kvclient/kvstreamer/results_buffer.go | 15 +- .../kvstreamer/results_buffer_test.go | 15 +- pkg/kv/kvclient/kvstreamer/streamer.go | 129 +++++++----------- pkg/kv/kvclient/kvstreamer/streamer_test.go | 24 +--- pkg/sql/row/kv_batch_streamer.go | 39 ++---- .../kvstreamer_result_disk_buffer.go | 29 +--- .../kvstreamer_result_disk_buffer_test.go | 15 +- 8 files changed, 90 insertions(+), 180 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider.go b/pkg/kv/kvclient/kvstreamer/requests_provider.go index e24fe07e3d6e..e5cbc551f11a 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider.go @@ -71,7 +71,7 @@ type singleRangeBatch struct { // // subRequestIdx is only allocated in InOrder mode when // Hints.SingleRowLookup is false and some Scan requests were enqueued. - subRequestIdx []int + subRequestIdx []int32 // reqsReservedBytes tracks the memory reservation against the budget for // the memory usage of reqs. reqsReservedBytes int64 @@ -114,7 +114,7 @@ func (r singleRangeBatch) priority() int { // compared when two batches have the same priority value. // // It is invalid to call this method on a batch with no requests. -func (r singleRangeBatch) subPriority() int { +func (r singleRangeBatch) subPriority() int32 { if r.subRequestIdx == nil { return 0 } diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer.go b/pkg/kv/kvclient/kvstreamer/results_buffer.go index c4f368d30045..7ceebecf4def 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer.go @@ -157,7 +157,7 @@ func (b *resultsBufferBase) initLocked(isEmpty bool, numExpectedResponses int) e func (b *resultsBufferBase) findCompleteResponses(results []Result) { for i := range results { - if results[i].GetResp != nil || results[i].ScanResp.Complete { + if results[i].GetResp != nil || results[i].scanComplete { b.numCompleteResponses++ } } @@ -320,7 +320,7 @@ type inOrderResultsBuffer struct { // multiple ranges - in such a scenario, multiple Results are created. For // Get requests and for single-range Scan requests this will always stay at // zero. - headOfLineSubRequestIdx int + headOfLineSubRequestIdx int32 // buffered contains all buffered Results, regardless of whether they are // stored in-memory or on disk. buffered []inOrderBufferedResult @@ -475,7 +475,7 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) // of the sub-request. b.headOfLineSubRequestIdx++ } - if result.GetResp != nil || result.ScanResp.Complete { + if result.GetResp != nil || result.scanComplete { // If the current Result is complete, then we need to advance the // head-of-the-line position. b.headOfLinePosition++ @@ -595,8 +595,9 @@ func (b *inOrderResultsBuffer) close(ctx context.Context) { // inOrderBufferedResult describes a single Result for InOrder mode, regardless // of where it is stored (in-memory or on disk). type inOrderBufferedResult struct { - // If onDisk is true, then only Result.ScanResp.Complete, Result.memoryTok, - // Result.Position, Result.subRequestIdx, and Result.subRequestDone are set. + // If onDisk is true, then only Result.Position, Result.memoryTok, + // Result.subRequestIdx, Result.subRequestDone, and Result.scanComplete are + // set. Result // addEpoch indicates the value of addCounter variable when this result was // added to the buffer. This "epoch" allows us to order correctly two @@ -623,7 +624,7 @@ type inOrderBufferedResult struct { // spill updates r to represent a result that has been spilled to disk and is // identified by the provided ordinal in the disk buffer. func (r *inOrderBufferedResult) spill(diskResultID int) { - isScanComplete := r.ScanResp.Complete + isScanComplete := r.scanComplete *r = inOrderBufferedResult{ Result: Result{ memoryTok: r.memoryTok, @@ -635,7 +636,7 @@ func (r *inOrderBufferedResult) spill(diskResultID int) { onDisk: true, diskResultID: diskResultID, } - r.ScanResp.Complete = isScanComplete + r.scanComplete = isScanComplete } // get returns the Result, deserializing it from disk if necessary. toConsume diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go index efef68515f77..ede110f30360 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go @@ -85,10 +85,10 @@ func TestInOrderResultsBuffer(t *testing.T) { } for j := 0; j < numRanges; j++ { scan := makeResultWithScanResp(rng) - scan.ScanResp.Complete = j+1 == numRanges + scan.scanComplete = j+1 == numRanges scan.memoryTok.toRelease = rng.Int63n(100) scan.Position = i - scan.subRequestIdx = j + scan.subRequestIdx = int32(j) scan.subRequestDone = true results = append(results, scan) } @@ -160,13 +160,6 @@ func TestInOrderResultsBuffer(t *testing.T) { } } -func fillEnqueueKeys(r *Result, rng *rand.Rand) { - r.EnqueueKeysSatisfied = make([]int, rng.Intn(20)+1) - for i := range r.EnqueueKeysSatisfied { - r.EnqueueKeysSatisfied[i] = rng.Int() - } -} - func makeResultWithGetResp(rng *rand.Rand, empty bool) Result { var r Result r.GetResp = &roachpb.GetResponse{} @@ -182,7 +175,6 @@ func makeResultWithGetResp(rng *rand.Rand, empty bool) Result { }, } } - fillEnqueueKeys(&r, rng) return r } @@ -195,9 +187,8 @@ func makeResultWithScanResp(rng *rand.Rand) Result { rng.Read(batchResponse) batchResponses[i] = batchResponse } - r.ScanResp.ScanResponse = &roachpb.ScanResponse{ + r.ScanResp = &roachpb.ScanResponse{ BatchResponses: batchResponses, } - fillEnqueueKeys(&r, rng) return r } diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 71a05b4e1d14..dde2bf094f95 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -80,34 +80,10 @@ type Result struct { // is false). In that case, there will be a further result with the // continuation; that result will use the same Key. Notably, SQL rows will // never be split across multiple results. - ScanResp struct { - // The response is always using BATCH_RESPONSE format (meaning that Rows - // field is always nil). IntentRows field is also nil. - *roachpb.ScanResponse - // If the Result represents a scan result, Complete indicates whether - // this is the last response for the respective scan, or if there are - // more responses to come. In any case, ScanResp never contains partial - // rows (i.e. a single row is never split into different Results). - // - // When running in InOrder mode, Results for a single scan will be - // delivered in key order (in addition to results for different scans - // being delivered in request order). When running in OutOfOrder mode, - // Results for a single scan can be delivered out of key order (in - // addition to results for different scans being delivered out of - // request order). - Complete bool - } - // EnqueueKeysSatisfied identifies the requests that this Result satisfies. - // In OutOfOrder mode, a single Result can satisfy multiple identical - // requests. In InOrder mode a Result can only satisfy multiple consecutive - // requests. - EnqueueKeysSatisfied []int - // memoryTok describes the memory reservation of this Result that needs to - // be released back to the Streamer's budget when the Result is Release()'d. - memoryTok struct { - streamer *Streamer - toRelease int64 - } + // + // The response is always using BATCH_RESPONSE format (meaning that Rows + // field is always nil). IntentRows field is also nil. + ScanResp *roachpb.ScanResponse // Position tracks the ordinal among all originally enqueued requests that // this result satisfies. See singleRangeBatch.positions for more details. // @@ -118,11 +94,17 @@ type Result struct { // TODO(yuzefovich): this might need to be []int when non-unique requests // are supported. Position int + // memoryTok describes the memory reservation of this Result that needs to + // be released back to the Streamer's budget when the Result is Release()'d. + memoryTok struct { + streamer *Streamer + toRelease int64 + } // subRequestIdx allows us to order two Results that come for the same // original Scan request but from different ranges. It is non-zero only in // InOrder mode when Hints.SingleRowLookup is false, in all other cases it // will remain zero. See singleRangeBatch.subRequestIdx for more details. - subRequestIdx int + subRequestIdx int32 // subRequestDone is true if the current Result is the last one for the // corresponding sub-request. For all Get requests and for Scan requests // contained within a single range, it is always true since those can only @@ -132,6 +114,17 @@ type Result struct { // properly if this Result is a Scan response and Hints.SingleRowLookup is // false. subRequestDone bool + // If the Result represents a scan result, scanComplete indicates whether + // this is the last response for the respective scan, or if there are more + // responses to come. In any case, ScanResp never contains partial rows + // (i.e. a single row is never split into different Results). + // + // When running in InOrder mode, Results for a single scan will be delivered + // in key order (in addition to results for different scans being delivered + // in request order). When running in OutOfOrder mode, Results for a single + // scan can be delivered out of key order (in addition to results for + // different scans being delivered out of request order). + scanComplete bool } // Hints provides different hints to the Streamer for optimization purposes. @@ -189,7 +182,7 @@ func (r Result) Release(ctx context.Context) { // } // // All previously enqueued requests have already been responded to. // if moreRequestsToEnqueue { -// err := s.Enqueue(ctx, requests, enqueueKeys) +// err := s.Enqueue(ctx, requests) // // err check // ... // } else { @@ -238,8 +231,6 @@ type Streamer struct { waitGroup sync.WaitGroup - enqueueKeys []int - // requestsToServe contains all single-range sub-requests that have yet // to be served. requestsToServe requestsProvider @@ -266,7 +257,7 @@ type Streamer struct { // It is allocated lazily if Hints.SingleRowLookup is false when the // first ScanRequest is encountered in Enqueue. // TODO(yuzefovich): perform memory accounting for this. - numRangesPerScanRequest []int + numRangesPerScanRequest []int32 // numRequestsInFlight tracks the number of single-range batches that // are currently being served asynchronously (i.e. those that have @@ -378,16 +369,7 @@ func (s *Streamer) Init( } // Enqueue dispatches multiple requests for execution. Results are delivered -// through the GetResults call. If enqueueKeys is not nil, it needs to contain -// one ID for each request; responses will reference that ID so that the client -// can associate them to the requests. If enqueueKeys is nil, then the responses -// will reference the ordinals of the corresponding requests among reqs. -// -// Multiple requests can specify the same key. In this case, their respective -// responses will also reference the same key. This is useful, for example, for -// "range-based lookup joins" where multiple spans are read in the context of -// the same input-side row (see multiSpanGenerator implementation of -// rowexec.joinReaderSpanGenerator interface for more details). +// through the GetResults call. // // The Streamer takes over the given requests, will perform the memory // accounting against its budget and might modify the requests in place. @@ -405,9 +387,7 @@ func (s *Streamer) Init( // Currently, enqueuing new requests while there are still requests in progress // from the previous invocation is prohibited. // TODO(yuzefovich): lift this restriction and introduce the pipelining. -func (s *Streamer) Enqueue( - ctx context.Context, reqs []roachpb.RequestUnion, enqueueKeys []int, -) (retErr error) { +func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (retErr error) { if !s.coordinatorStarted { var coordinatorCtx context.Context coordinatorCtx, s.coordinatorCtxCancel = s.stopper.WithCancelOnQuiesce(ctx) @@ -436,11 +416,6 @@ func (s *Streamer) Enqueue( } }() - if enqueueKeys != nil && len(enqueueKeys) != len(reqs) { - return errors.AssertionFailedf("invalid enqueueKeys: len(reqs) = %d, len(enqueueKeys) = %d", len(reqs), len(enqueueKeys)) - } - s.enqueueKeys = enqueueKeys - if err := s.results.init(ctx, len(reqs)); err != nil { return err } @@ -486,7 +461,7 @@ func (s *Streamer) Enqueue( if err != nil { return err } - var subRequestIdx []int + var subRequestIdx []int32 if !s.hints.SingleRowLookup { for i, pos := range positions { if _, isScan := reqs[pos].GetInner().(*roachpb.ScanRequest); isScan { @@ -497,20 +472,20 @@ func (s *Streamer) Enqueue( streamerLocked = true s.mu.Lock() if cap(s.mu.numRangesPerScanRequest) < len(reqs) { - s.mu.numRangesPerScanRequest = make([]int, len(reqs)) + s.mu.numRangesPerScanRequest = make([]int32, len(reqs)) } else { // We can reuse numRangesPerScanRequest allocated on // the previous call to Enqueue after we zero it // out. s.mu.numRangesPerScanRequest = s.mu.numRangesPerScanRequest[:len(reqs)] for n := 0; n < len(s.mu.numRangesPerScanRequest); { - n += copy(s.mu.numRangesPerScanRequest[n:], zeroIntSlice) + n += copy(s.mu.numRangesPerScanRequest[n:], zeroInt32Slice) } } } if s.mode == InOrder { if subRequestIdx == nil { - subRequestIdx = make([]int, len(singleRangeReqs)) + subRequestIdx = make([]int32, len(singleRangeReqs)) } subRequestIdx[i] = s.mu.numRangesPerScanRequest[pos] } @@ -1280,11 +1255,7 @@ func (w *workerCoordinator) processSingleRangeResults( var memoryTokensBytes int64 for i, resp := range br.Responses { position := req.positions[i] - enqueueKey := position - if w.s.enqueueKeys != nil { - enqueueKey = w.s.enqueueKeys[position] - } - var subRequestIdx int + var subRequestIdx int32 if req.subRequestIdx != nil { subRequestIdx = req.subRequestIdx[i] } @@ -1318,13 +1289,10 @@ func (w *workerCoordinator) processSingleRangeResults( ) } result := Result{ - GetResp: get, - // This currently only works because all requests are - // unique. - EnqueueKeysSatisfied: []int{enqueueKey}, - Position: position, - subRequestIdx: subRequestIdx, - subRequestDone: true, + GetResp: get, + Position: position, + subRequestIdx: subRequestIdx, + subRequestDone: true, } result.memoryTok.streamer = w.s result.memoryTok.toRelease = getResponseSize(get) @@ -1354,21 +1322,18 @@ func (w *workerCoordinator) processSingleRangeResults( // want to be able to set Complete field on such an empty // Result). result := Result{ - // This currently only works because all requests - // are unique. - EnqueueKeysSatisfied: []int{enqueueKey}, - Position: position, - subRequestIdx: subRequestIdx, - subRequestDone: scan.ResumeSpan == nil, + Position: position, + subRequestIdx: subRequestIdx, + subRequestDone: scan.ResumeSpan == nil, } result.memoryTok.streamer = w.s result.memoryTok.toRelease = scanResponseSize(scan) memoryTokensBytes += result.memoryTok.toRelease - result.ScanResp.ScanResponse = scan + result.ScanResp = scan if w.s.hints.SingleRowLookup { - // When SingleRowLookup is false, Complete field will be set - // in finalizeSingleRangeResults(). - result.ScanResp.Complete = true + // When SingleRowLookup is false, scanComplete field will be + // set in finalizeSingleRangeResults(). + result.scanComplete = true } results = append(results, result) hasNonEmptyScanResponse = true @@ -1482,14 +1447,14 @@ func (w *workerCoordinator) finalizeSingleRangeResults( // field has already been set correctly. if hasNonEmptyScanResponse && !w.s.hints.SingleRowLookup { for i := range results { - if results[i].ScanResp.ScanResponse != nil { + if results[i].ScanResp != nil { if results[i].ScanResp.ResumeSpan == nil { // The scan within the range is complete. if w.s.mode == OutOfOrder { w.s.mu.numRangesPerScanRequest[results[i].Position]-- if w.s.mu.numRangesPerScanRequest[results[i].Position] == 0 { // The scan across all ranges is now complete too. - results[i].ScanResp.Complete = true + results[i].scanComplete = true } } else { // In InOrder mode, the scan is marked as complete when @@ -1499,7 +1464,7 @@ func (w *workerCoordinator) finalizeSingleRangeResults( // Result until the previous sub-requests are responded // to. numSubRequests := w.s.mu.numRangesPerScanRequest[results[i].Position] - results[i].ScanResp.Complete = results[i].subRequestIdx+1 == numSubRequests + results[i].scanComplete = results[i].subRequestIdx+1 == numSubRequests } } else { // Unset the ResumeSpan on the result in order to not @@ -1524,10 +1489,10 @@ func (w *workerCoordinator) finalizeSingleRangeResults( w.s.results.add(results) } -var zeroIntSlice []int +var zeroInt32Slice []int32 func init() { - zeroIntSlice = make([]int, 1<<10) + zeroInt32Slice = make([]int32, 1<<10) } const requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{})) diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index c8dfab68773d..df2939656cc4 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -70,16 +70,6 @@ func TestStreamerLimitations(t *testing.T) { }) }) - t.Run("invalid enqueueKeys", func(t *testing.T) { - streamer := getStreamer() - defer streamer.Close(ctx) - streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */, nil /* diskBuffer */) - // Use a single request but two keys which is invalid. - reqs := []roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Get{}}} - enqueueKeys := []int{0, 1} - require.Error(t, streamer.Enqueue(ctx, reqs, enqueueKeys)) - }) - t.Run("pipelining unsupported", func(t *testing.T) { streamer := getStreamer() defer streamer.Close(ctx) @@ -90,10 +80,10 @@ func TestStreamerLimitations(t *testing.T) { Get: get.(*roachpb.GetRequest), }, }} - require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + require.NoError(t, streamer.Enqueue(ctx, reqs)) // It is invalid to enqueue more requests before the previous have been // responded to. - require.Error(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + require.Error(t, streamer.Enqueue(ctx, reqs)) }) t.Run("unexpected RootTxn", func(t *testing.T) { @@ -195,7 +185,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) { // A single request that exceeds the limit should be allowed. reqs := make([]roachpb.RequestUnion, 1) reqs[0] = makeGetRequest(limitBytes + 1) - require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + require.NoError(t, streamer.Enqueue(ctx, reqs)) }) t.Run("single key exceeds root pool size", func(t *testing.T) { @@ -207,7 +197,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) { // should be denied. reqs := make([]roachpb.RequestUnion, 1) reqs[0] = makeGetRequest(rootPoolSize + 1) - require.Error(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + require.Error(t, streamer.Enqueue(ctx, reqs)) }) t.Run("multiple keys exceed limit", func(t *testing.T) { @@ -219,7 +209,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) { reqs := make([]roachpb.RequestUnion, 2) reqs[0] = makeGetRequest(limitBytes/2 + 1) reqs[1] = makeGetRequest(limitBytes/2 + 1) - require.Error(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + require.Error(t, streamer.Enqueue(ctx, reqs)) }) } @@ -434,7 +424,7 @@ func TestStreamerEmptyScans(t *testing.T) { // Scan the row with pk=0. reqs := make([]roachpb.RequestUnion, 1) reqs[0] = makeScanRequest(0, 1) - require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + require.NoError(t, streamer.Enqueue(ctx, reqs)) results, err := streamer.GetResults(ctx) require.NoError(t, err) // We expect a single empty Scan response. @@ -448,7 +438,7 @@ func TestStreamerEmptyScans(t *testing.T) { // Scan the rows with pk in range [1, 4). reqs := make([]roachpb.RequestUnion, 1) reqs[0] = makeScanRequest(1, 4) - require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) + require.NoError(t, streamer.Enqueue(ctx, reqs)) // We expect an empty response for each range. var numResults int for { diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 94330314d13a..f83a21eddece 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -41,6 +41,7 @@ var useStreamerEnabled = settings.RegisterBoolSetting( type TxnKVStreamer struct { streamer *kvstreamer.Streamer spans roachpb.Spans + spanIDs []int // getResponseScratch is reused to return the result of Get requests. getResponseScratch [1]roachpb.KeyValue @@ -48,9 +49,6 @@ type TxnKVStreamer struct { results []kvstreamer.Result lastResultState struct { kvstreamer.Result - // numEmitted tracks the number of times this result has been fully - // emitted. - numEmitted int // Used only for ScanResponses. remainingBatches [][]byte } @@ -71,15 +69,23 @@ func NewTxnKVStreamer( } keyLocking := getKeyLockingStrength(lockStrength) reqs := spansToRequests(spans, false /* reverse */, keyLocking) - if err := streamer.Enqueue(ctx, reqs, spanIDs); err != nil { + if err := streamer.Enqueue(ctx, reqs); err != nil { return nil, err } return &TxnKVStreamer{ streamer: streamer, spans: spans, + spanIDs: spanIDs, }, nil } +func (f *TxnKVStreamer) getSpanID(resultPosition int) int { + if f.spanIDs == nil { + return resultPosition + } + return f.spanIDs[resultPosition] +} + // proceedWithLastResult processes the result which must be already set on the // lastResultState and emits the first part of the response (the only part for // GetResponses). @@ -89,7 +95,7 @@ func (f *TxnKVStreamer) proceedWithLastResult( result := f.lastResultState.Result ret := kvBatchFetcherResponse{ moreKVs: true, - spanID: result.EnqueueKeysSatisfied[f.lastResultState.numEmitted], + spanID: f.getSpanID(result.Position), } if get := result.GetResp; get != nil { // No need to check get.IntentValue since the Streamer guarantees that @@ -100,7 +106,6 @@ func (f *TxnKVStreamer) proceedWithLastResult( return true, kvBatchFetcherResponse{}, nil } origSpan := f.spans[result.Position] - f.lastResultState.numEmitted++ f.getResponseScratch[0] = roachpb.KeyValue{Key: origSpan.Key, Value: *get.Value} ret.kvs = f.getResponseScratch[:] return false, ret, nil @@ -109,9 +114,6 @@ func (f *TxnKVStreamer) proceedWithLastResult( if len(scan.BatchResponses) > 0 { ret.batchResponse, f.lastResultState.remainingBatches = scan.BatchResponses[0], scan.BatchResponses[1:] } - if len(f.lastResultState.remainingBatches) == 0 { - f.lastResultState.numEmitted++ - } // We're consciously ignoring scan.Rows argument since the Streamer // guarantees to always produce Scan responses using BATCH_RESPONSE format. // @@ -131,34 +133,19 @@ func (f *TxnKVStreamer) nextBatch(ctx context.Context) (kvBatchFetcherResponse, if len(f.lastResultState.remainingBatches) > 0 { ret := kvBatchFetcherResponse{ moreKVs: true, - spanID: f.lastResultState.Result.EnqueueKeysSatisfied[f.lastResultState.numEmitted], + spanID: f.getSpanID(f.lastResultState.Result.Position), } ret.batchResponse, f.lastResultState.remainingBatches = f.lastResultState.remainingBatches[0], f.lastResultState.remainingBatches[1:] - if len(f.lastResultState.remainingBatches) == 0 { - f.lastResultState.numEmitted++ - } return ret, nil } - // Check whether the current result satisfies multiple requests. - if f.lastResultState.numEmitted < len(f.lastResultState.EnqueueKeysSatisfied) { - // Note that we should never get an error here since we're processing - // the same result again. - var err error - _, ret, err := f.proceedWithLastResult(ctx) - return ret, err - } - // Release the current result. - if f.lastResultState.numEmitted == len(f.lastResultState.EnqueueKeysSatisfied) && f.lastResultState.numEmitted > 0 { - f.releaseLastResult(ctx) - } + f.releaseLastResult(ctx) // Process the next result we have already received from the streamer. for len(f.results) > 0 { // Peel off the next result and set it into lastResultState. f.lastResultState.Result = f.results[0] - f.lastResultState.numEmitted = 0 f.lastResultState.remainingBatches = nil // Lose the reference to that result and advance the results slice for // the next iteration. diff --git a/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go b/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go index 118ccc8ce5bc..42871d970eca 100644 --- a/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go +++ b/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go @@ -146,8 +146,8 @@ func (b *kvStreamerResultDiskBuffer) Close(ctx context.Context) { // inOrderResultsBufferSpillTypeSchema is the type schema of a single // kvstreamer.Result that is spilled to disk. // -// It contains all the information except for 'ScanResp.Complete', 'memoryTok', -// 'Position', 'subRequestIdx', and 'subRequestDone' fields which are kept +// It contains all the information except for 'Position', 'memoryTok', +// 'subRequestIdx', 'subRequestDone', and 'scanComplete' fields which are kept // in-memory (because they are allocated in // kvstreamer.inOrderBufferedResult.Result anyway). var inOrderResultsBufferSpillTypeSchema = []*types.T{ @@ -162,7 +162,6 @@ var inOrderResultsBufferSpillTypeSchema = []*types.T{ // ScanResp: // BatchResponses [][]byte types.BytesArray, - types.IntArray, // EnqueueKeysSatisfied []int } type resultSerializationIndex int @@ -174,7 +173,6 @@ const ( getTSLogicalIdx getTSSyntheticIdx scanBatchResponsesIdx - enqueueKeysSatisfiedIdx ) // serialize writes the serialized representation of the kvstreamer.Result into @@ -209,14 +207,6 @@ func serialize(r *kvstreamer.Result, row rowenc.EncDatumRow, alloc *tree.DatumAl row[scanBatchResponsesIdx] = rowenc.EncDatum{Datum: batchResponses} } } - enqueueKeysSatisfied := tree.NewDArray(types.Int) - enqueueKeysSatisfied.Array = make(tree.Datums, 0, len(r.EnqueueKeysSatisfied)) - for _, k := range r.EnqueueKeysSatisfied { - if err := enqueueKeysSatisfied.Append(alloc.NewDInt(tree.DInt(k))); err != nil { - return err - } - } - row[enqueueKeysSatisfiedIdx] = rowenc.EncDatum{Datum: enqueueKeysSatisfied} return nil } @@ -224,8 +214,8 @@ func serialize(r *kvstreamer.Result, row rowenc.EncDatumRow, alloc *tree.DatumAl // state of the kvstreamer.Result according to // inOrderResultsBufferSpillTypeSchema. // -// 'ScanResp.Complete', 'memoryTok', and 'position' fields are left unchanged -// since those aren't serialized. +// 'Position', 'memoryTok', 'subRequestIdx', 'subRequestDone', and +// 'scanComplete' fields are left unchanged since those aren't serialized. func deserialize(r *kvstreamer.Result, row rowenc.EncDatumRow, alloc *tree.DatumAlloc) error { for i := range row { if err := row[i].EnsureDecoded(inOrderResultsBufferSpillTypeSchema[i], alloc); err != nil { @@ -245,17 +235,12 @@ func deserialize(r *kvstreamer.Result, row rowenc.EncDatumRow, alloc *tree.Datum } } } else { - r.ScanResp.ScanResponse = &roachpb.ScanResponse{} + r.ScanResp = &roachpb.ScanResponse{} batchResponses := tree.MustBeDArray(row[scanBatchResponsesIdx].Datum) - r.ScanResp.ScanResponse.BatchResponses = make([][]byte, batchResponses.Len()) + r.ScanResp.BatchResponses = make([][]byte, batchResponses.Len()) for i := range batchResponses.Array { - r.ScanResp.ScanResponse.BatchResponses[i] = []byte(tree.MustBeDBytes(batchResponses.Array[i])) + r.ScanResp.BatchResponses[i] = []byte(tree.MustBeDBytes(batchResponses.Array[i])) } } - enqueueKeysSatisfied := tree.MustBeDArray(row[enqueueKeysSatisfiedIdx].Datum) - r.EnqueueKeysSatisfied = make([]int, enqueueKeysSatisfied.Len()) - for i := range enqueueKeysSatisfied.Array { - r.EnqueueKeysSatisfied[i] = int(tree.MustBeDInt(enqueueKeysSatisfied.Array[i])) - } return nil } diff --git a/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer_test.go b/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer_test.go index 006a84b04d94..c6a9d1d61ce2 100644 --- a/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer_test.go +++ b/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer_test.go @@ -27,8 +27,8 @@ import ( // TestRoundTripResult verifies that we can serialize and deserialize a Result // without any corruption. Note that fields that are kept in-memory -// ('ScanResp.Complete', 'memoryTok', and 'position') aren't set on the test -// Results. +// ('Position', 'memoryTok', 'subRequestIdx', 'subRequestDone', and +// 'scanComplete') aren't set on the test Results. func TestRoundTripResult(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -60,13 +60,6 @@ func TestRoundTripResult(t *testing.T) { }) } -func fillEnqueueKeys(r *kvstreamer.Result, rng *rand.Rand) { - r.EnqueueKeysSatisfied = make([]int, rng.Intn(20)+1) - for i := range r.EnqueueKeysSatisfied { - r.EnqueueKeysSatisfied[i] = rng.Int() - } -} - func makeResultWithGetResp(rng *rand.Rand, empty bool) kvstreamer.Result { var r kvstreamer.Result r.GetResp = &roachpb.GetResponse{} @@ -82,7 +75,6 @@ func makeResultWithGetResp(rng *rand.Rand, empty bool) kvstreamer.Result { }, } } - fillEnqueueKeys(&r, rng) return r } @@ -95,9 +87,8 @@ func makeResultWithScanResp(rng *rand.Rand) kvstreamer.Result { rng.Read(batchResponse) batchResponses[i] = batchResponse } - r.ScanResp.ScanResponse = &roachpb.ScanResponse{ + r.ScanResp = &roachpb.ScanResponse{ BatchResponses: batchResponses, } - fillEnqueueKeys(&r, rng) return r }