Skip to content

Commit

Permalink
Cache stream query plans in vttablet (#13264)
Browse files Browse the repository at this point in the history
Signed-off-by: Samantha Drago-Kramer <[email protected]>
Co-authored-by: Adam Saponara <[email protected]>
  • Loading branch information
samanthadrago and adsr authored Oct 5, 2023
1 parent 52a0f4b commit 963b48d
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 14 deletions.
7 changes: 1 addition & 6 deletions go/vt/vttablet/tabletserver/planbuilder/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,7 @@ func Build(statement sqlparser.Statement, tables map[string]*schema.Table, dbNam
}

// BuildStreaming builds a streaming plan based on the schema.
func BuildStreaming(sql string, tables map[string]*schema.Table) (*Plan, error) {
statement, err := sqlparser.Parse(sql)
if err != nil {
return nil, err
}

func BuildStreaming(statement sqlparser.Statement, tables map[string]*schema.Table) (*Plan, error) {
plan := &Plan{
PlanID: PlanSelectStream,
FullQuery: GenerateFullQuery(statement),
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,12 @@ func TestCustom(t *testing.T) {
func TestStreamPlan(t *testing.T) {
testSchema := loadSchema("schema_test.json")
for tcase := range iterateExecFile("stream_cases.txt") {
plan, err := BuildStreaming(tcase.input, testSchema)
var plan *Plan
var err error
statement, err := sqlparser.Parse(tcase.input)
if err == nil {
plan, err = BuildStreaming(statement, testSchema)
}
var out string
if err != nil {
out = err.Error()
Expand Down
49 changes: 44 additions & 5 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (qe *QueryEngine) getPlan(curSchema *currentSchema, sql string) (*TabletPla
return plan, nil
}

// GetPlan returns the TabletPlan that for the query. Plans are cached in a cache.LRUCache.
// GetPlan returns the TabletPlan that for the query. Plans are cached in a theine LRU cache.
func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats, sql string, skipQueryPlanCache bool) (*TabletPlan, error) {
span, _ := trace.NewSpan(ctx, "QueryEngine.GetPlan")
defer span.Finish()
Expand All @@ -401,19 +401,58 @@ func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats
return plan, err
}

// GetStreamPlan is similar to GetPlan, but doesn't use the cache
// and doesn't enforce a limit. It just returns the parsed query.
func (qe *QueryEngine) GetStreamPlan(sql string) (*TabletPlan, error) {
splan, err := planbuilder.BuildStreaming(sql, qe.schema.Load().tables)
func (qe *QueryEngine) getStreamPlan(curSchema *currentSchema, sql string) (*TabletPlan, error) {
statement, err := sqlparser.Parse(sql)
if err != nil {
return nil, err
}

splan, err := planbuilder.BuildStreaming(statement, curSchema.tables)

if err != nil {
return nil, err
}

plan := &TabletPlan{Plan: splan, Original: sql}
plan.Rules = qe.queryRuleSources.FilterByPlan(sql, plan.PlanID, plan.TableName().String())
plan.buildAuthorized()

if sqlparser.SkipQueryPlanCacheDirective(statement) {
return plan, errNoCache
}

return plan, nil
}

// GetStreamPlan returns the TabletPlan that for the query. Plans are cached in a theine LRU cache.
func (qe *QueryEngine) GetStreamPlan(ctx context.Context, logStats *tabletenv.LogStats, sql string, skipQueryPlanCache bool) (*TabletPlan, error) {
span, _ := trace.NewSpan(ctx, "QueryEngine.GetStreamPlan")
defer span.Finish()

var plan *TabletPlan
var err error

curSchema := qe.schema.Load()

if skipQueryPlanCache {
plan, err = qe.getStreamPlan(curSchema, sql)
} else {
plan, logStats.CachedPlan, err = qe.plans.GetOrLoad(PlanCacheKey(qe.getStreamPlanCacheKey(sql)), curSchema.epoch, func() (*TabletPlan, error) {
return qe.getStreamPlan(curSchema, sql)
})
}

if errors.Is(err, errNoCache) {
err = nil
}
return plan, err
}

// gets key used to cache stream query plan
func (qe *QueryEngine) getStreamPlanCacheKey(sql string) string {
return "__STREAM__" + sql
}

// GetMessageStreamPlan builds a plan for Message streaming.
func (qe *QueryEngine) GetMessageStreamPlan(name string) (*TabletPlan, error) {
splan, err := planbuilder.BuildMessageStreaming(name, qe.schema.Load().tables)
Expand Down
71 changes: 71 additions & 0 deletions go/vt/vttablet/tabletserver/query_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,77 @@ func TestNoQueryPlanCacheDirective(t *testing.T) {
qe.ClearQueryPlanCache()
}

func TestStreamQueryPlanCache(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
schematest.AddDefaultQueries(db)

firstQuery := "select * from test_table_01"
db.AddQuery("select * from test_table_01 where 1 != 1", &sqltypes.Result{})
db.AddQuery("select * from test_table_02 where 1 != 1", &sqltypes.Result{})

qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db))
qe.se.Open()
qe.Open()
defer qe.Close()

ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")

firstPlan, err := qe.GetStreamPlan(ctx, logStats, firstQuery, false)
require.NoError(t, err)
require.NotNil(t, firstPlan, "plan should not be nil")
assertPlanCacheSize(t, qe, 1)
qe.ClearQueryPlanCache()
}

func TestNoStreamQueryPlanCache(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
schematest.AddDefaultQueries(db)

firstQuery := "select * from test_table_01"
db.AddQuery("select * from test_table_01 where 1 != 1", &sqltypes.Result{})
db.AddQuery("select * from test_table_02 where 1 != 1", &sqltypes.Result{})

qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db))
qe.se.Open()
qe.Open()
defer qe.Close()

ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
firstPlan, err := qe.GetStreamPlan(ctx, logStats, firstQuery, true)
require.NoError(t, err)
require.NotNil(t, firstPlan)
assertPlanCacheSize(t, qe, 0)
qe.ClearQueryPlanCache()
}

func TestNoStreamQueryPlanCacheDirective(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
schematest.AddDefaultQueries(db)

firstQuery := "select /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ * from test_table_01"
db.AddQuery("select * from test_table_01 where 1 != 1", &sqltypes.Result{})
db.AddQuery("select /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ * from test_table_01 where 1 != 1", &sqltypes.Result{})
db.AddQuery("select /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ * from test_table_02 where 1 != 1", &sqltypes.Result{})

qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db))
qe.se.Open()
qe.Open()
defer qe.Close()

ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
firstPlan, err := qe.GetStreamPlan(ctx, logStats, firstQuery, false)
require.NoError(t, err)
require.NotNil(t, firstPlan)
assertPlanCacheSize(t, qe, 0)
qe.ClearQueryPlanCache()
}

func TestStatsURL(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, tx

func newTestQueryExecutorStreaming(ctx context.Context, tsv *TabletServer, sql string, txID int64) *QueryExecutor {
logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutorStreaming")
plan, err := tsv.qe.GetStreamPlan(sql)
plan, err := tsv.qe.GetStreamPlan(ctx, logStats, sql, false)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ func (tsv *TabletServer) streamExecute(ctx context.Context, target *querypb.Targ
bindVariables = make(map[string]*querypb.BindVariable)
}
query, comments := sqlparser.SplitMarginComments(sql)
plan, err := tsv.qe.GetStreamPlan(query)
plan, err := tsv.qe.GetStreamPlan(ctx, logStats, query, skipQueryPlanCache(options))
if err != nil {
return err
}
Expand Down

0 comments on commit 963b48d

Please sign in to comment.