From 27ade237fa38d3369031496d6cbee2e635a10baa Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 22 Jul 2022 16:36:13 -0700 Subject: [PATCH] distsql: change the flow setup code a bit Previously, when setting up a distributed plan, we would wait for all SetupFlow RPCs to come back before setting up the flow on the gateway. Most likely (in the happy scenario) all those RPCs would be successful, so we can parallelize the happy path a bit by setting up the local flow while the RPCs are in-flight which is what this commit does. This seems especially beneficial given the change in the previous commit to increase the number of DistSQL runners for beefy machines - we are now more likely to issue SetupFlow RPCs asynchronously. Release note: None --- pkg/sql/distsql_running.go | 85 +++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 43 deletions(-) diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 9e5a9d36a370..bcc931d320ec 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -392,13 +392,6 @@ func (dsp *DistSQLPlanner) setupFlows( StatementSQL: statementSQL, } - // Start all the flows except the flow on this node (there is always a flow on - // this node). - var resultChan chan runnerResult - if len(flows) > 1 { - resultChan = make(chan runnerResult, len(flows)-1) - } - if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff { // Now we determine whether the vectorized engine supports the flow // specs. @@ -408,38 +401,54 @@ func (dsp *DistSQLPlanner) setupFlows( if vectorizeMode == sessiondatapb.VectorizeExperimentalAlways { return nil, nil, nil, err } - // Vectorization is not supported for this flow, so we override the - // setting. + // Vectorization is not supported for this flow, so we override + // the setting. setupReq.EvalContext.SessionData.VectorizeMode = sessiondatapb.VectorizeOff break } } } - for nodeID, flowSpec := range flows { - if nodeID == thisNodeID { - // Skip this node. - continue - } - req := setupReq - req.Flow = *flowSpec - runReq := runnerRequest{ - ctx: ctx, - nodeDialer: dsp.podNodeDialer, - flowReq: &req, - sqlInstanceID: nodeID, - resultChan: resultChan, - } - // Send out a request to the workers; if no worker is available, run - // directly. - select { - case dsp.runnerCoordinator.runnerChan <- runReq: - default: - runReq.run() + // Start all the flows except the flow on this node (there is always a flow + // on this node). + var resultChan chan runnerResult + if len(flows) > 1 { + resultChan = make(chan runnerResult, len(flows)-1) + for nodeID, flowSpec := range flows { + if nodeID == thisNodeID { + // Skip this node. + continue + } + req := setupReq + req.Flow = *flowSpec + runReq := runnerRequest{ + ctx: ctx, + nodeDialer: dsp.podNodeDialer, + flowReq: &req, + sqlInstanceID: nodeID, + resultChan: resultChan, + } + + // Send out a request to the workers; if no worker is available, run + // directly. + select { + case dsp.runnerCoordinator.runnerChan <- runReq: + default: + runReq.run() + } } } - var firstErr error + // Now set up the flow on this node. + setupReq.Flow = *flows[thisNodeID] + var batchReceiver execinfra.BatchReceiver + if recv.batchWriter != nil { + // Use the DistSQLReceiver as an execinfra.BatchReceiver only if the + // former has the corresponding writer set. + batchReceiver = recv + } + ctx, flow, opChains, firstErr := dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState) + // Now wait for all the flows to be scheduled on remote nodes. Note that we // are not waiting for the flows themselves to complete. for i := 0; i < len(flows)-1; i++ { @@ -450,19 +459,9 @@ func (dsp *DistSQLPlanner) setupFlows( // TODO(radu): accumulate the flows that we failed to set up and move them // into the local flow. } - if firstErr != nil { - return nil, nil, nil, firstErr - } - - // Set up the flow on this node. - setupReq.Flow = *flows[thisNodeID] - var batchReceiver execinfra.BatchReceiver - if recv.batchWriter != nil { - // Use the DistSQLReceiver as an execinfra.BatchReceiver only if the - // former has the corresponding writer set. - batchReceiver = recv - } - return dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState) + // Note that we need to return the local flow even if firstErr is non-nil so + // that the local flow is properly cleaned up. + return ctx, flow, opChains, firstErr } const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"