Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds session option to cache query plans #3246

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 199 additions & 187 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

47 changes: 42 additions & 5 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ func (e *Executor) handleExec(ctx context.Context, session *vtgatepb.Session, sq
// V3 mode.
query, comments := sqlparser.SplitTrailingComments(sql)
vcursor := newVCursorImpl(ctx, session, target, comments, e)
plan, err := e.getPlan(vcursor, query, bindVars)
plan, err := e.getPlan(vcursor,
query,
bindVars,
skipQueryPlanCache(session),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -287,6 +291,22 @@ func (e *Executor) handleSet(ctx context.Context, session *vtgatepb.Session, sql
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected value for client_found_rows: %d", val)
}
case "skip_query_plan_cache":
val, ok := v.(int64)
if !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected value type for skip_query_plan_cache: %T", v)
}
if session.Options == nil {
session.Options = &querypb.ExecuteOptions{}
}
switch val {
case 0:
session.Options.SkipQueryPlanCache = false
case 1:
session.Options.SkipQueryPlanCache = true
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected value for skip_query_plan_cache: %d", val)
}
case "transaction_mode":
val, ok := v.(string)
if !ok {
Expand Down Expand Up @@ -468,7 +488,12 @@ func (e *Executor) StreamExecute(ctx context.Context, session *vtgatepb.Session,
}
query, comments := sqlparser.SplitTrailingComments(sql)
vcursor := newVCursorImpl(ctx, session, target, comments, e)
plan, err := e.getPlan(vcursor, query, bindVars)
plan, err := e.getPlan(
vcursor,
query,
bindVars,
skipQueryPlanCache(session),
)
if err != nil {
return err
}
Expand Down Expand Up @@ -737,7 +762,7 @@ func (e *Executor) ParseTarget(targetString string) querypb.Target {

// getPlan computes the plan for the given query. If one is in
// the cache, it reuses it.
func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, bindVars map[string]*querypb.BindVariable) (*engine.Plan, error) {
func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, bindVars map[string]*querypb.BindVariable, skipQueryPlanCache bool) (*engine.Plan, error) {
if e.VSchema() == nil {
return nil, errors.New("vschema not initialized")
}
Expand All @@ -754,7 +779,9 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, bindVars map[string
if err != nil {
return nil, err
}
e.plans.Set(key, plan)
if !skipQueryPlanCache {
e.plans.Set(key, plan)
}
return plan, nil
}
// Normalize and retry.
Expand All @@ -775,10 +802,20 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, bindVars map[string
if err != nil {
return nil, err
}
e.plans.Set(normkey, plan)
if !skipQueryPlanCache {
e.plans.Set(normkey, plan)
}
return plan, nil
}

// skipQueryPlanCache extracts SkipQueryPlanCache from session
func skipQueryPlanCache(session *vtgatepb.Session) bool {
if session == nil || session.Options == nil {
return false
}
return session.Options.SkipQueryPlanCache
}

// ServeHTTP shows the current plans in the query cache.
func (e *Executor) ServeHTTP(response http.ResponseWriter, request *http.Request) {
if err := acl.CheckAccessHTTP(request, acl.DEBUGGING); err != nil {
Expand Down
71 changes: 59 additions & 12 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ func TestExecutorSet(t *testing.T) {
}, {
in: "set net_read_timeout = 600",
out: &vtgatepb.Session{},
}, {
in: "set skip_query_plan_cache = 1",
out: &vtgatepb.Session{Options: &querypb.ExecuteOptions{SkipQueryPlanCache: true}},
}, {
in: "set skip_query_plan_cache = 0",
out: &vtgatepb.Session{Options: &querypb.ExecuteOptions{}},
}}
for _, tcase := range testcases {
session := &vtgatepb.Session{}
Expand Down Expand Up @@ -727,11 +733,11 @@ func TestGetPlanUnnormalized(t *testing.T) {
unshardedvc := newVCursorImpl(context.Background(), nil, querypb.Target{Keyspace: KsTestUnsharded}, "", r)

query1 := "select * from music_user_map where id = 1"
plan1, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{})
plan1, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
plan2, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{})
plan2, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
Expand All @@ -744,14 +750,14 @@ func TestGetPlanUnnormalized(t *testing.T) {
if keys := r.plans.Keys(); !reflect.DeepEqual(keys, want) {
t.Errorf("Plan keys: %s, want %s", keys, want)
}
plan3, err := r.getPlan(unshardedvc, query1, map[string]*querypb.BindVariable{})
plan3, err := r.getPlan(unshardedvc, query1, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
if plan1 == plan3 {
t.Errorf("getPlan(query1, ks): plans must not be equal: %p %p", plan1, plan3)
}
plan4, err := r.getPlan(unshardedvc, query1, map[string]*querypb.BindVariable{})
plan4, err := r.getPlan(unshardedvc, query1, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
Expand All @@ -767,6 +773,47 @@ func TestGetPlanUnnormalized(t *testing.T) {
}
}

func TestGetPlanCacheUnnormalized(t *testing.T) {
r, _, _, _ := createExecutorEnv()
emptyvc := newVCursorImpl(context.Background(), nil, querypb.Target{}, "", r)
query1 := "select * from music_user_map where id = 1"
_, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */)
if err != nil {
t.Error(err)
}
if r.plans.Size() != 0 {
t.Errorf("getPlan() expected cache to have size 0, but got: %b", r.plans.Size())
}
_, err = r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{}, false /* skipQueryPlanCache */)
if err != nil {
t.Error(err)
}
if r.plans.Size() != 1 {
t.Errorf("getPlan() expected cache to have size 1, but got: %b", r.plans.Size())
}
}

func TestGetPlanCacheNormalized(t *testing.T) {
r, _, _, _ := createExecutorEnv()
r.normalize = true
emptyvc := newVCursorImpl(context.Background(), nil, querypb.Target{}, "", r)
query1 := "select * from music_user_map where id = 1"
_, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */)
if err != nil {
t.Error(err)
}
if r.plans.Size() != 0 {
t.Errorf("getPlan() expected cache to have size 0, but got: %b", r.plans.Size())
}
_, err = r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{}, false /* skipQueryPlanCache */)
if err != nil {
t.Error(err)
}
if r.plans.Size() != 1 {
t.Errorf("getPlan() expected cache to have size 1, but got: %b", r.plans.Size())
}
}

func TestGetPlanNormalized(t *testing.T) {
r, _, _, _ := createExecutorEnv()
r.normalize = true
Expand All @@ -776,11 +823,11 @@ func TestGetPlanNormalized(t *testing.T) {
query1 := "select * from music_user_map where id = 1"
query2 := "select * from music_user_map where id = 2"
normalized := "select * from music_user_map where id = :vtg1"
plan1, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{})
plan1, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
plan2, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{})
plan2, err := r.getPlan(emptyvc, query1, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
Expand All @@ -793,29 +840,29 @@ func TestGetPlanNormalized(t *testing.T) {
if keys := r.plans.Keys(); !reflect.DeepEqual(keys, want) {
t.Errorf("Plan keys: %s, want %s", keys, want)
}
plan3, err := r.getPlan(emptyvc, query2, map[string]*querypb.BindVariable{})
plan3, err := r.getPlan(emptyvc, query2, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
if plan1 != plan3 {
t.Errorf("getPlan(query2): plans must be equal: %p %p", plan1, plan3)
}
plan4, err := r.getPlan(emptyvc, normalized, map[string]*querypb.BindVariable{})
plan4, err := r.getPlan(emptyvc, normalized, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
if plan1 != plan4 {
t.Errorf("getPlan(normalized): plans must be equal: %p %p", plan1, plan4)
}

plan3, err = r.getPlan(unshardedvc, query1, map[string]*querypb.BindVariable{})
plan3, err = r.getPlan(unshardedvc, query1, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
if plan1 == plan3 {
t.Errorf("getPlan(query1, ks): plans must not be equal: %p %p", plan1, plan3)
}
plan4, err = r.getPlan(unshardedvc, query1, map[string]*querypb.BindVariable{})
plan4, err = r.getPlan(unshardedvc, query1, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Error(err)
}
Expand All @@ -831,12 +878,12 @@ func TestGetPlanNormalized(t *testing.T) {
}

// Errors
_, err = r.getPlan(emptyvc, "syntax", map[string]*querypb.BindVariable{})
_, err = r.getPlan(emptyvc, "syntax", map[string]*querypb.BindVariable{}, false)
wantErr := "syntax error at position 7 near 'syntax'"
if err == nil || err.Error() != wantErr {
t.Errorf("getPlan(syntax): %v, want %s", err, wantErr)
}
_, err = r.getPlan(emptyvc, "create table a(id int)", map[string]*querypb.BindVariable{})
_, err = r.getPlan(emptyvc, "create table a(id int)", map[string]*querypb.BindVariable{}, false)
wantErr = "unsupported construct: ddl"
if err == nil || err.Error() != wantErr {
t.Errorf("getPlan(syntax): %v, want %s", err, wantErr)
Expand Down
6 changes: 4 additions & 2 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (qe *QueryEngine) Close() {
}

// GetPlan returns the TabletPlan that for the query. Plans are cached in a cache.LRUCache.
func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats, sql string) (*TabletPlan, error) {
func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats, sql string, skipQueryPlanCache bool) (*TabletPlan, error) {
span := trace.NewSpanFromContext(ctx)
span.StartLocal("QueryEngine.GetPlan")
defer span.Finish()
Expand Down Expand Up @@ -326,7 +326,9 @@ func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats
} else if plan.PlanID == planbuilder.PlanDDL || plan.PlanID == planbuilder.PlanSet {
return plan, nil
}
qe.queries.Set(sql, plan)
if !skipQueryPlanCache {
qe.queries.Set(sql, plan)
}
return plan, nil
}

Expand Down
47 changes: 42 additions & 5 deletions go/vt/vttablet/tabletserver/query_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestGetPlanPanicDuetoEmptyQuery(t *testing.T) {

ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
_, err := qe.GetPlan(ctx, logStats, "")
_, err := qe.GetPlan(ctx, logStats, "", false)
want := "syntax error"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("qe.GetPlan: %v, want %s", err, want)
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestGetMessageStreamPlan(t *testing.T) {
}
}

func TestQueryCache(t *testing.T) {
func TestQueryPlanCache(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
for query, result := range schematest.Queries() {
Expand All @@ -153,14 +153,14 @@ func TestQueryCache(t *testing.T) {
ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
qe.SetQueryCacheCap(1)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false)
if err != nil {
t.Fatal(err)
}
if firstPlan == nil {
t.Fatalf("plan should not be nil")
}
secondPlan, err := qe.GetPlan(ctx, logStats, secondQuery)
secondPlan, err := qe.GetPlan(ctx, logStats, secondQuery, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -170,6 +170,43 @@ func TestQueryCache(t *testing.T) {
expvar.Do(func(kv expvar.KeyValue) {
_ = kv.Value.String()
})
if qe.queries.Size() == 0 {
t.Fatalf("query plan cache should not be 0")
}
qe.ClearQueryPlanCache()
}

func TestNoQueryPlanCache(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
for query, result := range schematest.Queries() {
db.AddQuery(query, result)
}

firstQuery := "select * from test_table_01"
db.AddQuery("select * from test_table_01 where 1 != 1", &sqltypes.Result{})
db.AddQuery("select * from test_table_02 where 1 != 1", &sqltypes.Result{})

qe := newTestQueryEngine(10, 10*time.Second, true)
testUtils := newTestUtils()
dbconfigs := testUtils.newDBConfigs(db)
qe.se.Open(db.ConnParams())
qe.Open(dbconfigs)
defer qe.Close()

ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
qe.SetQueryCacheCap(1)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, true)
if err != nil {
t.Fatal(err)
}
if firstPlan == nil {
t.Fatalf("plan should not be nil")
}
if qe.queries.Size() != 0 {
t.Fatalf("query plan cache should be 0")
}
qe.ClearQueryPlanCache()
}

Expand All @@ -190,7 +227,7 @@ func TestStatsURL(t *testing.T) {
// warm up cache
ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
qe.GetPlan(ctx, logStats, query)
qe.GetPlan(ctx, logStats, query, false)

request, _ := http.NewRequest("GET", "/debug/tablet_plans", nil)
response := httptest.NewRecorder()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,7 +1861,7 @@ func newTransaction(tsv *TabletServer, options *querypb.ExecuteOptions) int64 {

func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, txID int64) *QueryExecutor {
logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutor")
plan, err := tsv.qe.GetPlan(ctx, logStats, sql)
plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false)
if err != nil {
panic(err)
}
Expand Down
Loading