Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: trace and control memory usage in DistSQL layer (#10003) #10197

Merged
merged 3 commits into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
return nil, errors.Trace(err)
}

// kvReq.MemTracker is used to trace and control memory usage in DistSQL layer;
// for streamResult, since it is a pipeline which has no buffer, it's not necessary to trace it;
// for selectResult, we just use the kvReq.MemTracker prepared for co-processor
// instead of creating a new one for simplification.
if kvReq.Streaming {
return &streamResult{
resp: resp,
Expand All @@ -69,6 +73,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
ctx: sctx,
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
return &execdetails.ExecDetails{}
}

// MemSize implements kv.ResultSubset interface.
func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) }

func populateBuffer() []byte {
numCols := 4
numRows := 1024
Expand Down
10 changes: 10 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -41,6 +43,14 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
return &builder.Request, errors.Trace(builder.err)
}

// SetMemTracker sets a memTracker for this request.
func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label string) *RequestBuilder {
t := memory.NewTracker(label, sctx.GetSessionVars().MemQuotaDistSQL)
t.AttachTo(sctx.GetSessionVars().StmtCtx.MemTracker)
builder.Request.MemTracker = t
return builder
}

// SetTableRanges sets "KeyRanges" for "kv.Request" by converting "tableRanges"
// to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableRanges(tid int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
Expand Down
16 changes: 16 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -68,6 +69,8 @@ type selectResult struct {
feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
sqlType string

memTracker *memory.Tracker
}

func (r *selectResult) Fetch(ctx context.Context) {
Expand All @@ -91,6 +94,10 @@ func (r *selectResult) fetch(ctx context.Context) {
return
}

if r.memTracker != nil {
r.memTracker.Consume(int64(resultSubset.MemSize()))
}

select {
case r.results <- resultWithErr{result: resultSubset}:
case <-r.closed:
Expand Down Expand Up @@ -141,15 +148,24 @@ func (r *selectResult) getSelectResp() error {
if re.err != nil {
return errors.Trace(re.err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(-int64(r.selectResp.Size()))
}
if re.result == nil {
r.selectResp = nil
return nil
}
if r.memTracker != nil {
r.memTracker.Consume(-int64(re.result.MemSize()))
}
r.selectResp = new(tipb.SelectResponse)
err := r.selectResp.Unmarshal(re.result.GetData())
if err != nil {
return errors.Trace(err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(int64(r.selectResp.Size()))
}
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
Expand Down
2 changes: 2 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "IndexReaderDistSQLTracker").
Build()
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -421,6 +422,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "IndexLookupDistSQLTracker").
Build()
if err != nil {
return errors.Trace(err)
Expand Down
1 change: 1 addition & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func (s *testExecSuite) TestProjectionUnparallelRequiredRows(c *C) {
}

func (s *testExecSuite) TestProjectionParallelRequiredRows(c *C) {
c.Skip("not stable because of goroutine schedule")
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
Expand Down
92 changes: 92 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/errors"
gofail "github.com/pingcap/gofail/runtime"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -63,6 +64,8 @@ import (
"github.com/pingcap/tidb/util/testutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/context"
)

Expand All @@ -77,6 +80,7 @@ var _ = Suite(&testSuite{})
var _ = Suite(&testContextOptionSuite{})
var _ = Suite(&testBypassSuite{})
var _ = Suite(&testUpdateSuite{})
var _ = Suite(&testOOMSuite{})

type testSuite struct {
cluster *mocktikv.Cluster
Expand Down Expand Up @@ -3452,3 +3456,91 @@ func (s *testSuite) TestDoSubquery(c *C) {
c.Assert(err, IsNil, Commentf("err %v", err))
c.Assert(r, IsNil, Commentf("result of Do not empty"))
}

type testOOMSuite struct {
store kv.Storage
do *domain.Domain
oom *oomCapturer
}

func (s *testOOMSuite) SetUpSuite(c *C) {
c.Skip("log.ReplaceGlobals(lg, r) in registerHook() may result in data race")
testleak.BeforeTest()
s.registerHook()
var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.SetSchemaLease(0)
domain.RunAutoAnalyze = false
s.do, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testOOMSuite) registerHook() {
conf := &log.Config{Level: "info", File: log.FileLogConfig{}}
_, r, _ := log.InitLogger(conf)
s.oom = &oomCapturer{r.Core, ""}
lg := zap.New(s.oom)
log.ReplaceGlobals(lg, r)
}

func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int, a int, b int, index idx_a(`a`))")
tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)")

s.oom.tracker = ""
tk.MustQuery("select * from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaDistSQL = 1
tk.MustQuery("select * from t")
c.Assert(s.oom.tracker, Equals, "TableReaderDistSQLTracker")
tk.Se.GetSessionVars().MemQuotaDistSQL = -1

s.oom.tracker = ""
tk.MustQuery("select a from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaDistSQL = 1
tk.MustQuery("select a from t use index(idx_a)")
c.Assert(s.oom.tracker, Equals, "IndexReaderDistSQLTracker")
tk.Se.GetSessionVars().MemQuotaDistSQL = -1

s.oom.tracker = ""
tk.MustQuery("select * from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaDistSQL = 1
tk.MustQuery("select * from t use index(idx_a)")
c.Assert(s.oom.tracker, Equals, "IndexLookupDistSQLTracker")
tk.Se.GetSessionVars().MemQuotaDistSQL = -1
}

type oomCapturer struct {
zapcore.Core
tracker string
}

func (h *oomCapturer) Write(entry zapcore.Entry, fields []zapcore.Field) error {
if strings.Contains(entry.Message, "memory exceeds quota") {
err, _ := fields[0].Interface.(error)
str := err.Error()
begin := strings.Index(str, "8001]")
if begin == -1 {
panic("begin not found")
}
end := strings.Index(str, " holds")
if end == -1 {
panic("end not found")
}
h.tracker = str[begin+len("8001]") : end]
}
return nil
}

func (h *oomCapturer) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if h.Enabled(e.Level) {
return ce.AddCore(e, h)
}
return ce
}
1 change: 1 addition & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, "TableReaderDistSQLTracker").
Build()
if err != nil {
return nil, errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -209,6 +210,8 @@ type Request struct {
// Streaming indicates using streaming API for this request, result in that one Next()
// call would not corresponds to a whole region result.
Streaming bool
// MemTracker is used to trace and control memory usage in co-processor layer.
MemTracker *memory.Tracker
}

// ResultSubset represents a result subset from a single storage unit.
Expand All @@ -220,6 +223,8 @@ type ResultSubset interface {
GetStartKey() Key
// GetExecDetails gets the detail information.
GetExecDetails() *execdetails.ExecDetails
// MemSize returns how many bytes of memory this result use for tracing memory usage.
MemSize() int64
}

// Response represents the response returned from KV layer.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func NewSessionVars() *SessionVars {
MemQuotaIndexLookupReader: DefTiDBMemQuotaIndexLookupReader,
MemQuotaIndexLookupJoin: DefTiDBMemQuotaIndexLookupJoin,
MemQuotaNestedLoopApply: DefTiDBMemQuotaNestedLoopApply,
MemQuotaDistSQL: DefTiDBMemQuotaDistSQL,
}
vars.BatchSize = BatchSize{
IndexJoinBatchSize: DefIndexJoinBatchSize,
Expand Down Expand Up @@ -732,6 +733,8 @@ type MemQuota struct {
MemQuotaIndexLookupJoin int64
// MemQuotaNestedLoopApply defines the memory quota for a nested loop apply executor.
MemQuotaNestedLoopApply int64
// MemQuotaDistSQL defines the memory quota for all operators in DistSQL layer like co-processor and selectResult.
MemQuotaDistSQL int64
}

// BatchSize defines batch size values.
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ const (
DefTiDBMemQuotaIndexLookupReader = 32 << 30 // 32GB.
DefTiDBMemQuotaIndexLookupJoin = 32 << 30 // 32GB.
DefTiDBMemQuotaNestedLoopApply = 32 << 30 // 32GB.
DefTiDBMemQuotaDistSQL = 32 << 30 // 32GB.
DefTiDBGeneralLog = 0
DefTiDBRetryLimit = 10
DefTiDBDisableTxnAutoRetry = false
Expand Down
Loading