Skip to content

Commit

Permalink
*: print an expensive query log when a query is out of memQuota (#10799
Browse files Browse the repository at this point in the history
…) (#10849)
  • Loading branch information
XuHuaiyu authored and zz-jason committed Jun 19, 2019
1 parent 12eacf1 commit a9298ee
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 17 deletions.
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,9 @@ func (c *Config) Valid() error {
return fmt.Errorf("invalid max log file size=%v which is larger than max=%v", c.Log.File.MaxSize, MaxLogFileSize)
}
c.OOMAction = strings.ToLower(c.OOMAction)
if c.OOMAction != OOMActionLog && c.OOMAction != OOMActionCancel {
return fmt.Errorf("unsupported OOMAction %v, TiDB only supports [%v, %v]", c.OOMAction, OOMActionLog, OOMActionCancel)
}

// lower_case_table_names is allowed to be 0, 1, 2
if c.LowerCaseTableNames < 0 || c.LowerCaseTableNames > 2 {
Expand Down
18 changes: 18 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,21 @@ func (s *testConfigSuite) TestValid(c *C) {
c.Assert(c1.Valid() == nil, Equals, tt.valid)
}
}

func (s *testConfigSuite) TestOOMActionValid(c *C) {
c1 := NewConfig()
tests := []struct {
oomAction string
valid bool
}{
{"log", true},
{"Log", true},
{"Cancel", true},
{"cANceL", true},
{"quit", false},
}
for _, tt := range tests {
c1.OOMAction = tt.oomAction
c.Assert(c1.Valid() == nil, Equals, tt.valid)
}
}
10 changes: 7 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,11 +1303,15 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
sc.MemTracker.SetActionOnExceed(&memory.PanicOnExceed{})
action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
sc.MemTracker.SetActionOnExceed(action)
case config.OOMActionLog:
sc.MemTracker.SetActionOnExceed(&memory.LogOnExceed{})
fallthrough
default:
sc.MemTracker.SetActionOnExceed(&memory.LogOnExceed{})
action := &memory.LogOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
sc.MemTracker.SetActionOnExceed(action)
}

if execStmt, ok := s.(*ast.ExecuteStmt); ok {
Expand Down
5 changes: 3 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,9 @@ func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo {
if atomic.LoadInt32(&client.status) == connStatusWaitShutdown {
continue
}
pi := client.ctx.ShowProcess()
rs[pi.ID] = pi
if pi := client.ctx.ShowProcess(); pi != nil {
rs[pi.ID] = pi
}
}
s.rwlock.RUnlock()
return rs
Expand Down
2 changes: 1 addition & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func createServer() {
svr, err = server.NewServer(cfg, driver)
// Both domain and storage have started, so we have to clean them before exiting.
terror.MustNil(err, closeDomainAndStorage)
go dom.ExpensiveQueryHandle().Run(svr)
go dom.ExpensiveQueryHandle().SetSessionManager(svr).Run()
}

func serverShutdown(isgraceful bool) {
Expand Down
26 changes: 23 additions & 3 deletions util/expensivequery/expensivequery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,23 @@ import (
// Handle is the handler for expensive query.
type Handle struct {
exitCh chan struct{}
sm util.SessionManager
}

// NewExpensiveQueryHandle builds a new expensive query handler.
func NewExpensiveQueryHandle(exitCh chan struct{}) *Handle {
return &Handle{exitCh}
return &Handle{exitCh: exitCh}
}

// SetSessionManager sets the SessionManager which is used to fetching the info
// of all active sessions.
func (eqh *Handle) SetSessionManager(sm util.SessionManager) *Handle {
eqh.sm = sm
return eqh
}

// Run starts a expensive query checker goroutine at the start time of the server.
func (eqh *Handle) Run(sm util.SessionManager) {
func (eqh *Handle) Run() {
threshold := atomic.LoadUint64(&variable.ExpensiveQueryTimeThreshold)
curInterval := time.Second * time.Duration(threshold)
ticker := time.NewTicker(curInterval / 2)
Expand All @@ -50,7 +58,7 @@ func (eqh *Handle) Run(sm util.SessionManager) {
if log.GetLevel() > zapcore.WarnLevel {
continue
}
processInfo := sm.ShowProcessList()
processInfo := eqh.sm.ShowProcessList()
for _, info := range processInfo {
if len(info.Info) == 0 || info.ExceedExpensiveTimeThresh {
continue
Expand All @@ -72,6 +80,18 @@ func (eqh *Handle) Run(sm util.SessionManager) {
}
}

// LogOnQueryExceedMemQuota prints a log when memory usage of connID is out of memory quota.
func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) {
if log.GetLevel() > zapcore.WarnLevel {
return
}
info, ok := eqh.sm.GetProcessInfo(connID)
if !ok {
return
}
logExpensiveQuery(time.Since(info.Time), info)
}

// logExpensiveQuery logs the queries which exceed the time threshold or memory threshold.
func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) {
logFields := make([]zap.Field, 0, 20)
Expand Down
40 changes: 32 additions & 8 deletions util/memory/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package memory

import (
"context"
"fmt"
"sync"

"github.com/pingcap/parser/mysql"
Expand All @@ -29,12 +30,22 @@ type ActionOnExceed interface {
// Action will be called when memory usage exceeds memory quota by the
// corresponding Tracker.
Action(t *Tracker)
// SetLogHook binds a log hook which will be triggered and log an detailed
// message for the out-of-memory sql.
SetLogHook(hook func(uint64))
}

// LogOnExceed logs a warning only once when memory usage exceeds memory quota.
type LogOnExceed struct {
mutex sync.Mutex // For synchronization.
acted bool
mutex sync.Mutex // For synchronization.
acted bool
ConnID uint64
logHook func(uint64)
}

// SetLogHook sets a hook for LogOnExceed.
func (a *LogOnExceed) SetLogHook(hook func(uint64)) {
a.logHook = hook
}

// Action logs a warning only once when memory usage exceeds memory quota.
Expand All @@ -43,16 +54,26 @@ func (a *LogOnExceed) Action(t *Tracker) {
defer a.mutex.Unlock()
if !a.acted {
a.acted = true
logutil.Logger(context.Background()).Warn("memory exceeds quota",
zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.bytesLimit, t.String())))
return
if a.logHook == nil {
logutil.Logger(context.Background()).Warn("memory exceeds quota",
zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.bytesLimit, t.String())))
return
}
a.logHook(a.ConnID)
}
}

// PanicOnExceed panics when memory usage exceeds memory quota.
type PanicOnExceed struct {
mutex sync.Mutex // For synchronization.
acted bool
mutex sync.Mutex // For synchronization.
acted bool
ConnID uint64
logHook func(uint64)
}

// SetLogHook sets a hook for PanicOnExceed.
func (a *PanicOnExceed) SetLogHook(hook func(uint64)) {
a.logHook = hook
}

// Action panics when memory usage exceeds memory quota.
Expand All @@ -64,7 +85,10 @@ func (a *PanicOnExceed) Action(t *Tracker) {
}
a.acted = true
a.mutex.Unlock()
panic(PanicMemoryExceed + t.String())
if a.logHook != nil {
a.logHook(a.ConnID)
}
panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID))
}

var (
Expand Down
3 changes: 3 additions & 0 deletions util/memory/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type mockAction struct {
called bool
}

func (a *mockAction) SetLogHook(hook func(uint64)) {
}

func (a *mockAction) Action(t *Tracker) {
a.called = true
}
Expand Down

0 comments on commit a9298ee

Please sign in to comment.