From 7953d60c1b5f2f1bca88720fcb578fe3a666f48f Mon Sep 17 00:00:00 2001 From: Zhou Kunqin <25057648+time-and-fate@users.noreply.github.com> Date: Thu, 14 Jul 2022 17:37:06 +0800 Subject: [PATCH] *: switch to flat plan in stmt summary, slow log, etc (#36069) ref pingcap/tidb#35888 --- bindinfo/capture_test.go | 10 +- executor/adapter.go | 109 +++++++++----- executor/compiler.go | 1 + executor/executor_test.go | 3 +- executor/resource_tag_test.go | 4 + infoschema/tables_test.go | 24 +-- planner/core/common_plans.go | 5 +- planner/core/encode.go | 150 ++++++++++++++++++- planner/core/exhaust_physical_plans.go | 1 + planner/core/find_best_task.go | 2 + planner/core/hints.go | 41 +++++ planner/core/physical_plan_test.go | 69 ++++++++- planner/core/plan_test.go | 140 +++++++++++++++-- planner/core/planbuilder.go | 1 + planner/core/testdata/analyze_suite_out.json | 2 +- planner/core/util.go | 19 +++ session/session.go | 1 + sessionctx/stmtctx/stmtctx.go | 44 +++++- util/plancodec/codec.go | 21 +-- 19 files changed, 548 insertions(+), 99 deletions(-) diff --git a/bindinfo/capture_test.go b/bindinfo/capture_test.go index 7ebb419a8adaf..0836196d86852 100644 --- a/bindinfo/capture_test.go +++ b/bindinfo/capture_test.go @@ -441,7 +441,7 @@ func TestUpdateSubqueryCapture(t *testing.T) { tk.MustExec("admin capture bindings") rows := tk.MustQuery("show global bindings").Rows() require.Len(t, rows, 1) - bindSQL := "UPDATE /*+ use_index(@`upd_1` `test`.`t1` `idx_b`), use_index(@`sel_1` `test`.`t2` ), hash_join(@`upd_1` `test`.`t1`), use_index(@`sel_2` `test`.`t2` )*/ `test`.`t1` SET `b`=1 WHERE `b` = 2 AND (`a` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1) OR `c` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1))" + bindSQL := "UPDATE /*+ hash_join(@`upd_1` `test`.`t1`), use_index(@`upd_1` `test`.`t1` `idx_b`), use_index(@`sel_1` `test`.`t2` ), use_index(@`sel_2` `test`.`t2` )*/ `test`.`t1` SET `b`=1 WHERE `b` = 2 AND (`a` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1) OR `c` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1))" require.Equal(t, bindSQL, rows[0][1]) tk.MustExec(bindSQL) require.Len(t, tk.Session().GetSessionVars().StmtCtx.GetWarnings(), 0) @@ -566,7 +566,7 @@ func TestIssue25505(t *testing.T) { spmMap := map[string]string{} spmMap["with recursive `cte` ( `a` ) as ( select ? union select `a` + ? from `test` . `t1` where `a` < ? ) select * from `cte`"] = - "WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` < 5) SELECT /*+ use_index(@`sel_3` `test`.`t1` `idx_ab`), hash_agg(@`sel_1`)*/ * FROM `cte`" + "WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` < 5) SELECT /*+ hash_agg(@`sel_1`), use_index(@`sel_3` `test`.`t1` `idx_ab`)*/ * FROM `cte`" spmMap["with recursive `cte1` ( `a` , `b` ) as ( select * from `test` . `t` where `b` = ? union select `a` + ? , `b` + ? from `cte1` where `a` < ? ) select * from `test` . `t`"] = "WITH RECURSIVE `cte1` (`a`, `b`) AS (SELECT * FROM `test`.`t` WHERE `b` = 1 UNION SELECT `a` + 1,`b` + 1 FROM `cte1` WHERE `a` < 2) SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t`" spmMap["with `cte1` as ( select * from `test` . `t` ) , `cte2` as ( select ? ) select * from `test` . `t`"] = @@ -574,11 +574,11 @@ func TestIssue25505(t *testing.T) { spmMap["with `cte` as ( select * from `test` . `t` where `b` = ? ) select * from `test` . `t`"] = "WITH `cte` AS (SELECT * FROM `test`.`t` WHERE `b` = 6) SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t`" spmMap["with recursive `cte` ( `a` ) as ( select ? union select `a` + ? from `test` . `t1` where `a` > ? ) select * from `cte`"] = - "WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` > 5) SELECT /*+ use_index(@`sel_3` `test`.`t1` `idx_b`), hash_agg(@`sel_1`)*/ * FROM `cte`" + "WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` > 5) SELECT /*+ hash_agg(@`sel_1`), use_index(@`sel_3` `test`.`t1` `idx_b`)*/ * FROM `cte`" spmMap["with `cte` as ( with `cte1` as ( select * from `test` . `t2` where `a` > ? and `b` > ? ) select * from `cte1` ) select * from `cte` join `test` . `t1` on `t1` . `a` = `cte` . `a`"] = - "WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` > 1 AND `b` > 1) SELECT * FROM `cte1`) SELECT /*+ use_index(@`sel_3` `test`.`t2` `idx_ab`), use_index(@`sel_1` `test`.`t1` `idx_ab`), inl_join(@`sel_1` `test`.`t1`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`" + "WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` > 1 AND `b` > 1) SELECT * FROM `cte1`) SELECT /*+ inl_join(@`sel_1` `test`.`t1`), use_index(@`sel_1` `test`.`t1` `idx_ab`), use_index(@`sel_3` `test`.`t2` `idx_ab`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`" spmMap["with `cte` as ( with `cte1` as ( select * from `test` . `t2` where `a` = ? and `b` = ? ) select * from `cte1` ) select * from `cte` join `test` . `t1` on `t1` . `a` = `cte` . `a`"] = - "WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` = 1 AND `b` = 1) SELECT * FROM `cte1`) SELECT /*+ use_index(@`sel_3` `test`.`t2` `idx_a`), use_index(@`sel_1` `test`.`t1` `idx_a`), inl_join(@`sel_1` `test`.`t1`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`" + "WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` = 1 AND `b` = 1) SELECT * FROM `cte1`) SELECT /*+ inl_join(@`sel_1` `test`.`t1`), use_index(@`sel_1` `test`.`t1` `idx_a`), use_index(@`sel_3` `test`.`t2` `idx_a`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`" tk.MustExec("with cte as (with cte1 as (select /*+use_index(t2 idx_a)*/ * from t2 where a = 1 and b = 1) select * from cte1) select /*+use_index(t1 idx_a)*/ * from cte join t1 on t1.a=cte.a;") tk.MustExec("with cte as (with cte1 as (select /*+use_index(t2 idx_a)*/ * from t2 where a = 1 and b = 1) select * from cte1) select /*+use_index(t1 idx_a)*/ * from cte join t1 on t1.a=cte.a;") diff --git a/executor/adapter.go b/executor/adapter.go index 1ac4ecef0194d..88bd635b6e670 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -332,6 +332,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { } a.OutputNames = names a.Plan = p + a.Ctx.GetSessionVars().StmtCtx.SetPlan(p) return a.InfoSchema.SchemaMetaVersion(), nil } @@ -847,6 +848,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { a.OutputNames = executorExec.outputNames a.isPreparedStmt = true a.Plan = executorExec.plan + a.Ctx.GetSessionVars().StmtCtx.SetPlan(executorExec.plan) if executorExec.lowerPriority { ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow } @@ -926,6 +928,11 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo a.Ctx.GetTxnWriteThroughputSLI().AddReadKeys(execDetail.ScanDetail.ProcessedKeys) } succ := err == nil + if a.Plan != nil { + // If this statement has a Plan, the StmtCtx.plan should have been set when it comes here, + // but we set it again in case we missed some code paths. + sessVars.StmtCtx.SetPlan(a.Plan) + } // `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`. a.LogSlowQuery(txnTS, succ, hasMoreResults) a.SummaryStmt(succ) @@ -973,6 +980,7 @@ func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) { // LogSlowQuery is used to print the slow query in the log files. func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { sessVars := a.Ctx.GetSessionVars() + stmtCtx := sessVars.StmtCtx level := log.GetLevel() cfg := config.GetGlobalConfig() costTime := time.Since(sessVars.StartTime) + sessVars.DurationParse @@ -984,15 +992,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { return } sql := FormatSQL(a.GetTextToLog()) - _, digest := sessVars.StmtCtx.SQLDigest() + _, digest := stmtCtx.SQLDigest() var indexNames string - if len(sessVars.StmtCtx.IndexNames) > 0 { + if len(stmtCtx.IndexNames) > 0 { // remove duplicate index. idxMap := make(map[string]struct{}) buf := bytes.NewBuffer(make([]byte, 0, 4)) buf.WriteByte('[') - for _, idx := range sessVars.StmtCtx.IndexNames { + for _, idx := range stmtCtx.IndexNames { _, ok := idxMap[idx] if ok { continue @@ -1006,6 +1014,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { buf.WriteByte(']') indexNames = buf.String() } + flat := getFlatPlan(stmtCtx) var stmtDetail execdetails.StmtExecDetails stmtDetailRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey) if stmtDetailRaw != nil { @@ -1016,12 +1025,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { if tikvExecDetailRaw != nil { tikvExecDetail = *(tikvExecDetailRaw.(*util.ExecDetails)) } - execDetail := sessVars.StmtCtx.GetExecDetails() - copTaskInfo := sessVars.StmtCtx.CopTasksDetails() - statsInfos := plannercore.GetStatsInfo(a.Plan) - memMax := sessVars.StmtCtx.MemTracker.MaxConsumed() - diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed() - _, planDigest := getPlanDigest(sessVars.StmtCtx, a.Plan) + execDetail := stmtCtx.GetExecDetails() + copTaskInfo := stmtCtx.CopTasksDetails() + statsInfos := plannercore.GetStatsInfoFromFlatPlan(flat) + memMax := stmtCtx.MemTracker.MaxConsumed() + diskMax := stmtCtx.DiskTracker.MaxConsumed() + _, planDigest := getPlanDigest(stmtCtx) + + resultRows := GetResultRowsCount(stmtCtx, a.Plan) + slowItems := &variable.SlowQueryLogItems{ TxnTS: txnTS, SQL: sql.String(), @@ -1038,7 +1050,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { MemMax: memMax, DiskMax: diskMax, Succ: succ, - Plan: getPlanTree(a.Ctx, a.Plan), + Plan: getPlanTree(stmtCtx), PlanDigest: planDigest.String(), Prepared: a.isPreparedStmt, HasMoreResults: hasMoreResults, @@ -1049,10 +1061,10 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { PDTotal: time.Duration(atomic.LoadInt64(&tikvExecDetail.WaitPDRespDuration)), BackoffTotal: time.Duration(atomic.LoadInt64(&tikvExecDetail.BackoffDuration)), WriteSQLRespTotal: stmtDetail.WriteSQLRespDuration, - ResultRows: GetResultRowsCount(a.Ctx, a.Plan), + ResultRows: resultRows, ExecRetryCount: a.retryCount, IsExplicitTxn: sessVars.TxnCtx.IsExplicit, - IsWriteCacheTable: sessVars.StmtCtx.WaitLockLeaseTime > 0, + IsWriteCacheTable: stmtCtx.WaitLockLeaseTime > 0, } if a.retryCount > 0 { slowItems.ExecRetryTime = costTime - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime) @@ -1080,15 +1092,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { userString = sessVars.User.String() } var tableIDs string - if len(sessVars.StmtCtx.TableIDs) > 0 { - tableIDs = strings.Replace(fmt.Sprintf("%v", sessVars.StmtCtx.TableIDs), " ", ",", -1) + if len(stmtCtx.TableIDs) > 0 { + tableIDs = strings.Replace(fmt.Sprintf("%v", stmtCtx.TableIDs), " ", ",", -1) } domain.GetDomain(a.Ctx).LogSlowQuery(&domain.SlowQueryInfo{ SQL: sql.String(), Digest: digest.String(), Start: sessVars.StartTime, Duration: costTime, - Detail: sessVars.StmtCtx.GetExecDetails(), + Detail: stmtCtx.GetExecDetails(), Succ: succ, ConnID: sessVars.ConnectionID, TxnTS: txnTS, @@ -1102,8 +1114,8 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { } // GetResultRowsCount gets the count of the statement result rows. -func GetResultRowsCount(sctx sessionctx.Context, p plannercore.Plan) int64 { - runtimeStatsColl := sctx.GetSessionVars().StmtCtx.RuntimeStatsColl +func GetResultRowsCount(stmtCtx *stmtctx.StatementContext, p plannercore.Plan) int64 { + runtimeStatsColl := stmtCtx.RuntimeStatsColl if runtimeStatsColl == nil { return 0 } @@ -1115,13 +1127,33 @@ func GetResultRowsCount(sctx sessionctx.Context, p plannercore.Plan) int64 { return rootStats.GetActRows() } +// getFlatPlan generates a FlatPhysicalPlan from the plan stored in stmtCtx.plan, +// then stores it in stmtCtx.flatPlan. +func getFlatPlan(stmtCtx *stmtctx.StatementContext) *plannercore.FlatPhysicalPlan { + pp := stmtCtx.GetPlan() + if pp == nil { + return nil + } + if flat := stmtCtx.GetFlatPlan(); flat != nil { + f := flat.(*plannercore.FlatPhysicalPlan) + return f + } + p := pp.(plannercore.Plan) + flat := plannercore.FlattenPhysicalPlan(p, false) + if flat != nil { + stmtCtx.SetFlatPlan(flat) + return flat + } + return nil +} + // getPlanTree will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. -func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string { +func getPlanTree(stmtCtx *stmtctx.StatementContext) string { cfg := config.GetGlobalConfig() if atomic.LoadUint32(&cfg.Instance.RecordPlanInSlowLog) == 0 { return "" } - planTree, _ := getEncodedPlan(sctx, p, false) + planTree, _ := getEncodedPlan(stmtCtx, false) if len(planTree) == 0 { return planTree } @@ -1129,31 +1161,33 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string { } // getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. -func getPlanDigest(sc *stmtctx.StatementContext, p plannercore.Plan) (string, *parser.Digest) { - normalized, planDigest := sc.GetPlanDigest() +func getPlanDigest(stmtCtx *stmtctx.StatementContext) (string, *parser.Digest) { + normalized, planDigest := stmtCtx.GetPlanDigest() if len(normalized) > 0 && planDigest != nil { return normalized, planDigest } - normalized, planDigest = plannercore.NormalizePlan(p) - sc.SetPlanDigest(normalized, planDigest) + flat := getFlatPlan(stmtCtx) + normalized, planDigest = plannercore.NormalizeFlatPlan(flat) + stmtCtx.SetPlanDigest(normalized, planDigest) return normalized, planDigest } // getEncodedPlan gets the encoded plan, and generates the hint string if indicated. -func getEncodedPlan(sctx sessionctx.Context, p plannercore.Plan, genHint bool) (encodedPlan, hintStr string) { +func getEncodedPlan(stmtCtx *stmtctx.StatementContext, genHint bool) (encodedPlan, hintStr string) { var hintSet bool - encodedPlan = sctx.GetSessionVars().StmtCtx.GetEncodedPlan() - hintStr, hintSet = sctx.GetSessionVars().StmtCtx.GetPlanHint() + encodedPlan = stmtCtx.GetEncodedPlan() + hintStr, hintSet = stmtCtx.GetPlanHint() if len(encodedPlan) > 0 && (!genHint || hintSet) { return } + flat := getFlatPlan(stmtCtx) if len(encodedPlan) == 0 { - encodedPlan = plannercore.EncodePlan(p) - sctx.GetSessionVars().StmtCtx.SetEncodedPlan(encodedPlan) + encodedPlan = plannercore.EncodeFlatPlan(flat) + stmtCtx.SetEncodedPlan(encodedPlan) } if genHint { - hints := plannercore.GenHintsFromPhysicalPlan(p) - for _, tableHint := range sctx.GetSessionVars().StmtCtx.OriginalTableHints { + hints := plannercore.GenHintsFromFlatPlan(flat) + for _, tableHint := range stmtCtx.OriginalTableHints { // some hints like 'memory_quota' cannot be extracted from the PhysicalPlan directly, // so we have to iterate all hints from the customer and keep some other necessary hints. switch tableHint.HintName.L { @@ -1165,7 +1199,7 @@ func getEncodedPlan(sctx sessionctx.Context, p plannercore.Plan, genHint bool) ( } hintStr = hint.RestoreOptimizerHints(hints) - sctx.GetSessionVars().StmtCtx.SetPlanHint(hintStr) + stmtCtx.SetPlanHint(hintStr) } return } @@ -1209,7 +1243,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) { // No need to encode every time, so encode lazily. planGenerator := func() (string, string) { - return getEncodedPlan(a.Ctx, a.Plan, !sessVars.InRestrictedSQL) + return getEncodedPlan(stmtCtx, !sessVars.InRestrictedSQL) } // Generating plan digest is slow, only generate it once if it's 'Point_Get'. // If it's a point get, different SQLs leads to different plans, so SQL digest @@ -1218,11 +1252,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) { var planDigestGen func() string if a.Plan.TP() == plancodec.TypePointGet { planDigestGen = func() string { - _, planDigest := getPlanDigest(stmtCtx, a.Plan) + _, planDigest := getPlanDigest(stmtCtx) return planDigest.String() } } else { - _, tmp := getPlanDigest(stmtCtx, a.Plan) + _, tmp := getPlanDigest(stmtCtx) planDigest = tmp.String() } @@ -1250,6 +1284,9 @@ func (a *ExecStmt) SummaryStmt(succ bool) { execDetail.BackoffTime += stmtCtx.WaitLockLeaseTime execDetail.TimeDetail.WaitTime += stmtCtx.WaitLockLeaseTime } + + resultRows := GetResultRowsCount(stmtCtx, a.Plan) + stmtExecInfo := &stmtsummary.StmtExecInfo{ SchemaName: strings.ToLower(sessVars.CurrentDB), OriginalSQL: sql, @@ -1278,7 +1315,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) { PlanInBinding: sessVars.FoundInBinding, ExecRetryCount: a.retryCount, StmtExecDetails: stmtDetail, - ResultRows: GetResultRowsCount(a.Ctx, a.Plan), + ResultRows: resultRows, TiKVExecDetails: tikvExecDetail, Prepared: a.isPreparedStmt, } @@ -1306,7 +1343,7 @@ func (a *ExecStmt) observeStmtBeginForTopSQL(ctx context.Context) context.Contex vars := a.Ctx.GetSessionVars() sc := vars.StmtCtx normalizedSQL, sqlDigest := sc.SQLDigest() - normalizedPlan, planDigest := getPlanDigest(sc, a.Plan) + normalizedPlan, planDigest := getPlanDigest(sc) var sqlDigestByte, planDigestByte []byte if sqlDigest != nil { sqlDigestByte = sqlDigest.Bytes() diff --git a/executor/compiler.go b/executor/compiler.go index f4633c85f70c6..17c09041a5932 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -90,6 +90,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm if c.Ctx.GetSessionVars().StmtCtx.Priority == mysql.NoPriority { lowerPriority = needLowerPriority(finalPlan) } + c.Ctx.GetSessionVars().StmtCtx.SetPlan(finalPlan) return &ExecStmt{ GoCtx: ctx, InfoSchema: is, diff --git a/executor/executor_test.go b/executor/executor_test.go index 2c13530dad3fe..1277386725a6b 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4359,7 +4359,8 @@ func TestGetResultRowsCount(t *testing.T) { require.NotNil(t, info) p, ok := info.Plan.(plannercore.Plan) require.True(t, ok) - cnt := executor.GetResultRowsCount(tk.Session(), p) + cnt := executor.GetResultRowsCount(tk.Session().GetSessionVars().StmtCtx, p) + require.Equal(t, ca.row, cnt, fmt.Sprintf("sql: %v", ca.sql)) require.Equal(t, cnt, ca.row, fmt.Sprintf("sql: %v", ca.sql)) } } diff --git a/executor/resource_tag_test.go b/executor/resource_tag_test.go index 605a7f323b573..db59828aa72be 100644 --- a/executor/resource_tag_test.go +++ b/executor/resource_tag_test.go @@ -196,6 +196,10 @@ func TestResourceGroupTag(t *testing.T) { p, ok := info.Plan.(plannercore.Plan) require.True(t, ok) _, expectPlanDigest = plannercore.NormalizePlan(p) + + flat := plannercore.FlattenPhysicalPlan(p, false) + _, newPlanDigest := plannercore.NormalizeFlatPlan(flat) + require.Equal(t, expectPlanDigest, newPlanDigest) } require.Equal(t, sqlDigest.String(), expectSQLDigest.String(), "%v", ca.sql) require.Equal(t, planDigest.String(), expectPlanDigest.String()) diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 42a1cc3ece50d..5e7b5b3bcc209 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -825,10 +825,10 @@ func TestStmtSummaryTable(t *testing.T) { "max_prewrite_regions, avg_affected_rows, query_sample_text, plan " + "from information_schema.statements_summary " + "where digest_text like 'select * from `t`%'" - tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 1 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" + - "\tIndexLookUp_10 \troot \t100 \t\n" + - "\t├─IndexRangeScan_8\tcop[tikv]\t100 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" + - "\t└─TableRowIDScan_9\tcop[tikv]\t100 \ttable:t, keep order:false, stats:pseudo")) + tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 1 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" + + "\tIndexLookUp_10 \troot \t100 \t\n" + + "\t├─IndexRangeScan_8(Build)\tcop[tikv]\t100 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" + + "\t└─TableRowIDScan_9(Probe)\tcop[tikv]\t100 \ttable:t, keep order:false, stats:pseudo")) // select ... order by tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys, @@ -847,10 +847,10 @@ func TestStmtSummaryTable(t *testing.T) { "from information_schema.statements_summary " + "where digest_text like 'select * from `t`%'" tk.MustQuery(sql).Check(testkit.Rows( - "Select test test.t t:k 2 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" + - "\tIndexLookUp_10 \troot \t100 \t\n" + - "\t├─IndexRangeScan_8\tcop[tikv]\t100 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" + - "\t└─TableRowIDScan_9\tcop[tikv]\t100 \ttable:t, keep order:false, stats:pseudo")) + "Select test test.t t:k 2 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" + + "\tIndexLookUp_10 \troot \t100 \t\n" + + "\t├─IndexRangeScan_8(Build)\tcop[tikv]\t100 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" + + "\t└─TableRowIDScan_9(Probe)\tcop[tikv]\t100 \ttable:t, keep order:false, stats:pseudo")) // Disable it again. tk.MustExec("set global tidb_enable_stmt_summary = false") @@ -894,10 +894,10 @@ func TestStmtSummaryTable(t *testing.T) { "max_prewrite_regions, avg_affected_rows, query_sample_text, plan " + "from information_schema.statements_summary " + "where digest_text like 'select * from `t`%'" - tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 1 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" + - "\tIndexLookUp_10 \troot \t1000 \t\n" + - "\t├─IndexRangeScan_8\tcop[tikv]\t1000 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" + - "\t└─TableRowIDScan_9\tcop[tikv]\t1000 \ttable:t, keep order:false, stats:pseudo")) + tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 1 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" + + "\tIndexLookUp_10 \troot \t1000 \t\n" + + "\t├─IndexRangeScan_8(Build)\tcop[tikv]\t1000 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" + + "\t└─TableRowIDScan_9(Probe)\tcop[tikv]\t1000 \ttable:t, keep order:false, stats:pseudo")) // Disable it in global scope. tk.MustExec("set global tidb_enable_stmt_summary = false") diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index e5c04aa6f538b..31ad78c12036c 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -500,6 +500,7 @@ REBUILD: } cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, isBinProtocol, binVarTypes, txtVarTypes, sessVars.StmtCtx.BindSQL) preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p) + stmtCtx.SetPlan(p) stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest) if cacheVals, exists := sctx.PreparedPlanCache().Get(cacheKey); exists { hitVal := false @@ -564,6 +565,7 @@ func (e *Execute) tryCachePointPlan(ctx context.Context, sctx sessionctx.Context prepared.CachedPlan = p prepared.CachedNames = names preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p) + sctx.GetSessionVars().StmtCtx.SetPlan(p) sctx.GetSessionVars().StmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest) } return err @@ -1251,7 +1253,8 @@ func (e *Explain) RenderResult() error { e.prepareDotInfo(physicalPlan) } case types.ExplainFormatHint: - hints := GenHintsFromPhysicalPlan(e.TargetPlan) + flat := FlattenPhysicalPlan(e.TargetPlan, false) + hints := GenHintsFromFlatPlan(flat) hints = append(hints, hint.ExtractTableHintsFromStmtNode(e.ExecStmt, nil)...) e.Rows = append(e.Rows, []string{hint.RestoreOptimizerHints(hints)}) default: diff --git a/planner/core/encode.go b/planner/core/encode.go index 6df510e7449ed..cc7b3955ecd9c 100644 --- a/planner/core/encode.go +++ b/planner/core/encode.go @@ -18,6 +18,7 @@ import ( "bytes" "crypto/sha256" "hash" + "strconv" "sync" "github.com/pingcap/failpoint" @@ -26,6 +27,106 @@ import ( "github.com/pingcap/tidb/util/plancodec" ) +// EncodeFlatPlan encodes a FlatPhysicalPlan with compression. +func EncodeFlatPlan(flat *FlatPhysicalPlan) string { + if len(flat.Main) == 0 { + return "" + } + // We won't collect the plan when we're in "EXPLAIN FOR" statement and the plan is from EXECUTE statement (please + // read comments of InExecute for details about the meaning of InExecute) because we are unable to get some + // necessary information when the execution of the plan is finished and some states in the session such as + // PreparedParams are cleaned. + // The behavior in BinaryPlanStrFromFlatPlan() is also the same. + if flat.InExecute { + return "" + } + failpoint.Inject("mockPlanRowCount", func(val failpoint.Value) { + selectPlan := flat.Main.GetSelectPlan() + for _, op := range selectPlan { + op.Origin.statsInfo().RowCount = float64(val.(int)) + } + }) + pn := encoderPool.Get().(*planEncoder) + defer func() { + pn.buf.Reset() + encoderPool.Put(pn) + }() + buf := pn.buf + buf.Reset() + opCount := len(flat.Main) + for _, cte := range flat.CTEs { + opCount += len(cte) + } + // assume an operator costs around 80 bytes, preallocate space for them + buf.Grow(80 * opCount) + encodeFlatPlanTree(flat.Main, 0, &buf) + for _, cte := range flat.CTEs { + op := cte[0] + cteDef := cte[0].Origin.(*CTEDefinition) + id := cteDef.CTE.IDForStorage + tp := plancodec.TypeCTEDefinition + taskTypeInfo := plancodec.EncodeTaskType(op.IsRoot, op.StoreType) + p := op.Origin + actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfoStr(p.SCtx(), p, nil) + var estRows float64 + if statsInfo := p.statsInfo(); statsInfo != nil { + estRows = statsInfo.RowCount + } + plancodec.EncodePlanNode( + int(op.Depth), + strconv.Itoa(id)+op.Label.String(), + tp, + estRows, + taskTypeInfo, + op.Origin.ExplainInfo(), + actRows, + analyzeInfo, + memoryInfo, + diskInfo, + &buf, + ) + if len(cte) > 1 { + encodeFlatPlanTree(cte[1:], 1, &buf) + } + } + return plancodec.Compress(buf.Bytes()) +} + +func encodeFlatPlanTree(flatTree FlatPlanTree, offset int, buf *bytes.Buffer) { + for _, op := range flatTree { + taskTypeInfo := plancodec.EncodeTaskType(op.IsRoot, op.StoreType) + p := op.Origin + actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfoStr(p.SCtx(), p, nil) + var estRows float64 + if statsInfo := p.statsInfo(); statsInfo != nil { + estRows = statsInfo.RowCount + } + plancodec.EncodePlanNode( + int(op.Depth), + strconv.Itoa(op.Origin.ID())+op.Label.String(), + op.Origin.TP(), + estRows, + taskTypeInfo, + op.Origin.ExplainInfo(), + actRows, + analyzeInfo, + memoryInfo, + diskInfo, + buf, + ) + + // If NeedReverseDriverSide is true, we stop using the order of the slice and switch to recursively + // call encodeFlatPlanTree to keep build side before probe side. + if op.NeedReverseDriverSide { + buildSide := flatTree[op.ChildrenIdx[1]-offset:] + probeSide := flatTree[op.ChildrenIdx[0]-offset : op.ChildrenIdx[1]-offset] + encodeFlatPlanTree(buildSide, op.ChildrenIdx[1], buf) + encodeFlatPlanTree(probeSide, op.ChildrenIdx[0], buf) + break + } + } +} + var encoderPool = sync.Pool{ New: func() interface{} { return &planEncoder{} @@ -40,6 +141,7 @@ type planEncoder struct { } // EncodePlan is used to encodePlan the plan to the plan tree with compressing. +// Deprecated: FlattenPhysicalPlan() + EncodeFlatPlan() is preferred. func EncodePlan(p Plan) string { if explain, ok := p.(*Explain); ok { p = explain.TargetPlan @@ -84,7 +186,7 @@ func (pn *planEncoder) encodeCTEPlan() { if statsInfo := x.statsInfo(); statsInfo != nil { rowCount = x.statsInfo().RowCount } - plancodec.EncodePlanNode(0, x.CTE.IDForStorage, plancodec.TypeCTEDefinition, rowCount, taskTypeInfo, x.ExplainInfo(), actRows, analyzeInfo, memoryInfo, diskInfo, &pn.buf) + plancodec.EncodePlanNode(0, strconv.Itoa(x.CTE.IDForStorage), plancodec.TypeCTEDefinition, rowCount, taskTypeInfo, x.ExplainInfo(), actRows, analyzeInfo, memoryInfo, diskInfo, &pn.buf) pn.encodePlan(x.SeedPlan, true, kv.TiKV, 1) if x.RecurPlan != nil { pn.encodePlan(x.RecurPlan, true, kv.TiKV, 1) @@ -100,7 +202,7 @@ func (pn *planEncoder) encodePlan(p Plan, isRoot bool, store kv.StoreType, depth if statsInfo := p.statsInfo(); statsInfo != nil { rowCount = p.statsInfo().RowCount } - plancodec.EncodePlanNode(depth, p.ID(), p.TP(), rowCount, taskTypeInfo, p.ExplainInfo(), actRows, analyzeInfo, memoryInfo, diskInfo, &pn.buf) + plancodec.EncodePlanNode(depth, strconv.Itoa(p.ID()), p.TP(), rowCount, taskTypeInfo, p.ExplainInfo(), actRows, analyzeInfo, memoryInfo, diskInfo, &pn.buf) pn.encodedPlans[p.ID()] = true depth++ @@ -152,23 +254,61 @@ type planDigester struct { hasher hash.Hash } +// NormalizeFlatPlan normalizes a FlatPhysicalPlan and generates plan digest. +func NormalizeFlatPlan(flat *FlatPhysicalPlan) (normalized string, digest *parser.Digest) { + selectPlan := flat.Main.GetSelectPlan() + if len(selectPlan) == 0 || !selectPlan[0].IsPhysicalPlan { + return "", parser.NewDigest(nil) + } + d := digesterPool.Get().(*planDigester) + defer func() { + d.buf.Reset() + d.hasher.Reset() + digesterPool.Put(d) + }() + // assume an operator costs around 30 bytes, preallocate space for them + d.buf.Grow(30 * len(selectPlan)) + depthOffset := len(flat.Main) - len(selectPlan) + for _, op := range selectPlan { + taskTypeInfo := plancodec.EncodeTaskTypeForNormalize(op.IsRoot, op.StoreType) + p := op.Origin.(PhysicalPlan) + plancodec.NormalizePlanNode( + int(op.Depth-uint32(depthOffset)), + op.Origin.TP(), + taskTypeInfo, + p.ExplainNormalizedInfo(), + &d.buf, + ) + } + normalized = d.buf.String() + _, err := d.hasher.Write(d.buf.Bytes()) + if err != nil { + panic(err) + } + digest = parser.NewDigest(d.hasher.Sum(nil)) + return +} + // NormalizePlan is used to normalize the plan and generate plan digest. +// Deprecated: FlattenPhysicalPlan() + NormalizeFlatPlan() is preferred. func NormalizePlan(p Plan) (normalized string, digest *parser.Digest) { selectPlan := getSelectPlan(p) if selectPlan == nil { return "", parser.NewDigest(nil) } d := digesterPool.Get().(*planDigester) - defer digesterPool.Put(d) + defer func() { + d.buf.Reset() + d.hasher.Reset() + digesterPool.Put(d) + }() d.normalizePlanTree(selectPlan) normalized = d.buf.String() _, err := d.hasher.Write(d.buf.Bytes()) if err != nil { panic(err) } - d.buf.Reset() digest = parser.NewDigest(d.hasher.Sum(nil)) - d.hasher.Reset() return } diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 77ea6f73d847a..79cc45416fe80 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1071,6 +1071,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask( Columns: ds.Columns, Table: is.Table, TableAsName: ds.TableAsName, + DBName: ds.DBName, isPartition: ds.isPartition, physicalTableID: ds.physicalTableID, tblCols: ds.TblCols, diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 00c5099388e08..12e1685b2f61d 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1401,6 +1401,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, Columns: ds.Columns, Table: is.Table, TableAsName: ds.TableAsName, + DBName: ds.DBName, isPartition: ds.isPartition, physicalTableID: ds.physicalTableID, tblCols: ds.TblCols, @@ -2079,6 +2080,7 @@ func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty, accessCnt := math.Min(candidate.path.CountAfterAccess, float64(len(candidate.path.Ranges))) batchPointGetPlan := BatchPointGetPlan{ ctx: ds.ctx, + dbName: ds.DBName.L, AccessConditions: candidate.path.AccessConds, TblInfo: ds.TableInfo(), KeepOrder: !prop.IsSortItemEmpty(), diff --git a/planner/core/hints.go b/planner/core/hints.go index b4ad15042ef43..241b95f1c47dd 100644 --- a/planner/core/hints.go +++ b/planner/core/hints.go @@ -22,7 +22,44 @@ import ( utilhint "github.com/pingcap/tidb/util/hint" ) +// GenHintsFromFlatPlan generates hints from a FlatPhysicalPlan. +func GenHintsFromFlatPlan(flat *FlatPhysicalPlan) []*ast.TableOptimizerHint { + if len(flat.Main) == 0 { + return nil + } + nodeTp := utilhint.TypeSelect + switch flat.Main[0].Origin.(type) { + case *Update: + nodeTp = utilhint.TypeUpdate + case *Delete: + nodeTp = utilhint.TypeDelete + } + var hints []*ast.TableOptimizerHint + selectPlan := flat.Main.GetSelectPlan() + if len(selectPlan) == 0 || !selectPlan[0].IsPhysicalPlan { + return nil + } + for _, op := range selectPlan { + if !op.IsRoot { + continue + } + p := op.Origin.(PhysicalPlan) + hints = genHintsFromSingle(p, nodeTp, hints) + } + for _, cte := range flat.CTEs { + for i, op := range cte { + if i == 0 || !op.IsRoot { + continue + } + p := op.Origin.(PhysicalPlan) + hints = genHintsFromSingle(p, nodeTp, hints) + } + } + return hints +} + // GenHintsFromPhysicalPlan generates hints from physical plan. +// Deprecated: FlattenPhysicalPlan() + GenHintsFromFlatPlan() is preferred. func GenHintsFromPhysicalPlan(p Plan) []*ast.TableOptimizerHint { var hints []*ast.TableOptimizerHint switch pp := p.(type) { @@ -127,6 +164,10 @@ func genHintsFromPhysicalPlan(p PhysicalPlan, nodeType utilhint.NodeType) (res [ res = append(res, genHintsFromPhysicalPlan(phCte.CTE.recursivePartPhysicalPlan, nodeType)...) } + return genHintsFromSingle(p, nodeType, res) +} + +func genHintsFromSingle(p PhysicalPlan, nodeType utilhint.NodeType, res []*ast.TableOptimizerHint) []*ast.TableOptimizerHint { qbName, err := utilhint.GenerateQBName(nodeType, p.SelectBlockOffset()) if err != nil { return res diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index ec9c71d6fa245..ac00996af0d84 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -278,6 +278,18 @@ func TestDAGPlanTopN(t *testing.T) { } } +func assertSameHints(t *testing.T, expected, actual []*ast.TableOptimizerHint) { + expectedStr := make([]string, 0, len(expected)) + actualStr := make([]string, 0, len(actual)) + for _, h := range expected { + expectedStr = append(expectedStr, hint.RestoreTableOptimizerHint(h)) + } + for _, h := range actual { + actualStr = append(actualStr, hint.RestoreTableOptimizerHint(h)) + } + require.ElementsMatch(t, expectedStr, actualStr) +} + func TestDAGPlanBuilderBasePhysicalPlan(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() @@ -312,7 +324,14 @@ func TestDAGPlanBuilderBasePhysicalPlan(t *testing.T) { output[i].Hints = hint.RestoreOptimizerHints(core.GenHintsFromPhysicalPlan(p)) }) require.Equal(t, output[i].Best, core.ToString(p), fmt.Sprintf("input: %s", tt)) - require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(core.GenHintsFromPhysicalPlan(p)), fmt.Sprintf("input: %s", tt)) + hints := core.GenHintsFromPhysicalPlan(p) + + // test the new genHints code + flat := core.FlattenPhysicalPlan(p, false) + newHints := core.GenHintsFromFlatPlan(flat) + assertSameHints(t, hints, newHints) + + require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(hints), fmt.Sprintf("input: %s", tt)) } } @@ -852,8 +871,14 @@ func TestJoinHints(t *testing.T) { require.Equal(t, stmtctx.WarnLevelWarning, warnings[0].Level) require.Equal(t, output[i].Warning, warnings[0].Err.Error()) } + hints := core.GenHintsFromPhysicalPlan(p) - require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(core.GenHintsFromPhysicalPlan(p)), comment) + // test the new genHints code + flat := core.FlattenPhysicalPlan(p, false) + newHints := core.GenHintsFromFlatPlan(flat) + assertSameHints(t, hints, newHints) + + require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(hints), comment) } } @@ -970,10 +995,10 @@ func TestExplainJoinHints(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, b int, c int, key(b), key(c))") tk.MustQuery("explain format='hint' select /*+ inl_merge_join(t2) */ * from t t1 inner join t t2 on t1.b = t2.b and t1.c = 1").Check(testkit.Rows( - "use_index(@`sel_1` `test`.`t1` `c`), use_index(@`sel_1` `test`.`t2` `b`), inl_merge_join(@`sel_1` `test`.`t2`), inl_merge_join(`t2`)", + "inl_merge_join(@`sel_1` `test`.`t2`), use_index(@`sel_1` `test`.`t1` `c`), use_index(@`sel_1` `test`.`t2` `b`), inl_merge_join(`t2`)", )) tk.MustQuery("explain format='hint' select /*+ inl_hash_join(t2) */ * from t t1 inner join t t2 on t1.b = t2.b and t1.c = 1").Check(testkit.Rows( - "use_index(@`sel_1` `test`.`t1` `c`), use_index(@`sel_1` `test`.`t2` `b`), inl_hash_join(@`sel_1` `test`.`t2`), inl_hash_join(`t2`)", + "inl_hash_join(@`sel_1` `test`.`t2`), use_index(@`sel_1` `test`.`t1` `c`), use_index(@`sel_1` `test`.`t2` `b`), inl_hash_join(`t2`)", )) } @@ -1319,7 +1344,14 @@ func TestIndexHint(t *testing.T) { } else { require.Len(t, warnings, 0, comment) } - require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(core.GenHintsFromPhysicalPlan(p)), comment) + hints := core.GenHintsFromPhysicalPlan(p) + + // test the new genHints code + flat := core.FlattenPhysicalPlan(p, false) + newHints := core.GenHintsFromFlatPlan(flat) + assertSameHints(t, hints, newHints) + + require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(hints), comment) } } @@ -1366,7 +1398,14 @@ func TestIndexMergeHint(t *testing.T) { } else { require.Len(t, warnings, 0, comment) } - require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(core.GenHintsFromPhysicalPlan(p)), comment) + hints := core.GenHintsFromPhysicalPlan(p) + + // test the new genHints code + flat := core.FlattenPhysicalPlan(p, false) + newHints := core.GenHintsFromFlatPlan(flat) + assertSameHints(t, hints, newHints) + + require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(hints), comment) } } @@ -1402,7 +1441,14 @@ func TestQueryBlockHint(t *testing.T) { output[i].Hints = hint.RestoreOptimizerHints(core.GenHintsFromPhysicalPlan(p)) }) require.Equal(t, output[i].Plan, core.ToString(p), comment) - require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(core.GenHintsFromPhysicalPlan(p)), comment) + hints := core.GenHintsFromPhysicalPlan(p) + + // test the new genHints code + flat := core.FlattenPhysicalPlan(p, false) + newHints := core.GenHintsFromFlatPlan(flat) + assertSameHints(t, hints, newHints) + + require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(hints), comment) } } @@ -1442,7 +1488,14 @@ func TestInlineProjection(t *testing.T) { output[i].Hints = hint.RestoreOptimizerHints(core.GenHintsFromPhysicalPlan(p)) }) require.Equal(t, output[i].Plan, core.ToString(p), comment) - require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(core.GenHintsFromPhysicalPlan(p)), comment) + hints := core.GenHintsFromPhysicalPlan(p) + + // test the new genHints code + flat := core.FlattenPhysicalPlan(p, false) + newHints := core.GenHintsFromFlatPlan(flat) + assertSameHints(t, hints, newHints) + + require.Equal(t, output[i].Hints, hint.RestoreOptimizerHints(hints), comment) } } diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 003ca690a206d..8b3c61f7a2316 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -81,7 +81,14 @@ func TestPreferRangeScan(t *testing.T) { require.NotNil(t, info) p, ok := info.Plan.(core.Plan) require.True(t, ok) - normalized, _ := core.NormalizePlan(p) + normalized, digest := core.NormalizePlan(p) + + // test the new normalization code + flat := core.FlattenPhysicalPlan(p, false) + newNormalized, newDigest := core.NormalizeFlatPlan(flat) + require.Equal(t, normalized, newNormalized) + require.Equal(t, digest, newDigest) + normalizedPlan, err := plancodec.DecodeNormalizedPlan(normalized) normalizedPlanRows := getPlanRows(normalizedPlan) require.NoError(t, err) @@ -118,7 +125,14 @@ func TestNormalizedPlan(t *testing.T) { require.NotNil(t, info) p, ok := info.Plan.(core.Plan) require.True(t, ok) - normalized, _ := core.NormalizePlan(p) + normalized, digest := core.NormalizePlan(p) + + // test the new normalization code + flat := core.FlattenPhysicalPlan(p, false) + newNormalized, newDigest := core.NormalizeFlatPlan(flat) + require.Equal(t, normalized, newNormalized) + require.Equal(t, digest, newDigest) + normalizedPlan, err := plancodec.DecodeNormalizedPlan(normalized) normalizedPlanRows := getPlanRows(normalizedPlan) require.NoError(t, err) @@ -159,6 +173,13 @@ func TestNormalizedPlanForDiffStore(t *testing.T) { ep, ok := info.Plan.(*core.Explain) require.True(t, ok) normalized, digest := core.NormalizePlan(ep.TargetPlan) + + // test the new normalization code + flat := core.FlattenPhysicalPlan(ep.TargetPlan, false) + newNormalized, newPlanDigest := core.NormalizeFlatPlan(flat) + require.Equal(t, digest, newPlanDigest) + require.Equal(t, normalized, newNormalized) + normalizedPlan, err := plancodec.DecodeNormalizedPlan(normalized) normalizedPlanRows := getPlanRows(normalizedPlan) require.NoError(t, err) @@ -184,7 +205,7 @@ func TestEncodeDecodePlan(t *testing.T) { tk.MustExec("set tidb_partition_prune_mode='static';") tk.Session().GetSessionVars().PlanID = 0 - getPlanTree := func() string { + getPlanTree := func() (str1, str2 string) { info := tk.Session().ShowProcess() require.NotNil(t, info) p, ok := info.Plan.(core.Plan) @@ -192,41 +213,89 @@ func TestEncodeDecodePlan(t *testing.T) { encodeStr := core.EncodePlan(p) planTree, err := plancodec.DecodePlan(encodeStr) require.NoError(t, err) - return planTree + + // test the new encoding method + flat := core.FlattenPhysicalPlan(p, true) + newEncodeStr := core.EncodeFlatPlan(flat) + newPlanTree, err := plancodec.DecodePlan(newEncodeStr) + require.NoError(t, err) + + return planTree, newPlanTree } tk.MustExec("select max(a) from t1 where a>0;") - planTree := getPlanTree() + planTree, newplanTree := getPlanTree() require.Contains(t, planTree, "time") require.Contains(t, planTree, "loops") - - tk.MustExec("insert into t1 values (1,1,1);") - planTree = getPlanTree() + require.Contains(t, newplanTree, "time") + require.Contains(t, newplanTree, "loops") + + tk.MustExec("prepare stmt from \"select max(a) from t1 where a > ?\";") + tk.MustExec("set @a = 1;") + tk.MustExec("execute stmt using @a;") + planTree, newplanTree = getPlanTree() + require.Empty(t, planTree) + require.Empty(t, newplanTree) + + tk.MustExec("insert into t1 values (1,1,1), (2,2,2);") + planTree, newplanTree = getPlanTree() require.Contains(t, planTree, "Insert") require.Contains(t, planTree, "time") require.Contains(t, planTree, "loops") + require.Contains(t, newplanTree, "Insert") + require.Contains(t, newplanTree, "time") + require.Contains(t, newplanTree, "loops") + + tk.MustExec("update t1 set b = 3 where c = 1;") + planTree, newplanTree = getPlanTree() + require.Contains(t, planTree, "Update") + require.Contains(t, planTree, "time") + require.Contains(t, planTree, "loops") + require.Contains(t, newplanTree, "Update") + require.Contains(t, newplanTree, "time") + require.Contains(t, newplanTree, "loops") + + tk.MustExec("delete from t1 where b = 3;") + planTree, newplanTree = getPlanTree() + require.Contains(t, planTree, "Delete") + require.Contains(t, planTree, "time") + require.Contains(t, planTree, "loops") + require.Contains(t, newplanTree, "Delete") + require.Contains(t, newplanTree, "time") + require.Contains(t, newplanTree, "loops") tk.MustExec("with cte(a) as (select 1) select * from cte") - planTree = getPlanTree() + planTree, newplanTree = getPlanTree() require.Contains(t, planTree, "CTE") require.Contains(t, planTree, "1->Column#1") require.Contains(t, planTree, "time") require.Contains(t, planTree, "loops") + require.Contains(t, newplanTree, "CTE") + require.Contains(t, newplanTree, "1->Column#1") + require.Contains(t, newplanTree, "time") + require.Contains(t, newplanTree, "loops") tk.MustExec("with cte(a) as (select 2) select * from cte") - planTree = getPlanTree() + planTree, newplanTree = getPlanTree() require.Contains(t, planTree, "CTE") require.Contains(t, planTree, "2->Column#1") require.Contains(t, planTree, "time") require.Contains(t, planTree, "loops") + require.Contains(t, newplanTree, "CTE") + require.Contains(t, newplanTree, "2->Column#1") + require.Contains(t, newplanTree, "time") + require.Contains(t, newplanTree, "loops") tk.MustExec("select * from tp") - planTree = getPlanTree() + planTree, newplanTree = getPlanTree() require.Contains(t, planTree, "PartitionUnion") + require.Contains(t, newplanTree, "PartitionUnion") tk.MustExec("select row_number() over (partition by c) from t1;") - planTree = getPlanTree() + planTree, newplanTree = getPlanTree() require.Contains(t, planTree, "Shuffle") require.Contains(t, planTree, "ShuffleReceiver") + require.Contains(t, newplanTree, "Shuffle") + require.Contains(t, newplanTree, "ShuffleReceiver") } func TestNormalizedDigest(t *testing.T) { @@ -419,12 +488,25 @@ func testNormalizeDigest(tk *testkit.TestKit, t *testing.T, sql1, sql2 string, i require.True(t, ok) normalized1, digest1 := core.NormalizePlan(physicalPlan) + // test the new normalization code + flat := core.FlattenPhysicalPlan(physicalPlan, false) + newNormalized, newPlanDigest := core.NormalizeFlatPlan(flat) + require.Equal(t, digest1, newPlanDigest) + require.Equal(t, normalized1, newNormalized) + tk.MustQuery(sql2) info = tk.Session().ShowProcess() require.NotNil(t, info) physicalPlan, ok = info.Plan.(core.PhysicalPlan) require.True(t, ok) normalized2, digest2 := core.NormalizePlan(physicalPlan) + + // test the new normalization code + flat = core.FlattenPhysicalPlan(physicalPlan, false) + newNormalized, newPlanDigest = core.NormalizeFlatPlan(flat) + require.Equal(t, digest2, newPlanDigest) + require.Equal(t, normalized2, newNormalized) + comment := fmt.Sprintf("sql1: %v, sql2: %v\n%v !=\n%v\n", sql1, sql2, normalized1, normalized2) if isSame { require.Equal(t, normalized1, normalized2, comment) @@ -455,8 +537,12 @@ func TestExplainFormatHint(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t (c1 int not null, c2 int not null, key idx_c2(c2)) partition by range (c2) (partition p0 values less than (10), partition p1 values less than (20))") - tk.MustQuery("explain format='hint' select /*+ use_index(@`sel_2` `test`.`t2` `idx_c2`), hash_agg(@`sel_2`), use_index(@`sel_1` `test`.`t1` `idx_c2`), hash_agg(@`sel_1`) */ count(1) from t t1 where c2 in (select c2 from t t2 where t2.c2 < 15 and t2.c2 > 12)").Check(testkit.Rows( - "use_index(@`sel_2` `test`.`t2` `idx_c2`), hash_agg(@`sel_2`), use_index(@`sel_1` `test`.`t1` `idx_c2`), hash_agg(@`sel_1`)")) + tk.MustQuery("explain format='hint'" + + "select " + + "/*+ use_index(@`sel_2` `test`.`t2` `idx_c2`), hash_agg(@`sel_2`), use_index(@`sel_1` `test`.`t1` `idx_c2`), hash_agg(@`sel_1`) */ " + + "count(1) from t t1 " + + "where c2 in (select c2 from t t2 where t2.c2 < 15 and t2.c2 > 12)").Check(testkit.Rows( + "hash_agg(@`sel_1`), hash_agg(@`sel_2`), use_index(@`sel_2` `test`.`t2` `idx_c2`), use_index(@`sel_1` `test`.`t1` `idx_c2`)")) } func TestExplainFormatHintRecoverableForTiFlashReplica(t *testing.T) { @@ -609,6 +695,32 @@ func BenchmarkEncodePlan(b *testing.B) { } } +func BenchmarkEncodeFlatPlan(b *testing.B) { + store, clean := testkit.CreateMockStore(b) + defer clean() + tk := testkit.NewTestKit(b, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists th") + tk.MustExec("set @@session.tidb_enable_table_partition = 1") + tk.MustExec(`set @@tidb_partition_prune_mode='` + string(variable.Static) + `'`) + tk.MustExec("create table th (i int, a int,b int, c int, index (a)) partition by hash (a) partitions 8192;") + tk.MustExec("set @@tidb_slow_log_threshold=200000") + + query := "select count(*) from th t1 join th t2 join th t3 join th t4 join th t5 join th t6 where t1.i=t2.a and t1.i=t3.i and t3.i=t4.i and t4.i=t5.i and t5.i=t6.i" + tk.Session().GetSessionVars().PlanID = 0 + tk.MustExec(query) + info := tk.Session().ShowProcess() + require.NotNil(b, info) + p, ok := info.Plan.(core.PhysicalPlan) + require.True(b, ok) + tk.Session().GetSessionVars().StmtCtx.RuntimeStatsColl = nil + b.ResetTimer() + for i := 0; i < b.N; i++ { + flat := core.FlattenPhysicalPlan(p, false) + core.EncodeFlatPlan(flat) + } +} + // Close issue 25729 func TestIssue25729(t *testing.T) { config.UpdateGlobal(func(conf *config.Config) { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 9b1a1fbe0206b..4e052cb7bc3a3 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1486,6 +1486,7 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReader(ctx context.Context, dbName Columns: idxColInfos, Table: tblInfo, TableAsName: &tblInfo.Name, + DBName: dbName, physicalTableID: physicalID, isPartition: isPartition, tblColHists: &(statistics.PseudoTable(tblInfo)).HistColl, diff --git a/planner/core/testdata/analyze_suite_out.json b/planner/core/testdata/analyze_suite_out.json index 71a54e4d86f28..32924912e9848 100644 --- a/planner/core/testdata/analyze_suite_out.json +++ b/planner/core/testdata/analyze_suite_out.json @@ -17,7 +17,7 @@ { "SQL": "explain format = 'hint' select * from t1, t2 where t1.a = t2.a", "Plan": [ - "use_index(@`sel_1` `test`.`t1` ), use_index(@`sel_1` `test`.`t2` ), hash_join(@`sel_1` `test`.`t1`)" + "hash_join(@`sel_1` `test`.`t1`), use_index(@`sel_1` `test`.`t1` ), use_index(@`sel_1` `test`.`t2` )" ] } ] diff --git a/planner/core/util.go b/planner/core/util.go index ccf43fc3808f7..8cbf54bab31e4 100644 --- a/planner/core/util.go +++ b/planner/core/util.go @@ -256,7 +256,26 @@ func BuildPhysicalJoinSchema(joinType JoinType, join PhysicalPlan) *expression.S return newSchema } +// GetStatsInfoFromFlatPlan gets the statistics info from a FlatPhysicalPlan. +func GetStatsInfoFromFlatPlan(flat *FlatPhysicalPlan) map[string]uint64 { + res := make(map[string]uint64) + for _, op := range flat.Main { + switch p := op.Origin.(type) { + case *PhysicalIndexScan: + if _, ok := res[p.Table.Name.O]; p.stats != nil && !ok { + res[p.Table.Name.O] = p.stats.StatsVersion + } + case *PhysicalTableScan: + if _, ok := res[p.Table.Name.O]; p.stats != nil && !ok { + res[p.Table.Name.O] = p.stats.StatsVersion + } + } + } + return res +} + // GetStatsInfo gets the statistics info from a physical plan tree. +// Deprecated: FlattenPhysicalPlan() + GetStatsInfoFromFlatPlan() is preferred. func GetStatsInfo(i interface{}) map[string]uint64 { if i == nil { // it's a workaround for https://github.com/pingcap/tidb/issues/17419 diff --git a/session/session.go b/session/session.go index 8bbf90a7c8550..e9d22b01a9fa4 100644 --- a/session/session.go +++ b/session/session.go @@ -2312,6 +2312,7 @@ func (s *session) cachedPointPlanExec(ctx context.Context, stmt.Text = prepared.Stmt.Text() stmtCtx.OriginalSQL = stmt.Text + stmtCtx.SetPlan(execPlan) stmtCtx.InitSQLDigest(prepareStmt.NormalizedSQL, prepareStmt.SQLDigest) stmtCtx.SetPlanDigest(prepareStmt.NormalizedPlan, prepareStmt.PlanDigest) logGeneralQuery(stmt, s, false) diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index da79f9c842007..f6be6534be7fd 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -208,12 +208,24 @@ type StatementContext struct { // BindSQL used to construct the key for plan cache. It records the binding used by the stmt. // If the binding is not used by the stmt, the value is empty BindSQL string - // planNormalized use for cache the normalized plan, avoid duplicate builds. - planNormalized string - planDigest *parser.Digest - encodedPlan string - planHint string - planHintSet bool + + // The several fields below are mainly for some diagnostic features, like stmt summary and slow query. + // We cache the values here to avoid calculating them multiple times. + // Note: + // Avoid accessing these fields directly, use their Setter/Getter methods instead. + // Other fields should be the zero value or be consistent with the plan field. + // TODO: more clearly distinguish between the value is empty and the value has not been set + planNormalized string + planDigest *parser.Digest + encodedPlan string + planHint string + planHintSet bool + // To avoid cycle import, we use interface{} for the following two fields. + // flatPlan should be a *plannercore.FlatPhysicalPlan if it's not nil + flatPlan interface{} + // plan should be a plannercore.Plan if it's not nil + plan interface{} + Tables []TableEntry PointExec bool // for point update cached execution, Constant expression need to set "paramMarker" lockWaitStartTime int64 // LockWaitStartTime stores the pessimistic lock wait start time @@ -379,6 +391,26 @@ func (sc *StatementContext) GetPlanDigest() (normalized string, planDigest *pars return sc.planNormalized, sc.planDigest } +// GetPlan gets the plan field of stmtctx +func (sc *StatementContext) GetPlan() interface{} { + return sc.plan +} + +// SetPlan sets the plan field of stmtctx +func (sc *StatementContext) SetPlan(plan interface{}) { + sc.plan = plan +} + +// GetFlatPlan gets the flatPlan field of stmtctx +func (sc *StatementContext) GetFlatPlan() interface{} { + return sc.flatPlan +} + +// SetFlatPlan sets the flatPlan field of stmtctx +func (sc *StatementContext) SetFlatPlan(flat interface{}) { + sc.flatPlan = flat +} + // GetResourceGroupTagger returns the implementation of tikvrpc.ResourceGroupTagger related to self. func (sc *StatementContext) GetResourceGroupTagger() tikvrpc.ResourceGroupTagger { normalized, digest := sc.SQLDigest() diff --git a/util/plancodec/codec.go b/util/plancodec/codec.go index 28284020aca16..8f84c4d40db69 100644 --- a/util/plancodec/codec.go +++ b/util/plancodec/codec.go @@ -25,6 +25,7 @@ import ( "github.com/golang/snappy" "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/texttree" ) @@ -92,14 +93,14 @@ type planInfo struct { } func (pd *planDecoder) decode(planString string) (string, error) { - str, err := decompress(planString) + b, err := decompress(planString) if err != nil { if planString == PlanDiscardedEncoded { return planDiscardedDecoded, nil } return "", err } - return pd.buildPlanTree(str) + return pd.buildPlanTree(string(hack.String(b))) } func (pd *planDecoder) buildPlanTree(planString string) (string, error) { @@ -323,7 +324,7 @@ func decodePlanInfo(str string) (*planInfo, error) { } // EncodePlanNode is used to encode the plan to a string. -func EncodePlanNode(depth, pid int, planType string, rowCount float64, +func EncodePlanNode(depth int, pid, planType string, rowCount float64, taskTypeInfo, explainInfo, actRows, analyzeInfo, memoryInfo, diskInfo string, buf *bytes.Buffer) { explainInfo = escapeString(explainInfo) buf.WriteString(strconv.Itoa(depth)) @@ -371,9 +372,9 @@ func NormalizePlanNode(depth int, planType string, taskTypeInfo string, explainI buf.WriteByte(lineBreaker) } -func encodeID(planType string, id int) string { +func encodeID(planType, id string) string { planID := TypeStringToPhysicalID(planType) - return strconv.Itoa(planID) + idSeparator + strconv.Itoa(id) + return strconv.Itoa(planID) + idSeparator + id } // EncodeTaskType is used to encode task type to a string. @@ -409,21 +410,21 @@ func decodeTaskType(str string) (string, error) { return "cop[" + ((kv.StoreType)(storeType)).Name() + "]", nil } -// Compress is used to compress the input with zlib. +// Compress compresses the input with snappy then encodes it with base64. func Compress(input []byte) string { compressBytes := snappy.Encode(nil, input) return base64.StdEncoding.EncodeToString(compressBytes) } -func decompress(str string) (string, error) { +func decompress(str string) ([]byte, error) { decodeBytes, err := base64.StdEncoding.DecodeString(str) if err != nil { - return "", err + return nil, err } bs, err := snappy.Decode(nil, decodeBytes) if err != nil { - return "", err + return nil, err } - return string(bs), nil + return bs, nil }