Skip to content

Commit

Permalink
kvstreamer: clean up Result struct
Browse files Browse the repository at this point in the history
This commit cleans up the `Result` struct in order to reduce its memory
size, bringing it down into 48 byte size class.
```
name                             old time/op    new time/op    delta
IndexJoin/Cockroach-24             6.29ms ± 1%    6.22ms ± 2%   -1.06%  (p=0.024 n=9+9)
IndexJoin/MultinodeCockroach-24    7.99ms ± 1%    7.93ms ± 2%     ~     (p=0.165 n=10+10)

name                             old alloc/op   new alloc/op   delta
IndexJoin/Cockroach-24             1.64MB ± 1%    1.48MB ± 0%   -9.25%  (p=0.000 n=9+10)
IndexJoin/MultinodeCockroach-24    2.37MB ± 1%    2.20MB ± 1%   -7.06%  (p=0.000 n=10+10)

name                             old allocs/op  new allocs/op  delta
IndexJoin/Cockroach-24              8.15k ± 1%     7.15k ± 1%  -12.28%  (p=0.000 n=8+10)
IndexJoin/MultinodeCockroach-24     12.7k ± 1%     11.7k ± 1%   -8.18%  (p=0.000 n=10+10)
```

The main change of this commit is the removal of the concept of "enqueue
keys" from the Streamer API in favor of relying on `Result.Position`.
When requests are unique, then a single `Result` can only satisfy
a single enqueue key; however, for non-unique requests a single `Result`
can satisfy multiple requests and, thus, can have multiple enqueue keys.
At the moment, only unique requests are supported though. Once
non-unique requests are supported too, we'll need to figure out how to
handle those (maybe we'll be returning a `Result` `N` number of times if
it satisfies `N` original requests with different values for
`Position`).

Also, initially multiple "enqueue keys" support was envisioned for the
case of `multiSpanGenerator` in the lookup joins (i.e. multi-equality
lookup joins); however, I believe we should push that complexity out of
the streamer (into `TxnKVStreamer`) which is what this commit does.

Other changes done in this commit:
- unexport `ScanResp.Complete` field since this is currently only
used within the `kvstreamer` package
- reorder all existing fields so that the footprint of the struct is
minimized (in particular, `scanComplete` field is moved to the bottom
and `ScanResp` anonymous struct is removed)
- make `subRequestIdx` `int32` rather than `int`. This value is bound
by the number of ranges in the cluster, so max int32 is more than
sufficient.

Release note: None
  • Loading branch information
yuzefovich committed Jun 3, 2022
1 parent cf36f89 commit 3a05903
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 180 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type singleRangeBatch struct {
//
// subRequestIdx is only allocated in InOrder mode when
// Hints.SingleRowLookup is false and some Scan requests were enqueued.
subRequestIdx []int
subRequestIdx []int32
// reqsReservedBytes tracks the memory reservation against the budget for
// the memory usage of reqs.
reqsReservedBytes int64
Expand Down Expand Up @@ -114,7 +114,7 @@ func (r singleRangeBatch) priority() int {
// compared when two batches have the same priority value.
//
// It is invalid to call this method on a batch with no requests.
func (r singleRangeBatch) subPriority() int {
func (r singleRangeBatch) subPriority() int32 {
if r.subRequestIdx == nil {
return 0
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (b *resultsBufferBase) initLocked(isEmpty bool, numExpectedResponses int) e

func (b *resultsBufferBase) findCompleteResponses(results []Result) {
for i := range results {
if results[i].GetResp != nil || results[i].ScanResp.Complete {
if results[i].GetResp != nil || results[i].scanComplete {
b.numCompleteResponses++
}
}
Expand Down Expand Up @@ -320,7 +320,7 @@ type inOrderResultsBuffer struct {
// multiple ranges - in such a scenario, multiple Results are created. For
// Get requests and for single-range Scan requests this will always stay at
// zero.
headOfLineSubRequestIdx int
headOfLineSubRequestIdx int32
// buffered contains all buffered Results, regardless of whether they are
// stored in-memory or on disk.
buffered []inOrderBufferedResult
Expand Down Expand Up @@ -475,7 +475,7 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error)
// of the sub-request.
b.headOfLineSubRequestIdx++
}
if result.GetResp != nil || result.ScanResp.Complete {
if result.GetResp != nil || result.scanComplete {
// If the current Result is complete, then we need to advance the
// head-of-the-line position.
b.headOfLinePosition++
Expand Down Expand Up @@ -595,8 +595,9 @@ func (b *inOrderResultsBuffer) close(ctx context.Context) {
// inOrderBufferedResult describes a single Result for InOrder mode, regardless
// of where it is stored (in-memory or on disk).
type inOrderBufferedResult struct {
// If onDisk is true, then only Result.ScanResp.Complete, Result.memoryTok,
// Result.Position, Result.subRequestIdx, and Result.subRequestDone are set.
// If onDisk is true, then only Result.Position, Result.memoryTok,
// Result.subRequestIdx, Result.subRequestDone, and Result.scanComplete are
// set.
Result
// addEpoch indicates the value of addCounter variable when this result was
// added to the buffer. This "epoch" allows us to order correctly two
Expand All @@ -623,7 +624,7 @@ type inOrderBufferedResult struct {
// spill updates r to represent a result that has been spilled to disk and is
// identified by the provided ordinal in the disk buffer.
func (r *inOrderBufferedResult) spill(diskResultID int) {
isScanComplete := r.ScanResp.Complete
isScanComplete := r.scanComplete
*r = inOrderBufferedResult{
Result: Result{
memoryTok: r.memoryTok,
Expand All @@ -635,7 +636,7 @@ func (r *inOrderBufferedResult) spill(diskResultID int) {
onDisk: true,
diskResultID: diskResultID,
}
r.ScanResp.Complete = isScanComplete
r.scanComplete = isScanComplete
}

// get returns the Result, deserializing it from disk if necessary. toConsume
Expand Down
15 changes: 3 additions & 12 deletions pkg/kv/kvclient/kvstreamer/results_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func TestInOrderResultsBuffer(t *testing.T) {
}
for j := 0; j < numRanges; j++ {
scan := makeResultWithScanResp(rng)
scan.ScanResp.Complete = j+1 == numRanges
scan.scanComplete = j+1 == numRanges
scan.memoryTok.toRelease = rng.Int63n(100)
scan.Position = i
scan.subRequestIdx = j
scan.subRequestIdx = int32(j)
scan.subRequestDone = true
results = append(results, scan)
}
Expand Down Expand Up @@ -160,13 +160,6 @@ func TestInOrderResultsBuffer(t *testing.T) {
}
}

func fillEnqueueKeys(r *Result, rng *rand.Rand) {
r.EnqueueKeysSatisfied = make([]int, rng.Intn(20)+1)
for i := range r.EnqueueKeysSatisfied {
r.EnqueueKeysSatisfied[i] = rng.Int()
}
}

func makeResultWithGetResp(rng *rand.Rand, empty bool) Result {
var r Result
r.GetResp = &roachpb.GetResponse{}
Expand All @@ -182,7 +175,6 @@ func makeResultWithGetResp(rng *rand.Rand, empty bool) Result {
},
}
}
fillEnqueueKeys(&r, rng)
return r
}

Expand All @@ -195,9 +187,8 @@ func makeResultWithScanResp(rng *rand.Rand) Result {
rng.Read(batchResponse)
batchResponses[i] = batchResponse
}
r.ScanResp.ScanResponse = &roachpb.ScanResponse{
r.ScanResp = &roachpb.ScanResponse{
BatchResponses: batchResponses,
}
fillEnqueueKeys(&r, rng)
return r
}
129 changes: 47 additions & 82 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,34 +80,10 @@ type Result struct {
// is false). In that case, there will be a further result with the
// continuation; that result will use the same Key. Notably, SQL rows will
// never be split across multiple results.
ScanResp struct {
// The response is always using BATCH_RESPONSE format (meaning that Rows
// field is always nil). IntentRows field is also nil.
*roachpb.ScanResponse
// If the Result represents a scan result, Complete indicates whether
// this is the last response for the respective scan, or if there are
// more responses to come. In any case, ScanResp never contains partial
// rows (i.e. a single row is never split into different Results).
//
// When running in InOrder mode, Results for a single scan will be
// delivered in key order (in addition to results for different scans
// being delivered in request order). When running in OutOfOrder mode,
// Results for a single scan can be delivered out of key order (in
// addition to results for different scans being delivered out of
// request order).
Complete bool
}
// EnqueueKeysSatisfied identifies the requests that this Result satisfies.
// In OutOfOrder mode, a single Result can satisfy multiple identical
// requests. In InOrder mode a Result can only satisfy multiple consecutive
// requests.
EnqueueKeysSatisfied []int
// memoryTok describes the memory reservation of this Result that needs to
// be released back to the Streamer's budget when the Result is Release()'d.
memoryTok struct {
streamer *Streamer
toRelease int64
}
//
// The response is always using BATCH_RESPONSE format (meaning that Rows
// field is always nil). IntentRows field is also nil.
ScanResp *roachpb.ScanResponse
// Position tracks the ordinal among all originally enqueued requests that
// this result satisfies. See singleRangeBatch.positions for more details.
//
Expand All @@ -118,11 +94,17 @@ type Result struct {
// TODO(yuzefovich): this might need to be []int when non-unique requests
// are supported.
Position int
// memoryTok describes the memory reservation of this Result that needs to
// be released back to the Streamer's budget when the Result is Release()'d.
memoryTok struct {
streamer *Streamer
toRelease int64
}
// subRequestIdx allows us to order two Results that come for the same
// original Scan request but from different ranges. It is non-zero only in
// InOrder mode when Hints.SingleRowLookup is false, in all other cases it
// will remain zero. See singleRangeBatch.subRequestIdx for more details.
subRequestIdx int
subRequestIdx int32
// subRequestDone is true if the current Result is the last one for the
// corresponding sub-request. For all Get requests and for Scan requests
// contained within a single range, it is always true since those can only
Expand All @@ -132,6 +114,17 @@ type Result struct {
// properly if this Result is a Scan response and Hints.SingleRowLookup is
// false.
subRequestDone bool
// If the Result represents a scan result, scanComplete indicates whether
// this is the last response for the respective scan, or if there are more
// responses to come. In any case, ScanResp never contains partial rows
// (i.e. a single row is never split into different Results).
//
// When running in InOrder mode, Results for a single scan will be delivered
// in key order (in addition to results for different scans being delivered
// in request order). When running in OutOfOrder mode, Results for a single
// scan can be delivered out of key order (in addition to results for
// different scans being delivered out of request order).
scanComplete bool
}

// Hints provides different hints to the Streamer for optimization purposes.
Expand Down Expand Up @@ -189,7 +182,7 @@ func (r Result) Release(ctx context.Context) {
// }
// // All previously enqueued requests have already been responded to.
// if moreRequestsToEnqueue {
// err := s.Enqueue(ctx, requests, enqueueKeys)
// err := s.Enqueue(ctx, requests)
// // err check
// ...
// } else {
Expand Down Expand Up @@ -238,8 +231,6 @@ type Streamer struct {

waitGroup sync.WaitGroup

enqueueKeys []int

// requestsToServe contains all single-range sub-requests that have yet
// to be served.
requestsToServe requestsProvider
Expand All @@ -266,7 +257,7 @@ type Streamer struct {
// It is allocated lazily if Hints.SingleRowLookup is false when the
// first ScanRequest is encountered in Enqueue.
// TODO(yuzefovich): perform memory accounting for this.
numRangesPerScanRequest []int
numRangesPerScanRequest []int32

// numRequestsInFlight tracks the number of single-range batches that
// are currently being served asynchronously (i.e. those that have
Expand Down Expand Up @@ -378,16 +369,7 @@ func (s *Streamer) Init(
}

// Enqueue dispatches multiple requests for execution. Results are delivered
// through the GetResults call. If enqueueKeys is not nil, it needs to contain
// one ID for each request; responses will reference that ID so that the client
// can associate them to the requests. If enqueueKeys is nil, then the responses
// will reference the ordinals of the corresponding requests among reqs.
//
// Multiple requests can specify the same key. In this case, their respective
// responses will also reference the same key. This is useful, for example, for
// "range-based lookup joins" where multiple spans are read in the context of
// the same input-side row (see multiSpanGenerator implementation of
// rowexec.joinReaderSpanGenerator interface for more details).
// through the GetResults call.
//
// The Streamer takes over the given requests, will perform the memory
// accounting against its budget and might modify the requests in place.
Expand All @@ -405,9 +387,7 @@ func (s *Streamer) Init(
// Currently, enqueuing new requests while there are still requests in progress
// from the previous invocation is prohibited.
// TODO(yuzefovich): lift this restriction and introduce the pipelining.
func (s *Streamer) Enqueue(
ctx context.Context, reqs []roachpb.RequestUnion, enqueueKeys []int,
) (retErr error) {
func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (retErr error) {
if !s.coordinatorStarted {
var coordinatorCtx context.Context
coordinatorCtx, s.coordinatorCtxCancel = s.stopper.WithCancelOnQuiesce(ctx)
Expand Down Expand Up @@ -436,11 +416,6 @@ func (s *Streamer) Enqueue(
}
}()

if enqueueKeys != nil && len(enqueueKeys) != len(reqs) {
return errors.AssertionFailedf("invalid enqueueKeys: len(reqs) = %d, len(enqueueKeys) = %d", len(reqs), len(enqueueKeys))
}
s.enqueueKeys = enqueueKeys

if err := s.results.init(ctx, len(reqs)); err != nil {
return err
}
Expand Down Expand Up @@ -486,7 +461,7 @@ func (s *Streamer) Enqueue(
if err != nil {
return err
}
var subRequestIdx []int
var subRequestIdx []int32
if !s.hints.SingleRowLookup {
for i, pos := range positions {
if _, isScan := reqs[pos].GetInner().(*roachpb.ScanRequest); isScan {
Expand All @@ -497,20 +472,20 @@ func (s *Streamer) Enqueue(
streamerLocked = true
s.mu.Lock()
if cap(s.mu.numRangesPerScanRequest) < len(reqs) {
s.mu.numRangesPerScanRequest = make([]int, len(reqs))
s.mu.numRangesPerScanRequest = make([]int32, len(reqs))
} else {
// We can reuse numRangesPerScanRequest allocated on
// the previous call to Enqueue after we zero it
// out.
s.mu.numRangesPerScanRequest = s.mu.numRangesPerScanRequest[:len(reqs)]
for n := 0; n < len(s.mu.numRangesPerScanRequest); {
n += copy(s.mu.numRangesPerScanRequest[n:], zeroIntSlice)
n += copy(s.mu.numRangesPerScanRequest[n:], zeroInt32Slice)
}
}
}
if s.mode == InOrder {
if subRequestIdx == nil {
subRequestIdx = make([]int, len(singleRangeReqs))
subRequestIdx = make([]int32, len(singleRangeReqs))
}
subRequestIdx[i] = s.mu.numRangesPerScanRequest[pos]
}
Expand Down Expand Up @@ -1280,11 +1255,7 @@ func (w *workerCoordinator) processSingleRangeResults(
var memoryTokensBytes int64
for i, resp := range br.Responses {
position := req.positions[i]
enqueueKey := position
if w.s.enqueueKeys != nil {
enqueueKey = w.s.enqueueKeys[position]
}
var subRequestIdx int
var subRequestIdx int32
if req.subRequestIdx != nil {
subRequestIdx = req.subRequestIdx[i]
}
Expand Down Expand Up @@ -1318,13 +1289,10 @@ func (w *workerCoordinator) processSingleRangeResults(
)
}
result := Result{
GetResp: get,
// This currently only works because all requests are
// unique.
EnqueueKeysSatisfied: []int{enqueueKey},
Position: position,
subRequestIdx: subRequestIdx,
subRequestDone: true,
GetResp: get,
Position: position,
subRequestIdx: subRequestIdx,
subRequestDone: true,
}
result.memoryTok.streamer = w.s
result.memoryTok.toRelease = getResponseSize(get)
Expand Down Expand Up @@ -1354,21 +1322,18 @@ func (w *workerCoordinator) processSingleRangeResults(
// want to be able to set Complete field on such an empty
// Result).
result := Result{
// This currently only works because all requests
// are unique.
EnqueueKeysSatisfied: []int{enqueueKey},
Position: position,
subRequestIdx: subRequestIdx,
subRequestDone: scan.ResumeSpan == nil,
Position: position,
subRequestIdx: subRequestIdx,
subRequestDone: scan.ResumeSpan == nil,
}
result.memoryTok.streamer = w.s
result.memoryTok.toRelease = scanResponseSize(scan)
memoryTokensBytes += result.memoryTok.toRelease
result.ScanResp.ScanResponse = scan
result.ScanResp = scan
if w.s.hints.SingleRowLookup {
// When SingleRowLookup is false, Complete field will be set
// in finalizeSingleRangeResults().
result.ScanResp.Complete = true
// When SingleRowLookup is false, scanComplete field will be
// set in finalizeSingleRangeResults().
result.scanComplete = true
}
results = append(results, result)
hasNonEmptyScanResponse = true
Expand Down Expand Up @@ -1482,14 +1447,14 @@ func (w *workerCoordinator) finalizeSingleRangeResults(
// field has already been set correctly.
if hasNonEmptyScanResponse && !w.s.hints.SingleRowLookup {
for i := range results {
if results[i].ScanResp.ScanResponse != nil {
if results[i].ScanResp != nil {
if results[i].ScanResp.ResumeSpan == nil {
// The scan within the range is complete.
if w.s.mode == OutOfOrder {
w.s.mu.numRangesPerScanRequest[results[i].Position]--
if w.s.mu.numRangesPerScanRequest[results[i].Position] == 0 {
// The scan across all ranges is now complete too.
results[i].ScanResp.Complete = true
results[i].scanComplete = true
}
} else {
// In InOrder mode, the scan is marked as complete when
Expand All @@ -1499,7 +1464,7 @@ func (w *workerCoordinator) finalizeSingleRangeResults(
// Result until the previous sub-requests are responded
// to.
numSubRequests := w.s.mu.numRangesPerScanRequest[results[i].Position]
results[i].ScanResp.Complete = results[i].subRequestIdx+1 == numSubRequests
results[i].scanComplete = results[i].subRequestIdx+1 == numSubRequests
}
} else {
// Unset the ResumeSpan on the result in order to not
Expand All @@ -1524,10 +1489,10 @@ func (w *workerCoordinator) finalizeSingleRangeResults(
w.s.results.add(results)
}

var zeroIntSlice []int
var zeroInt32Slice []int32

func init() {
zeroIntSlice = make([]int, 1<<10)
zeroInt32Slice = make([]int32, 1<<10)
}

const requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{}))
Expand Down
Loading

0 comments on commit 3a05903

Please sign in to comment.