Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix the issue of reusing wrong point-plan for "select ... for update" (#54661) #54680

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/ddl/tests/metadatalock/mdl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,11 @@ func TestMDLPreparePlanCacheExecute(t *testing.T) {

tk.MustQuery("select * from t2")
tk.MustExec(`set @a = 2, @b=4;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`) // can't reuse the prior plan created outside this txn.
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
tk.MustExec(`execute stmt_test_1 using @a, @b;`) // can't reuse the prior plan since this table becomes dirty.
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
tk.MustExec(`execute stmt_test_1 using @a, @b;`) // can't reuse the prior plan now.
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1"))
// The plan is from cache, the metadata lock should be added to block the DDL.
ch <- struct{}{}
Expand Down
46 changes: 46 additions & 0 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,18 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
// cached plan once the schema version is changed.
// Cached plan in prepared struct does NOT have a "cache key" with
// schema version like prepared plan cache key
<<<<<<< HEAD
stmtAst.CachedPlan = nil
stmt.Executor = nil
stmt.ColumnInfos = nil
=======
stmt.PointGet.pointPlan = nil
stmt.PointGet.planCacheKey = ""
stmt.PointGet.columnNames = nil
stmt.PointGet.pointPlanHints = nil
stmt.PointGet.Executor = nil
stmt.PointGet.ColumnInfos = nil
>>>>>>> e1626a9c5b7 (planner: fix the issue of reusing wrong point-plan for "select ... for update" (#54661))
// If the schema version has changed we need to preprocess it again,
// if this time it failed, the real reason for the error is schema changed.
// Example:
Expand Down Expand Up @@ -230,9 +239,32 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
}
}

<<<<<<< HEAD
matchOpts, err := GetMatchOpts(sctx, is, stmt, params)
if err != nil {
return nil, nil, err
=======
var paramTypes []*types.FieldType
if stmtCtx.UseCache() {
var cachedVal *PlanCacheValue
var hit, isPointPlan bool
if stmt.PointGet.pointPlan != nil && stmt.PointGet.planCacheKey == cacheKey { // if it's PointGet Plan, no need to use paramTypes
cachedVal = &PlanCacheValue{
Plan: stmt.PointGet.pointPlan,
OutputColumns: stmt.PointGet.columnNames,
stmtHints: stmt.PointGet.pointPlanHints,
}
isPointPlan, hit = true, true
} else {
paramTypes = parseParamTypes(sctx, params)
cachedVal, hit = lookupPlanCache(ctx, sctx, cacheKey, paramTypes)
}
if hit {
if plan, names, ok, err := adjustCachedPlan(ctx, sctx, cachedVal, isNonPrepared, isPointPlan, binding, is, stmt); err != nil || ok {
return plan, names, err
}
}
>>>>>>> e1626a9c5b7 (planner: fix the issue of reusing wrong point-plan for "select ... for update" (#54661))
}
if stmtCtx.UseCache { // for non-point plans
if plan, names, ok, err := getCachedPlan(sctx, isNonPrepared, cacheKey, bindSQL, is, stmt, matchOpts); err != nil || ok {
Expand Down Expand Up @@ -373,7 +405,21 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlan(p)
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
<<<<<<< HEAD
sctx.GetSessionPlanCache().Put(cacheKey, cached, matchOpts)
=======
if sessVars.EnableInstancePlanCache {
domain.GetDomain(sctx).GetInstancePlanCache().Put(cacheKey, cached, paramTypes)
} else {
sctx.GetSessionPlanCache().Put(cacheKey, cached, paramTypes)
}
if _, ok := p.(*PointGetPlan); ok {
stmt.PointGet.pointPlan = p
stmt.PointGet.columnNames = names
stmt.PointGet.pointPlanHints = stmtCtx.StmtHints.Clone()
stmt.PointGet.planCacheKey = cacheKey
}
>>>>>>> e1626a9c5b7 (planner: fix the issue of reusing wrong point-plan for "select ... for update" (#54661))
}
sessVars.FoundInPlanCache = false
return p, names, err
Expand Down
94 changes: 94 additions & 0 deletions pkg/planner/core/plan_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,3 +1437,97 @@ func TestIndexRange(t *testing.T) {
tk.MustQuery(`SELECT t0.* FROM t0 WHERE (id = 1 or id = 9223372036854775808);`).Check(testkit.Rows("1"))
tk.MustQuery("SELECT t1.c0 FROM t1 WHERE t1.c0!=BIN(-1);").Check(testkit.Rows("1"))
}
<<<<<<< HEAD
=======

func TestPlanCacheDirtyTables(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)

for _, t1Dirty := range []bool{true, false} {
for _, t2Dirty := range []bool{true, false} {
tk.MustExec(`create table t1 (a int);`)
tk.MustExec(`create table t2 (a int);`)
tk.MustExec(`begin`)
tk.MustExec(`prepare st from 'select 1 from t1, t2'`)
if t1Dirty {
tk.MustExec(`insert into t1 values (1)`)
}
if t2Dirty {
tk.MustExec(`insert into t2 values (1)`)
}
tk.MustExec(`execute st`) // generate a cached plan with t1Dirty & t2Dirty
tk.MustExec(`commit`)

// test cases
for _, testT1Dirty := range []bool{true, false} {
for _, testT2Dirty := range []bool{true, false} {
tk.MustExec(`begin`)
if testT1Dirty {
tk.MustExec(`insert into t1 values (1)`)
}
if testT2Dirty {
tk.MustExec(`insert into t2 values (1)`)
}
tk.MustExec(`execute st`)

if testT1Dirty == t1Dirty && testT2Dirty == t2Dirty {
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
} else {
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0"))
}

tk.MustExec(`commit`)
}
}
tk.MustExec(`drop table t1, t2`)
}
}
}

func TestInstancePlanCacheAcrossSession(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec(`use test`)
tk1.MustExec(`create table t (a int)`)
tk1.MustExec(`insert into t values (1), (2), (3), (4), (5)`)
tk1.Session().GetSessionVars().EnableInstancePlanCache = true
tk1.MustExec(`prepare st from 'select a from t where a < ?'`)
tk1.MustExec(`set @a=2`)
tk1.MustQuery(`execute st using @a`).Sort().Check(testkit.Rows(`1`))
tk1.MustExec(`set @a=3`)
tk1.MustQuery(`execute st using @a`).Sort().Check(testkit.Rows(`1`, `2`))
tk1.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))

// session2 can share session1's cached plan
tk2 := testkit.NewTestKit(t, store)
tk2.Session().GetSessionVars().EnableInstancePlanCache = true
tk2.MustExec(`use test`)
tk2.MustExec(`prepare st from 'select a from t where a < ?'`)
tk2.MustExec(`set @a=4`)
tk2.MustQuery(`execute st using @a`).Sort().Check(testkit.Rows(`1`, `2`, `3`))
tk2.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
}

func TestIssue54652(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec(`use test`)
tk.MustExec(`create table t (pk int, a int, primary key(pk))`)
tk.MustExec(`set autocommit=on`)
tk.MustQuery(`select @@autocommit`).Check(testkit.Rows("1"))
tk.MustExec(`set @pk=1`)

tk.MustExec(`prepare st from 'select * from t where pk=? for update'`)
tk.MustExec(`execute st using @pk`)
tk.MustExec(`execute st using @pk`)
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))

tk.MustExec(`begin`)
tk.MustExec(`execute st using @pk`)
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // can't reuse since it's in txn now.
tk.MustExec(`commit`)
}
>>>>>>> e1626a9c5b7 (planner: fix the issue of reusing wrong point-plan for "select ... for update" (#54661))
119 changes: 119 additions & 0 deletions pkg/planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ import (
"unsafe"

"github.com/pingcap/errors"
<<<<<<< HEAD
=======
"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
>>>>>>> e1626a9c5b7 (planner: fix the issue of reusing wrong point-plan for "select ... for update" (#54661))
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -384,7 +390,97 @@ func NewPlanCacheKey(sessionVars *variable.SessionVars, stmtText, stmtDB string,
for k, v := range relatedSchemaVersion {
key.tblVersionMap[k] = v
}
<<<<<<< HEAD
return key, nil
=======
hash = codec.EncodeInt(hash, int64(vars.SelectLimit))
hash = append(hash, hack.Slice(binding)...)
hash = append(hash, hack.Slice(connCollation)...)
hash = append(hash, hack.Slice(strconv.FormatBool(vars.InRestrictedSQL))...)
hash = append(hash, hack.Slice(strconv.FormatBool(variable.RestrictedReadOnly.Load()))...)
hash = append(hash, hack.Slice(strconv.FormatBool(variable.VarTiDBSuperReadOnly.Load()))...)
// expr-pushdown-blacklist can affect query optimization, so we need to consider it in plan cache.
hash = codec.EncodeInt(hash, expression.ExprPushDownBlackListReloadTimeStamp.Load())

// whether this query has sub-query
if stmt.hasSubquery {
if !vars.EnablePlanCacheForSubquery {
return "", "", false, "the switch 'tidb_enable_plan_cache_for_subquery' is off", nil
}
hash = append(hash, '1')
} else {
hash = append(hash, '0')
}

// this variable might affect the plan
hash = append(hash, bool2Byte(vars.ForeignKeyChecks))

// "limit ?" can affect the cached plan: "limit 1" and "limit 10000" should use different plans.
if len(stmt.limits) > 0 {
if !vars.EnablePlanCacheForParamLimit {
return "", "", false, "the switch 'tidb_enable_plan_cache_for_param_limit' is off", nil
}
hash = append(hash, '|')
for _, node := range stmt.limits {
for _, valNode := range []ast.ExprNode{node.Count, node.Offset} {
if valNode == nil {
continue
}
if param, isParam := valNode.(*driver.ParamMarkerExpr); isParam {
typeExpected, val := CheckParamTypeInt64orUint64(param)
if !typeExpected {
return "", "", false, "unexpected value after LIMIT", nil
}
if val > MaxCacheableLimitCount {
return "", "", false, "limit count is too large", nil
}
hash = codec.EncodeUint(hash, val)
}
}
}
hash = append(hash, '|')
}

// stats ver can affect cached plan
if sctx.GetSessionVars().PlanCacheInvalidationOnFreshStats {
var statsVerHash uint64
for _, t := range stmt.tables {
statsVerHash += getLatestVersionFromStatsTable(sctx, t.Meta(), t.Meta().ID) // use '+' as the hash function for simplicity
}
hash = codec.EncodeUint(hash, statsVerHash)
}

// handle dirty tables
dirtyTables := vars.StmtCtx.TblInfo2UnionScan
if len(dirtyTables) > 0 {
dirtyTableIDs := make([]int64, 0, len(dirtyTables)) // TODO: a Pool for this
for t, dirty := range dirtyTables {
if !dirty {
continue
}
dirtyTableIDs = append(dirtyTableIDs, t.ID)
}
sort.Slice(dirtyTableIDs, func(i, j int) bool { return dirtyTableIDs[i] < dirtyTableIDs[j] })
for _, id := range dirtyTableIDs {
hash = codec.EncodeInt(hash, id)
}
}

// txn status
hash = append(hash, '|')
hash = append(hash, bool2Byte(vars.InTxn()))
hash = append(hash, bool2Byte(vars.IsAutocommit()))
hash = append(hash, bool2Byte(config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load()))

return string(hash), binding, true, "", nil
>>>>>>> e1626a9c5b7 (planner: fix the issue of reusing wrong point-plan for "select ... for update" (#54661))
}

func bool2Byte(flag bool) byte {
if flag {
return '1'
}
return '0'
}

// PlanCacheValue stores the cached Statement and StmtNode.
Expand Down Expand Up @@ -487,6 +583,29 @@ func (*PlanCacheQueryFeatures) Leave(in ast.Node) (out ast.Node, ok bool) {
return in, true
}

<<<<<<< HEAD
=======
// PointGetExecutorCache caches the PointGetExecutor to further improve its performance.
// Don't forget to reset this executor when the prior plan is invalid.
type PointGetExecutorCache struct {
// Special (or tricky) optimization for PointGet Plan.
// Store the PointGet Plan in PlanCacheStmt directly to bypass the LRU Cache to gain some performance improvement.
// There is around 3% improvement, BenchmarkPreparedPointGet: 6450 ns/op --> 6250 ns/op.
pointPlan base.Plan
pointPlanHints *hint.StmtHints
columnNames types.NameSlice

// the cache key for this statement, have to check whether the cache key changes before reusing this plan for safety.
planCacheKey string

ColumnInfos any
// Executor is only used for point get scene.
// Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it.
// If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here.
Executor any
}

>>>>>>> e1626a9c5b7 (planner: fix the issue of reusing wrong point-plan for "select ... for update" (#54661))
// PlanCacheStmt store prepared ast from PrepareExec and other related fields
type PlanCacheStmt struct {
PreparedAst *ast.Prepared
Expand Down
8 changes: 6 additions & 2 deletions pkg/planner/core/tests/prepare/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,10 +679,12 @@ func TestPlanCacheUnionScan(t *testing.T) {
tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows())
tk.MustExec("begin")
tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows())
cnt := pb.GetCounter().GetValue()
require.Equal(t, float64(0), cnt) // can't reuse the plan created outside the txn
tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows())
err := counter.Write(pb)
require.NoError(t, err)
cnt := pb.GetCounter().GetValue()
require.Equal(t, float64(1), cnt)
require.Equal(t, float64(0), cnt)
tk.MustExec("insert into t1 values(1)")
// Cached plan is invalid now, it is not chosen and removed.
tk.MustQuery("execute stmt1 using @p0").Check(testkit.Rows(
Expand Down Expand Up @@ -713,6 +715,8 @@ func TestPlanCacheUnionScan(t *testing.T) {
tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows())
tk.MustExec("begin")
tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows())
require.Equal(t, float64(3), cnt) // can't reuse the plan created outside the txn
tk.MustQuery("execute stmt2 using @p0").Check(testkit.Rows())
err = counter.Write(pb)
require.NoError(t, err)
cnt = pb.GetCounter().GetValue()
Expand Down