Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Jan 17, 2023
2 parents 8961afd + 17df596 commit 812612d
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 85 deletions.
12 changes: 6 additions & 6 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool)
return 0, false, true
}
if mustInt64orUint64 {
if expected := checkParamTypeInt64orUint64(v); !expected {
if expected, _ := CheckParamTypeInt64orUint64(v); !expected {
return 0, false, false
}
}
Expand Down Expand Up @@ -2054,19 +2054,19 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool)
return 0, false, false
}

// check param type for plan cache limit, only allow int64 and uint64 now
// CheckParamTypeInt64orUint64 check param type for plan cache limit, only allow int64 and uint64 now
// eg: set @a = 1;
func checkParamTypeInt64orUint64(param *driver.ParamMarkerExpr) bool {
func CheckParamTypeInt64orUint64(param *driver.ParamMarkerExpr) (bool, uint64) {
val := param.GetValue()
switch v := val.(type) {
case int64:
if v >= 0 {
return true
return true, uint64(v)
}
case uint64:
return true
return true, v
}
return false
return false, 0
}

func extractLimitCountOffset(ctx sessionctx.Context, limit *ast.Limit) (count uint64,
Expand Down
22 changes: 13 additions & 9 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,18 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
return plan, names, err
}
}

limitCountAndOffset, paramErr := ExtractLimitFromAst(stmt.PreparedAst.Stmt, sctx)
if paramErr != nil {
return nil, nil, paramErr
}
if stmtCtx.UseCache { // for non-point plans
if plan, names, ok, err := getCachedPlan(sctx, isNonPrepared, cacheKey, bindSQL, is, stmt,
paramTypes); err != nil || ok {
paramTypes, limitCountAndOffset); err != nil || ok {
return plan, names, err
}
}

return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramTypes, bindSQL)
return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramTypes, bindSQL, limitCountAndOffset)
}

// parseParamTypes get parameters' types in PREPARE statement
Expand Down Expand Up @@ -220,12 +223,12 @@ func getCachedPointPlan(stmt *ast.Prepared, sessVars *variable.SessionVars, stmt
}

func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache.Key, bindSQL string,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType) (Plan,
is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType, limitParams []uint64) (Plan,
[]*types.FieldName, bool, error) {
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx

candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes)
candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes, limitParams)
if !exist {
return nil, nil, false, nil
}
Expand Down Expand Up @@ -263,8 +266,9 @@ func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache

// generateNewPlan call the optimizer to generate a new plan for current statement
// and try to add it to cache
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64,
paramTypes []*types.FieldType, bindSQL string) (Plan, []*types.FieldName, error) {
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema,
stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramTypes []*types.FieldType,
bindSQL string, limitParams []uint64) (Plan, []*types.FieldName, error) {
stmtAst := stmt.PreparedAst
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
Expand Down Expand Up @@ -295,11 +299,11 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes)
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes, limitParams)
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlan(p)
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes)
sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes, limitParams)
}
sessVars.FoundInPlanCache = false
return p, names, err
Expand Down
36 changes: 28 additions & 8 deletions planner/core/plan_cache_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type LRUPlanCache struct {
lock sync.Mutex

// pickFromBucket get one element from bucket. The LRUPlanCache can not work if it is nil
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool)
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool)
// onEvict will be called if any eviction happened, only for test use now
onEvict func(kvcache.Key, kvcache.Value)

Expand All @@ -68,7 +68,7 @@ type LRUPlanCache struct {
// NewLRUPlanCache creates a PCLRUCache object, whose capacity is "capacity".
// NOTE: "capacity" should be a positive value.
func NewLRUPlanCache(capacity uint, guard float64, quota uint64,
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache {
pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache {
if capacity < 1 {
capacity = 100
logutil.BgLogger().Info("capacity of LRU cache is less than 1, will use default value(100) init cache")
Expand All @@ -94,13 +94,13 @@ func strHashKey(key kvcache.Key, deepCopy bool) string {
}

// Get tries to find the corresponding value according to the given key.
func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (value kvcache.Value, ok bool) {
func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType, limitParams []uint64) (value kvcache.Value, ok bool) {
l.lock.Lock()
defer l.lock.Unlock()

bucket, bucketExist := l.buckets[strHashKey(key, false)]
if bucketExist {
if element, exist := l.pickFromBucket(bucket, paramTypes); exist {
if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist {
l.lruList.MoveToFront(element)
return element.Value.(*planCacheEntry).PlanValue, true
}
Expand All @@ -109,14 +109,14 @@ func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (valu
}

// Put puts the (key, value) pair into the LRU Cache.
func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType) {
func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType, limitParams []uint64) {
l.lock.Lock()
defer l.lock.Unlock()

hash := strHashKey(key, true)
bucket, bucketExist := l.buckets[hash]
if bucketExist {
if element, exist := l.pickFromBucket(bucket, paramTypes); exist {
if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist {
l.updateInstanceMetric(&planCacheEntry{PlanKey: key, PlanValue: value}, element.Value.(*planCacheEntry))
element.Value.(*planCacheEntry).PlanValue = value
l.lruList.MoveToFront(element)
Expand Down Expand Up @@ -252,16 +252,36 @@ func (l *LRUPlanCache) memoryControl() {
}

// PickPlanFromBucket pick one plan from bucket
func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType) (*list.Element, bool) {
func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType, limitParams []uint64) (*list.Element, bool) {
for k := range bucket {
plan := k.Value.(*planCacheEntry).PlanValue.(*PlanCacheValue)
if plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes) {
ok1 := plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes)
if !ok1 {
continue
}
ok2 := checkUint64SliceIfEqual(plan.limitOffsetAndCount, limitParams)
if ok2 {
return k, true
}
}
return nil, false
}

func checkUint64SliceIfEqual(a, b []uint64) bool {
if (a == nil && b != nil) || (a != nil && b == nil) {
return false
}
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

// updateInstanceMetric update the memory usage and plan num for show in grafana
func (l *LRUPlanCache) updateInstanceMetric(in, out *planCacheEntry) {
updateInstancePlanNum(in, out)
Expand Down
51 changes: 32 additions & 19 deletions planner/core/plan_cache_lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,18 @@ func TestLRUPCPut(t *testing.T) {
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeLong)},
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeInt24)},
}
limitParams := [][]uint64{
{1}, {2}, {3}, {4}, {5},
}

// one key corresponding to multi values
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(1), 10)}
vals[i] = &PlanCacheValue{
ParamTypes: pTypes[i],
ParamTypes: pTypes[i],
limitOffsetAndCount: limitParams[i],
}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], limitParams[i])
}
require.Equal(t, lru.size, lru.capacity)
require.Equal(t, uint(3), lru.size)
Expand Down Expand Up @@ -103,7 +107,7 @@ func TestLRUPCPut(t *testing.T) {

bucket, exist := lru.buckets[string(hack.String(keys[i].Hash()))]
require.True(t, exist)
element, exist := lru.pickFromBucket(bucket, pTypes[i])
element, exist := lru.pickFromBucket(bucket, pTypes[i], limitParams[i])
require.NotNil(t, element)
require.True(t, exist)
require.Equal(t, root, element)
Expand Down Expand Up @@ -131,22 +135,25 @@ func TestLRUPCGet(t *testing.T) {
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeLong)},
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeInt24)},
}
limitParams := [][]uint64{
{1}, {2}, {3}, {4}, {5},
}
// 5 bucket
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i%4), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i], limitOffsetAndCount: limitParams[i]}
lru.Put(keys[i], vals[i], pTypes[i], limitParams[i])
}

// test for non-existent elements
for i := 0; i < 2; i++ {
value, exists := lru.Get(keys[i], pTypes[i])
value, exists := lru.Get(keys[i], pTypes[i], limitParams[i])
require.False(t, exists)
require.Nil(t, value)
}

for i := 2; i < 5; i++ {
value, exists := lru.Get(keys[i], pTypes[i])
value, exists := lru.Get(keys[i], pTypes[i], limitParams[i])
require.True(t, exists)
require.NotNil(t, value)
require.Equal(t, vals[i], value)
Expand Down Expand Up @@ -175,23 +182,29 @@ func TestLRUPCDelete(t *testing.T) {
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeEnum)},
{types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeDate)},
}
limitParams := [][]uint64{
{1}, {2}, {3},
}
for i := 0; i < 3; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
vals[i] = &PlanCacheValue{
ParamTypes: pTypes[i],
limitOffsetAndCount: limitParams[i],
}
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, 3, int(lru.size))

lru.Delete(keys[1])
value, exists := lru.Get(keys[1], pTypes[1])
value, exists := lru.Get(keys[1], pTypes[1], limitParams[1])
require.False(t, exists)
require.Nil(t, value)
require.Equal(t, 2, int(lru.size))

_, exists = lru.Get(keys[0], pTypes[0])
_, exists = lru.Get(keys[0], pTypes[0], limitParams[0])
require.True(t, exists)

_, exists = lru.Get(keys[2], pTypes[2])
_, exists = lru.Get(keys[2], pTypes[2], limitParams[2])
require.True(t, exists)
}

Expand All @@ -207,14 +220,14 @@ func TestLRUPCDeleteAll(t *testing.T) {
for i := 0; i < 3; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, 3, int(lru.size))

lru.DeleteAll()

for i := 0; i < 3; i++ {
value, exists := lru.Get(keys[i], pTypes[i])
value, exists := lru.Get(keys[i], pTypes[i], []uint64{})
require.False(t, exists)
require.Nil(t, value)
require.Equal(t, 0, int(lru.size))
Expand Down Expand Up @@ -242,7 +255,7 @@ func TestLRUPCSetCapacity(t *testing.T) {
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(1), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, lru.size, lru.capacity)
require.Equal(t, uint(5), lru.size)
Expand Down Expand Up @@ -292,7 +305,7 @@ func TestIssue37914(t *testing.T) {
val := &PlanCacheValue{ParamTypes: pTypes}

require.NotPanics(t, func() {
lru.Put(key, val, pTypes)
lru.Put(key, val, pTypes, []uint64{})
})
}

Expand All @@ -313,7 +326,7 @@ func TestIssue38244(t *testing.T) {
for i := 0; i < 5; i++ {
keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)}
vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]}
lru.Put(keys[i], vals[i], pTypes[i])
lru.Put(keys[i], vals[i], pTypes[i], []uint64{})
}
require.Equal(t, lru.size, lru.capacity)
require.Equal(t, uint(3), lru.size)
Expand All @@ -334,15 +347,15 @@ func TestLRUPlanCacheMemoryUsage(t *testing.T) {
for i := 0; i < 3; i++ {
k := randomPlanCacheKey()
v := randomPlanCacheValue(pTypes)
lru.Put(k, v, pTypes)
lru.Put(k, v, pTypes, []uint64{})
res += k.MemoryUsage() + v.MemoryUsage()
require.Equal(t, lru.MemoryUsage(), res)
}
// evict
p := &PhysicalTableScan{}
k := &planCacheKey{database: "3"}
v := &PlanCacheValue{Plan: p}
lru.Put(k, v, pTypes)
lru.Put(k, v, pTypes, []uint64{})
res += k.MemoryUsage() + v.MemoryUsage()
for kk, vv := range evict {
res -= kk.(*planCacheKey).MemoryUsage() + vv.(*PlanCacheValue).MemoryUsage()
Expand Down
Loading

0 comments on commit 812612d

Please sign in to comment.