Skip to content

Commit

Permalink
*: switch to flat plan in stmt summary, slow log, etc (#36069)
Browse files Browse the repository at this point in the history
ref #35888
  • Loading branch information
time-and-fate authored Jul 14, 2022
1 parent 704275b commit 7953d60
Show file tree
Hide file tree
Showing 19 changed files with 548 additions and 99 deletions.
10 changes: 5 additions & 5 deletions bindinfo/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func TestUpdateSubqueryCapture(t *testing.T) {
tk.MustExec("admin capture bindings")
rows := tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 1)
bindSQL := "UPDATE /*+ use_index(@`upd_1` `test`.`t1` `idx_b`), use_index(@`sel_1` `test`.`t2` ), hash_join(@`upd_1` `test`.`t1`), use_index(@`sel_2` `test`.`t2` )*/ `test`.`t1` SET `b`=1 WHERE `b` = 2 AND (`a` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1) OR `c` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1))"
bindSQL := "UPDATE /*+ hash_join(@`upd_1` `test`.`t1`), use_index(@`upd_1` `test`.`t1` `idx_b`), use_index(@`sel_1` `test`.`t2` ), use_index(@`sel_2` `test`.`t2` )*/ `test`.`t1` SET `b`=1 WHERE `b` = 2 AND (`a` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1) OR `c` IN (SELECT `a` FROM `test`.`t2` WHERE `b` = 1))"
require.Equal(t, bindSQL, rows[0][1])
tk.MustExec(bindSQL)
require.Len(t, tk.Session().GetSessionVars().StmtCtx.GetWarnings(), 0)
Expand Down Expand Up @@ -566,19 +566,19 @@ func TestIssue25505(t *testing.T) {

spmMap := map[string]string{}
spmMap["with recursive `cte` ( `a` ) as ( select ? union select `a` + ? from `test` . `t1` where `a` < ? ) select * from `cte`"] =
"WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` < 5) SELECT /*+ use_index(@`sel_3` `test`.`t1` `idx_ab`), hash_agg(@`sel_1`)*/ * FROM `cte`"
"WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` < 5) SELECT /*+ hash_agg(@`sel_1`), use_index(@`sel_3` `test`.`t1` `idx_ab`)*/ * FROM `cte`"
spmMap["with recursive `cte1` ( `a` , `b` ) as ( select * from `test` . `t` where `b` = ? union select `a` + ? , `b` + ? from `cte1` where `a` < ? ) select * from `test` . `t`"] =
"WITH RECURSIVE `cte1` (`a`, `b`) AS (SELECT * FROM `test`.`t` WHERE `b` = 1 UNION SELECT `a` + 1,`b` + 1 FROM `cte1` WHERE `a` < 2) SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t`"
spmMap["with `cte1` as ( select * from `test` . `t` ) , `cte2` as ( select ? ) select * from `test` . `t`"] =
"WITH `cte1` AS (SELECT * FROM `test`.`t`), `cte2` AS (SELECT 4) SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t`"
spmMap["with `cte` as ( select * from `test` . `t` where `b` = ? ) select * from `test` . `t`"] =
"WITH `cte` AS (SELECT * FROM `test`.`t` WHERE `b` = 6) SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t`"
spmMap["with recursive `cte` ( `a` ) as ( select ? union select `a` + ? from `test` . `t1` where `a` > ? ) select * from `cte`"] =
"WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` > 5) SELECT /*+ use_index(@`sel_3` `test`.`t1` `idx_b`), hash_agg(@`sel_1`)*/ * FROM `cte`"
"WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` > 5) SELECT /*+ hash_agg(@`sel_1`), use_index(@`sel_3` `test`.`t1` `idx_b`)*/ * FROM `cte`"
spmMap["with `cte` as ( with `cte1` as ( select * from `test` . `t2` where `a` > ? and `b` > ? ) select * from `cte1` ) select * from `cte` join `test` . `t1` on `t1` . `a` = `cte` . `a`"] =
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` > 1 AND `b` > 1) SELECT * FROM `cte1`) SELECT /*+ use_index(@`sel_3` `test`.`t2` `idx_ab`), use_index(@`sel_1` `test`.`t1` `idx_ab`), inl_join(@`sel_1` `test`.`t1`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` > 1 AND `b` > 1) SELECT * FROM `cte1`) SELECT /*+ inl_join(@`sel_1` `test`.`t1`), use_index(@`sel_1` `test`.`t1` `idx_ab`), use_index(@`sel_3` `test`.`t2` `idx_ab`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"
spmMap["with `cte` as ( with `cte1` as ( select * from `test` . `t2` where `a` = ? and `b` = ? ) select * from `cte1` ) select * from `cte` join `test` . `t1` on `t1` . `a` = `cte` . `a`"] =
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` = 1 AND `b` = 1) SELECT * FROM `cte1`) SELECT /*+ use_index(@`sel_3` `test`.`t2` `idx_a`), use_index(@`sel_1` `test`.`t1` `idx_a`), inl_join(@`sel_1` `test`.`t1`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` = 1 AND `b` = 1) SELECT * FROM `cte1`) SELECT /*+ inl_join(@`sel_1` `test`.`t1`), use_index(@`sel_1` `test`.`t1` `idx_a`), use_index(@`sel_3` `test`.`t2` `idx_a`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"

tk.MustExec("with cte as (with cte1 as (select /*+use_index(t2 idx_a)*/ * from t2 where a = 1 and b = 1) select * from cte1) select /*+use_index(t1 idx_a)*/ * from cte join t1 on t1.a=cte.a;")
tk.MustExec("with cte as (with cte1 as (select /*+use_index(t2 idx_a)*/ * from t2 where a = 1 and b = 1) select * from cte1) select /*+use_index(t1 idx_a)*/ * from cte join t1 on t1.a=cte.a;")
Expand Down
109 changes: 73 additions & 36 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
}
a.OutputNames = names
a.Plan = p
a.Ctx.GetSessionVars().StmtCtx.SetPlan(p)
return a.InfoSchema.SchemaMetaVersion(), nil
}

Expand Down Expand Up @@ -847,6 +848,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
a.OutputNames = executorExec.outputNames
a.isPreparedStmt = true
a.Plan = executorExec.plan
a.Ctx.GetSessionVars().StmtCtx.SetPlan(executorExec.plan)
if executorExec.lowerPriority {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}
Expand Down Expand Up @@ -926,6 +928,11 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
a.Ctx.GetTxnWriteThroughputSLI().AddReadKeys(execDetail.ScanDetail.ProcessedKeys)
}
succ := err == nil
if a.Plan != nil {
// If this statement has a Plan, the StmtCtx.plan should have been set when it comes here,
// but we set it again in case we missed some code paths.
sessVars.StmtCtx.SetPlan(a.Plan)
}
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
a.LogSlowQuery(txnTS, succ, hasMoreResults)
a.SummaryStmt(succ)
Expand Down Expand Up @@ -973,6 +980,7 @@ func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
// LogSlowQuery is used to print the slow query in the log files.
func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
sessVars := a.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
level := log.GetLevel()
cfg := config.GetGlobalConfig()
costTime := time.Since(sessVars.StartTime) + sessVars.DurationParse
Expand All @@ -984,15 +992,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
return
}
sql := FormatSQL(a.GetTextToLog())
_, digest := sessVars.StmtCtx.SQLDigest()
_, digest := stmtCtx.SQLDigest()

var indexNames string
if len(sessVars.StmtCtx.IndexNames) > 0 {
if len(stmtCtx.IndexNames) > 0 {
// remove duplicate index.
idxMap := make(map[string]struct{})
buf := bytes.NewBuffer(make([]byte, 0, 4))
buf.WriteByte('[')
for _, idx := range sessVars.StmtCtx.IndexNames {
for _, idx := range stmtCtx.IndexNames {
_, ok := idxMap[idx]
if ok {
continue
Expand All @@ -1006,6 +1014,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
buf.WriteByte(']')
indexNames = buf.String()
}
flat := getFlatPlan(stmtCtx)
var stmtDetail execdetails.StmtExecDetails
stmtDetailRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey)
if stmtDetailRaw != nil {
Expand All @@ -1016,12 +1025,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
if tikvExecDetailRaw != nil {
tikvExecDetail = *(tikvExecDetailRaw.(*util.ExecDetails))
}
execDetail := sessVars.StmtCtx.GetExecDetails()
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed()
_, planDigest := getPlanDigest(sessVars.StmtCtx, a.Plan)
execDetail := stmtCtx.GetExecDetails()
copTaskInfo := stmtCtx.CopTasksDetails()
statsInfos := plannercore.GetStatsInfoFromFlatPlan(flat)
memMax := stmtCtx.MemTracker.MaxConsumed()
diskMax := stmtCtx.DiskTracker.MaxConsumed()
_, planDigest := getPlanDigest(stmtCtx)

resultRows := GetResultRowsCount(stmtCtx, a.Plan)

slowItems := &variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql.String(),
Expand All @@ -1038,7 +1050,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
MemMax: memMax,
DiskMax: diskMax,
Succ: succ,
Plan: getPlanTree(a.Ctx, a.Plan),
Plan: getPlanTree(stmtCtx),
PlanDigest: planDigest.String(),
Prepared: a.isPreparedStmt,
HasMoreResults: hasMoreResults,
Expand All @@ -1049,10 +1061,10 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
PDTotal: time.Duration(atomic.LoadInt64(&tikvExecDetail.WaitPDRespDuration)),
BackoffTotal: time.Duration(atomic.LoadInt64(&tikvExecDetail.BackoffDuration)),
WriteSQLRespTotal: stmtDetail.WriteSQLRespDuration,
ResultRows: GetResultRowsCount(a.Ctx, a.Plan),
ResultRows: resultRows,
ExecRetryCount: a.retryCount,
IsExplicitTxn: sessVars.TxnCtx.IsExplicit,
IsWriteCacheTable: sessVars.StmtCtx.WaitLockLeaseTime > 0,
IsWriteCacheTable: stmtCtx.WaitLockLeaseTime > 0,
}
if a.retryCount > 0 {
slowItems.ExecRetryTime = costTime - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime)
Expand Down Expand Up @@ -1080,15 +1092,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
userString = sessVars.User.String()
}
var tableIDs string
if len(sessVars.StmtCtx.TableIDs) > 0 {
tableIDs = strings.Replace(fmt.Sprintf("%v", sessVars.StmtCtx.TableIDs), " ", ",", -1)
if len(stmtCtx.TableIDs) > 0 {
tableIDs = strings.Replace(fmt.Sprintf("%v", stmtCtx.TableIDs), " ", ",", -1)
}
domain.GetDomain(a.Ctx).LogSlowQuery(&domain.SlowQueryInfo{
SQL: sql.String(),
Digest: digest.String(),
Start: sessVars.StartTime,
Duration: costTime,
Detail: sessVars.StmtCtx.GetExecDetails(),
Detail: stmtCtx.GetExecDetails(),
Succ: succ,
ConnID: sessVars.ConnectionID,
TxnTS: txnTS,
Expand All @@ -1102,8 +1114,8 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
}

// GetResultRowsCount gets the count of the statement result rows.
func GetResultRowsCount(sctx sessionctx.Context, p plannercore.Plan) int64 {
runtimeStatsColl := sctx.GetSessionVars().StmtCtx.RuntimeStatsColl
func GetResultRowsCount(stmtCtx *stmtctx.StatementContext, p plannercore.Plan) int64 {
runtimeStatsColl := stmtCtx.RuntimeStatsColl
if runtimeStatsColl == nil {
return 0
}
Expand All @@ -1115,45 +1127,67 @@ func GetResultRowsCount(sctx sessionctx.Context, p plannercore.Plan) int64 {
return rootStats.GetActRows()
}

// getFlatPlan generates a FlatPhysicalPlan from the plan stored in stmtCtx.plan,
// then stores it in stmtCtx.flatPlan.
func getFlatPlan(stmtCtx *stmtctx.StatementContext) *plannercore.FlatPhysicalPlan {
pp := stmtCtx.GetPlan()
if pp == nil {
return nil
}
if flat := stmtCtx.GetFlatPlan(); flat != nil {
f := flat.(*plannercore.FlatPhysicalPlan)
return f
}
p := pp.(plannercore.Plan)
flat := plannercore.FlattenPhysicalPlan(p, false)
if flat != nil {
stmtCtx.SetFlatPlan(flat)
return flat
}
return nil
}

// getPlanTree will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement.
func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string {
func getPlanTree(stmtCtx *stmtctx.StatementContext) string {
cfg := config.GetGlobalConfig()
if atomic.LoadUint32(&cfg.Instance.RecordPlanInSlowLog) == 0 {
return ""
}
planTree, _ := getEncodedPlan(sctx, p, false)
planTree, _ := getEncodedPlan(stmtCtx, false)
if len(planTree) == 0 {
return planTree
}
return variable.SlowLogPlanPrefix + planTree + variable.SlowLogPlanSuffix
}

// getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement.
func getPlanDigest(sc *stmtctx.StatementContext, p plannercore.Plan) (string, *parser.Digest) {
normalized, planDigest := sc.GetPlanDigest()
func getPlanDigest(stmtCtx *stmtctx.StatementContext) (string, *parser.Digest) {
normalized, planDigest := stmtCtx.GetPlanDigest()
if len(normalized) > 0 && planDigest != nil {
return normalized, planDigest
}
normalized, planDigest = plannercore.NormalizePlan(p)
sc.SetPlanDigest(normalized, planDigest)
flat := getFlatPlan(stmtCtx)
normalized, planDigest = plannercore.NormalizeFlatPlan(flat)
stmtCtx.SetPlanDigest(normalized, planDigest)
return normalized, planDigest
}

// getEncodedPlan gets the encoded plan, and generates the hint string if indicated.
func getEncodedPlan(sctx sessionctx.Context, p plannercore.Plan, genHint bool) (encodedPlan, hintStr string) {
func getEncodedPlan(stmtCtx *stmtctx.StatementContext, genHint bool) (encodedPlan, hintStr string) {
var hintSet bool
encodedPlan = sctx.GetSessionVars().StmtCtx.GetEncodedPlan()
hintStr, hintSet = sctx.GetSessionVars().StmtCtx.GetPlanHint()
encodedPlan = stmtCtx.GetEncodedPlan()
hintStr, hintSet = stmtCtx.GetPlanHint()
if len(encodedPlan) > 0 && (!genHint || hintSet) {
return
}
flat := getFlatPlan(stmtCtx)
if len(encodedPlan) == 0 {
encodedPlan = plannercore.EncodePlan(p)
sctx.GetSessionVars().StmtCtx.SetEncodedPlan(encodedPlan)
encodedPlan = plannercore.EncodeFlatPlan(flat)
stmtCtx.SetEncodedPlan(encodedPlan)
}
if genHint {
hints := plannercore.GenHintsFromPhysicalPlan(p)
for _, tableHint := range sctx.GetSessionVars().StmtCtx.OriginalTableHints {
hints := plannercore.GenHintsFromFlatPlan(flat)
for _, tableHint := range stmtCtx.OriginalTableHints {
// some hints like 'memory_quota' cannot be extracted from the PhysicalPlan directly,
// so we have to iterate all hints from the customer and keep some other necessary hints.
switch tableHint.HintName.L {
Expand All @@ -1165,7 +1199,7 @@ func getEncodedPlan(sctx sessionctx.Context, p plannercore.Plan, genHint bool) (
}

hintStr = hint.RestoreOptimizerHints(hints)
sctx.GetSessionVars().StmtCtx.SetPlanHint(hintStr)
stmtCtx.SetPlanHint(hintStr)
}
return
}
Expand Down Expand Up @@ -1209,7 +1243,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) {

// No need to encode every time, so encode lazily.
planGenerator := func() (string, string) {
return getEncodedPlan(a.Ctx, a.Plan, !sessVars.InRestrictedSQL)
return getEncodedPlan(stmtCtx, !sessVars.InRestrictedSQL)
}
// Generating plan digest is slow, only generate it once if it's 'Point_Get'.
// If it's a point get, different SQLs leads to different plans, so SQL digest
Expand All @@ -1218,11 +1252,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
var planDigestGen func() string
if a.Plan.TP() == plancodec.TypePointGet {
planDigestGen = func() string {
_, planDigest := getPlanDigest(stmtCtx, a.Plan)
_, planDigest := getPlanDigest(stmtCtx)
return planDigest.String()
}
} else {
_, tmp := getPlanDigest(stmtCtx, a.Plan)
_, tmp := getPlanDigest(stmtCtx)
planDigest = tmp.String()
}

Expand Down Expand Up @@ -1250,6 +1284,9 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
execDetail.BackoffTime += stmtCtx.WaitLockLeaseTime
execDetail.TimeDetail.WaitTime += stmtCtx.WaitLockLeaseTime
}

resultRows := GetResultRowsCount(stmtCtx, a.Plan)

stmtExecInfo := &stmtsummary.StmtExecInfo{
SchemaName: strings.ToLower(sessVars.CurrentDB),
OriginalSQL: sql,
Expand Down Expand Up @@ -1278,7 +1315,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
PlanInBinding: sessVars.FoundInBinding,
ExecRetryCount: a.retryCount,
StmtExecDetails: stmtDetail,
ResultRows: GetResultRowsCount(a.Ctx, a.Plan),
ResultRows: resultRows,
TiKVExecDetails: tikvExecDetail,
Prepared: a.isPreparedStmt,
}
Expand Down Expand Up @@ -1306,7 +1343,7 @@ func (a *ExecStmt) observeStmtBeginForTopSQL(ctx context.Context) context.Contex
vars := a.Ctx.GetSessionVars()
sc := vars.StmtCtx
normalizedSQL, sqlDigest := sc.SQLDigest()
normalizedPlan, planDigest := getPlanDigest(sc, a.Plan)
normalizedPlan, planDigest := getPlanDigest(sc)
var sqlDigestByte, planDigestByte []byte
if sqlDigest != nil {
sqlDigestByte = sqlDigest.Bytes()
Expand Down
1 change: 1 addition & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
if c.Ctx.GetSessionVars().StmtCtx.Priority == mysql.NoPriority {
lowerPriority = needLowerPriority(finalPlan)
}
c.Ctx.GetSessionVars().StmtCtx.SetPlan(finalPlan)
return &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Expand Down
3 changes: 2 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4359,7 +4359,8 @@ func TestGetResultRowsCount(t *testing.T) {
require.NotNil(t, info)
p, ok := info.Plan.(plannercore.Plan)
require.True(t, ok)
cnt := executor.GetResultRowsCount(tk.Session(), p)
cnt := executor.GetResultRowsCount(tk.Session().GetSessionVars().StmtCtx, p)
require.Equal(t, ca.row, cnt, fmt.Sprintf("sql: %v", ca.sql))
require.Equal(t, cnt, ca.row, fmt.Sprintf("sql: %v", ca.sql))
}
}
Expand Down
4 changes: 4 additions & 0 deletions executor/resource_tag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ func TestResourceGroupTag(t *testing.T) {
p, ok := info.Plan.(plannercore.Plan)
require.True(t, ok)
_, expectPlanDigest = plannercore.NormalizePlan(p)

flat := plannercore.FlattenPhysicalPlan(p, false)
_, newPlanDigest := plannercore.NormalizeFlatPlan(flat)
require.Equal(t, expectPlanDigest, newPlanDigest)
}
require.Equal(t, sqlDigest.String(), expectSQLDigest.String(), "%v", ca.sql)
require.Equal(t, planDigest.String(), expectPlanDigest.String())
Expand Down
Loading

0 comments on commit 7953d60

Please sign in to comment.