Skip to content

Commit

Permalink
samplerate to float64
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Oct 13, 2021
1 parent 515a3e3 commit 074dd42
Show file tree
Hide file tree
Showing 13 changed files with 9,243 additions and 9,209 deletions.
82 changes: 41 additions & 41 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type AnalyzeExec struct {
baseExecutor
tasks []*analyzeTask
wg *sync.WaitGroup
opts map[ast.AnalyzeOptionType]uint64
opts map[ast.AnalyzeOptionType]interface{}
}

var (
Expand Down Expand Up @@ -327,7 +327,7 @@ type baseAnalyzeExec struct {
tableID statistics.AnalyzeTableID
concurrency int
analyzePB *tipb.AnalyzeReq
opts map[ast.AnalyzeOptionType]uint64
opts map[ast.AnalyzeOptionType]interface{}
job *statistics.AnalyzeJob
snapshot uint64
}
Expand Down Expand Up @@ -480,8 +480,8 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
var cms *statistics.CMSketch
var topn *statistics.TopN
if needCMS {
cms = statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth]))
topn = statistics.NewTopN(int(e.opts[ast.AnalyzeOptNumTopN]))
cms = statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth].(uint64)), int32(e.opts[ast.AnalyzeOptCMSketchWidth].(uint64)))
topn = statistics.NewTopN(int(e.opts[ast.AnalyzeOptNumTopN].(uint64)))
}
fms := statistics.NewFMSketch(maxSketchSize)
statsVer := statistics.Version1
Expand All @@ -502,7 +502,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
return nil, nil, nil, nil, err
}
hist, cms, fms, topn, err = updateIndexResult(e.ctx.GetSessionVars().StmtCtx, resp, e.job, hist, cms, fms, topn,
e.idxInfo, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), statsVer)
e.idxInfo, int(e.opts[ast.AnalyzeOptNumBuckets].(uint64)), int(e.opts[ast.AnalyzeOptNumTopN].(uint64)), statsVer)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -827,7 +827,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
}()

l := len(e.analyzePB.ColReq.ColumnsInfo) + len(e.analyzePB.ColReq.ColumnGroups)
rootRowCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), int(e.analyzePB.ColReq.GetSampleRate()), l)
rootRowCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
for i := 0; i < l; i++ {
rootRowCollector.Base().FMSketches = append(rootRowCollector.Base().FMSketches, statistics.NewFMSketch(maxSketchSize))
}
Expand Down Expand Up @@ -1110,12 +1110,12 @@ func (e *AnalyzeColumnsExec) buildSubIndexJobForSpecialIndex(indexInfos []*model
isCommonHandle: e.tableInfo.IsCommonHandle,
idxInfo: indexInfo,
}
idxExec.opts = make(map[ast.AnalyzeOptionType]uint64, len(ast.AnalyzeOptionString))
idxExec.opts[ast.AnalyzeOptNumTopN] = 0
idxExec.opts[ast.AnalyzeOptCMSketchDepth] = 0
idxExec.opts[ast.AnalyzeOptCMSketchWidth] = 0
idxExec.opts[ast.AnalyzeOptNumSamples] = 0
idxExec.opts[ast.AnalyzeOptNumBuckets] = 1
idxExec.opts = make(map[ast.AnalyzeOptionType]interface{}, len(ast.AnalyzeOptionString))
idxExec.opts[ast.AnalyzeOptNumTopN] = uint64(0)
idxExec.opts[ast.AnalyzeOptCMSketchDepth] = uint64(0)
idxExec.opts[ast.AnalyzeOptCMSketchWidth] = uint64(0)
idxExec.opts[ast.AnalyzeOptNumSamples] = uint64(0)
idxExec.opts[ast.AnalyzeOptNumBuckets] = uint64(1)
statsVersion := new(int32)
*statsVersion = statistics.Version1
// No Top-N
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func (e *AnalyzeColumnsExec) subMergeWorker(resultCh chan<- *samplingMergeResult
failpoint.Inject("mockAnalyzeSamplingMergeWorkerPanic", func() {
panic("failpoint triggered")
})
retCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), int(e.analyzePB.ColReq.GetSampleRate()), l)
retCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
for i := 0; i < l; i++ {
retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(maxSketchSize))
}
Expand Down Expand Up @@ -1316,7 +1316,7 @@ workLoop:
if task.isColumn {
collectors[task.slicePos] = collector
}
hist, topn, err := statistics.BuildHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), task.id, collector, task.tp, task.isColumn)
hist, topn, err := statistics.BuildHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets].(uint64)), int(e.opts[ast.AnalyzeOptNumTopN].(uint64)), task.id, collector, task.tp, task.isColumn)
if err != nil {
resultCh <- err
continue
Expand Down Expand Up @@ -1346,8 +1346,8 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
statsVer := statistics.Version1
if e.analyzePB.Tp == tipb.AnalyzeType_TypeMixed {
handleHist = &statistics.Histogram{}
handleCms = statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth]))
handleTopn = statistics.NewTopN(int(e.opts[ast.AnalyzeOptNumTopN]))
handleCms = statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth].(uint64)), int32(e.opts[ast.AnalyzeOptCMSketchWidth].(uint64)))
handleTopn = statistics.NewTopN(int(e.opts[ast.AnalyzeOptNumTopN].(uint64)))
handleFms = statistics.NewFMSketch(maxSketchSize)
if e.analyzePB.IdxReq.Version != nil {
statsVer = int(*e.analyzePB.IdxReq.Version)
Expand All @@ -1359,8 +1359,8 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
collectors[i] = &statistics.SampleCollector{
IsMerger: true,
FMSketch: statistics.NewFMSketch(maxSketchSize),
MaxSampleSize: int64(e.opts[ast.AnalyzeOptNumSamples]),
CMSketch: statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth])),
MaxSampleSize: int64(e.opts[ast.AnalyzeOptNumSamples].(uint64)),
CMSketch: statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth].(uint64)), int32(e.opts[ast.AnalyzeOptCMSketchWidth].(uint64))),
}
}
for {
Expand All @@ -1381,8 +1381,8 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
}
colResp = resp.ColumnsResp
handleHist, handleCms, handleFms, handleTopn, err = updateIndexResult(sc, resp.IndexResp, nil, handleHist,
handleCms, handleFms, handleTopn, e.commonHandle, int(e.opts[ast.AnalyzeOptNumBuckets]),
int(e.opts[ast.AnalyzeOptNumTopN]), statsVer)
handleCms, handleFms, handleTopn, e.commonHandle, int(e.opts[ast.AnalyzeOptNumBuckets].(uint64)),
int(e.opts[ast.AnalyzeOptNumTopN].(uint64)), statsVer)

if err != nil {
return nil, nil, nil, nil, nil, err
Expand All @@ -1395,7 +1395,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
if hasPkHist(e.handleCols) {
respHist := statistics.HistogramFromProto(colResp.PkHist)
rowCount = int64(respHist.TotalRowCount())
pkHist, err = statistics.MergeHistograms(sc, pkHist, respHist, int(e.opts[ast.AnalyzeOptNumBuckets]), statistics.Version1)
pkHist, err = statistics.MergeHistograms(sc, pkHist, respHist, int(e.opts[ast.AnalyzeOptNumBuckets].(uint64)), statistics.Version1)
if err != nil {
return nil, nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -1423,7 +1423,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
for i, col := range e.colsInfo {
if e.StatsVersion < 2 {
// In analyze version 2, we don't collect TopN this way. We will collect TopN from samples in `BuildColumnHistAndTopN()` below.
err := collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN]), e.ctx.GetSessionVars().StmtCtx, &col.FieldType, timeZone)
err := collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN].(uint64)), e.ctx.GetSessionVars().StmtCtx, &col.FieldType, timeZone)
if err != nil {
return nil, nil, nil, nil, nil, err
}
Expand All @@ -1445,9 +1445,9 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
var err error
var topn *statistics.TopN
if e.StatsVersion < 2 {
hg, err = statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), col.ID, collectors[i], &col.FieldType)
hg, err = statistics.BuildColumn(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets].(uint64)), col.ID, collectors[i], &col.FieldType)
} else {
hg, topn, err = statistics.BuildHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), col.ID, collectors[i], &col.FieldType, true)
hg, topn, err = statistics.BuildHistAndTopN(e.ctx, int(e.opts[ast.AnalyzeOptNumBuckets].(uint64)), int(e.opts[ast.AnalyzeOptNumTopN].(uint64)), col.ID, collectors[i], &col.FieldType, true)
topNs = append(topNs, topn)
}
if err != nil {
Expand Down Expand Up @@ -1619,7 +1619,7 @@ func (e *AnalyzeFastExec) calculateEstimateSampleStep() (err error) {
e.rowCount = chk.GetRow(0).GetInt64(0)
historyRowCount = uint64(e.rowCount)
}
totalSampSize := e.opts[ast.AnalyzeOptNumSamples]
totalSampSize := e.opts[ast.AnalyzeOptNumSamples].(uint64)
e.estSampStep = uint32(historyRowCount / totalSampSize)
return
}
Expand Down Expand Up @@ -1799,9 +1799,9 @@ func (e *AnalyzeFastExec) handleBatchSeekResponse(kvMap map[string][]byte) (err
newCursor := atomic.AddInt32(&e.sampCursor, length)
samplePos := newCursor - length
for sKey, sValue := range kvMap {
exceedNeededSampleCounts := uint64(samplePos) >= e.opts[ast.AnalyzeOptNumSamples]
exceedNeededSampleCounts := uint64(samplePos) >= e.opts[ast.AnalyzeOptNumSamples].(uint64)
if exceedNeededSampleCounts {
atomic.StoreInt32(&e.sampCursor, int32(e.opts[ast.AnalyzeOptNumSamples]))
atomic.StoreInt32(&e.sampCursor, int32(e.opts[ast.AnalyzeOptNumSamples].(uint64)))
break
}
err = e.updateCollectorSamples(sValue, kv.Key(sKey), samplePos)
Expand All @@ -1815,7 +1815,7 @@ func (e *AnalyzeFastExec) handleBatchSeekResponse(kvMap map[string][]byte) (err

func (e *AnalyzeFastExec) handleScanIter(iter kv.Iterator) (scanKeysSize int, err error) {
rander := rand.New(rand.NewSource(e.randSeed)) // #nosec G404
sampleSize := int64(e.opts[ast.AnalyzeOptNumSamples])
sampleSize := int64(e.opts[ast.AnalyzeOptNumSamples].(uint64))
for ; iter.Valid() && err == nil; err = iter.Next() {
// reservoir sampling
scanKeysSize++
Expand Down Expand Up @@ -1925,10 +1925,10 @@ func (e *AnalyzeFastExec) buildColumnStats(ID int64, collector *statistics.Sampl
data = append(data, valBytes)
}
// Build CMSketch.
cmSketch, topN, ndv, scaleRatio := statistics.NewCMSketchAndTopN(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth]), data, uint32(e.opts[ast.AnalyzeOptNumTopN]), uint64(rowCount))
cmSketch, topN, ndv, scaleRatio := statistics.NewCMSketchAndTopN(int32(e.opts[ast.AnalyzeOptCMSketchDepth].(uint64)), int32(e.opts[ast.AnalyzeOptCMSketchWidth].(uint64)), data, uint32(e.opts[ast.AnalyzeOptNumTopN].(uint64)), uint64(rowCount))
// Build Histogram.
collector.Samples = notNullSamples
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), ID, collector, tp, rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets].(uint64)), ID, collector, tp, rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
return hist, cmSketch, topN, fmSketch, err
}

Expand All @@ -1949,22 +1949,22 @@ func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *s
data[i] = append(data[i], sample.Value.GetBytes()[:preLen])
}
}
numTop := uint32(e.opts[ast.AnalyzeOptNumTopN])
cmSketch, topN, ndv, scaleRatio := statistics.NewCMSketchAndTopN(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth]), data[0], numTop, uint64(rowCount))
numTop := uint32(e.opts[ast.AnalyzeOptNumTopN].(uint64))
cmSketch, topN, ndv, scaleRatio := statistics.NewCMSketchAndTopN(int32(e.opts[ast.AnalyzeOptCMSketchDepth].(uint64)), int32(e.opts[ast.AnalyzeOptCMSketchWidth].(uint64)), data[0], numTop, uint64(rowCount))
// Build CM Sketch for each prefix and merge them into one.
for i := 1; i < len(idxInfo.Columns); i++ {
var curCMSketch *statistics.CMSketch
var curTopN *statistics.TopN
// `ndv` should be the ndv of full index, so just rewrite it here.
curCMSketch, curTopN, ndv, scaleRatio = statistics.NewCMSketchAndTopN(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth]), data[i], numTop, uint64(rowCount))
curCMSketch, curTopN, ndv, scaleRatio = statistics.NewCMSketchAndTopN(int32(e.opts[ast.AnalyzeOptCMSketchDepth].(uint64)), int32(e.opts[ast.AnalyzeOptCMSketchWidth].(uint64)), data[i], numTop, uint64(rowCount))
err := cmSketch.MergeCMSketch(curCMSketch)
if err != nil {
return nil, nil, nil, err
}
statistics.MergeTopNAndUpdateCMSketch(topN, curTopN, cmSketch, numTop)
}
// Build Histogram.
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), idxInfo.ID, collector, types.NewFieldType(mysql.TypeBlob), rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets].(uint64)), idxInfo.ID, collector, types.NewFieldType(mysql.TypeBlob), rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
return hist, cmSketch, topN, err
}

Expand All @@ -1976,8 +1976,8 @@ func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMS
e.collectors = make([]*statistics.SampleCollector, length)
for i := range e.collectors {
e.collectors[i] = &statistics.SampleCollector{
MaxSampleSize: int64(e.opts[ast.AnalyzeOptNumSamples]),
Samples: make([]*statistics.SampleItem, e.opts[ast.AnalyzeOptNumSamples]),
MaxSampleSize: int64(e.opts[ast.AnalyzeOptNumSamples].(uint64)),
Samples: make([]*statistics.SampleItem, e.opts[ast.AnalyzeOptNumSamples].(uint64)),
}
}

Expand Down Expand Up @@ -2064,7 +2064,7 @@ type AnalyzeTestFastExec struct {
Concurrency int
Collectors []*statistics.SampleCollector
TblInfo *model.TableInfo
Opts map[ast.AnalyzeOptionType]uint64
Opts map[ast.AnalyzeOptionType]interface{}
Snapshot uint64
}

Expand Down Expand Up @@ -2113,19 +2113,19 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) *statistics.A
if err != nil {
return &statistics.AnalyzeResults{Err: err, Job: idxExec.job}
}
hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, idxExec.oldHist, hist, int(idxExec.opts[ast.AnalyzeOptNumBuckets]), statsVer)
hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, idxExec.oldHist, hist, int(idxExec.opts[ast.AnalyzeOptNumBuckets].(uint64)), statsVer)
if err != nil {
return &statistics.AnalyzeResults{Err: err, Job: idxExec.job}
}
if idxExec.oldCMS != nil && cms != nil {
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS, uint32(idxExec.opts[ast.AnalyzeOptNumTopN]))
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS, uint32(idxExec.opts[ast.AnalyzeOptNumTopN].(uint64)))
if err != nil {
return &statistics.AnalyzeResults{Err: err, Job: idxExec.job}
}
cms.CalcDefaultValForAnalyze(uint64(hist.NDV))
}
if statsVer >= statistics.Version2 {
poped := statistics.MergeTopNAndUpdateCMSketch(topN, idxExec.oldTopN, cms, uint32(idxExec.opts[ast.AnalyzeOptNumTopN]))
poped := statistics.MergeTopNAndUpdateCMSketch(topN, idxExec.oldTopN, cms, uint32(idxExec.opts[ast.AnalyzeOptNumTopN].(uint64)))
hist.AddIdxVals(poped)
}
result := &statistics.AnalyzeResult{
Expand Down Expand Up @@ -2169,7 +2169,7 @@ func analyzePKIncremental(colExec *analyzePKIncrementalExec) *statistics.Analyze
return &statistics.AnalyzeResults{Err: err, Job: colExec.job}
}
hist := hists[0]
hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, colExec.oldHist, hist, int(colExec.opts[ast.AnalyzeOptNumBuckets]), statistics.Version1)
hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, colExec.oldHist, hist, int(colExec.opts[ast.AnalyzeOptNumBuckets].(uint64)), statistics.Version1)
if err != nil {
return &statistics.AnalyzeResults{Err: err, Job: colExec.job}
}
Expand Down
Loading

0 comments on commit 074dd42

Please sign in to comment.