Skip to content

Commit

Permalink
executor: let PlanReplayerExec support multi sqls (#37799)
Browse files Browse the repository at this point in the history
ref #37798
  • Loading branch information
Yisaer authored Sep 14, 2022
1 parent 884898b commit ea2ddfe
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 55 deletions.
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,9 +975,9 @@ func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executo
}
return e
}
e := &PlanReplayerSingleExec{
e := &PlanReplayerExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
ExecStmt: v.ExecStmt,
ExecStmts: []ast.StmtNode{v.ExecStmt},
Analyze: v.Analyze,
}
return e
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ func checkFileName(s string) bool {
"schema/test.t_dump_single.schema.txt",
"table_tiflash_replica.txt",
"variables.toml",
"sqls.sql",
"session_bindings.sql",
"global_bindings.sql",
"explain.txt",
"sql/sql0.sql",
"explain/sql0.txt",
}
for _, f := range files {
if strings.Compare(f, s) == 0 {
Expand Down
119 changes: 68 additions & 51 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,23 @@ import (
"go.uber.org/zap"
)

var _ Executor = &PlanReplayerSingleExec{}
var _ Executor = &PlanReplayerExec{}
var _ Executor = &PlanReplayerLoadExec{}

const (
configFile = "config.toml"
metaFile = "meta.txt"
variablesFile = "variables.toml"
sqlFile = "sqls.sql"
tiFlashReplicasFile = "table_tiflash_replica.txt"
sessionBindingFile = "session_bindings.sql"
globalBindingFile = "global_bindings.sql"
explainFile = "explain.txt"
)

// PlanReplayerSingleExec represents a plan replayer executor.
type PlanReplayerSingleExec struct {
// PlanReplayerExec represents a plan replayer executor.
type PlanReplayerExec struct {
baseExecutor
ExecStmt ast.StmtNode
Analyze bool
ExecStmts []ast.StmtNode
Analyze bool

endFlag bool
}
Expand Down Expand Up @@ -143,15 +141,15 @@ func (tne *tableNameExtractor) handleIsView(t *ast.TableName) (bool, error) {
}

// Next implements the Executor Next interface.
func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) error {
func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
if e.endFlag {
return nil
}
if e.ExecStmt == nil {
if e.ExecStmts == nil {
return errors.New("plan replayer: sql is empty")
}
res, err := e.dumpSingle(ctx, domain.GetPlanReplayerDirName())
res, err := e.dump(ctx, domain.GetPlanReplayerDirName())
if err != nil {
return err
}
Expand All @@ -160,7 +158,7 @@ func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) err
return nil
}

// dumpSingle will dump the information about a single sql.
// dumpSingle will dump the information about sqls.
// The files will be organized into the following format:
/*
|-meta.txt
Expand All @@ -180,11 +178,16 @@ func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) err
|-table_tiflash_replica.txt
|-variables.toml
|-bindings.sql
|-sqls.sql
|-sql
| |-sql1.sql
| |-sql2.sql
| |-....
|_explain
|-explain.txt
|-explain1.txt
|-explain2.txt
|-....
*/
func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (fileName string, err error) {
func (e *PlanReplayerExec) dump(ctx context.Context, path string) (fileName string, err error) {
// Create path
err = os.MkdirAll(path, os.ModePerm)
if err != nil {
Expand All @@ -200,7 +203,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
fileName = fmt.Sprintf("replayer_single_%v_%v.zip", key, time)
fileName = fmt.Sprintf("replayer_%v_%v.zip", key, time)
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return "", errors.AddStack(err)
Expand Down Expand Up @@ -235,7 +238,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
do := domain.GetDomain(e.ctx)

// Retrieve all tables
pairs, err := e.extractTableNames(ctx, e.ExecStmt, dbName)
pairs, err := e.extractTableNames(ctx, e.ExecStmts, dbName)
if err != nil {
return "", errors.AddStack(fmt.Errorf("plan replayer: invalid SQL text, err: %v", err))
}
Expand All @@ -261,12 +264,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
}

// Dump sql
sql, err := zw.Create(sqlFile)
if err != nil {
return "", nil
}
_, err = sql.Write([]byte(e.ExecStmt.Text()))
if err != nil {
if err = dumpSQLs(e.ExecStmts, zw); err != nil {
return "", err
}

Expand All @@ -281,7 +279,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(ctx context.Context, path string) (f
}

// Dump explain
if err = dumpExplain(e.ctx, zw, e.ExecStmt.Text(), e.Analyze); err != nil {
if err = dumpExplain(e.ctx, zw, e.ExecStmts, e.Analyze); err != nil {
return "", err
}

Expand Down Expand Up @@ -371,6 +369,20 @@ func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *domain.Doma
return nil
}

func dumpSQLs(execStmts []ast.StmtNode, zw *zip.Writer) error {
for i, stmtExec := range execStmts {
zf, err := zw.Create(fmt.Sprintf("sql/sql%v.sql", i))
if err != nil {
return err
}
_, err = zf.Write([]byte(stmtExec.Text()))
if err != nil {
return err
}
}
return nil
}

func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error {
varMap := make(map[string]string)
recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.Background(), "show variables")
Expand Down Expand Up @@ -447,43 +459,46 @@ func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error {
return nil
}

func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, sql string, isAnalyze bool) error {
var recordSets []sqlexec.RecordSet
var err error
if isAnalyze {
// Explain analyze
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain analyze %s", sql))
func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNode, isAnalyze bool) error {
for i, stmtExec := range execStmts {
sql := stmtExec.Text()
var recordSets []sqlexec.RecordSet
var err error
if isAnalyze {
// Explain analyze
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain analyze %s", sql))
if err != nil {
return err
}
} else {
// Explain
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain %s", sql))
if err != nil {
return err
}
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0], false)
if err != nil {
return err
}
} else {
// Explain
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain %s", sql))
fw, err := zw.Create(fmt.Sprintf("explain/sql%v.txt", i))
if err != nil {
return err
return errors.AddStack(err)
}
}
sRows, err := resultSetToStringSlice(context.Background(), recordSets[0], false)
if err != nil {
return err
}
fw, err := zw.Create(explainFile)
if err != nil {
return errors.AddStack(err)
}
for _, row := range sRows {
fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t"))
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
for _, row := range sRows {
fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t"))
}
if len(recordSets) > 0 {
if err := recordSets[0].Close(); err != nil {
return err
}
}
}
return nil
}

func (e *PlanReplayerSingleExec) extractTableNames(ctx context.Context,
ExecStmt ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
func (e *PlanReplayerExec) extractTableNames(ctx context.Context,
ExecStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
ctx: ctx,
executor: e.ctx.(sqlexec.RestrictedSQLExecutor),
Expand All @@ -492,7 +507,9 @@ func (e *PlanReplayerSingleExec) extractTableNames(ctx context.Context,
names: make(map[tableNamePair]struct{}),
cteNames: make(map[string]struct{}),
}
ExecStmt.Accept(tableExtractor)
for _, execStmt := range ExecStmts {
execStmt.Accept(tableExtractor)
}
if tableExtractor.err != nil {
return nil, tableExtractor.err
}
Expand Down

0 comments on commit ea2ddfe

Please sign in to comment.