Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support canceling DDL statements with KILL #35803

Merged
merged 7 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,11 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {

var historyJob *model.Job
jobID := job.ID

// Attach the context of the jobId to the calling session so that
// KILL can cancel this DDL job.
ctx.GetSessionVars().StmtCtx.DDLJobID = jobID

// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
// But we use etcd to speed up, normally it takes less than 0.5s now, so we use 0.5s or 1s or 3s as the max value.
Expand Down
5 changes: 4 additions & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
return err
}

defer func() { e.ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = false }()
defer func() {
e.ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = false
e.ctx.GetSessionVars().StmtCtx.DDLJobID = 0
}()

switch x := e.stmt.(type) {
case *ast.AlterDatabaseStmt:
Expand Down
20 changes: 20 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/blacktear23/go-proxyprotocol"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -718,6 +719,25 @@ func killConn(conn *clientConn) {
conn.mu.RLock()
cancelFunc := conn.mu.cancelFunc
conn.mu.RUnlock()

// If the connection being killed is a DDL Job,
// we need to CANCEL the matching jobID first.
if sessVars.StmtCtx.IsDDLJobInQueue {
jobID := sessVars.StmtCtx.DDLJobID
err := kv.RunInNewTxn(context.Background(), conn.ctx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
// errs is the error per job, there is only one submitted
// err is the error of the overall task
errs, err := ddl.CancelJobs(txn, []int64{jobID})
if len(errs) > 0 {
logutil.BgLogger().Warn("error canceling DDL job", zap.Error(errs[0]))
}
return err
})
if err != nil {
logutil.BgLogger().Warn("could not cancel DDL job", zap.Error(err))
}
}

if cancelFunc != nil {
cancelFunc()
}
Expand Down
1 change: 1 addition & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type StatementContext struct {
// IsDDLJobInQueue is used to mark whether the DDL job is put into the queue.
// If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker.
IsDDLJobInQueue bool
DDLJobID int64
InInsertStmt bool
InUpdateStmt bool
InDeleteStmt bool
Expand Down