diff --git a/pkg/sql/app_stats.go b/pkg/sql/app_stats.go index 9d13b21d5b65..f6b3fafab0ea 100644 --- a/pkg/sql/app_stats.go +++ b/pkg/sql/app_stats.go @@ -111,7 +111,7 @@ const saveFingerprintPlanOnceEvery = 1000 // // samplePlanDescription can be nil, as these are only sampled periodically per unique fingerprint. func (a *appStats) recordStatement( - stmt Statement, + stmt *Statement, samplePlanDescription *roachpb.ExplainTreePlanNode, distSQLUsed bool, optUsed bool, @@ -157,7 +157,7 @@ func (a *appStats) recordStatement( // getStatsForStmt retrieves the per-stmt stat object. func (a *appStats) getStatsForStmt( - stmt Statement, distSQLUsed bool, optimizerUsed bool, err error, createIfNonexistent bool, + stmt *Statement, distSQLUsed bool, optimizerUsed bool, err error, createIfNonexistent bool, ) *stmtStats { // Extend the statement key with various characteristics, so // that we use separate buckets for the different situations. @@ -185,7 +185,7 @@ func (a *appStats) getStatsForStmtWithKey(key stmtKey, createIfNonexistent bool) return s } -func anonymizeStmt(stmt Statement) string { +func anonymizeStmt(stmt *Statement) string { return tree.AsStringWithFlags(stmt.AST, tree.FmtHideConstants) } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 9bb8106d0cb3..35b94f871928 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1994,7 +1994,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // // If an error is returned, it is to be considered a query execution error. func (ex *connExecutor) initStatementResult( - ctx context.Context, res RestrictedCommandResult, stmt Statement, cols sqlbase.ResultColumns, + ctx context.Context, res RestrictedCommandResult, stmt *Statement, cols sqlbase.ResultColumns, ) error { for _, c := range cols { if err := checkResultType(c.Typ); err != nil { diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index c3f5153d6f3e..d02736ec3e71 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -420,7 +420,7 @@ func (ex *connExecutor) execStmtInOpenState( defer constantMemAcc.Close(ctx) if runInParallel { - cols, err := ex.execStmtInParallel(ctx, stmt, p, queryDone) + cols, err := ex.execStmtInParallel(ctx, p, queryDone) queryDone = nil if err != nil { return makeErrEvent(err) @@ -429,12 +429,12 @@ func (ex *connExecutor) execStmtInOpenState( // statement's result type: // - tree.Rows -> an empty set of rows // - tree.RowsAffected -> zero rows affected - if err := ex.initStatementResult(ctx, res, stmt, cols); err != nil { + if err := ex.initStatementResult(ctx, res, p.stmt, cols); err != nil { return makeErrEvent(err) } } else { p.autoCommit = os.ImplicitTxn.Get() && !ex.server.cfg.TestingKnobs.DisableAutoCommit - if err := ex.dispatchToExecutionEngine(ctx, stmt, p, res); err != nil { + if err := ex.dispatchToExecutionEngine(ctx, p, res); err != nil { return nil, nil, err } if err := res.Err(); err != nil { @@ -651,10 +651,7 @@ func (ex *connExecutor) rollbackSQLTransaction(ctx context.Context) (fsm.Event, // Args: // queryDone: A cleanup function to be called when the execution is done. func (ex *connExecutor) execStmtInParallel( - ctx context.Context, - stmt Statement, - planner *planner, - queryDone func(context.Context, RestrictedCommandResult), + ctx context.Context, planner *planner, queryDone func(context.Context, RestrictedCommandResult), ) (sqlbase.ResultColumns, error) { params := runParams{ ctx: ctx, @@ -662,9 +659,25 @@ func (ex *connExecutor) execStmtInParallel( p: planner, } + stmt := planner.stmt ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag()) - err := planner.makePlan(ctx, stmt) + planner.statsCollector.PhaseTimes()[plannerStartLogicalPlan] = timeutil.Now() + + err := planner.makePlan(ctx) + // Ensure that the plan is collected just before closing. + if sampleLogicalPlans.Get(&ex.appStats.st.SV) { + // Note: if sampleLogicalPlans is false, + // planner.curPlan.maybeSavePlan remains nil (because makePlan has + // cleared curPlan at this point) and plan collection will not + // happen. + planner.curPlan.maybeSavePlan = func(ctx context.Context) *roachpb.ExplainTreePlanNode { + return ex.maybeSavePlan(ctx, planner) + } + } + + planner.statsCollector.PhaseTimes()[plannerEndLogicalPlan] = timeutil.Now() ex.sessionTracing.TracePlanEnd(ctx, err) + if err != nil { planner.maybeLogStatement(ctx, "par-prepare" /* lbl */, 0 /* rows */, err) return nil, err @@ -714,23 +727,33 @@ func (ex *connExecutor) execStmtInParallel( planner.statsCollector.PhaseTimes()[plannerStartExecStmt] = timeutil.Now() - samplePlanDescription := ex.sampleLogicalPlanDescription( - stmt, false /* optimizerUsed */, planner) + // We need to set the "exec done" flag early because + // curPlan.close(), which will need to observe it, may be closed + // during execution (distsqlrun.PlanAndRun). + // + // TODO(knz): This is a mis-design. Andrei says "it's OK if + // execution closes the plan" but it transfers responsibility to + // run any "finalizers" on the plan (including plan sampling for + // stats) to the execution engine. That's a lot of responsibility + // to transfer! It would be better if this responsibility remained + // around here. + planner.curPlan.flags.Set(planFlagExecDone) - var flags planFlags if distributePlan { - flags.Set(planFlagDistributed) + planner.curPlan.flags.Set(planFlagDistributed) } else { - flags.Set(planFlagDistSQLLocal) + planner.curPlan.flags.Set(planFlagDistSQLLocal) } ex.sessionTracing.TraceExecStart(ctx, "parallel") err = ex.execWithDistSQLEngine(ctx, planner, stmt.AST.StatementType(), res, distributePlan) ex.sessionTracing.TraceExecEnd(ctx, res.Err(), res.RowsAffected()) planner.statsCollector.PhaseTimes()[plannerEndExecStmt] = timeutil.Now() + // Record the statement summary. This also closes the plan if the + // plan has not been closed earlier. ex.recordStatementSummary( - planner, stmt, samplePlanDescription, flags, ex.extraTxnState.autoRetryCounter, - res.RowsAffected(), err, + ctx, planner, ex.extraTxnState.autoRetryCounter, + res.RowsAffected(), res.Err(), ) if ex.server.cfg.TestingKnobs.AfterExecute != nil { ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err()) @@ -792,14 +815,24 @@ func enhanceErrWithCorrelation(err error, isCorrelated bool) { // expected that the caller will inspect res and react to query errors by // producing an appropriate state machine event. func (ex *connExecutor) dispatchToExecutionEngine( - ctx context.Context, stmt Statement, planner *planner, res RestrictedCommandResult, + ctx context.Context, planner *planner, res RestrictedCommandResult, ) error { + stmt := planner.stmt ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag()) planner.statsCollector.PhaseTimes()[plannerStartLogicalPlan] = timeutil.Now() - flags, err := ex.makeExecPlan(ctx, stmt, planner) + err := ex.makeExecPlan(ctx, planner) + // We'll be closing the plan manually below after execution; this + // defer is a catch-all in case some other return path is taken. defer planner.curPlan.close(ctx) + // Ensure that the plan is collected just before closing. + if sampleLogicalPlans.Get(&ex.appStats.st.SV) { + planner.curPlan.maybeSavePlan = func(ctx context.Context) *roachpb.ExplainTreePlanNode { + return ex.maybeSavePlan(ctx, planner) + } + } + defer func() { planner.maybeLogStatement(ctx, "exec", res.RowsAffected(), res.Err()) }() planner.statsCollector.PhaseTimes()[plannerEndLogicalPlan] = timeutil.Now() @@ -845,99 +878,98 @@ func (ex *connExecutor) dispatchToExecutionEngine( queryMeta.isDistributed = distributePlan ex.mu.Unlock() - samplePlanDescription := ex.sampleLogicalPlanDescription(stmt, flags.IsSet(planFlagOptUsed), planner) + // We need to set the "exec done" flag early because + // curPlan.close(), which will need to observe it, may be closed + // during execution (distsqlrun.PlanAndRun). + // + // TODO(knz): This is a mis-design. Andrei says "it's OK if + // execution closes the plan" but it transfers responsibility to + // run any "finalizers" on the plan (including plan sampling for + // stats) to the execution engine. That's a lot of responsibility + // to transfer! It would be better if this responsibility remained + // around here. + planner.curPlan.flags.Set(planFlagExecDone) if distributePlan { - flags.Set(planFlagDistributed) + planner.curPlan.flags.Set(planFlagDistributed) } else { - flags.Set(planFlagDistSQLLocal) + planner.curPlan.flags.Set(planFlagDistSQLLocal) } ex.sessionTracing.TraceExecStart(ctx, "distributed") err = ex.execWithDistSQLEngine(ctx, planner, stmt.AST.StatementType(), res, distributePlan) ex.sessionTracing.TraceExecEnd(ctx, res.Err(), res.RowsAffected()) planner.statsCollector.PhaseTimes()[plannerEndExecStmt] = timeutil.Now() - if err != nil { - return err - } + + // Record the statement summary. This also closes the plan if the + // plan has not been closed earlier. ex.recordStatementSummary( - planner, stmt, samplePlanDescription, flags, + ctx, planner, ex.extraTxnState.autoRetryCounter, res.RowsAffected(), res.Err(), ) if ex.server.cfg.TestingKnobs.AfterExecute != nil { ex.server.cfg.TestingKnobs.AfterExecute(ctx, stmt.String(), res.Err()) } - return nil + return err } // makeExecPlan creates an execution plan and populates planner.curPlan, using // either the optimizer or the heuristic planner. -func (ex *connExecutor) makeExecPlan( - ctx context.Context, stmt Statement, planner *planner, -) (planFlags, error) { +func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) error { + stmt := planner.stmt // Initialize planner.curPlan.AST early; it might be used by maybeLogStatement // in error cases. planner.curPlan = planTop{AST: stmt.AST} - var flags planFlags var isCorrelated bool if optMode := ex.sessionData.OptimizerMode; optMode != sessiondata.OptimizerOff { log.VEvent(ctx, 2, "generating optimizer plan") var result *planTop var err error - result, flags, isCorrelated, err = planner.makeOptimizerPlan(ctx, stmt) + result, isCorrelated, err = planner.makeOptimizerPlan(ctx) if err == nil { planner.curPlan = *result - return flags, nil + return nil } log.VEventf(ctx, 1, "optimizer plan failed (isCorrelated=%t): %v", isCorrelated, err) if !canFallbackFromOpt(err, optMode, stmt) { - return 0, err + return err } - flags = planFlagOptFallback + planner.curPlan.flags.Set(planFlagOptFallback) log.VEvent(ctx, 1, "optimizer falls back on heuristic planner") } else { log.VEvent(ctx, 2, "optimizer disabled") } // Use the heuristic planner. - err := planner.makePlan(ctx, stmt) + optFlags := planner.curPlan.flags + err := planner.makePlan(ctx) + planner.curPlan.flags |= optFlags enhanceErrWithCorrelation(err, isCorrelated) - return flags, err -} - -// sampleLogicalPlanDescription returns a serialized representation of a statement's logical plan. -// The returned ExplainTreePlanNode will be nil if plan should not be sampled. -func (ex *connExecutor) sampleLogicalPlanDescription( - stmt Statement, optimizerUsed bool, planner *planner, -) *roachpb.ExplainTreePlanNode { - if !sampleLogicalPlans.Get(&ex.appStats.st.SV) { - return nil - } - - if ex.saveLogicalPlanDescription(stmt, optimizerUsed) { - return planToTree(context.Background(), planner.curPlan) - } - return nil + return err } -// saveLogicalPlanDescription returns if we should save this as a sample logical plan +// saveLogicalPlanDescription returns whether we should save this as a sample logical plan // for its corresponding fingerprint. We use `saveFingerprintPlanOnceEvery` // to assess how frequently to sample logical plans. -func (ex *connExecutor) saveLogicalPlanDescription(stmt Statement, optimizerUsed bool) bool { - stats := ex.appStats.getStatsForStmt(stmt, true /* distSQLUsed */, optimizerUsed, nil, false /* createIfNonexistent */) +func (ex *connExecutor) saveLogicalPlanDescription( + stmt *Statement, useDistSQL bool, optimizerUsed bool, err error, +) bool { + stats := ex.appStats.getStatsForStmt( + stmt, useDistSQL, optimizerUsed, err, false /* createIfNonexistent */) if stats == nil { // Save logical plan the first time we see new statement fingerprint. return true } stats.Lock() - defer stats.Unlock() + count := stats.data.Count + stats.Unlock() - return stats.data.Count%saveFingerprintPlanOnceEvery == 0 + return count%saveFingerprintPlanOnceEvery == 0 } // canFallbackFromOpt returns whether we can fallback on the heuristic planner // when the optimizer hits an error. -func canFallbackFromOpt(err error, optMode sessiondata.OptimizerMode, stmt Statement) bool { +func canFallbackFromOpt(err error, optMode sessiondata.OptimizerMode, stmt *Statement) bool { pgerr, ok := err.(*pgerror.Error) if !ok || pgerr.Code != pgerror.CodeFeatureNotSupportedError { // We only fallback on "feature not supported" errors. diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index 5c047513b097..a3c2e9c4c1e7 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -142,7 +142,7 @@ func (ex *connExecutor) prepare( return prepared, nil } prepared.Statement = stmt.Statement - prepared.AnonymizedStr = anonymizeStmt(stmt) + prepared.AnonymizedStr = anonymizeStmt(&stmt) // Point to the prepared state, which can be further populated during query // preparation. @@ -160,7 +160,8 @@ func (ex *connExecutor) prepare( p := &ex.planner ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTimestamp */) - flags, err := ex.populatePrepared(ctx, txn, stmt, placeholderHints, p) + p.stmt = &stmt + flags, err := ex.populatePrepared(ctx, txn, placeholderHints, p) if err != nil { txn.CleanupOnError(ctx, err) return nil, err @@ -180,12 +181,9 @@ func (ex *connExecutor) prepare( // populatePrepared analyzes and type-checks the query and populates // stmt.Prepared. func (ex *connExecutor) populatePrepared( - ctx context.Context, - txn *client.Txn, - stmt Statement, - placeholderHints tree.PlaceholderTypes, - p *planner, + ctx context.Context, txn *client.Txn, placeholderHints tree.PlaceholderTypes, p *planner, ) (planFlags, error) { + stmt := p.stmt if err := p.semaCtx.Placeholders.Init(stmt.NumPlaceholders, placeholderHints); err != nil { return 0, err } @@ -221,7 +219,7 @@ func (ex *connExecutor) populatePrepared( if optMode := ex.sessionData.OptimizerMode; optMode != sessiondata.OptimizerOff { log.VEvent(ctx, 2, "preparing using optimizer") var err error - flags, isCorrelated, err = p.prepareUsingOptimizer(ctx, stmt) + flags, isCorrelated, err = p.prepareUsingOptimizer(ctx) if err == nil { log.VEvent(ctx, 2, "optimizer prepare succeeded") // stmt.Prepared fields have been populated. @@ -231,7 +229,7 @@ func (ex *connExecutor) populatePrepared( if !canFallbackFromOpt(err, optMode, stmt) { return 0, err } - flags = planFlagOptFallback + flags.Set(planFlagOptFallback) log.VEvent(ctx, 1, "prepare falls back on heuristic planner") } else { log.VEvent(ctx, 2, "optimizer disabled (prepare)") @@ -247,7 +245,12 @@ func (ex *connExecutor) populatePrepared( if p.curPlan.plan == nil { // Statement with no result columns and no support for placeholders. - return flags, nil + // + // Note: we're combining `flags` which comes from + // `prepareUsingOptimizer`, with `p.curPlan.flags` which ensures + // the new flags combine with the existing flags (this is used + // e.g. to maintain the count of times the optimizer was used). + return flags | p.curPlan.flags, nil } defer p.curPlan.close(ctx) @@ -262,7 +265,8 @@ func (ex *connExecutor) populatePrepared( return 0, err } prepared.Types = p.semaCtx.Placeholders.Types - return flags, nil + // The flags are combined, see the comment above for why. + return flags | p.curPlan.flags, nil } func (ex *connExecutor) execBind( diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index a11601f97a7e..2246df5acd8a 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -34,7 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" ) @@ -255,6 +255,7 @@ func (dsp *DistSQLPlanner) Run( // We need to close the planNode tree we translated into a DistSQL plan before // flow.Cleanup, which closes memory accounts that expect to be emptied. if planCtx.planner != nil && !planCtx.ignoreClose { + planCtx.planner.curPlan.execErr = recv.resultWriter.Err() planCtx.planner.curPlan.close(ctx) } flow.Cleanup(ctx) diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index da55a86700e7..5641fbb5816b 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -151,7 +151,8 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) { // We need to re-plan every time, since close() below makes // the plan unusable across retries. - if err := p.makePlan(ctx, Statement{Statement: stmt}); err != nil { + p.stmt = &Statement{Statement: stmt} + if err := p.makePlan(ctx); err != nil { t.Fatal(err) } defer p.curPlan.close(ctx) diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index d7cfe9f378d3..b6f21511c7ca 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1739,7 +1739,7 @@ func (s *sqlStatsCollectorImpl) PhaseTimes() *phaseTimes { // // samplePlanDescription can be nil, as these are only sampled periodically per unique fingerprint. func (s *sqlStatsCollectorImpl) RecordStatement( - stmt Statement, + stmt *Statement, samplePlanDescription *roachpb.ExplainTreePlanNode, distSQLUsed bool, optUsed bool, diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index a618db2182ba..da2c8f2331a5 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -15,6 +15,7 @@ package sql import ( + "context" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -86,23 +87,30 @@ var _ metric.Struct = EngineMetrics{} // MetricStruct is part of the metric.Struct interface. func (EngineMetrics) MetricStruct() {} +func (ex *connExecutor) maybeSavePlan( + ctx context.Context, p *planner, +) *roachpb.ExplainTreePlanNode { + if ex.saveLogicalPlanDescription( + p.stmt, + p.curPlan.flags.IsSet(planFlagDistributed), + p.curPlan.flags.IsSet(planFlagOptUsed), + p.curPlan.execErr) { + // If statement plan sample is requested, collect a sample. + return planToTree(ctx, &p.curPlan) + } + return nil +} + // recordStatementSummery gathers various details pertaining to the // last executed statement/query and performs the associated // accounting in the passed-in EngineMetrics. -// - samplePlanDescription can be nil, as these are only sampled periodically per unique fingerprint. // - distSQLUsed reports whether the query was distributed. // - automaticRetryCount is the count of implicit txn retries // so far. // - result is the result set computed by the query/statement. // - err is the error encountered, if any. func (ex *connExecutor) recordStatementSummary( - planner *planner, - stmt Statement, - samplePlanDescription *roachpb.ExplainTreePlanNode, - planFlags planFlags, - automaticRetryCount int, - rowsAffected int, - err error, + ctx context.Context, planner *planner, automaticRetryCount int, rowsAffected int, err error, ) { phaseTimes := planner.statsCollector.PhaseTimes() @@ -127,10 +135,12 @@ func (ex *connExecutor) recordStatementSummary( // overhead latency: txn/retry management, error checking, etc execOverhead := svcLat - processingLat + stmt := planner.stmt + flags := planner.curPlan.flags if automaticRetryCount == 0 { - ex.updateOptCounters(planFlags) + ex.updateOptCounters(flags) m := &ex.metrics.EngineMetrics - if planFlags.IsSet(planFlagDistributed) { + if flags.IsSet(planFlagDistributed) { if _, ok := stmt.AST.(*tree.Select); ok { m.DistSQLSelectCount.Inc(1) } @@ -141,9 +151,15 @@ func (ex *connExecutor) recordStatementSummary( m.SQLServiceLatency.RecordValue(svcLatRaw.Nanoseconds()) } + // Close the plan if this was not done earlier. + // This also ensures that curPlan.savedPlanForStats is + // collected (see maybeSavePlan). + planner.curPlan.execErr = err + planner.curPlan.close(ctx) + planner.statsCollector.RecordStatement( - stmt, samplePlanDescription, - planFlags.IsSet(planFlagDistributed), planFlags.IsSet(planFlagOptUsed), + stmt, planner.curPlan.savedPlanForStats, + flags.IsSet(planFlagDistributed), flags.IsSet(planFlagOptUsed), automaticRetryCount, rowsAffected, err, parseLat, planLat, runLat, svcLat, execOverhead, ) diff --git a/pkg/sql/explain_distsql.go b/pkg/sql/explain_distsql.go index d91f0f4b136e..af94421de7da 100644 --- a/pkg/sql/explain_distsql.go +++ b/pkg/sql/explain_distsql.go @@ -23,7 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" ) // explainDistSQLNode is a planNode that wraps a plan and returns diff --git a/pkg/sql/explain_tree.go b/pkg/sql/explain_tree.go index 44a398d37389..322cba478eb1 100644 --- a/pkg/sql/explain_tree.go +++ b/pkg/sql/explain_tree.go @@ -44,7 +44,7 @@ import ( // leaveNode // keep root node on stack (base case because it's the root). // // and planToTree would return the join node. -func planToTree(ctx context.Context, top planTop) *roachpb.ExplainTreePlanNode { +func planToTree(ctx context.Context, top *planTop) *roachpb.ExplainTreePlanNode { nodeStack := &planNodeStack{} observer := planObserver{ enterNode: func(ctx context.Context, nodeName string, plan planNode) (bool, error) { diff --git a/pkg/sql/explain_tree_test.go b/pkg/sql/explain_tree_test.go index c7d2f3ca6169..6d623fefbc8b 100644 --- a/pkg/sql/explain_tree_test.go +++ b/pkg/sql/explain_tree_test.go @@ -375,10 +375,11 @@ func assertExpectedPlansForTests(t *testing.T, sqlSetup string, plansToTest []*T if err != nil { t.Fatal(err) } - if err := p.makePlan(ctx, Statement{Statement: stmt}); err != nil { + p.stmt = &Statement{Statement: stmt} + if err := p.makePlan(ctx); err != nil { t.Fatal(err) } - actualPlanTree := planToTree(ctx, p.curPlan) + actualPlanTree := planToTree(ctx, &p.curPlan) assert.Equal(t, test.ExpectedPlanTree, actualPlanTree, "planToTree for %s:\nexpected:%s\nactual:%s", test.SQL, test.ExpectedPlanTree, actualPlanTree) actualPlanString := planToString(ctx, p.curPlan.plan, p.curPlan.subqueryPlans) diff --git a/pkg/sql/parallel_stmts_test.go b/pkg/sql/parallel_stmts_test.go index 2b5acbea8158..921a5660d110 100644 --- a/pkg/sql/parallel_stmts_test.go +++ b/pkg/sql/parallel_stmts_test.go @@ -329,7 +329,8 @@ func planQuery(t *testing.T, s serverutils.TestServerInterface, sql string) (*pl if err != nil { t.Fatal(err) } - if err := p.makePlan(context.TODO(), Statement{Statement: stmt}); err != nil { + p.stmt = &Statement{Statement: stmt} + if err := p.makePlan(context.TODO()); err != nil { t.Fatal(err) } return p, func() { diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 1ff19ada855c..a1bef1c46bd1 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -53,7 +53,7 @@ type planMaker interface { // iterating using curPlan.plan.Next() and curPlan.plan.Values() in // order to retrieve matching rows. Finally, the plan must be closed // with curPlan.close(). - makePlan(ctx context.Context, stmt Statement) error + makePlan(ctx context.Context) error // prepare does the same checks as makePlan but skips building some // data structures necessary for execution, based on the assumption @@ -291,6 +291,20 @@ type planTop struct { // auditEvents becomes non-nil if any of the descriptors used by // current statement is causing an auditing event. See exec_log.go. auditEvents []auditEvent + + // flags is populated during planning and execution. + flags planFlags + + // execErr retains the last execution error, if any. + execErr error + + // maybeSavePlan, if defined, is called during close() to + // conditionally save the logical plan to savedPlanForStats. + maybeSavePlan func(context.Context) *roachpb.ExplainTreePlanNode + + // savedPlanForStats is conditionally populated at the end of + // statement execution, for registration in statement statistics. + savedPlanForStats *roachpb.ExplainTreePlanNode } // makePlan implements the Planner interface. It populates the @@ -301,8 +315,9 @@ type planTop struct { // // After makePlan(), the caller should be careful to also call // p.curPlan.Close(). -func (p *planner) makePlan(ctx context.Context, stmt Statement) error { +func (p *planner) makePlan(ctx context.Context) error { // Reinitialize. + stmt := p.stmt p.curPlan = planTop{AST: stmt.AST} log.VEvent(ctx, 2, "heuristic planner starts") @@ -393,6 +408,9 @@ func (p *planner) hideHiddenColumns( // close ensures that the plan's resources have been deallocated. func (p *planTop) close(ctx context.Context) { if p.plan != nil { + if p.maybeSavePlan != nil && p.flags.IsSet(planFlagExecDone) { + p.savedPlanForStats = p.maybeSavePlan(ctx) + } p.plan.Close(ctx) p.plan = nil } @@ -922,6 +940,9 @@ const ( // planFlagDistSQLLocal is set if the plan is for the DistSQL engine, // but in local mode. planFlagDistSQLLocal + + // planFlagExecDone marks that execution has been completed. + planFlagExecDone ) func (pf planFlags) IsSet(flag planFlags) bool { diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index a4a3f120180a..7879696e142e 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -39,14 +39,15 @@ import ( // isCorrelated is set in error cases if we detect a correlated subquery; it is // used in the fallback case to create a better error. func (p *planner) prepareUsingOptimizer( - ctx context.Context, stmt Statement, + ctx context.Context, ) (_ planFlags, isCorrelated bool, _ error) { + stmt := p.stmt if err := checkOptSupportForTopStatement(stmt.AST); err != nil { return 0, false, err } var opc optPlanningCtx - opc.init(p, stmt) + opc.init(p) if opc.useCache { cachedData, ok := p.execCfg.QueryCache.Find(&p.queryCacheSession, stmt.SQL) @@ -119,19 +120,18 @@ func (p *planner) prepareUsingOptimizer( // // isCorrelated is set in error cases if we detect a correlated subquery; it is // used in the fallback case to create a better error. -func (p *planner) makeOptimizerPlan( - ctx context.Context, stmt Statement, -) (_ *planTop, _ planFlags, isCorrelated bool, _ error) { +func (p *planner) makeOptimizerPlan(ctx context.Context) (_ *planTop, isCorrelated bool, _ error) { + stmt := p.stmt if err := checkOptSupportForTopStatement(stmt.AST); err != nil { - return nil, 0, false, err + return nil, false, err } var opc optPlanningCtx - opc.init(p, stmt) + opc.init(p) execMemo, isCorrelated, err := opc.buildExecMemo(ctx) if err != nil { - return nil, 0, isCorrelated, err + return nil, isCorrelated, err } // Build the plan tree. @@ -139,22 +139,23 @@ func (p *planner) makeOptimizerPlan( execFactory := makeExecFactory(p) plan, err := execbuilder.New(&execFactory, execMemo, root, p.EvalContext()).Build() if err != nil { - return nil, 0, false, err + return nil, false, err } result := plan.(*planTop) result.AST = stmt.AST + result.flags = opc.flags cols := planColumns(result.plan) if stmt.ExpectedTypes != nil { if !stmt.ExpectedTypes.TypesEqual(cols) { - return nil, 0, false, pgerror.NewError( + return nil, false, pgerror.NewError( pgerror.CodeFeatureNotSupportedError, "cached plan must not change result type", ) } } - return result, opc.flags, false, nil + return result, false, nil } func checkOptSupportForTopStatement(AST tree.Statement) error { @@ -171,8 +172,7 @@ func checkOptSupportForTopStatement(AST tree.Statement) error { } type optPlanningCtx struct { - p *planner - stmt Statement + p *planner catalog optCatalog @@ -186,9 +186,8 @@ type optPlanningCtx struct { flags planFlags } -func (opc *optPlanningCtx) init(p *planner, stmt Statement) { +func (opc *optPlanningCtx) init(p *planner) { opc.p = p - opc.stmt = stmt opc.catalog.init(p.execCfg.TableStatsCache, p) p.optimizer.Init(p.EvalContext()) @@ -205,7 +204,7 @@ func (opc *optPlanningCtx) init(p *planner, stmt Statement) { func (opc *optPlanningCtx) log(ctx context.Context, msg string) { if log.VDepth(1, 1) { - log.InfofDepth(ctx, 1, "%s: %s", msg, opc.stmt) + log.InfofDepth(ctx, 1, "%s: %s", msg, opc.p.stmt) } else { log.Event(ctx, msg) } @@ -229,7 +228,7 @@ func (opc *optPlanningCtx) buildReusableMemo( // that there's even less to do during the EXECUTE phase. // f := p.optimizer.Factory() - bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), &opc.catalog, f, opc.stmt.AST) + bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), &opc.catalog, f, opc.p.stmt.AST) bld.KeepPlaceholders = true if err := bld.Build(); err != nil { return nil, bld.IsCorrelated, err @@ -282,7 +281,7 @@ func (opc *optPlanningCtx) reuseMemo(cachedMemo *memo.Memo) (*memo.Memo, error) func (opc *optPlanningCtx) buildExecMemo( ctx context.Context, ) (_ *memo.Memo, isCorrelated bool, _ error) { - prepared := opc.stmt.Prepared + prepared := opc.p.stmt.Prepared p := opc.p if opc.allowMemoReuse && prepared != nil && prepared.Memo != nil { // We are executing a previously prepared statement and a reusable memo is @@ -304,7 +303,7 @@ func (opc *optPlanningCtx) buildExecMemo( if opc.useCache { // Consult the query cache. - cachedData, ok := p.execCfg.QueryCache.Find(&p.queryCacheSession, opc.stmt.SQL) + cachedData, ok := p.execCfg.QueryCache.Find(&p.queryCacheSession, opc.p.stmt.SQL) if ok { if isStale, err := cachedData.Memo.IsStale(ctx, p.EvalContext(), &opc.catalog); err != nil { return nil, false, err @@ -333,7 +332,7 @@ func (opc *optPlanningCtx) buildExecMemo( // We are executing a statement for which there is no reusable memo // available. f := opc.p.optimizer.Factory() - bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), &opc.catalog, f, opc.stmt.AST) + bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), &opc.catalog, f, opc.p.stmt.AST) if err := bld.Build(); err != nil { return nil, bld.IsCorrelated, err } @@ -345,7 +344,7 @@ func (opc *optPlanningCtx) buildExecMemo( if opc.useCache && !bld.HadPlaceholders { memo := p.optimizer.DetachMemo() cachedData := querycache.CachedData{ - SQL: opc.stmt.SQL, + SQL: opc.p.stmt.SQL, Memo: memo, } p.execCfg.QueryCache.Add(&p.queryCacheSession, &cachedData) diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index fa5c0bd87bac..ec2d1ca7112e 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -551,7 +551,7 @@ type sqlStatsCollector interface { // // samplePlanDescription can be nil, as these are only sampled periodically per unique fingerprint. RecordStatement( - stmt Statement, + stmt *Statement, samplePlanDescription *roachpb.ExplainTreePlanNode, distSQLUsed bool, optUsed bool, diff --git a/pkg/sql/subquery_test.go b/pkg/sql/subquery_test.go index 158465280160..73e668fb0dd7 100644 --- a/pkg/sql/subquery_test.go +++ b/pkg/sql/subquery_test.go @@ -33,7 +33,8 @@ func TestStartSubqueriesReturnsError(t *testing.T) { if err != nil { t.Fatal(err) } - if err := p.makePlan(context.TODO(), Statement{Statement: stmt}); err != nil { + p.stmt = &Statement{Statement: stmt} + if err := p.makePlan(context.TODO()); err != nil { t.Fatal(err) } params := runParams{ctx: context.TODO(), p: p, extendedEvalCtx: &p.extendedEvalCtx} diff --git a/pkg/sql/testutils.go b/pkg/sql/testutils.go index 92a8bbb915d6..9e947d9774ed 100644 --- a/pkg/sql/testutils.go +++ b/pkg/sql/testutils.go @@ -102,7 +102,8 @@ func (dsp *DistSQLPlanner) Exec( return err } p := localPlanner.(*planner) - if err := p.makePlan(ctx, Statement{Statement: stmt}); err != nil { + p.stmt = &Statement{Statement: stmt} + if err := p.makePlan(ctx); err != nil { return err } rw := newCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {