From 71c482f8abaf95da5bf2249d125d4f860f2cc065 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 27 Jun 2022 09:11:58 -0700 Subject: [PATCH 1/2] kvstreamer: refactor the loop of processing the batch response This commit refactors the code which processes the batch response in order to separate out two different concerns: - processing non-empty responses in order to create `Result`s - processing incomplete responses to populate the resume request. Now each of these "concerns" is handled in a separate loop making it easier to reason about, especially so in the following commit. This commit also extracts out multiple return arguments of a function into a struct as well as updates some of the comments and moves some of the error-checking to an earlier stage. Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 482 ++++++++++++++----------- 1 file changed, 268 insertions(+), 214 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 760ccc91bcf1..17a04bf70403 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -79,10 +79,10 @@ type Result struct { // // GetResp is guaranteed to have nil IntentValue. GetResp *roachpb.GetResponse - // ScanResp can contain a partial response to a ScanRequest (when Complete - // is false). In that case, there will be a further result with the - // continuation; that result will use the same Key. Notably, SQL rows will - // never be split across multiple results. + // ScanResp can contain a partial response to a ScanRequest (when + // scanComplete is false). In that case, there will be a further result with + // the continuation; that result will use the same Key. Notably, SQL rows + // will never be split across multiple results. // // The response is always using BATCH_RESPONSE format (meaning that Rows // field is always nil). IntentRows field is also nil. @@ -1144,13 +1144,13 @@ func (w *workerCoordinator) performRequestAsync( // unnecessary blocking (due to sequential evaluation of sub-batches // by the DistSender). For the initial implementation it doesn't // seem important though. - br, err := w.txn.Send(ctx, ba) - if err != nil { + br, pErr := w.txn.Send(ctx, ba) + if pErr != nil { // TODO(yuzefovich): if err is // ReadWithinUncertaintyIntervalError and there is only a single // Streamer in a single local flow, attempt to transparently // refresh. - w.s.results.setError(err.GoError()) + w.s.results.setError(pErr.GoError()) return } atomic.AddInt64(w.s.atomics.batchRequestsIssued, 1) @@ -1161,13 +1161,17 @@ func (w *workerCoordinator) performRequestAsync( // if any are present. At the moment, due to limitations of the KV // layer (#75452) we cannot reuse original requests because the KV // doesn't allow mutability. - memoryFootprintBytes, resumeReqsMemUsage, numIncompleteGets, numIncompleteScans := calculateFootprint(req, br) + fp, err := calculateFootprint(req, br) + if err != nil { + w.s.results.setError(err) + return + } // Now adjust the budget based on the actual memory footprint of // non-empty responses as well as resume spans, if any. - respOverestimate := targetBytes - memoryFootprintBytes - reqOveraccounted := req.reqsReservedBytes - resumeReqsMemUsage - if resumeReqsMemUsage == 0 { + respOverestimate := targetBytes - fp.memoryFootprintBytes + reqOveraccounted := req.reqsReservedBytes - fp.resumeReqsMemUsage + if fp.resumeReqsMemUsage == 0 { // There will be no resume request, so we will lose the // reference to the req.reqs slice and can release its memory // reservation. @@ -1189,7 +1193,7 @@ func (w *workerCoordinator) performRequestAsync( // headOfLine must be true (targetBytes might be 1 or higher, // but not enough for that large row). toConsume := -overaccountedTotal - if err := w.s.budget.consume(ctx, toConsume, headOfLine /* allowDebt */); err != nil { + if err = w.s.budget.consume(ctx, toConsume, headOfLine /* allowDebt */); err != nil { atomic.AddInt64(&w.s.atomics.droppedBatchResponses, 1) w.s.budget.release(ctx, targetBytes) if !headOfLine { @@ -1225,7 +1229,7 @@ func (w *workerCoordinator) performRequestAsync( Priority: admissionpb.WorkPriority(w.requestAdmissionHeader.Priority), CreateTime: w.requestAdmissionHeader.CreateTime, } - if _, err := w.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil { + if _, err = w.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil { w.s.results.setError(err) return } @@ -1233,12 +1237,7 @@ func (w *workerCoordinator) performRequestAsync( // Finally, process the results and add the ResumeSpans to be // processed as well. - if err := w.processSingleRangeResults( - req, br, memoryFootprintBytes, resumeReqsMemUsage, - numIncompleteGets, numIncompleteScans, - ); err != nil { - w.s.results.setError(err) - } + processSingleRangeResponse(w.s, req, br, fp) }); err != nil { // The new goroutine for the request wasn't spun up, so we have to // perform the cleanup of this request ourselves. @@ -1247,20 +1246,28 @@ func (w *workerCoordinator) performRequestAsync( } } +// singleRangeBatchResponseFootprint is the footprint of the shape of the +// response to a singleRangeBatch. +type singleRangeBatchResponseFootprint struct { + // memoryFootprintBytes tracks the total memory footprint of non-empty + // responses. This will be equal to the sum of memory tokens created for all + // Results. + memoryFootprintBytes int64 + // resumeReqsMemUsage tracks the memory usage of the requests for the + // ResumeSpans. + resumeReqsMemUsage int64 + numIncompleteGets, numIncompleteScans int +} + +func (fp singleRangeBatchResponseFootprint) hasIncomplete() bool { + return fp.numIncompleteGets > 0 || fp.numIncompleteScans > 0 +} + // calculateFootprint calculates the memory footprint of the batch response as // well as of the requests that will have to be created for the ResumeSpans. -// - memoryFootprintBytes tracks the total memory footprint of non-empty -// responses. This will be equal to the sum of memory tokens created for all -// Results. -// - resumeReqsMemUsage tracks the memory usage of the requests for the -// ResumeSpans. func calculateFootprint( req singleRangeBatch, br *roachpb.BatchResponse, -) ( - memoryFootprintBytes int64, - resumeReqsMemUsage int64, - numIncompleteGets, numIncompleteScans int, -) { +) (fp singleRangeBatchResponseFootprint, _ error) { // Note that we cannot use Size() methods that are automatically generated // by the protobuf library because they account for things differently from // how the memory usage is accounted for by the KV layer for the purposes of @@ -1275,32 +1282,47 @@ func calculateFootprint( switch req.reqs[i].GetInner().(type) { case *roachpb.GetRequest: get := reply.(*roachpb.GetResponse) + if get.IntentValue != nil { + return fp, errors.AssertionFailedf( + "unexpectedly got an IntentValue back from a SQL GetRequest %v", *get.IntentValue, + ) + } if get.ResumeSpan != nil { // This Get wasn't completed. getRequestScratch.SetSpan(*get.ResumeSpan) - resumeReqsMemUsage += int64(getRequestScratch.Size()) - numIncompleteGets++ + fp.resumeReqsMemUsage += int64(getRequestScratch.Size()) + fp.numIncompleteGets++ } else { // This Get was completed. - memoryFootprintBytes += getResponseSize(get) + fp.memoryFootprintBytes += getResponseSize(get) } case *roachpb.ScanRequest: scan := reply.(*roachpb.ScanResponse) + if len(scan.Rows) > 0 { + return fp, errors.AssertionFailedf( + "unexpectedly got a ScanResponse using KEY_VALUES response format", + ) + } + if len(scan.IntentRows) > 0 { + return fp, errors.AssertionFailedf( + "unexpectedly got a ScanResponse with non-nil IntentRows", + ) + } if len(scan.BatchResponses) > 0 { - memoryFootprintBytes += scanResponseSize(scan) + fp.memoryFootprintBytes += scanResponseSize(scan) } if scan.ResumeSpan != nil { // This Scan wasn't completed. scanRequestScratch.SetSpan(*scan.ResumeSpan) - resumeReqsMemUsage += int64(scanRequestScratch.Size()) - numIncompleteScans++ + fp.resumeReqsMemUsage += int64(scanRequestScratch.Size()) + fp.numIncompleteScans++ } } } - return memoryFootprintBytes, resumeReqsMemUsage, numIncompleteGets, numIncompleteScans + return fp, nil } -// processSingleRangeResults creates a Result for each non-empty response found +// processSingleRangeResponse creates a Result for each non-empty response found // in the BatchResponse. The ResumeSpans, if found, are added into a new // singleRangeBatch request that is added to be picked up by the mainLoop of the // worker coordinator. This method assumes that req is no longer needed by the @@ -1308,35 +1330,31 @@ func calculateFootprint( // // It also assumes that the budget has already been reconciled with the // reservations for Results that will be created. -func (w *workerCoordinator) processSingleRangeResults( +func processSingleRangeResponse( + s *Streamer, req singleRangeBatch, br *roachpb.BatchResponse, - memoryFootprintBytes int64, - resumeReqsMemUsage int64, - numIncompleteGets, numIncompleteScans int, -) error { - numIncompleteRequests := numIncompleteGets + numIncompleteScans - var resumeReq singleRangeBatch - // We have to allocate the new Get and Scan requests, but we can reuse the - // reqs and the positions slices. - resumeReq.reqs = req.reqs[:numIncompleteRequests] - resumeReq.positions = req.positions[:0] - resumeReq.subRequestIdx = req.subRequestIdx[:0] - // We've already reconciled the budget with the actual reservation for the - // requests with the ResumeSpans. - resumeReq.reqsReservedBytes = resumeReqsMemUsage - resumeReq.overheadAccountedFor = req.overheadAccountedFor - gets := make([]struct { - req roachpb.GetRequest - union roachpb.RequestUnion_Get - }, numIncompleteGets) - scans := make([]struct { - req roachpb.ScanRequest - union roachpb.RequestUnion_Scan - }, numIncompleteScans) + fp singleRangeBatchResponseFootprint, +) { + processSingleRangeResults(s, req, br, fp) + if fp.hasIncomplete() { + resumeReq := buildResumeSingeRangeBatch(s, req, br, fp) + s.requestsToServe.add(resumeReq) + } +} + +// processSingleRangeResults examines the body of a BatchResponse and its +// associated singleRangeBatch to add any results. If there are no results, this +// function is a no-op. This function handles the associated bookkeeping on the +// streamer as it processes the results. +func processSingleRangeResults( + s *Streamer, + req singleRangeBatch, + br *roachpb.BatchResponse, + fp singleRangeBatchResponseFootprint, +) { var results []Result var hasNonEmptyScanResponse bool - var resumeReqIdx int // memoryTokensBytes accumulates all reservations that are made for all // Results created below. The accounting for these reservations has already // been performed, and memoryTokensBytes should be exactly equal to @@ -1349,188 +1367,90 @@ func (w *workerCoordinator) processSingleRangeResults( subRequestIdx = req.subRequestIdx[i] } reply := resp.GetInner() - switch origRequest := req.reqs[i].GetInner().(type) { + switch req.reqs[i].GetInner().(type) { case *roachpb.GetRequest: get := reply.(*roachpb.GetResponse) if get.ResumeSpan != nil { - // This Get wasn't completed - update the original - // request according to the ResumeSpan and include it - // into the batch again. - newGet := gets[0] - gets = gets[1:] - newGet.req.SetSpan(*get.ResumeSpan) - newGet.req.KeyLocking = origRequest.KeyLocking - newGet.union.Get = &newGet.req - resumeReq.reqs[resumeReqIdx].Value = &newGet.union - resumeReq.positions = append(resumeReq.positions, position) - if req.subRequestIdx != nil { - resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, subRequestIdx) - } - if resumeReq.minTargetBytes == 0 { - resumeReq.minTargetBytes = get.ResumeNextBytes - } - resumeReqIdx++ - } else { - // This Get was completed. - if get.IntentValue != nil { - return errors.AssertionFailedf( - "unexpectedly got an IntentValue back from a SQL GetRequest %v", *get.IntentValue, - ) - } - result := Result{ - GetResp: get, - Position: position, - subRequestIdx: subRequestIdx, - subRequestDone: true, - } - result.memoryTok.streamer = w.s - result.memoryTok.toRelease = getResponseSize(get) - memoryTokensBytes += result.memoryTok.toRelease - results = append(results, result) + // This Get wasn't completed. + continue + } + // This Get was completed. + result := Result{ + GetResp: get, + Position: position, + subRequestIdx: subRequestIdx, + subRequestDone: true, } + result.memoryTok.streamer = s + result.memoryTok.toRelease = getResponseSize(get) + memoryTokensBytes += result.memoryTok.toRelease + results = append(results, result) case *roachpb.ScanRequest: scan := reply.(*roachpb.ScanResponse) - if len(scan.Rows) > 0 { - return errors.AssertionFailedf( - "unexpectedly got a ScanResponse using KEY_VALUES response format", - ) - } - if len(scan.IntentRows) > 0 { - return errors.AssertionFailedf( - "unexpectedly got a ScanResponse with non-nil IntentRows", - ) - } - if len(scan.BatchResponses) > 0 || scan.ResumeSpan == nil { - // Only the second part of the conditional is true whenever we + if len(scan.BatchResponses) == 0 && scan.ResumeSpan != nil { + // Only the first part of the conditional is true whenever we // received an empty response for the Scan request (i.e. there // was no data in the span to scan). In such a scenario we still // create a Result with no data that the client will skip over // (this approach makes it easier to support Scans that span // multiple ranges and the last range has no data in it - we - // want to be able to set Complete field on such an empty + // want to be able to set scanComplete field on such an empty // Result). - result := Result{ - Position: position, - subRequestIdx: subRequestIdx, - subRequestDone: scan.ResumeSpan == nil, - } - result.memoryTok.streamer = w.s - result.memoryTok.toRelease = scanResponseSize(scan) - memoryTokensBytes += result.memoryTok.toRelease - result.ScanResp = scan - if w.s.hints.SingleRowLookup { - // When SingleRowLookup is false, scanComplete field will be - // set in finalizeSingleRangeResults(). - result.scanComplete = true - } - results = append(results, result) - hasNonEmptyScanResponse = true + continue } - if scan.ResumeSpan != nil { - // This Scan wasn't completed - update the original - // request according to the ResumeSpan and include it - // into the batch again. - newScan := scans[0] - scans = scans[1:] - newScan.req.SetSpan(*scan.ResumeSpan) - newScan.req.ScanFormat = roachpb.BATCH_RESPONSE - newScan.req.KeyLocking = origRequest.KeyLocking - newScan.union.Scan = &newScan.req - resumeReq.reqs[resumeReqIdx].Value = &newScan.union - resumeReq.positions = append(resumeReq.positions, position) - if req.subRequestIdx != nil { - resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, subRequestIdx) - } - if resumeReq.minTargetBytes == 0 { - resumeReq.minTargetBytes = scan.ResumeNextBytes - } - resumeReqIdx++ - - if w.s.hints.SingleRowLookup { - // Unset the ResumeSpan on the result in order to not - // confuse the user of the Streamer. Non-nil resume span was - // already included into resumeReq above. - // - // When SingleRowLookup is false, this will be done in - // finalizeSingleRangeResults(). - scan.ResumeSpan = nil - } + result := Result{ + Position: position, + subRequestIdx: subRequestIdx, + subRequestDone: scan.ResumeSpan == nil, + } + result.memoryTok.streamer = s + result.memoryTok.toRelease = scanResponseSize(scan) + memoryTokensBytes += result.memoryTok.toRelease + result.ScanResp = scan + if s.hints.SingleRowLookup { + // When SingleRowLookup is false, scanComplete field will be + // set in finalizeSingleRangeResults(). + result.scanComplete = true } + results = append(results, result) + hasNonEmptyScanResponse = true } } if buildutil.CrdbTestBuild { - if memoryFootprintBytes != memoryTokensBytes { + if fp.memoryFootprintBytes != memoryTokensBytes { panic(errors.AssertionFailedf( "different calculation of memory footprint\ncalculateFootprint: %d bytes\n"+ - "processSingleRangeResults: %d bytes", memoryFootprintBytes, memoryTokensBytes, + "processSingleRangeResults: %d bytes", fp.memoryFootprintBytes, memoryTokensBytes, )) } } if len(results) > 0 { - w.finalizeSingleRangeResults( - results, memoryFootprintBytes, hasNonEmptyScanResponse, + finalizeSingleRangeResults( + s, results, fp.memoryFootprintBytes, hasNonEmptyScanResponse, ) - } else { - // We received an empty response. - atomic.AddInt64(&w.s.atomics.emptyBatchResponses, 1) - if req.minTargetBytes != 0 { - // We previously have already received an empty response for this - // request, and minTargetBytes wasn't sufficient. Make sure that - // minTargetBytes on the resume request has increased. - if resumeReq.minTargetBytes <= req.minTargetBytes { - // Since ResumeNextBytes is populated on a best-effort basis, we - // cannot rely on it to make progress, so we make sure that if - // minTargetBytes hasn't increased for the resume request, we - // use the double of the original target. - resumeReq.minTargetBytes = 2 * req.minTargetBytes - } - } - if debug { - fmt.Printf( - "request for positions %v came back empty, original minTargetBytes=%d, "+ - "resumeReq.minTargetBytes=%d\n", req.positions, req.minTargetBytes, resumeReq.minTargetBytes, - ) - } } - - // If we have any incomplete requests, add them back into the work - // pool. - if numIncompleteRequests > 0 { - // Make sure to nil out old requests that we didn't include into the - // resume request. We don't have to do this if there aren't any - // incomplete requests since req and resumeReq will be garbage collected - // on their own. - for i := numIncompleteRequests; i < len(req.reqs); i++ { - req.reqs[i] = roachpb.RequestUnion{} - } - w.s.requestsToServe.add(resumeReq) - atomic.AddInt64(&w.s.atomics.resumeBatchRequests, 1) - atomic.AddInt64(&w.s.atomics.resumeSingleRangeRequests, int64(numIncompleteRequests)) - } - - return nil } // finalizeSingleRangeResults "finalizes" the results of evaluation of a -// singleRangeBatch. By "finalization" we mean setting Complete field of +// singleRangeBatch. By "finalization" we mean setting scanComplete field of // ScanResp to correct value for all scan responses (when Hints.SingleRowLookup // is false), updating the estimate of an average response size, and telling the // Streamer about these results. // // This method assumes that results has length greater than zero. -func (w *workerCoordinator) finalizeSingleRangeResults( - results []Result, actualMemoryReservation int64, hasNonEmptyScanResponse bool, +func finalizeSingleRangeResults( + s *Streamer, results []Result, actualMemoryReservation int64, hasNonEmptyScanResponse bool, ) { if buildutil.CrdbTestBuild { if len(results) == 0 { panic(errors.AssertionFailedf("finalizeSingleRangeResults is called with no results")) } } - w.s.mu.Lock() - defer w.s.mu.Unlock() + s.mu.Lock() + defer s.mu.Unlock() // If we have non-empty scan response, it might be complete. This will be // the case when a scan response doesn't have a resume span and there are no @@ -1538,20 +1458,20 @@ func (w *workerCoordinator) finalizeSingleRangeResults( // the same original ScanRequest. // // We need to do this check as well as adding the results to be returned to - // the client as an atomic operation so that Complete is set to true only on - // the last partial scan response. + // the client as an atomic operation so that scanComplete is set to true + // only on the last partial scan response. // // However, if we got a hint that each lookup produces a single row, then we - // know that no original ScanRequest can span multiple ranges, so Complete - // field has already been set correctly. - if hasNonEmptyScanResponse && !w.s.hints.SingleRowLookup { + // know that no original ScanRequest can span multiple ranges, so + // scanComplete field has already been set correctly. + if hasNonEmptyScanResponse && !s.hints.SingleRowLookup { for i := range results { if results[i].ScanResp != nil { if results[i].ScanResp.ResumeSpan == nil { // The scan within the range is complete. - if w.s.mode == OutOfOrder { - w.s.mu.numRangesPerScanRequest[results[i].Position]-- - if w.s.mu.numRangesPerScanRequest[results[i].Position] == 0 { + if s.mode == OutOfOrder { + s.mu.numRangesPerScanRequest[results[i].Position]-- + if s.mu.numRangesPerScanRequest[results[i].Position] == 0 { // The scan across all ranges is now complete too. results[i].scanComplete = true } @@ -1562,7 +1482,7 @@ func (w *workerCoordinator) finalizeSingleRangeResults( // yet - the inOrderResultsBuffer will not emit this // Result until the previous sub-requests are responded // to. - numSubRequests := w.s.mu.numRangesPerScanRequest[results[i].Position] + numSubRequests := s.mu.numRangesPerScanRequest[results[i].Position] results[i].scanComplete = results[i].subRequestIdx+1 == numSubRequests } } else { @@ -1580,12 +1500,146 @@ func (w *workerCoordinator) finalizeSingleRangeResults( // TODO(yuzefovich): some of the responses might be partial, yet the // estimator doesn't distinguish the footprint of the full response vs the // partial one. Think more about this. - w.s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) + s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) if debug { - printSubRequestIdx := w.s.mode == InOrder && !w.s.hints.SingleRowLookup + printSubRequestIdx := s.mode == InOrder && !s.hints.SingleRowLookup fmt.Printf("created %s with total size %d\n", resultsToString(results, printSubRequestIdx), actualMemoryReservation) } - w.s.results.add(results) + s.results.add(results) +} + +// buildResumeSingleRangeBatch consumes a BatchResponse for a singleRangeBatch +// which contains incomplete requests and returns the next singleRangeBatch to +// be submitted. Note that for maximal memory reuse, the original request and +// response may no longer be utilized. +// +// Note that it should only be called if the response has any incomplete +// requests. +func buildResumeSingeRangeBatch( + s *Streamer, + req singleRangeBatch, + br *roachpb.BatchResponse, + fp singleRangeBatchResponseFootprint, +) (resumeReq singleRangeBatch) { + numIncompleteRequests := fp.numIncompleteGets + fp.numIncompleteScans + // We have to allocate the new Get and Scan requests, but we can reuse the + // reqs and the positions slices. + resumeReq.reqs = req.reqs[:numIncompleteRequests] + resumeReq.positions = req.positions[:0] + resumeReq.subRequestIdx = req.subRequestIdx[:0] + // We've already reconciled the budget with the actual reservation for the + // requests with the ResumeSpans. + resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage + resumeReq.overheadAccountedFor = req.overheadAccountedFor + // TODO(yuzefovich): for incomplete Get requests, the ResumeSpan should be + // exactly the same as the original span, so we might be able to reuse the + // original Get requests. + gets := make([]struct { + req roachpb.GetRequest + union roachpb.RequestUnion_Get + }, fp.numIncompleteGets) + scans := make([]struct { + req roachpb.ScanRequest + union roachpb.RequestUnion_Scan + }, fp.numIncompleteScans) + var resumeReqIdx int + emptyResponse := true + for i, resp := range br.Responses { + position := req.positions[i] + reply := resp.GetInner() + switch origRequest := req.reqs[i].GetInner().(type) { + case *roachpb.GetRequest: + get := reply.(*roachpb.GetResponse) + if get.ResumeSpan == nil { + emptyResponse = false + continue + } + // This Get wasn't completed - create a new request according to the + // ResumeSpan and include it into the batch. + newGet := gets[0] + gets = gets[1:] + newGet.req.SetSpan(*get.ResumeSpan) + newGet.req.KeyLocking = origRequest.KeyLocking + newGet.union.Get = &newGet.req + resumeReq.reqs[resumeReqIdx].Value = &newGet.union + resumeReq.positions = append(resumeReq.positions, position) + if req.subRequestIdx != nil { + resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, req.subRequestIdx[i]) + } + if resumeReq.minTargetBytes == 0 { + resumeReq.minTargetBytes = get.ResumeNextBytes + } + resumeReqIdx++ + + case *roachpb.ScanRequest: + scan := reply.(*roachpb.ScanResponse) + if scan.ResumeSpan == nil { + emptyResponse = false + continue + } + // This Scan wasn't completed - create a new request according to + // the ResumeSpan and include it into the batch. + newScan := scans[0] + scans = scans[1:] + newScan.req.SetSpan(*scan.ResumeSpan) + newScan.req.ScanFormat = roachpb.BATCH_RESPONSE + newScan.req.KeyLocking = origRequest.KeyLocking + newScan.union.Scan = &newScan.req + resumeReq.reqs[resumeReqIdx].Value = &newScan.union + resumeReq.positions = append(resumeReq.positions, position) + if req.subRequestIdx != nil { + resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, req.subRequestIdx[i]) + } + if resumeReq.minTargetBytes == 0 { + resumeReq.minTargetBytes = scan.ResumeNextBytes + } + resumeReqIdx++ + + if s.hints.SingleRowLookup { + // Unset the ResumeSpan on the result in order to not + // confuse the user of the Streamer. Non-nil resume span was + // already included into resumeReq above. + // + // When SingleRowLookup is false, this will be done in + // finalizeSingleRangeResults(). + scan.ResumeSpan = nil + } + } + } + + if emptyResponse { + // We received an empty response. + atomic.AddInt64(&s.atomics.emptyBatchResponses, 1) + if req.minTargetBytes != 0 { + // We previously have already received an empty response for this + // request, and minTargetBytes wasn't sufficient. Make sure that + // minTargetBytes on the resume request has increased. + if resumeReq.minTargetBytes <= req.minTargetBytes { + // Since ResumeNextBytes is populated on a best-effort basis, we + // cannot rely on it to make progress, so we make sure that if + // minTargetBytes hasn't increased for the resume request, we + // use the double of the original target. + resumeReq.minTargetBytes = 2 * req.minTargetBytes + } + } + if debug { + fmt.Printf( + "request for positions %v came back empty, original minTargetBytes=%d, "+ + "resumeReq.minTargetBytes=%d\n", req.positions, req.minTargetBytes, resumeReq.minTargetBytes, + ) + } + } + + // Make sure to nil out old requests that we didn't include into the resume + // request. We don't have to do this if there aren't any incomplete requests + // since req and resumeReq will be garbage collected on their own. + for i := numIncompleteRequests; i < len(req.reqs); i++ { + req.reqs[i] = roachpb.RequestUnion{} + } + atomic.AddInt64(&s.atomics.resumeBatchRequests, 1) + atomic.AddInt64(&s.atomics.resumeSingleRangeRequests, int64(numIncompleteRequests)) + + return resumeReq } var zeroInt32Slice []int32 From 6343df3928e5fa11d9dda15eda7784dd360c6161 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 3 Jun 2022 23:09:24 -0700 Subject: [PATCH 2/2] kvstreamer: remove temporary allocations for []Result MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the worker goroutine would accumulate all `Result`s it can create based on the KV response into a slice, and then the slice would be passed into the results buffer. At that point, the slice would be discarded since the results buffer would copy all `Result`s into its internal state. This commit refactors the streamer as well as the results buffer to avoid this temporary allocation of `[]Result`. The main idea is that `Result`s are now passed one-by-one into the results buffer. The worker goroutine now acquires the results buffer's mutex, processes the KV responses one at a time, and whenever a `Result` is created, it is added into the results buffer right away. However, in order to prevent the results buffer from eagerly returning a single `Result` on `GetResults` call, the streamer's user goroutine won't be woken up, until a newly introduced `doneAddingLocked` method is called by the worker goroutine. Some care needs to be taken to prevent deadlocks with all of the mutexes. Now, since we're finalizing the results one at a time, we might need to hold the streamer's mutex (so that we can set `scanComplete` correctly), and that mutex must be acquired before the results buffer's one. This change shows a modest improvement on the microbenchmarks but is a lot more important on analytical, TPCH-like queries, where this `[]Result` is one of the largest sources of garbage. ``` name old time/op new time/op delta IndexJoin/Cockroach-24 5.98ms ± 1% 5.95ms ± 1% ~ (p=0.079 n=9+10) IndexJoin/MultinodeCockroach-24 7.55ms ± 1% 7.59ms ± 1% +0.47% (p=0.015 n=8+9) IndexJoinColumnFamilies/Cockroach-24 8.68ms ± 3% 8.56ms ± 2% ~ (p=0.133 n=9+10) IndexJoinColumnFamilies/MultinodeCockroach-24 11.8ms ± 5% 11.7ms ± 3% ~ (p=0.315 n=10+10) LookupJoinEqColsAreKeyNoOrdering/Cockroach-24 6.67ms ± 1% 6.69ms ± 1% ~ (p=0.315 n=10+9) LookupJoinEqColsAreKeyNoOrdering/MultinodeCockroach-24 7.87ms ± 1% 7.92ms ± 1% +0.73% (p=0.015 n=10+10) LookupJoinEqColsAreKeyOrdering/Cockroach-24 9.30ms ± 2% 9.31ms ± 4% ~ (p=0.796 n=10+10) LookupJoinEqColsAreKeyOrdering/MultinodeCockroach-24 10.9ms ± 4% 10.9ms ± 2% ~ (p=0.971 n=10+10) LookupJoinNoOrdering/Cockroach-24 8.99ms ± 1% 9.03ms ± 4% ~ (p=0.549 n=9+10) LookupJoinNoOrdering/MultinodeCockroach-24 12.1ms ± 4% 11.9ms ± 6% ~ (p=0.143 n=10+10) LookupJoinOrdering/Cockroach-24 10.9ms ± 3% 10.8ms ± 3% ~ (p=0.243 n=10+9) LookupJoinOrdering/MultinodeCockroach-24 14.2ms ± 5% 13.9ms ± 3% ~ (p=0.113 n=10+9) name old alloc/op new alloc/op delta IndexJoin/Cockroach-24 1.36MB ± 1% 1.31MB ± 0% -3.61% (p=0.000 n=10+9) IndexJoin/MultinodeCockroach-24 2.07MB ± 2% 2.04MB ± 3% ~ (p=0.063 n=10+10) IndexJoinColumnFamilies/Cockroach-24 1.43MB ± 1% 1.38MB ± 0% -3.56% (p=0.000 n=9+9) IndexJoinColumnFamilies/MultinodeCockroach-24 2.27MB ± 1% 2.22MB ± 2% -2.09% (p=0.000 n=8+10) LookupJoinEqColsAreKeyNoOrdering/Cockroach-24 1.71MB ± 0% 1.67MB ± 0% -2.70% (p=0.000 n=9+10) LookupJoinEqColsAreKeyNoOrdering/MultinodeCockroach-24 2.43MB ± 5% 2.35MB ± 1% -3.31% (p=0.000 n=10+10) LookupJoinEqColsAreKeyOrdering/Cockroach-24 1.72MB ± 1% 1.62MB ± 1% -6.20% (p=0.000 n=10+10) LookupJoinEqColsAreKeyOrdering/MultinodeCockroach-24 2.39MB ± 2% 2.30MB ± 3% -3.53% (p=0.000 n=10+10) LookupJoinNoOrdering/Cockroach-24 1.79MB ± 1% 1.74MB ± 1% -2.80% (p=0.000 n=10+9) LookupJoinNoOrdering/MultinodeCockroach-24 2.35MB ± 3% 2.32MB ± 2% ~ (p=0.079 n=10+9) LookupJoinOrdering/Cockroach-24 1.63MB ± 1% 1.53MB ± 1% -5.77% (p=0.000 n=10+10) LookupJoinOrdering/MultinodeCockroach-24 2.30MB ± 4% 2.23MB ± 2% -3.41% (p=0.002 n=9+8) name old allocs/op new allocs/op delta IndexJoin/Cockroach-24 7.15k ± 1% 7.16k ± 1% ~ (p=0.888 n=10+9) IndexJoin/MultinodeCockroach-24 11.9k ± 2% 11.9k ± 2% ~ (p=0.968 n=10+9) IndexJoinColumnFamilies/Cockroach-24 11.9k ± 0% 11.9k ± 0% ~ (p=0.075 n=9+10) IndexJoinColumnFamilies/MultinodeCockroach-24 17.6k ± 1% 17.5k ± 1% ~ (p=0.566 n=10+10) LookupJoinEqColsAreKeyNoOrdering/Cockroach-24 9.86k ± 1% 9.88k ± 1% ~ (p=0.150 n=9+10) LookupJoinEqColsAreKeyNoOrdering/MultinodeCockroach-24 14.1k ± 0% 14.1k ± 1% ~ (p=0.055 n=8+10) LookupJoinEqColsAreKeyOrdering/Cockroach-24 12.6k ± 1% 12.5k ± 1% -0.77% (p=0.005 n=10+10) LookupJoinEqColsAreKeyOrdering/MultinodeCockroach-24 17.2k ± 1% 17.0k ± 0% -0.88% (p=0.000 n=10+8) LookupJoinNoOrdering/Cockroach-24 12.3k ± 1% 12.3k ± 1% ~ (p=0.929 n=10+10) LookupJoinNoOrdering/MultinodeCockroach-24 16.8k ± 1% 16.8k ± 1% ~ (p=0.968 n=9+10) LookupJoinOrdering/Cockroach-24 14.5k ± 1% 14.5k ± 1% ~ (p=0.271 n=10+10) LookupJoinOrdering/MultinodeCockroach-24 19.4k ± 1% 19.3k ± 1% ~ (p=0.056 n=9+8) ``` Release note: None --- pkg/kv/kvclient/kvstreamer/results_buffer.go | 87 +++++---- .../kvstreamer/results_buffer_test.go | 9 +- pkg/kv/kvclient/kvstreamer/streamer.go | 173 +++++++++--------- 3 files changed, 141 insertions(+), 128 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer.go b/pkg/kv/kvclient/kvstreamer/results_buffer.go index c04123140270..15fa764c5835 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer.go @@ -45,6 +45,8 @@ type resultsBuffer interface { // get returns all the Results that the buffer can send to the client at the // moment. The boolean indicates whether all expected Results have been // returned. Must be called without holding the budget's mutex. + // TODO(yuzefovich): consider changing the interface to return a single + // Result object in order to avoid some allocations. get(context.Context) (_ []Result, allComplete bool, _ error) // wait blocks until there is at least one Result available to be returned @@ -64,10 +66,26 @@ type resultsBuffer interface { // // /////////////////////////////////////////////////////////////////////////// - // add adds the provided Results into the buffer. If any Results are - // available to be returned to the client and there is a goroutine blocked - // in wait(), the goroutine is woken up. - add([]Result) + // Lock and Unlock expose methods on the mutex of the resultsBuffer. If the + // Streamer's mutex needs to be locked, then the Streamer's mutex must be + // acquired first. + Lock() + Unlock() + + // addLocked adds the provided Result into the buffer. Note that if the + // Result is available to be returned to the client and there is a goroutine + // blocked in wait(), the goroutine is **not** woken up - doneAddingLocked() + // has to be called. + // + // The combination of multiple addLocked() calls followed by a single + // doneAddingLocked() call allows us to simulate adding many Results at + // once, without having to allocate a slice for that. + addLocked(Result) + // doneAddingLocked notifies the resultsBuffer that the worker goroutine + // added all Results it could, and the resultsBuffer checks whether any + // Results are available to be returned to the client. If there is a + // goroutine blocked in wait(), the goroutine is woken up. + doneAddingLocked() /////////////////////////////////////////////////////////////////////////// // // @@ -159,11 +177,10 @@ func (b *resultsBufferBase) initLocked(isEmpty bool, numExpectedResponses int) e return nil } -func (b *resultsBufferBase) findCompleteResponses(results []Result) { - for i := range results { - if results[i].GetResp != nil || results[i].scanComplete { - b.numCompleteResponses++ - } +func (b *resultsBufferBase) checkIfCompleteLocked(r Result) { + b.Mutex.AssertHeld() + if r.GetResp != nil || r.scanComplete { + b.numCompleteResponses++ } } @@ -248,12 +265,15 @@ func (b *outOfOrderResultsBuffer) init(_ context.Context, numExpectedResponses i return nil } -func (b *outOfOrderResultsBuffer) add(results []Result) { - b.Lock() - defer b.Unlock() - b.results = append(b.results, results...) - b.findCompleteResponses(results) - b.numUnreleasedResults += len(results) +func (b *outOfOrderResultsBuffer) addLocked(r Result) { + b.Mutex.AssertHeld() + b.results = append(b.results, r) + b.checkIfCompleteLocked(r) + b.numUnreleasedResults++ +} + +func (b *outOfOrderResultsBuffer) doneAddingLocked() { + b.Mutex.AssertHeld() b.signal() } @@ -408,35 +428,32 @@ func (b *inOrderResultsBuffer) init(ctx context.Context, numExpectedResponses in return nil } -func (b *inOrderResultsBuffer) add(results []Result) { - b.Lock() - defer b.Unlock() +func (b *inOrderResultsBuffer) addLocked(r Result) { + b.Mutex.AssertHeld() // Note that we don't increase b.numUnreleasedResults because all these // results are "buffered". - b.findCompleteResponses(results) - foundHeadOfLine := false - for _, r := range results { - if debug { - subRequestIdx := "" - if !b.singleRowLookup { - subRequestIdx = fmt.Sprintf(" (%d)", r.subRequestIdx) - } - fmt.Printf("adding a result for position %d%s of size %d\n", r.Position, subRequestIdx, r.memoryTok.toRelease) - } - // All the Results have already been registered with the budget, so - // we're keeping them in-memory. - heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false, addEpoch: b.addCounter}) - if r.Position == b.headOfLinePosition && r.subRequestIdx == b.headOfLineSubRequestIdx { - foundHeadOfLine = true + b.checkIfCompleteLocked(r) + if debug { + subRequestIdx := "" + if !b.singleRowLookup { + subRequestIdx = fmt.Sprintf(" (%d)", r.subRequestIdx) } + fmt.Printf("adding a result for position %d%s of size %d\n", r.Position, subRequestIdx, r.memoryTok.toRelease) } - if foundHeadOfLine { + // All the Results have already been registered with the budget, so we're + // keeping them in-memory. + heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false, addEpoch: b.addCounter}) + b.addCounter++ +} + +func (b *inOrderResultsBuffer) doneAddingLocked() { + b.Mutex.AssertHeld() + if len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition && b.buffered[0].subRequestIdx == b.headOfLineSubRequestIdx { if debug { fmt.Println("found head-of-the-line") } b.signal() } - b.addCounter++ } func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) { diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go index ede110f30360..63f31baee688 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go @@ -113,13 +113,14 @@ func TestInOrderResultsBuffer(t *testing.T) { break } + b.Lock() numToAdd := rng.Intn(len(addOrder)) + 1 - toAdd := make([]Result, numToAdd) - for i := range toAdd { - toAdd[i] = results[addOrder[0]] + for i := 0; i < numToAdd; i++ { + b.addLocked(results[addOrder[0]]) addOrder = addOrder[1:] } - b.add(toAdd) + b.doneAddingLocked() + b.Unlock() // With 50% probability, try spilling some of the buffered results // to disk. diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 17a04bf70403..1f0439af3773 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -246,7 +246,8 @@ type Streamer struct { mu struct { // If the budget's mutex also needs to be locked, the budget's mutex - // must be acquired first. + // must be acquired first. If the results' mutex needs to be locked, + // then this mutex must be acquired first. syncutil.Mutex avgResponseEstimator avgResponseEstimator @@ -1255,10 +1256,17 @@ type singleRangeBatchResponseFootprint struct { memoryFootprintBytes int64 // resumeReqsMemUsage tracks the memory usage of the requests for the // ResumeSpans. - resumeReqsMemUsage int64 + resumeReqsMemUsage int64 + // numGetResults and numScanResults indicate how many Result objects will + // need to be created for Get and Scan responses, respectively. + numGetResults, numScanResults int numIncompleteGets, numIncompleteScans int } +func (fp singleRangeBatchResponseFootprint) hasResults() bool { + return fp.numGetResults > 0 || fp.numScanResults > 0 +} + func (fp singleRangeBatchResponseFootprint) hasIncomplete() bool { return fp.numIncompleteGets > 0 || fp.numIncompleteScans > 0 } @@ -1295,6 +1303,7 @@ func calculateFootprint( } else { // This Get was completed. fp.memoryFootprintBytes += getResponseSize(get) + fp.numGetResults++ } case *roachpb.ScanRequest: scan := reply.(*roachpb.ScanResponse) @@ -1311,6 +1320,9 @@ func calculateFootprint( if len(scan.BatchResponses) > 0 { fp.memoryFootprintBytes += scanResponseSize(scan) } + if len(scan.BatchResponses) > 0 || scan.ResumeSpan == nil { + fp.numScanResults++ + } if scan.ResumeSpan != nil { // This Scan wasn't completed. scanRequestScratch.SetSpan(*scan.ResumeSpan) @@ -1353,8 +1365,46 @@ func processSingleRangeResults( br *roachpb.BatchResponse, fp singleRangeBatchResponseFootprint, ) { - var results []Result - var hasNonEmptyScanResponse bool + // If there are no results, this function has nothing to do. + if !fp.hasResults() { + return + } + + // We will add some Results into the results buffer, and doneAddingLocked() + // call below requires that the budget's mutex is held. It also must be + // acquired before the streamer's mutex is locked, so we have to do this + // right away. + // TODO(yuzefovich): check whether the lock contention on this mutex is + // noticeable and possibly refactor the code so that the budget's mutex is + // only acquired for the duration of doneAddingLocked(). + s.budget.mu.Lock() + defer s.budget.mu.Unlock() + s.mu.Lock() + + // TODO(yuzefovich): some of the responses might be partial, yet the + // estimator doesn't distinguish the footprint of the full response vs + // the partial one. Think more about this. + s.mu.avgResponseEstimator.update( + fp.memoryFootprintBytes, int64(fp.numGetResults+fp.numScanResults), + ) + + // If we have any Scan results to create and the Scan requests can return + // multiple rows, we'll need to consult s.mu.numRangesPerScanRequest, so + // we'll defer unlocking the streamer's mutex. However, if only Get results + // or Scan results of single rows will be created, we can unlock the + // streamer's mutex right away. + if fp.numScanResults > 0 && !s.hints.SingleRowLookup { + defer s.mu.Unlock() + } else { + s.mu.Unlock() + } + + // Now we can get the resultsBuffer's mutex - it must be acquired after + // the Streamer's one. + s.results.Lock() + defer s.results.Unlock() + defer s.results.doneAddingLocked() + // memoryTokensBytes accumulates all reservations that are made for all // Results created below. The accounting for these reservations has already // been performed, and memoryTokensBytes should be exactly equal to @@ -1384,7 +1434,14 @@ func processSingleRangeResults( result.memoryTok.streamer = s result.memoryTok.toRelease = getResponseSize(get) memoryTokensBytes += result.memoryTok.toRelease - results = append(results, result) + if buildutil.CrdbTestBuild { + if fp.numGetResults == 0 { + panic(errors.AssertionFailedf( + "unexpectedly found a non-empty GetResponse when numGetResults is zero", + )) + } + } + s.results.addLocked(result) case *roachpb.ScanRequest: scan := reply.(*roachpb.ScanResponse) @@ -1409,12 +1466,30 @@ func processSingleRangeResults( memoryTokensBytes += result.memoryTok.toRelease result.ScanResp = scan if s.hints.SingleRowLookup { - // When SingleRowLookup is false, scanComplete field will be - // set in finalizeSingleRangeResults(). result.scanComplete = true + } else if scan.ResumeSpan == nil { + // The scan within the range is complete. + if s.mode == OutOfOrder { + s.mu.numRangesPerScanRequest[position]-- + result.scanComplete = s.mu.numRangesPerScanRequest[position] == 0 + } else { + // In InOrder mode, the scan is marked as complete when the + // last sub-request is satisfied. Note that it is ok if the + // previous sub-requests haven't been satisfied yet - the + // inOrderResultsBuffer will not emit this Result until the + // previous sub-requests are responded to. + numSubRequests := s.mu.numRangesPerScanRequest[position] + result.scanComplete = result.subRequestIdx+1 == numSubRequests + } + } + if buildutil.CrdbTestBuild { + if fp.numScanResults == 0 { + panic(errors.AssertionFailedf( + "unexpectedly found a ScanResponse when numScanResults is zero", + )) + } } - results = append(results, result) - hasNonEmptyScanResponse = true + s.results.addLocked(result) } } @@ -1426,86 +1501,6 @@ func processSingleRangeResults( )) } } - - if len(results) > 0 { - finalizeSingleRangeResults( - s, results, fp.memoryFootprintBytes, hasNonEmptyScanResponse, - ) - } -} - -// finalizeSingleRangeResults "finalizes" the results of evaluation of a -// singleRangeBatch. By "finalization" we mean setting scanComplete field of -// ScanResp to correct value for all scan responses (when Hints.SingleRowLookup -// is false), updating the estimate of an average response size, and telling the -// Streamer about these results. -// -// This method assumes that results has length greater than zero. -func finalizeSingleRangeResults( - s *Streamer, results []Result, actualMemoryReservation int64, hasNonEmptyScanResponse bool, -) { - if buildutil.CrdbTestBuild { - if len(results) == 0 { - panic(errors.AssertionFailedf("finalizeSingleRangeResults is called with no results")) - } - } - s.mu.Lock() - defer s.mu.Unlock() - - // If we have non-empty scan response, it might be complete. This will be - // the case when a scan response doesn't have a resume span and there are no - // other scan requests in flight (involving other ranges) that are part of - // the same original ScanRequest. - // - // We need to do this check as well as adding the results to be returned to - // the client as an atomic operation so that scanComplete is set to true - // only on the last partial scan response. - // - // However, if we got a hint that each lookup produces a single row, then we - // know that no original ScanRequest can span multiple ranges, so - // scanComplete field has already been set correctly. - if hasNonEmptyScanResponse && !s.hints.SingleRowLookup { - for i := range results { - if results[i].ScanResp != nil { - if results[i].ScanResp.ResumeSpan == nil { - // The scan within the range is complete. - if s.mode == OutOfOrder { - s.mu.numRangesPerScanRequest[results[i].Position]-- - if s.mu.numRangesPerScanRequest[results[i].Position] == 0 { - // The scan across all ranges is now complete too. - results[i].scanComplete = true - } - } else { - // In InOrder mode, the scan is marked as complete when - // the last sub-request is satisfied. Note that it is ok - // if the previous sub-requests haven't been satisfied - // yet - the inOrderResultsBuffer will not emit this - // Result until the previous sub-requests are responded - // to. - numSubRequests := s.mu.numRangesPerScanRequest[results[i].Position] - results[i].scanComplete = results[i].subRequestIdx+1 == numSubRequests - } - } else { - // Unset the ResumeSpan on the result in order to not - // confuse the user of the Streamer. Non-nil resume span was - // already included into resumeReq populated in - // performRequestAsync. - results[i].ScanResp.ResumeSpan = nil - } - } - } - } - - // Update the average response size based on this batch. - // TODO(yuzefovich): some of the responses might be partial, yet the - // estimator doesn't distinguish the footprint of the full response vs the - // partial one. Think more about this. - s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) - if debug { - printSubRequestIdx := s.mode == InOrder && !s.hints.SingleRowLookup - fmt.Printf("created %s with total size %d\n", resultsToString(results, printSubRequestIdx), actualMemoryReservation) - } - s.results.add(results) } // buildResumeSingleRangeBatch consumes a BatchResponse for a singleRangeBatch