Skip to content

Commit

Permalink
executor: support plan replayer multi sqls (#37867)
Browse files Browse the repository at this point in the history
close #37798
  • Loading branch information
Yisaer authored Sep 16, 2022
1 parent 20d46c2 commit c19dc46
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 43 deletions.
12 changes: 10 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,8 +977,16 @@ func (b *executorBuilder) buildPlanReplayer(v *plannercore.PlanReplayer) Executo
}
e := &PlanReplayerExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
ExecStmts: []ast.StmtNode{v.ExecStmt},
Analyze: v.Analyze,
DumpInfo: &PlanReplayerDumpInfo{
Analyze: v.Analyze,
Path: v.File,
ctx: b.ctx,
},
}
if v.ExecStmt != nil {
e.DumpInfo.ExecStmts = []ast.StmtNode{v.ExecStmt}
} else {
e.baseExecutor = newBaseExecutor(b.ctx, nil, v.ID())
}
return e
}
Expand Down
148 changes: 107 additions & 41 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,18 @@ const (
// PlanReplayerExec represents a plan replayer executor.
type PlanReplayerExec struct {
baseExecutor
DumpInfo *PlanReplayerDumpInfo
endFlag bool
}

// PlanReplayerDumpInfo indicates dump info
type PlanReplayerDumpInfo struct {
ExecStmts []ast.StmtNode
Analyze bool

endFlag bool
Path string
File *os.File
FileName string
ctx sessionctx.Context
}

type tableNamePair struct {
Expand Down Expand Up @@ -146,16 +154,55 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.endFlag {
return nil
}
if e.ExecStmts == nil {
err := e.createFile(domain.GetPlanReplayerDirName())
if err != nil {
return err
}
if len(e.DumpInfo.Path) > 0 {
err = e.prepare()
if err != nil {
return err
}
// As we can only read file from handleSpecialQuery, thus we store the file token in the session var during `dump`
// and return nil here.
e.endFlag = true
return nil
}
if e.DumpInfo.ExecStmts == nil {
return errors.New("plan replayer: sql is empty")
}
res, err := e.dump(ctx, domain.GetPlanReplayerDirName())
err = e.DumpInfo.dump(ctx)
if err != nil {
return err
}
req.AppendString(0, res)
req.AppendString(0, e.DumpInfo.FileName)
e.endFlag = true
e.ctx.GetSessionVars().LastPlanReplayerToken = res
return nil
}

func (e *PlanReplayerExec) createFile(path string) error {
// Create path
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return errors.AddStack(err)
}

// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err = rand.Read(b)
if err != nil {
return err
}
key := base64.URLEncoding.EncodeToString(b)
fileName := fmt.Sprintf("replayer_%v_%v.zip", key, time)
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return errors.AddStack(err)
}
e.DumpInfo.File = zf
e.DumpInfo.FileName = fileName
return nil
}

Expand Down Expand Up @@ -188,28 +235,9 @@ func (e *PlanReplayerExec) Next(ctx context.Context, req *chunk.Chunk) error {
|-explain2.txt
|-....
*/
func (e *PlanReplayerExec) dump(ctx context.Context, path string) (fileName string, err error) {
// Create path
err = os.MkdirAll(path, os.ModePerm)
if err != nil {
return "", errors.AddStack(err)
}

// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
//nolint: gosec
_, err = rand.Read(b)
if err != nil {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
fileName = fmt.Sprintf("replayer_%v_%v.zip", key, time)
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return "", errors.AddStack(err)
}

func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
// Create zip writer
zw := zip.NewWriter(zf)
defer func() {
Expand All @@ -225,12 +253,12 @@ func (e *PlanReplayerExec) dump(ctx context.Context, path string) (fileName stri

// Dump config
if err = dumpConfig(zw); err != nil {
return "", err
return err
}

// Dump meta
if err = dumpMeta(zw); err != nil {
return "", err
return err
}

// Retrieve current DB
Expand All @@ -241,50 +269,51 @@ func (e *PlanReplayerExec) dump(ctx context.Context, path string) (fileName stri
// Retrieve all tables
pairs, err := e.extractTableNames(ctx, e.ExecStmts, dbName)
if err != nil {
return "", errors.AddStack(fmt.Errorf("plan replayer: invalid SQL text, err: %v", err))
return errors.AddStack(fmt.Errorf("plan replayer: invalid SQL text, err: %v", err))
}

// Dump Schema and View
if err = dumpSchemas(e.ctx, zw, pairs); err != nil {
return "", err
return err
}

// Dump tables tiflash replicas
if err = dumpTiFlashReplica(e.ctx, zw, pairs); err != nil {
return "", err
return err
}

// Dump stats
if err = dumpStats(zw, pairs, do); err != nil {
return "", err
return err
}

// Dump variables
if err = dumpVariables(e.ctx, zw); err != nil {
return "", err
return err
}

// Dump sql
if err = dumpSQLs(e.ExecStmts, zw); err != nil {
return "", err
return err
}

// Dump session bindings
if err = dumpSessionBindings(e.ctx, zw); err != nil {
return "", err
return err
}

// Dump global bindings
if err = dumpGlobalBindings(e.ctx, zw); err != nil {
return "", err
return err
}

// Dump explain
if err = dumpExplain(e.ctx, zw, e.ExecStmts, e.Analyze); err != nil {
return "", err
return err
}

return fileName, nil
e.ctx.GetSessionVars().LastPlanReplayerToken = e.FileName
return nil
}

func dumpConfig(zw *zip.Writer) error {
Expand Down Expand Up @@ -498,7 +527,7 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNod
return nil
}

func (e *PlanReplayerExec) extractTableNames(ctx context.Context,
func (e *PlanReplayerDumpInfo) extractTableNames(ctx context.Context,
ExecStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
ctx: ctx,
Expand Down Expand Up @@ -529,6 +558,34 @@ func (e *PlanReplayerExec) extractTableNames(ctx context.Context,
return r, nil
}

func (e *PlanReplayerExec) prepare() error {
val := e.ctx.Value(PlanReplayerDumpVarKey)
if val != nil {
e.ctx.SetValue(PlanReplayerDumpVarKey, nil)
return errors.New("plan replayer: previous plan replayer dump option isn't closed normally, please try again")
}
e.ctx.SetValue(PlanReplayerDumpVarKey, e.DumpInfo)
return nil
}

// DumpSQLsFromFile dumps plan replayer results for sqls from file
func (e *PlanReplayerDumpInfo) DumpSQLsFromFile(ctx context.Context, b []byte) error {
sqls := strings.Split(string(b), ";")
e.ExecStmts = make([]ast.StmtNode, 0)
for _, sql := range sqls {
s := strings.Trim(sql, "\n")
if len(s) < 1 {
continue
}
node, err := e.ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(ctx, s)
if err != nil {
return fmt.Errorf("parse sql error, sql:%v, err:%v", s, err)
}
e.ExecStmts = append(e.ExecStmts, node)
}
return e.dump(ctx)
}

func getStatsForTable(do *domain.Domain, pair tableNamePair) (*handle.JSONTable, error) {
is := do.InfoSchema()
h := do.StatsHandle()
Expand Down Expand Up @@ -644,6 +701,12 @@ type PlanReplayerLoadInfo struct {
Ctx sessionctx.Context
}

type planReplayerDumpKeyType int

func (k planReplayerDumpKeyType) String() string {
return "plan_replayer_dump_var"
}

type planReplayerLoadKeyType int

func (k planReplayerLoadKeyType) String() string {
Expand All @@ -653,6 +716,9 @@ func (k planReplayerLoadKeyType) String() string {
// PlanReplayerLoadVarKey is a variable key for plan replayer load.
const PlanReplayerLoadVarKey planReplayerLoadKeyType = 0

// PlanReplayerDumpVarKey is a variable key for plan replayer dump.
const PlanReplayerDumpVarKey planReplayerDumpKeyType = 1

// Next implements the Executor Next interface.
func (e *PlanReplayerLoadExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
Expand Down
27 changes: 27 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,24 @@ func (cc *clientConn) handlePlanReplayerLoad(ctx context.Context, planReplayerLo
return planReplayerLoadInfo.Update(data)
}

func (cc *clientConn) handlePlanReplayerDump(ctx context.Context, e *executor.PlanReplayerDumpInfo) error {
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if e == nil {
return errors.New("plan replayer dump: executor is empty")
}
data, err := cc.getDataFromPath(ctx, e.Path)
if err != nil {
logutil.BgLogger().Error(err.Error())
return err
}
if len(data) == 0 {
return nil
}
return e.DumpSQLsFromFile(ctx, data)
}

func (cc *clientConn) audit(eventType plugin.GeneralEvent) {
err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error {
audit := plugin.DeclareAuditManifest(p.Manifest)
Expand Down Expand Up @@ -2117,6 +2135,15 @@ func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bo
}
}

planReplayerDump := cc.ctx.Value(executor.PlanReplayerDumpVarKey)
if planReplayerDump != nil {
handled = true
defer cc.ctx.SetValue(executor.PlanReplayerDumpVarKey, nil)
//nolint:forcetypeassert
if err := cc.handlePlanReplayerDump(ctx, planReplayerDump.(*executor.PlanReplayerDumpInfo)); err != nil {
return handled, err
}
}
return handled, cc.writeOkWith(ctx, mysql.OKHeader, true, status)
}

Expand Down

0 comments on commit c19dc46

Please sign in to comment.