Skip to content

Commit

Permalink
executor,server: re-implement the kill statement by checking the `N…
Browse files Browse the repository at this point in the history
…ext()` function (10841) (#10879)
  • Loading branch information
tiancaiamao authored and winkyao committed Jun 20, 2019
1 parent 6d5f8f9 commit dc4f2f1
Show file tree
Hide file tree
Showing 20 changed files with 76 additions and 102 deletions.
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func schema2ResultFields(schema *expression.Schema, defaultDB string) (rfs []*as
// next query.
// If stmt is not nil and chunk with some rows inside, we simply update last query found rows by the number of row in chunk.
func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
err := a.executor.Next(ctx, chk)
err := Next(ctx, a.executor, chk)
if err != nil {
a.lastErr = err
return errors.Trace(err)
Expand Down Expand Up @@ -286,7 +286,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
a.logAudit()
}()

err = e.Next(ctx, e.newFirstChunk())
err = Next(ctx, e, e.newFirstChunk())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (e *HashAggExec) fetchChildData(ctx context.Context) {
}
chk = input.chk
}
err = e.children[0].Next(ctx, chk)
err = Next(ctx, e.children[0], chk)
if err != nil {
e.finalOutputCh <- &AfFinalResult{err: errors.Trace(err)}
return
Expand Down Expand Up @@ -669,7 +669,7 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childResult)
for {
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -856,7 +856,7 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
return errors.Trace(err)
}

err = e.children[0].Next(ctx, e.childResult)
err = Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
for {
iter := chunk.NewIterator4Chunk(chk)

err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
chk := e.children[0].newFirstChunk()
for {
iter := chunk.NewIterator4Chunk(chk)
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
task.rows = make([]chunk.Row, 0, handleCnt)
for {
chk := tableReader.newFirstChunk()
err = tableReader.Next(ctx, chk)
err = Next(ctx, tableReader, chk)
if err != nil {
logutil.Logger(ctx).Error("table reader fetch next chunk failed", zap.Error(err))
return errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
ErrCantChangeTxCharacteristics = terror.ClassExecutor.New(mysql.ErrCantChangeTxCharacteristics, mysql.MySQLErrName[mysql.ErrCantChangeTxCharacteristics])
ErrPsManyParam = terror.ClassExecutor.New(mysql.ErrPsManyParam, mysql.MySQLErrName[mysql.ErrPsManyParam])
ErrAdminCheckTable = terror.ClassExecutor.New(mysql.ErrAdminCheckTable, mysql.MySQLErrName[mysql.ErrAdminCheckTable])
ErrQueryInterrupted = terror.ClassExecutor.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted])
)

func init() {
Expand All @@ -57,6 +58,7 @@ func init() {
mysql.ErrCantChangeTxCharacteristics: mysql.ErrCantChangeTxCharacteristics,
mysql.ErrPsManyParam: mysql.ErrPsManyParam,
mysql.ErrAdminCheckTable: mysql.ErrAdminCheckTable,
mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted,
}
terror.ErrClassToMySQLCodes[terror.ClassExecutor] = tableMySQLErrCodes
}
31 changes: 23 additions & 8 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type baseExecutor struct {
runtimeStats *execdetails.RuntimeStats
}

func (e *baseExecutor) base() *baseExecutor {
return e
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *baseExecutor) Open(ctx context.Context) error {
for _, child := range e.children {
Expand Down Expand Up @@ -161,6 +165,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
base() *baseExecutor
Open(context.Context) error
Next(ctx context.Context, chk *chunk.Chunk) error
Close() error
Expand All @@ -170,6 +175,16 @@ type Executor interface {
newFirstChunk() *chunk.Chunk
}

// Next is a wrapper function on e.Next(), it handles some common codes.
func Next(ctx context.Context, e Executor, chk *chunk.Chunk) error {
sessVars := e.base().ctx.GetSessionVars()
if atomic.CompareAndSwapUint32(&sessVars.Killed, 1, 0) {
return ErrQueryInterrupted
}

return e.Next(ctx, chk)
}

// CancelDDLJobsExec represents a cancel DDL jobs executor.
type CancelDDLJobsExec struct {
baseExecutor
Expand Down Expand Up @@ -533,7 +548,7 @@ func (e *CheckIndexExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
chk = e.src.newFirstChunk()
for {
err := e.src.Next(ctx, chk)
err := Next(ctx, e.src, chk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -636,7 +651,7 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
// Next implements the Executor Next interface.
func (e *SelectLockExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.GrowAndReset(e.maxChunkSize)
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -693,7 +708,7 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
for !e.meetFirstBatch {
// transfer req's requiredRows to childResult and then adjust it in childResult
e.childResult = e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
err := e.children[0].Next(ctx, e.adjustRequiredRows(e.childResult))
err := Next(ctx, e.children[0], e.adjustRequiredRows(e.childResult))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -718,7 +733,7 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
e.cursor += batchSize
}
e.adjustRequiredRows(chk)
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -788,7 +803,7 @@ func init() {
}
chk := exec.newFirstChunk()
for {
err = exec.Next(ctx, chk)
err = Next(ctx, exec, chk)
if err != nil {
return rows, errors.Trace(err)
}
Expand Down Expand Up @@ -897,7 +912,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
chk.AppendRow(e.inputRow)
}
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -929,7 +944,7 @@ func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) err
return nil
}
}
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1069,7 +1084,7 @@ func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}
e.evaluated = true
err := e.children[0].Next(ctx, chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, erro
if e.analyzeExec != nil {
chk := e.analyzeExec.newFirstChunk()
for {
err := e.analyzeExec.Next(ctx, chk)
err := Next(ctx, e.analyzeExec, chk)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {

task.memTracker.Consume(task.outerResult.MemoryUsage())
for task.outerResult.NumRows() < ow.batchSize {
err := ow.executor.Next(ctx, ow.executorChk)
err := Next(ctx, ow.executor, ow.executorChk)
if err != nil {
return task, errors.Trace(err)
}
Expand Down Expand Up @@ -555,7 +555,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
innerResult.GetMemTracker().SetLabel("inner result")
innerResult.GetMemTracker().AttachTo(task.memTracker)
for {
err := innerExec.Next(ctx, iw.executorChk)
err := Next(ctx, innerExec, iw.executorChk)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (e *InsertValues) insertRowsFromSelect(ctx context.Context, exec func(rows
batchInsert := (sessVars.BatchInsert && !sessVars.InTxn()) || config.GetGlobalConfig().EnableBatchDML

for {
err := selectExec.Next(ctx, chk)
err := Next(ctx, selectExec, chk)
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 4 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
}
}
outerResult := outerResource.chk
err := e.outerExec.Next(ctx, outerResult)
err := Next(ctx, e.outerExec, outerResult)
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
err: errors.Trace(err),
Expand Down Expand Up @@ -268,7 +268,7 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C
return
}
chk := e.children[e.innerIdx].newFirstChunk()
err = e.innerExec.Next(ctx, chk)
err = Next(ctx, e.innerExec, chk)
if err != nil {
e.innerFinished <- errors.Trace(err)
return
Expand Down Expand Up @@ -648,7 +648,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
outerIter := chunk.NewIterator4Chunk(e.outerChunk)
for {
if e.outerChunkCursor >= e.outerChunk.NumRows() {
err := e.outerExec.Next(ctx, e.outerChunk)
err := Next(ctx, e.outerExec, e.outerChunk)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -685,7 +685,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {
e.innerList.Reset()
innerIter := chunk.NewIterator4Chunk(e.innerChunk)
for {
err := e.innerExec.Next(ctx, e.innerChunk)
err := Next(ctx, e.innerExec, e.innerChunk)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (t *mergeJoinInnerTable) nextRow() (chunk.Row, error) {
if t.curRow == t.curIter.End() {
t.reallocReaderResult()
oldMemUsage := t.curResult.MemoryUsage()
err := t.reader.Next(t.ctx, t.curResult)
err := Next(t.ctx, t.reader, t.curResult)
// error happens or no more data.
if err != nil || t.curResult.NumRows() == 0 {
t.curRow = t.curIter.End()
Expand Down Expand Up @@ -378,7 +378,7 @@ func (e *MergeJoinExec) fetchNextInnerRows() (err error) {
// may not all belong to the same join key, but are guaranteed to be sorted
// according to the join key.
func (e *MergeJoinExec) fetchNextOuterRows(ctx context.Context) (err error) {
err = e.outerTable.reader.Next(ctx, e.outerTable.chk)
err = Next(ctx, e.outerTable.reader, e.outerTable.chk)
if err != nil {
return errors.Trace(err)
}
Expand Down
46 changes: 13 additions & 33 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -38,22 +36,23 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
b.err = errors.Trace(err)
return nil
}
return &PointGetExecutor{
ctx: b.ctx,
schema: p.Schema(),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: startTS,
}
e := &PointGetExecutor{
baseExecutor: newBaseExecutor(b.ctx, p.Schema(), p.ExplainID()),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: startTS,
}
e.base().initCap = 1
e.base().maxChunkSize = 1
return e
}

// PointGetExecutor executes point select query.
type PointGetExecutor struct {
ctx sessionctx.Context
schema *expression.Schema
tps []*types.FieldType
baseExecutor

tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
Expand Down Expand Up @@ -232,22 +231,3 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
}
return nil
}

// Schema implements the Executor interface.
func (e *PointGetExecutor) Schema() *expression.Schema {
return e.schema
}

func (e *PointGetExecutor) retTypes() []*types.FieldType {
if e.tps == nil {
e.tps = make([]*types.FieldType, e.schema.Len())
for i := range e.schema.Columns {
e.tps[i] = e.schema.Columns[i].RetType
}
}
return e.tps
}

func (e *PointGetExecutor) newFirstChunk() *chunk.Chunk {
return chunk.New(e.retTypes(), 1, 1)
}
4 changes: 2 additions & 2 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (e *ProjectionExec) isUnparallelExec() bool {
func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk) error {
// push requiredRows down
e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
err := e.children[0].Next(ctx, e.childResult)
err := Next(ctx, e.children[0], e.childResult)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) {

requiredRows := atomic.LoadInt64(&f.proj.parentReqRows)
input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize)
err := f.child.Next(ctx, input.chk)
err := Next(ctx, f.child, input.chk)
if err != nil || input.chk.NumRows() == 0 {
output.done <- errors.Trace(err)
return
Expand Down
Loading

0 comments on commit dc4f2f1

Please sign in to comment.