Skip to content

Commit

Permalink
executor: kill tidb [session id] can't stop executors and release r…
Browse files Browse the repository at this point in the history
…esources quickly (#9844)
  • Loading branch information
qw4990 authored and zz-jason committed Apr 1, 2019
1 parent 873d951 commit 77e91d1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
10 changes: 10 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type clientConn struct {
mu struct {
sync.RWMutex
cancelFunc context.CancelFunc
resultSets []ResultSet
}
}

Expand Down Expand Up @@ -1047,6 +1048,15 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
metrics.ExecuteErrorCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err)).Inc()
return errors.Trace(err)
}
cc.mu.Lock()
cc.mu.resultSets = rs
status := atomic.LoadInt32(&cc.status)
if status == connStatusShutdown || status == connStatusWaitShutdown {
cc.mu.Unlock()
killConn(cc)
return errors.New("killed by another connection")
}
cc.mu.Unlock()
if rs != nil {
if len(rs) == 1 {
err = cc.writeResultset(ctx, rs[0], false, 0, 0)
Expand Down
6 changes: 3 additions & 3 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"crypto/tls"
"fmt"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -354,7 +355,7 @@ type tidbResultSet struct {
recordSet sqlexec.RecordSet
columns []*ColumnInfo
rows []chunk.Row
closed bool
closed int32
}

func (trs *tidbResultSet) NewRecordBatch() *chunk.RecordBatch {
Expand All @@ -377,10 +378,9 @@ func (trs *tidbResultSet) GetFetchedRows() []chunk.Row {
}

func (trs *tidbResultSet) Close() error {
if trs.closed {
if !atomic.CompareAndSwapInt32(&trs.closed, 0, 1) {
return nil
}
trs.closed = true
return trs.recordSet.Close()
}

Expand Down
21 changes: 11 additions & 10 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,19 +508,25 @@ func (s *Server) Kill(connectionID uint64, query bool) {
return
}

killConn(conn, query)
}

func killConn(conn *clientConn, query bool) {
if !query {
// Mark the client connection status as WaitShutdown, when the goroutine detect
// this, it will end the dispatch loop and exit.
atomic.StoreInt32(&conn.status, connStatusWaitShutdown)
}
killConn(conn)
}

func killConn(conn *clientConn) {
conn.mu.RLock()
resultSets := conn.mu.resultSets
cancelFunc := conn.mu.cancelFunc
conn.mu.RUnlock()
for _, resultSet := range resultSets {
// resultSet.Close() is reentrant so it's safe to kill a same connID multiple times
if err := resultSet.Close(); err != nil {
logutil.Logger(context.Background()).Error("close result set error", zap.Uint32("connID", conn.connectionID), zap.Error(err))
}
}
if cancelFunc != nil {
cancelFunc()
}
Expand All @@ -535,12 +541,7 @@ func (s *Server) KillAllConnections() {
for _, conn := range s.clients {
atomic.StoreInt32(&conn.status, connStatusShutdown)
terror.Log(errors.Trace(conn.closeWithoutLock()))
conn.mu.RLock()
cancelFunc := conn.mu.cancelFunc
conn.mu.RUnlock()
if cancelFunc != nil {
cancelFunc()
}
killConn(conn)
}
}

Expand Down

0 comments on commit 77e91d1

Please sign in to comment.