Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: support fast analyze in planner and executor's builder. #10040

Merged
merged 18 commits into from
Apr 10, 2019
80 changes: 80 additions & 0 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"runtime"
"strconv"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -108,12 +111,14 @@ type taskType int
const (
colTask taskType = iota
idxTask
fastTask
winoros marked this conversation as resolved.
Show resolved Hide resolved
)

type analyzeTask struct {
taskType taskType
idxExec *AnalyzeIndexExec
colExec *AnalyzeColumnsExec
fastExec *AnalyzeFastExec
}

var errAnalyzeWorkerPanic = errors.New("analyze worker panic")
Expand All @@ -137,6 +142,10 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<-
resultCh <- analyzeColumnsPushdown(task.colExec)
case idxTask:
resultCh <- analyzeIndexPushdown(task.idxExec)
case fastTask:
for _, result := range analyzeFastExec(task.fastExec) {
resultCh <- result
}
}
}
}
Expand Down Expand Up @@ -435,3 +444,74 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms []
}
return hists, cms, nil
}

func analyzeFastExec(exec *AnalyzeFastExec) []statistics.AnalyzeResult {
hists, cms, err := exec.buildStats()
if err != nil {
return []statistics.AnalyzeResult{{Err: err}}
}
var results []statistics.AnalyzeResult
hasIdxInfo := len(exec.idxsInfo)
hasPKInfo := 0
if exec.pkInfo != nil {
hasPKInfo = 1
}
if hasIdxInfo > 0 {
for i := hasPKInfo + len(exec.colsInfo); i < len(hists); i++ {
idxResult := statistics.AnalyzeResult{
PhysicalTableID: exec.PhysicalTableID,
Hist: []*statistics.Histogram{hists[i]},
Cms: []*statistics.CMSketch{cms[i]},
IsIndex: 1,
Count: hists[i].NullCount,
}
if hists[i].Len() > 0 {
idxResult.Count += hists[i].Buckets[hists[i].Len()-1].Count
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
}
results = append(results, idxResult)
}
}
hist := hists[0]
colResult := statistics.AnalyzeResult{
PhysicalTableID: exec.PhysicalTableID,
Hist: hists[:hasPKInfo+len(exec.colsInfo)],
Cms: cms[:hasPKInfo+len(exec.colsInfo)],
Count: hist.NullCount,
}
if hist.Len() > 0 {
colResult.Count += hist.Buckets[hist.Len()-1].Count
}
results = append(results, colResult)
return results
}

// AnalyzeFastTask is the task for build stats.
type AnalyzeFastTask struct {
Location *tikv.KeyLocation
SampSize uint64
LRowCount uint64
RRowCount uint64
}

// AnalyzeFastExec represents Fast Analyze executor.
type AnalyzeFastExec struct {
ctx sessionctx.Context
PhysicalTableID int64
pkInfo *model.ColumnInfo
colsInfo []*model.ColumnInfo
idxsInfo []*model.IndexInfo
concurrency int
maxNumBuckets uint64
table table.Table
cache *tikv.RegionCache
wg *sync.WaitGroup
sampLocs chan *tikv.KeyLocation
sampLocRowCount uint64
tasks chan *AnalyzeFastTask
scanTasks []*tikv.KeyLocation
}

func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*statistics.CMSketch, err error) {
// TODO: do fast analyze.
return nil, nil, nil
}
84 changes: 76 additions & 8 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,25 +1390,93 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo
return e
}

func (b *executorBuilder) buildAnalyzeFastColumn(e *AnalyzeExec, task plannercore.AnalyzeColumnsTask, maxNumBuckets uint64) {
findTask := false
for _, eTask := range e.tasks {
if eTask.fastExec.PhysicalTableID == task.PhysicalTableID {
eTask.fastExec.colsInfo = append(eTask.fastExec.colsInfo, task.ColsInfo...)
findTask = true
break
}
}
if !findTask {
var concurrency int
concurrency, b.err = getBuildStatsConcurrency(e.ctx)
if b.err != nil {
return
}
e.tasks = append(e.tasks, &analyzeTask{
taskType: fastTask,
fastExec: &AnalyzeFastExec{
ctx: b.ctx,
PhysicalTableID: task.PhysicalTableID,
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
maxNumBuckets: maxNumBuckets,
table: task.Table,
concurrency: concurrency,
},
})
}
}

func (b *executorBuilder) buildAnalyzeFastIndex(e *AnalyzeExec, task plannercore.AnalyzeIndexTask, maxNumBuckets uint64) {
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
findTask := false
for _, eTask := range e.tasks {
if eTask.fastExec.PhysicalTableID == task.PhysicalTableID {
eTask.fastExec.idxsInfo = append(eTask.fastExec.idxsInfo, task.IndexInfo)
findTask = true
break
}
}
if !findTask {
var concurrency int
concurrency, b.err = getBuildStatsConcurrency(e.ctx)
if b.err != nil {
return
}
e.tasks = append(e.tasks, &analyzeTask{
taskType: fastTask,
fastExec: &AnalyzeFastExec{
ctx: b.ctx,
PhysicalTableID: task.PhysicalTableID,
idxsInfo: []*model.IndexInfo{task.IndexInfo},
maxNumBuckets: maxNumBuckets,
table: task.Table,
concurrency: concurrency,
},
})
}
}

func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) Executor {
e := &AnalyzeExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
tasks: make([]*analyzeTask, 0, len(v.ColTasks)+len(v.IdxTasks)),
}
enableFastAnalyze := b.ctx.GetSessionVars().EnableFastAnalyze
for _, task := range v.ColTasks {
e.tasks = append(e.tasks, &analyzeTask{
taskType: colTask,
colExec: b.buildAnalyzeColumnsPushdown(task, v.MaxNumBuckets),
})
if enableFastAnalyze {
b.buildAnalyzeFastColumn(e, task, v.MaxNumBuckets)
} else {
e.tasks = append(e.tasks, &analyzeTask{
taskType: colTask,
colExec: b.buildAnalyzeColumnsPushdown(task, v.MaxNumBuckets),
})
}
if b.err != nil {
return nil
}
}
for _, task := range v.IdxTasks {
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxTask,
idxExec: b.buildAnalyzeIndexPushdown(task, v.MaxNumBuckets),
})
if enableFastAnalyze {
b.buildAnalyzeFastIndex(e, task, v.MaxNumBuckets)
} else {
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxTask,
idxExec: b.buildAnalyzeIndexPushdown(task, v.MaxNumBuckets),
})
}
if b.err != nil {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,15 @@ type AnalyzeColumnsTask struct {
PhysicalTableID int64
PKInfo *model.ColumnInfo
ColsInfo []*model.ColumnInfo
Table table.Table
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
}

// AnalyzeIndexTask is used for analyze index.
type AnalyzeIndexTask struct {
// PhysicalTableID is the id for a partition or a table.
PhysicalTableID int64
IndexInfo *model.IndexInfo
Table table.Table
}

// Analyze represents an analyze plan
Expand Down
22 changes: 20 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
Expand Down Expand Up @@ -771,14 +772,27 @@ func (b *PlanBuilder) buildAnalyzeTable(as *ast.AnalyzeTableStmt) (Plan, error)
if err != nil {
return nil, err
}
table, ok := b.is.TableByID(tbl.TableInfo.ID)
if !ok {
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(tbl.DBInfo.Name.O, tbl.TableInfo.Name.O)
}
for _, idx := range idxInfo {
for _, id := range physicalIDs {
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{PhysicalTableID: id, IndexInfo: idx})
p.IdxTasks = append(p.IdxTasks, AnalyzeIndexTask{
PhysicalTableID: id,
IndexInfo: idx,
Table: table,
})
}
}
if len(colInfo) > 0 || pkInfo != nil {
for _, id := range physicalIDs {
p.ColTasks = append(p.ColTasks, AnalyzeColumnsTask{PhysicalTableID: id, PKInfo: pkInfo, ColsInfo: colInfo})
p.ColTasks = append(p.ColTasks, AnalyzeColumnsTask{
PhysicalTableID: id,
PKInfo: pkInfo,
ColsInfo: colInfo,
Table: table,
})
}
}
}
Expand Down Expand Up @@ -827,6 +841,10 @@ const (
)

func (b *PlanBuilder) buildAnalyze(as *ast.AnalyzeTableStmt) (Plan, error) {
// If enable fast analyze, the storage must be tikv.Storage.
if _, isTikvStorage := b.ctx.GetStore().(tikv.Storage); !isTikvStorage && b.ctx.GetSessionVars().EnableFastAnalyze {
return nil, errors.Errorf("Only support fast analyze in tikv storage.")
}
for _, tbl := range as.TableNames {
user := b.ctx.GetSessionVars().User
var insertErr, selectErr error
Expand Down
11 changes: 11 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -817,6 +818,16 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.SplitRegion = handler.handleSplitRegion(r)
// DebugGetRegionProperties is for fast analyze in mock tikv.
case tikvrpc.CmdDebugGetRegionProperties:
r := req.DebugGetRegionProperties
region, _ := c.Cluster.GetRegionByID(r.RegionId)
scanResp := handler.handleKvScan(&kvrpcpb.ScanRequest{StartKey: region.StartKey, EndKey: region.EndKey})
resp.DebugGetRegionProperties = &debugpb.GetRegionPropertiesResponse{
Props: []*debugpb.Property{{
Name: "num_rows",
Value: string(len(scanResp.Pairs)),
}}}
default:
return nil, errors.Errorf("unsupport this request type %v", req.Type)
}
Expand Down