Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Make DDL unaffected by killing TiDB instance (#43871) #43910

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
@@ -1091,6 +1091,10 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {

// If the connection being killed, we need to CANCEL the DDL job.
if atomic.LoadUint32(&sessVars.Killed) == 1 {
if atomic.LoadInt32(&sessVars.ConnectionStatus) == variable.ConnStatusShutdown {
logutil.BgLogger().Info("[ddl] DoDDLJob will quit because context done")
return context.Canceled
}
if sessVars.StmtCtx.DDLJobID != 0 {
se, err := d.sessPool.Get()
if err != nil {
31 changes: 23 additions & 8 deletions server/conn.go
Original file line number Diff line number Diff line change
@@ -101,8 +101,8 @@ import (
const (
connStatusDispatching int32 = iota
connStatusReading
connStatusShutdown // Closed by server.
connStatusWaitShutdown // Notified by server to close.
connStatusShutdown = variable.ConnStatusShutdown // Closed by server.
connStatusWaitShutdown = 3 // Notified by server to close.
)

// newClientConn creates a *clientConn object.
@@ -187,6 +187,21 @@ func (cc *clientConn) String() string {
)
}

func (cc *clientConn) setStatus(status int32) {
atomic.StoreInt32(&cc.status, status)
if ctx := cc.getCtx(); ctx != nil {
atomic.StoreInt32(&ctx.GetSessionVars().ConnectionStatus, status)
}
}

func (cc *clientConn) getStatus() int32 {
return atomic.LoadInt32(&cc.status)
}

func (cc *clientConn) CompareAndSwapStatus(oldStatus, newStatus int32) bool {
return atomic.CompareAndSwapInt32(&cc.status, oldStatus, newStatus)
}

// authSwitchRequest is used by the server to ask the client to switch to a different authentication
// plugin. MySQL 8.0 libmysqlclient based clients by default always try `caching_sha2_password`, even
// when the server advertises the its default to be `mysql_native_password`. In addition to this switching
@@ -1063,7 +1078,7 @@ func (cc *clientConn) Run(ctx context.Context) {
terror.Log(err)
metrics.PanicCounter.WithLabelValues(metrics.LabelSession).Inc()
}
if atomic.LoadInt32(&cc.status) != connStatusShutdown {
if cc.getStatus() != connStatusShutdown {
err := cc.Close()
terror.Log(err)
}
@@ -1086,10 +1101,10 @@ func (cc *clientConn) Run(ctx context.Context) {
}
}

if !atomic.CompareAndSwapInt32(&cc.status, connStatusDispatching, connStatusReading) ||
if !cc.CompareAndSwapStatus(connStatusDispatching, connStatusReading) ||
// The judge below will not be hit by all means,
// But keep it stayed as a reminder and for the code reference for connStatusWaitShutdown.
atomic.LoadInt32(&cc.status) == connStatusWaitShutdown {
cc.getStatus() == connStatusWaitShutdown {
return
}

@@ -1103,7 +1118,7 @@ func (cc *clientConn) Run(ctx context.Context) {
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
if netErr, isNetErr := errors.Cause(err).(net.Error); isNetErr && netErr.Timeout() {
if atomic.LoadInt32(&cc.status) == connStatusWaitShutdown {
if cc.getStatus() == connStatusWaitShutdown {
logutil.Logger(ctx).Info("read packet timeout because of killed connection")
} else {
idleTime := time.Since(start)
@@ -1130,7 +1145,7 @@ func (cc *clientConn) Run(ctx context.Context) {
return
}

if !atomic.CompareAndSwapInt32(&cc.status, connStatusReading, connStatusDispatching) {
if !cc.CompareAndSwapStatus(connStatusReading, connStatusDispatching) {
return
}

@@ -2103,7 +2118,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [
}

if rs != nil {
if connStatus := atomic.LoadInt32(&cc.status); connStatus == connStatusShutdown {
if cc.getStatus() == connStatusShutdown {
return false, exeerrors.ErrQueryInterrupted
}
if retryable, err := cc.writeResultSet(ctx, rs, false, status, 0); err != nil {
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
@@ -811,7 +811,7 @@ func (s *Server) Kill(connectionID uint64, query bool) {
if !query {
// Mark the client connection status as WaitShutdown, when clientConn.Run detect
// this, it will end the dispatch loop and exit.
atomic.StoreInt32(&conn.status, connStatusWaitShutdown)
conn.setStatus(connStatusWaitShutdown)
}
killQuery(conn)
}
@@ -861,7 +861,7 @@ func (s *Server) KillAllConnections() {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
for _, conn := range s.clients {
atomic.StoreInt32(&conn.status, connStatusShutdown)
conn.setStatus(connStatusShutdown)
if err := conn.closeWithoutLock(); err != nil {
terror.Log(err)
}
7 changes: 7 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
@@ -75,6 +75,10 @@ var (
enableAdaptiveReplicaRead uint32 = 1
)

// ConnStatusShutdown indicates that the connection status is closed by server.
// This code is put here because of package imports, and this value is the original server.connStatusShutdown.
const ConnStatusShutdown int32 = 2

// SetEnableAdaptiveReplicaRead set `enableAdaptiveReplicaRead` with given value.
// return true if the value is changed.
func SetEnableAdaptiveReplicaRead(enabled bool) bool {
@@ -1052,6 +1056,9 @@ type SessionVars struct {
// Killed is a flag to indicate that this query is killed.
Killed uint32

// ConnectionStatus indicates current connection status.
ConnectionStatus int32

// ConnectionInfo indicates current connection info used by current session.
ConnectionInfo *ConnectionInfo