From 7110c5693890ce047aadc033c174ffabfbea59be Mon Sep 17 00:00:00 2001 From: db-storage <35760977+db-storage@users.noreply.github.com> Date: Tue, 25 Jun 2019 01:18:11 +0800 Subject: [PATCH] *: Add support for MAX_EXECUTION_TIME. (#10541) --- domain/domain.go | 44 ++++++++++------ executor/adapter.go | 19 ++++++- server/conn.go | 21 ++++++-- server/conn_stmt.go | 2 +- server/conn_test.go | 72 +++++++++++++++++++++++++++ server/driver.go | 2 +- server/driver_tidb.go | 4 +- server/server.go | 3 +- session/session.go | 15 +++++- session/session_test.go | 26 ++++++++++ sessionctx/variable/session.go | 23 ++++++--- sessionctx/variable/sysvar.go | 1 + sessionctx/variable/varsutil.go | 2 + tidb-server/main.go | 1 + util/expensivequery/expensivequery.go | 69 +++++++++++++++++++++++++ util/processinfo.go | 3 ++ 16 files changed, 271 insertions(+), 36 deletions(-) create mode 100644 util/expensivequery/expensivequery.go diff --git a/domain/domain.go b/domain/domain.go index 1f779350d500e..862562f68c6ef 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/expensivequery" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" @@ -52,22 +53,23 @@ import ( // Domain represents a storage space. Different domains can use the same database name. // Multiple domains can be used in parallel without synchronization. type Domain struct { - store kv.Storage - infoHandle *infoschema.Handle - privHandle *privileges.Handle - statsHandle unsafe.Pointer - statsLease time.Duration - statsUpdating sync2.AtomicInt32 - ddl ddl.DDL - info *InfoSyncer - m sync.Mutex - SchemaValidator SchemaValidator - sysSessionPool *sessionPool - exit chan struct{} - etcdClient *clientv3.Client - wg sync.WaitGroup - gvc GlobalVariableCache - slowQuery *topNSlowQueries + store kv.Storage + infoHandle *infoschema.Handle + privHandle *privileges.Handle + statsHandle unsafe.Pointer + statsLease time.Duration + statsUpdating sync2.AtomicInt32 + ddl ddl.DDL + info *InfoSyncer + m sync.Mutex + SchemaValidator SchemaValidator + sysSessionPool *sessionPool + exit chan struct{} + etcdClient *clientv3.Client + wg sync.WaitGroup + gvc GlobalVariableCache + slowQuery *topNSlowQueries + expensiveQueryHandle *expensivequery.Handle MockReloadFailed MockFailure // It mocks reload failed. } @@ -980,6 +982,16 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) { } } +// ExpensiveQueryHandle returns the expensive query handle. +func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle { + return do.expensiveQueryHandle +} + +// InitExpensiveQueryHandle init the expensive query handler. +func (do *Domain) InitExpensiveQueryHandle() { + do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit) +} + const privilegeKey = "/tidb/privilege" // NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches diff --git a/executor/adapter.go b/executor/adapter.go index 18042dc62c5d1..573feffbfe319 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -46,7 +46,7 @@ import ( // processinfoSetter is the interface use to set current running process info. type processinfoSetter interface { - SetProcessInfo(string, time.Time, byte) + SetProcessInfo(string, time.Time, byte, uint64) } // recordSet wraps an executor, implements sqlexec.RecordSet interface @@ -234,8 +234,9 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { sql = ss.SecureText() } } + maxExecutionTime := getMaxExecutionTime(sctx, a.StmtNode) // Update processinfo, ShowProcess() will use it. - pi.SetProcessInfo(sql, time.Now(), cmd) + pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime) a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode) } @@ -264,6 +265,20 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) { }, nil } +// getMaxExecutionTime get the max execution timeout value. +func getMaxExecutionTime(sctx sessionctx.Context, stmtNode ast.StmtNode) uint64 { + ret := sctx.GetSessionVars().MaxExecutionTime + if sel, ok := stmtNode.(*ast.SelectStmt); ok { + for _, hint := range sel.TableHints { + if hint.HintName.L == variable.MaxExecutionTime { + ret = hint.MaxExecutionTime + break + } + } + } + return ret +} + func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) { // Check if "tidb_snapshot" is set for the write executors. // In history read mode, we can not do write operations. diff --git a/server/conn.go b/server/conn.go index ed0d972727afa..260062cbc5bbe 100644 --- a/server/conn.go +++ b/server/conn.go @@ -50,6 +50,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" @@ -219,6 +220,11 @@ func (cc *clientConn) readPacket() ([]byte, error) { } func (cc *clientConn) writePacket(data []byte) error { + failpoint.Inject("FakeClientConn", func() { + if cc.pkt == nil { + failpoint.Return(nil) + } + }) return cc.pkt.writePacket(data) } @@ -619,7 +625,11 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { cc.lastCmd = hack.String(data) token := cc.server.getToken() defer func() { - cc.ctx.SetProcessInfo("", t, mysql.ComSleep) + // if handleChangeUser failed, cc.ctx may be nil + if cc.ctx != nil { + cc.ctx.SetProcessInfo("", t, mysql.ComSleep, 0) + } + cc.server.releaseToken(token) span.Finish() }() @@ -633,9 +643,9 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { switch cmd { case mysql.ComPing, mysql.ComStmtClose, mysql.ComStmtSendLongData, mysql.ComStmtReset, mysql.ComSetOption, mysql.ComChangeUser: - cc.ctx.SetProcessInfo("", t, cmd) + cc.ctx.SetProcessInfo("", t, cmd, 0) case mysql.ComInitDB: - cc.ctx.SetProcessInfo("use "+hack.String(data), t, cmd) + cc.ctx.SetProcessInfo("use "+hack.String(data), t, cmd, 0) } switch cmd { @@ -697,6 +707,11 @@ func (cc *clientConn) useDB(ctx context.Context, db string) (err error) { } func (cc *clientConn) flush() error { + failpoint.Inject("FakeClientConn", func() { + if cc.pkt == nil { + failpoint.Return(nil) + } + }) return cc.pkt.flush() } diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 6803f17e550e9..1c09932dca8f2 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -223,7 +223,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err if prepared, ok := cc.ctx.GetStatement(int(stmtID)).(*TiDBStatement); ok { sql = prepared.sql } - cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute) + cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute, 0) rs := stmt.GetResultSet() if rs == nil { return mysql.NewErr(mysql.ErrUnknownStmtHandler, diff --git a/server/conn_test.go b/server/conn_test.go index 2e6f593398283..7f81340d4c0d7 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -18,9 +18,14 @@ import ( "bytes" "context" "encoding/binary" + "fmt" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/util/arena" ) type ConnTestSuite struct{} @@ -163,3 +168,70 @@ func mapBelong(m1, m2 map[string]string) bool { } return true } + +func (ts ConnTestSuite) TestConnExecutionTimeout(c *C) { + //There is no underlying netCon, use failpoint to avoid panic + c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/FakeClientConn", "return(1)"), IsNil) + + c.Parallel() + store, err := mockstore.NewMockTikvStore() + c.Assert(err, IsNil) + defer store.Close() + dom, err := session.BootstrapSession(store) + c.Assert(err, IsNil) + defer dom.Close() + se, err := session.CreateSession4Test(store) + c.Assert(err, IsNil) + + connID := 1 + se.SetConnectionID(uint64(connID)) + tc := &TiDBContext{ + session: se, + stmts: make(map[int]*TiDBStatement), + } + cc := &clientConn{ + connectionID: uint32(connID), + server: &Server{ + capability: defaultCapability, + }, + ctx: tc, + alloc: arena.NewAllocator(32 * 1024), + } + srv := &Server{ + clients: map[uint32]*clientConn{ + uint32(connID): cc, + }, + } + handle := dom.ExpensiveQueryHandle().SetSessionManager(srv) + go handle.Run() + + _, err = se.Execute(context.Background(), "use test;") + c.Assert(err, IsNil) + _, err = se.Execute(context.Background(), "CREATE TABLE testTable2 (id bigint PRIMARY KEY, age int)") + c.Assert(err, IsNil) + for i := 0; i < 10; i++ { + str := fmt.Sprintf("insert into testTable2 values(%d, %d)", i, i%80) + _, err = se.Execute(context.Background(), str) + c.Assert(err, IsNil) + } + + _, err = se.Execute(context.Background(), "select SLEEP(1);") + c.Assert(err, IsNil) + + _, err = se.Execute(context.Background(), "set @@max_execution_time = 500;") + c.Assert(err, IsNil) + + err = cc.handleQuery(context.Background(), "select * FROM testTable2 WHERE SLEEP(1);") + c.Assert(err, NotNil) + + _, err = se.Execute(context.Background(), "set @@max_execution_time = 0;") + c.Assert(err, IsNil) + + err = cc.handleQuery(context.Background(), "select * FROM testTable2 WHERE SLEEP(1);") + c.Assert(err, IsNil) + + err = cc.handleQuery(context.Background(), "select /*+ MAX_EXECUTION_TIME(100)*/ * FROM testTable2 WHERE SLEEP(1);") + c.Assert(err, NotNil) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/FakeClientConn"), IsNil) +} diff --git a/server/driver.go b/server/driver.go index 734956b5117e1..5106e52eb5e42 100644 --- a/server/driver.go +++ b/server/driver.go @@ -48,7 +48,7 @@ type QueryCtx interface { // SetValue saves a value associated with this context for key. SetValue(key fmt.Stringer, value interface{}) - SetProcessInfo(sql string, t time.Time, command byte) + SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64) // CommitTxn commits the transaction operations. CommitTxn(ctx context.Context) error diff --git a/server/driver_tidb.go b/server/driver_tidb.go index dd3dfc6f00428..4aa564b3d4ce6 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -213,8 +213,8 @@ func (tc *TiDBContext) CommitTxn(ctx context.Context) error { } // SetProcessInfo implements QueryCtx SetProcessInfo method. -func (tc *TiDBContext) SetProcessInfo(sql string, t time.Time, command byte) { - tc.session.SetProcessInfo(sql, t, command) +func (tc *TiDBContext) SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64) { + tc.session.SetProcessInfo(sql, t, command, maxExecutionTime) } // RollbackTxn implements QueryCtx RollbackTxn method. diff --git a/server/server.go b/server/server.go index 197af6e6d4e88..9fccf685594c7 100644 --- a/server/server.go +++ b/server/server.go @@ -105,7 +105,7 @@ type Server struct { tlsConfig *tls.Config driver IDriver listener net.Listener - rwlock *sync.RWMutex + rwlock sync.RWMutex concurrentLimiter *TokenLimiter clients map[uint32]*clientConn capability uint32 @@ -164,7 +164,6 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { cfg: cfg, driver: driver, concurrentLimiter: NewTokenLimiter(cfg.TokenLimit), - rwlock: &sync.RWMutex{}, clients: make(map[uint32]*clientConn), stopListenerCh: make(chan struct{}, 1), } diff --git a/session/session.go b/session/session.go index b34dda6646034..a6179b8118f00 100644 --- a/session/session.go +++ b/session/session.go @@ -83,7 +83,7 @@ type Session interface { SetClientCapability(uint32) // Set client capability flags. SetConnectionID(uint64) SetCommandValue(byte) - SetProcessInfo(string, time.Time, byte) + SetProcessInfo(string, time.Time, byte, uint64) SetTLSState(*tls.ConnectionState) SetCollation(coID int) error SetSessionManager(util.SessionManager) @@ -661,6 +661,10 @@ func createSessionFunc(store kv.Storage) pools.Factory { if err != nil { return nil, errors.Trace(err) } + err = variable.SetSessionSystemVar(se.sessionVars, variable.MaxExecutionTime, types.NewUintDatum(0)) + if err != nil { + return nil, errors.Trace(err) + } se.sessionVars.CommonGlobalLoaded = true se.sessionVars.InRestrictedSQL = true return se, nil @@ -677,6 +681,10 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R if err != nil { return nil, errors.Trace(err) } + err = variable.SetSessionSystemVar(se.sessionVars, variable.MaxExecutionTime, types.NewUintDatum(0)) + if err != nil { + return nil, errors.Trace(err) + } se.sessionVars.CommonGlobalLoaded = true se.sessionVars.InRestrictedSQL = true return se, nil @@ -783,7 +791,7 @@ func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string) return s.parser.Parse(sql, charset, collation) } -func (s *session) SetProcessInfo(sql string, t time.Time, command byte) { +func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64) { pi := util.ProcessInfo{ ID: s.sessionVars.ConnectionID, DB: s.sessionVars.CurrentDB, @@ -791,6 +799,8 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte) { Time: t, State: s.Status(), Info: sql, + + MaxExecutionTime: maxExecutionTime, } if s.sessionVars.User != nil { pi.User = s.sessionVars.User.Username @@ -1232,6 +1242,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { timeutil.SetSystemTZ(tz) dom := domain.GetDomain(se) + dom.InitExpensiveQueryHandle() if !config.GetGlobalConfig().Security.SkipGrantTable { err = dom.LoadPrivilegeLoop(se) diff --git a/session/session_test.go b/session/session_test.go index 61a640b80fdd1..1d235aa0c61cb 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2432,3 +2432,29 @@ func (s *testSessionSuite) TestTxnGoString(c *C) { tk.MustExec("rollback") c.Assert(fmt.Sprintf("%#v", txn), Equals, "Txn{state=invalid}") } + +func (s *testSessionSuite) TestMaxExeucteTime(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + + tk.MustExec("create table MaxExecTime( id int,name varchar(128),age int);") + tk.MustExec("begin") + tk.MustExec("insert into MaxExecTime (id,name,age) values (1,'john',18),(2,'lary',19),(3,'lily',18);") + + tk.MustQuery("select @@MAX_EXECUTION_TIME;").Check(testkit.Rows("0")) + tk.MustQuery("select @@global.MAX_EXECUTION_TIME;").Check(testkit.Rows("0")) + tk.MustQuery("select /*+ MAX_EXECUTION_TIME(1000) */ * FROM MaxExecTime;") + + tk.MustExec("set @@global.MAX_EXECUTION_TIME = 300;") + tk.MustQuery("select * FROM MaxExecTime;") + + tk.MustExec("set @@MAX_EXECUTION_TIME = 150;") + tk.MustQuery("select * FROM MaxExecTime;") + + tk.MustQuery("select @@global.MAX_EXECUTION_TIME;").Check(testkit.Rows("300")) + tk.MustQuery("select @@MAX_EXECUTION_TIME;").Check(testkit.Rows("150")) + + tk.MustExec("set @@global.MAX_EXECUTION_TIME = 0;") + tk.MustExec("set @@MAX_EXECUTION_TIME = 0;") + tk.MustExec("commit") + tk.MustExec("drop table if exists MaxExecTime;") +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index cf291c96c4dfa..ca7cb7f61d5f6 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -105,7 +105,7 @@ type TransactionContext struct { Shard *int64 TableDeltaMap map[int64]TableDelta - // For metrics. + // CreateTime For metrics. CreateTime time.Time StatementCount int @@ -186,12 +186,12 @@ type SessionVars struct { PreparedStmtNameToID map[string]uint32 // preparedStmtID is id of prepared statement. preparedStmtID uint32 - // params for prepared statements + // PreparedParams params for prepared statements PreparedParams []types.Datum // retry information RetryInfo *RetryInfo - // Should be reset on transaction finished. + // TxnCtx Should be reset on transaction finished. TxnCtx *TransactionContext // KVVars is the variables for KV storage. @@ -199,9 +199,9 @@ type SessionVars struct { // TxnIsolationLevelOneShot is used to implements "set transaction isolation level ..." TxnIsolationLevelOneShot struct { - // state 0 means default - // state 1 means it's set in current transaction. - // state 2 means it should be used in current transaction. + // State 0 means default + // State 1 means it's set in current transaction. + // State 2 means it should be used in current transaction. State int Value string } @@ -326,6 +326,11 @@ type SessionVars struct { // SlowQueryFile indicates which slow query log file for SLOW_QUERY table to parse. SlowQueryFile string + // MaxExecutionTime is the timeout for select statement, in milliseconds. + // If the value is 0, timeouts are not enabled. + // See https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_execution_time + MaxExecutionTime uint64 + // Killed is a flag to indicate that this query is killed. Killed uint32 } @@ -578,6 +583,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { if isAutocommit { s.SetStatusFlag(mysql.ServerStatusInTrans, false) } + case MaxExecutionTime: + timeoutMS := tidbOptPositiveInt32(val, 0) + s.MaxExecutionTime = uint64(timeoutMS) case TiDBSkipUTF8Check: s.SkipUTF8Check = TiDBOptOn(val) case TiDBOptAggPushDown: @@ -689,6 +697,7 @@ const ( TxnIsolation = "tx_isolation" TransactionIsolation = "transaction_isolation" TxnIsolationOneShot = "tx_isolation_one_shot" + MaxExecutionTime = "max_execution_time" ) var ( @@ -729,7 +738,7 @@ type Concurrency struct { // HashAggPartialConcurrency is the number of concurrent hash aggregation partial worker. HashAggPartialConcurrency int - // HashAggPartialConcurrency is the number of concurrent hash aggregation final worker. + // HashAggFinalConcurrency is the number of concurrent hash aggregation final worker. HashAggFinalConcurrency int // IndexSerialScanConcurrency is the number of concurrent index serial scan worker. diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a8247c4069af7..a6f55ce1390a0 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -169,6 +169,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, "range_alloc_block_size", "4096"}, {ScopeGlobal, ConnectTimeout, "10"}, {ScopeGlobal | ScopeSession, "collation_server", mysql.DefaultCollationName}, + {ScopeGlobal | ScopeSession, MaxExecutionTime, "0"}, {ScopeNone, "have_rtree_keys", "YES"}, {ScopeGlobal, "innodb_old_blocks_pct", "37"}, {ScopeGlobal, "innodb_file_format", "Antelope"}, diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 8b412c3619fe0..94e5435dd6108 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -333,6 +333,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) case TiDBDDLReorgBatchSize: return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars) + case MaxExecutionTime: + return checkUInt64SystemVar(name, value, 0, math.MaxUint64, vars) case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize, TiDBIndexLookupSize, TiDBHashJoinConcurrency, diff --git a/tidb-server/main.go b/tidb-server/main.go index 23d57ec4182cf..2cc72f0977109 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -506,6 +506,7 @@ func createServer() { xsvr, err = xserver.NewServer(xcfg) terror.MustNil(err, closeDomainAndStorage) } + go dom.ExpensiveQueryHandle().SetSessionManager(svr).Run() } func serverShutdown(isgraceful bool) { diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go new file mode 100644 index 0000000000000..6a164d98f757c --- /dev/null +++ b/util/expensivequery/expensivequery.go @@ -0,0 +1,69 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package expensivequery + +import ( + "sync" + "time" + + "github.com/pingcap/tidb/util" +) + +// Handle is the handler for expensive query. +type Handle struct { + mu sync.RWMutex + exitCh chan struct{} + sm util.SessionManager +} + +// NewExpensiveQueryHandle builds a new expensive query handler. +func NewExpensiveQueryHandle(exitCh chan struct{}) *Handle { + return &Handle{exitCh: exitCh} +} + +// SetSessionManager sets the SessionManager which is used to fetching the info +// of all active sessions. +func (eqh *Handle) SetSessionManager(sm util.SessionManager) *Handle { + eqh.sm = sm + return eqh +} + +// Run starts a expensive query checker goroutine at the start time of the server. +func (eqh *Handle) Run() { + // use 100ms as tickInterval temply, may use given interval or use defined variable later + tickInterval := time.Millisecond * time.Duration(100) + ticker := time.NewTicker(tickInterval) + for { + select { + case <-ticker.C: + processInfo := eqh.sm.ShowProcessList() + for _, info := range processInfo { + if len(info.Info) == 0 { + continue + } + costTime := time.Since(info.Time) + if info.MaxExecutionTime > 0 && costTime > time.Duration(info.MaxExecutionTime)*time.Millisecond { + eqh.sm.Kill(info.ID, true) + } + } + case <-eqh.exitCh: + return + } + } +} + +// Close closes the handle and release the background goroutine. +func (eqh *Handle) Close() { + close(eqh.exitCh) +} diff --git a/util/processinfo.go b/util/processinfo.go index 9c43d0573e856..5aa224ba3caaa 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -28,6 +28,9 @@ type ProcessInfo struct { State uint16 Info string Mem int64 + // MaxExecutionTime is the timeout for select statement, in milliseconds. + // If the query takes too long, kill it. + MaxExecutionTime uint64 } // SessionManager is an interface for session manage. Show processlist and