diff --git a/ddl/ddl.go b/ddl/ddl.go index fedd9ca70336a..b9d90cb87bdee 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -1091,6 +1091,10 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { // If the connection being killed, we need to CANCEL the DDL job. if atomic.LoadUint32(&sessVars.Killed) == 1 { + if atomic.LoadInt32(&sessVars.ConnectionStatus) == variable.ConnStatusShutdown { + logutil.BgLogger().Info("[ddl] DoDDLJob will quit because context done") + return context.Canceled + } if sessVars.StmtCtx.DDLJobID != 0 { se, err := d.sessPool.Get() if err != nil { diff --git a/server/conn.go b/server/conn.go index 578987d63eab3..251a7216e5a8d 100644 --- a/server/conn.go +++ b/server/conn.go @@ -101,8 +101,8 @@ import ( const ( connStatusDispatching int32 = iota connStatusReading - connStatusShutdown // Closed by server. - connStatusWaitShutdown // Notified by server to close. + connStatusShutdown = variable.ConnStatusShutdown // Closed by server. + connStatusWaitShutdown = 3 // Notified by server to close. ) // newClientConn creates a *clientConn object. @@ -187,6 +187,21 @@ func (cc *clientConn) String() string { ) } +func (cc *clientConn) setStatus(status int32) { + atomic.StoreInt32(&cc.status, status) + if ctx := cc.getCtx(); ctx != nil { + atomic.StoreInt32(&ctx.GetSessionVars().ConnectionStatus, status) + } +} + +func (cc *clientConn) getStatus() int32 { + return atomic.LoadInt32(&cc.status) +} + +func (cc *clientConn) CompareAndSwapStatus(oldStatus, newStatus int32) bool { + return atomic.CompareAndSwapInt32(&cc.status, oldStatus, newStatus) +} + // authSwitchRequest is used by the server to ask the client to switch to a different authentication // plugin. MySQL 8.0 libmysqlclient based clients by default always try `caching_sha2_password`, even // when the server advertises the its default to be `mysql_native_password`. In addition to this switching @@ -1063,7 +1078,7 @@ func (cc *clientConn) Run(ctx context.Context) { terror.Log(err) metrics.PanicCounter.WithLabelValues(metrics.LabelSession).Inc() } - if atomic.LoadInt32(&cc.status) != connStatusShutdown { + if cc.getStatus() != connStatusShutdown { err := cc.Close() terror.Log(err) } @@ -1086,10 +1101,10 @@ func (cc *clientConn) Run(ctx context.Context) { } } - if !atomic.CompareAndSwapInt32(&cc.status, connStatusDispatching, connStatusReading) || + if !cc.CompareAndSwapStatus(connStatusDispatching, connStatusReading) || // The judge below will not be hit by all means, // But keep it stayed as a reminder and for the code reference for connStatusWaitShutdown. - atomic.LoadInt32(&cc.status) == connStatusWaitShutdown { + cc.getStatus() == connStatusWaitShutdown { return } @@ -1103,7 +1118,7 @@ func (cc *clientConn) Run(ctx context.Context) { if err != nil { if terror.ErrorNotEqual(err, io.EOF) { if netErr, isNetErr := errors.Cause(err).(net.Error); isNetErr && netErr.Timeout() { - if atomic.LoadInt32(&cc.status) == connStatusWaitShutdown { + if cc.getStatus() == connStatusWaitShutdown { logutil.Logger(ctx).Info("read packet timeout because of killed connection") } else { idleTime := time.Since(start) @@ -1130,7 +1145,7 @@ func (cc *clientConn) Run(ctx context.Context) { return } - if !atomic.CompareAndSwapInt32(&cc.status, connStatusReading, connStatusDispatching) { + if !cc.CompareAndSwapStatus(connStatusReading, connStatusDispatching) { return } @@ -2103,7 +2118,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [ } if rs != nil { - if connStatus := atomic.LoadInt32(&cc.status); connStatus == connStatusShutdown { + if cc.getStatus() == connStatusShutdown { return false, exeerrors.ErrQueryInterrupted } if retryable, err := cc.writeResultSet(ctx, rs, false, status, 0); err != nil { diff --git a/server/server.go b/server/server.go index c0c82f7612a9a..67163e6e70991 100644 --- a/server/server.go +++ b/server/server.go @@ -811,7 +811,7 @@ func (s *Server) Kill(connectionID uint64, query bool) { if !query { // Mark the client connection status as WaitShutdown, when clientConn.Run detect // this, it will end the dispatch loop and exit. - atomic.StoreInt32(&conn.status, connStatusWaitShutdown) + conn.setStatus(connStatusWaitShutdown) } killQuery(conn) } @@ -861,7 +861,7 @@ func (s *Server) KillAllConnections() { s.rwlock.RLock() defer s.rwlock.RUnlock() for _, conn := range s.clients { - atomic.StoreInt32(&conn.status, connStatusShutdown) + conn.setStatus(connStatusShutdown) if err := conn.closeWithoutLock(); err != nil { terror.Log(err) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 8685eb43f2dac..4e0d0e0f5d657 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -75,6 +75,10 @@ var ( enableAdaptiveReplicaRead uint32 = 1 ) +// ConnStatusShutdown indicates that the connection status is closed by server. +// This code is put here because of package imports, and this value is the original server.connStatusShutdown. +const ConnStatusShutdown int32 = 2 + // SetEnableAdaptiveReplicaRead set `enableAdaptiveReplicaRead` with given value. // return true if the value is changed. func SetEnableAdaptiveReplicaRead(enabled bool) bool { @@ -1052,6 +1056,9 @@ type SessionVars struct { // Killed is a flag to indicate that this query is killed. Killed uint32 + // ConnectionStatus indicates current connection status. + ConnectionStatus int32 + // ConnectionInfo indicates current connection info used by current session. ConnectionInfo *ConnectionInfo