From f4995ce031bc4217616b49e7d42a434174553aff Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Sun, 20 Jan 2019 16:12:01 +0800 Subject: [PATCH 1/3] statistics: compute and store column order correlation with handle --- executor/analyze.go | 10 +++-- executor/analyze_test.go | 4 +- executor/builder.go | 3 -- executor/show_stats.go | 1 + executor/show_stats_test.go | 8 ++-- planner/core/cbo_test.go | 8 ++-- planner/core/planbuilder.go | 23 +++--------- session/bootstrap.go | 10 +++++ statistics/bootstrap.go | 3 +- statistics/builder.go | 32 +++++++++++++--- statistics/fmsketch_test.go | 4 +- statistics/handle_test.go | 65 ++++++++++++++++++++++++++++++++ statistics/histogram.go | 9 ++++- statistics/sample.go | 71 +++++++++++++++++++++++++++++++---- statistics/statistics_test.go | 22 ++++++----- statistics/table.go | 3 +- 16 files changed, 213 insertions(+), 63 deletions(-) diff --git a/executor/analyze.go b/executor/analyze.go index 2650bb7f1e59c..67497220fa959 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -254,7 +254,6 @@ type AnalyzeColumnsExec struct { pkInfo *model.ColumnInfo concurrency int priority int - keepOrder bool analyzePB *tipb.AnalyzeReq resultHandler *tableResultHandler maxNumBuckets uint64 @@ -268,7 +267,7 @@ func (e *AnalyzeColumnsExec) open() error { ranges = ranger.FullIntRange(false) } e.resultHandler = &tableResultHandler{} - firstPartRanges, secondPartRanges := splitRanges(ranges, e.keepOrder) + firstPartRanges, secondPartRanges := splitRanges(ranges, true) firstResult, err := e.buildResp(firstPartRanges) if err != nil { return errors.Trace(err) @@ -289,9 +288,11 @@ func (e *AnalyzeColumnsExec) open() error { func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) { var builder distsql.RequestBuilder + // Always set KeepOrder of the request to be true, in order to compute + // correct `correlation` of columns. kvReq, err := builder.SetTableRanges(e.physicalTableID, ranges, nil). SetAnalyzeRequest(e.analyzePB). - SetKeepOrder(e.keepOrder). + SetKeepOrder(true). SetConcurrency(e.concurrency). Build() if err != nil { @@ -363,7 +364,8 @@ func (e *AnalyzeColumnsExec) buildStats() (hists []*statistics.Histogram, cms [] } for i, col := range e.colsInfo { for j, s := range collectors[i].Samples { - collectors[i].Samples[j], err = tablecodec.DecodeColumnValue(s.GetBytes(), &col.FieldType, timeZone) + collectors[i].Samples[j].Ordinal = j + collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone) if err != nil { return nil, nil, errors.Trace(err) } diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 5c0199b8bf9b2..40aa42ee79bae 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -54,7 +54,7 @@ PARTITION BY RANGE ( a ) ( for _, def := range pi.Definitions { statsTbl := handle.GetPartitionStats(table.Meta(), def.ID) c.Assert(statsTbl.Pseudo, IsFalse) - c.Assert(len(statsTbl.Columns), Equals, 2) + c.Assert(len(statsTbl.Columns), Equals, 3) c.Assert(len(statsTbl.Indices), Equals, 1) for _, col := range statsTbl.Columns { c.Assert(col.Len(), Greater, 0) @@ -80,7 +80,7 @@ PARTITION BY RANGE ( a ) ( statsTbl := handle.GetPartitionStats(table.Meta(), def.ID) if i == 0 { c.Assert(statsTbl.Pseudo, IsFalse) - c.Assert(len(statsTbl.Columns), Equals, 2) + c.Assert(len(statsTbl.Columns), Equals, 3) c.Assert(len(statsTbl.Indices), Equals, 1) } else { c.Assert(statsTbl.Pseudo, IsTrue) diff --git a/executor/builder.go b/executor/builder.go index 50dab43cb722d..dc363dd4e6827 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1360,9 +1360,7 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeInde func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, maxNumBuckets uint64) *AnalyzeColumnsExec { cols := task.ColsInfo - keepOrder := false if task.PKInfo != nil { - keepOrder = true cols = append([]*model.ColumnInfo{task.PKInfo}, cols...) } @@ -1373,7 +1371,6 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo colsInfo: task.ColsInfo, pkInfo: task.PKInfo, concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency, - keepOrder: keepOrder, analyzePB: &tipb.AnalyzeReq{ Tp: tipb.AnalyzeType_TypeColumn, StartTs: math.MaxUint64, diff --git a/executor/show_stats.go b/executor/show_stats.go index 893f068cdb832..c63286e6a8337 100644 --- a/executor/show_stats.go +++ b/executor/show_stats.go @@ -100,6 +100,7 @@ func (e *ShowExec) histogramToRow(dbName, tblName, partitionName, colName string hist.NDV, hist.NullCount, avgColSize, + hist.Correlation, }) } diff --git a/executor/show_stats_test.go b/executor/show_stats_test.go index 05987b5935203..dcb9139073b72 100644 --- a/executor/show_stats_test.go +++ b/executor/show_stats_test.go @@ -90,14 +90,16 @@ func (s *testSuite1) TestShowPartitionStats(c *C) { c.Assert(result.Rows()[0][2], Equals, "p0") result = tk.MustQuery("show stats_histograms").Sort() - c.Assert(len(result.Rows()), Equals, 2) + c.Assert(len(result.Rows()), Equals, 3) c.Assert(result.Rows()[0][2], Equals, "p0") c.Assert(result.Rows()[0][3], Equals, "a") c.Assert(result.Rows()[1][2], Equals, "p0") - c.Assert(result.Rows()[1][3], Equals, "idx") + c.Assert(result.Rows()[1][3], Equals, "b") + c.Assert(result.Rows()[2][2], Equals, "p0") + c.Assert(result.Rows()[2][3], Equals, "idx") result = tk.MustQuery("show stats_buckets").Sort() - result.Check(testkit.Rows("test t p0 a 0 0 1 1 1 1", "test t p0 idx 1 0 1 1 1 1")) + result.Check(testkit.Rows("test t p0 a 0 0 1 1 1 1", "test t p0 b 0 0 1 1 1 1", "test t p0 idx 1 0 1 1 1 1")) result = tk.MustQuery("show stats_healthy") result.Check(testkit.Rows("test t p0 100")) diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index fee08d6866ed0..e0af81af42168 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -483,7 +483,7 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) { }{ { sql: "analyze table t3", - best: "Analyze{Index(a),Table(b)}", + best: "Analyze{Index(a),Table(a, b)}", }, // Test analyze full table. { @@ -708,10 +708,10 @@ func (s *testAnalyzeSuite) TestCorrelatedEstimation(c *C) { " ├─TableReader_12 10.00 root data:TableScan_11", " │ └─TableScan_11 10.00 cop table:t, range:[-inf,+inf], keep order:false", " └─MaxOneRow_13 1.00 root ", - " └─Projection_14 0.00 root concat(cast(t1.a), \",\", cast(t1.b))", - " └─IndexLookUp_21 0.00 root ", + " └─Projection_14 0.10 root concat(cast(t1.a), \",\", cast(t1.b))", + " └─IndexLookUp_21 0.10 root ", " ├─IndexScan_18 1.00 cop table:t1, index:c, range: decided by [eq(t1.c, test.t.c)], keep order:false", - " └─Selection_20 0.00 cop eq(t1.a, test.t.a)", + " └─Selection_20 0.10 cop eq(t1.a, test.t.a)", " └─TableScan_19 1.00 cop table:t, keep order:false", )) } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 7683bd33f1701..6205d243bc650 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -645,13 +645,9 @@ func (b *PlanBuilder) buildCheckIndexSchema(tn *ast.TableName, indexName string) // getColsInfo returns the info of index columns, normal columns and primary key. func getColsInfo(tn *ast.TableName) (indicesInfo []*model.IndexInfo, colsInfo []*model.ColumnInfo, pkCol *model.ColumnInfo) { tbl := tn.TableInfo - // idxNames contains all the normal columns that can be analyzed more effectively, because those columns occur as index - // columns or primary key columns with integer type. - var idxNames []string if tbl.PKIsHandle { for _, col := range tbl.Columns { if mysql.HasPriKeyFlag(col.Flag) { - idxNames = append(idxNames, col.Name.L) pkCol = col } } @@ -659,22 +655,13 @@ func getColsInfo(tn *ast.TableName) (indicesInfo []*model.IndexInfo, colsInfo [] for _, idx := range tn.TableInfo.Indices { if idx.State == model.StatePublic { indicesInfo = append(indicesInfo, idx) - if len(idx.Columns) == 1 { - idxNames = append(idxNames, idx.Columns[0].Name.L) - } } } for _, col := range tbl.Columns { - isIndexCol := false - for _, idx := range idxNames { - if idx == col.Name.L { - isIndexCol = true - break - } - } - if !isIndexCol { - colsInfo = append(colsInfo, col) + if col == pkCol { + continue } + colsInfo = append(colsInfo, col) } return } @@ -1782,9 +1769,9 @@ func buildShowSchema(s *ast.ShowStmt, isView bool) (schema *expression.Schema) { names = []string{"Db_name", "Table_name", "Partition_name", "Update_time", "Modify_count", "Row_count"} ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeLonglong} case ast.ShowStatsHistograms: - names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Update_time", "Distinct_count", "Null_count", "Avg_col_size"} + names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Update_time", "Distinct_count", "Null_count", "Avg_col_size", "Correlation"} ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeTiny, mysql.TypeDatetime, - mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDouble} + mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeDouble, mysql.TypeDouble} case ast.ShowStatsBuckets: names = []string{"Db_name", "Table_name", "Partition_name", "Column_name", "Is_index", "Bucket_id", "Count", "Repeats", "Lower_Bound", "Upper_Bound"} diff --git a/session/bootstrap.go b/session/bootstrap.go index ba11e06ec4b18..beec5befa8b75 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -168,6 +168,7 @@ const ( cm_sketch blob, stats_ver bigint(64) NOT NULL DEFAULT 0, flag bigint(64) NOT NULL DEFAULT 0, + correlation double NOT NULL DEFAULT 0, unique index tbl(table_id, is_index, hist_id) );` @@ -286,6 +287,7 @@ const ( version24 = 24 version25 = 25 version26 = 26 + version27 = 27 ) func checkBootstrapped(s Session) (bool, error) { @@ -448,6 +450,10 @@ func upgrade(s Session) { upgradeToVer26(s) } + if ver < version27 { + upgradeToVer27(s) + } + updateBootstrapVer(s) _, err = s.Execute(context.Background(), "COMMIT") @@ -719,6 +725,10 @@ func upgradeToVer26(s Session) { mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET Create_role_priv='Y',Drop_role_priv='Y'") } +func upgradeToVer27(s Session) { + doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms ADD COLUMN `correlation` double NOT NULL DEFAULT 0", infoschema.ErrColumnExists) +} + // updateBootstrapVer updates bootstrap version variable in mysql.TiDB table. func updateBootstrapVer(s Session) { // Update bootstrap version. diff --git a/statistics/bootstrap.go b/statistics/bootstrap.go index 19842303cfeb6..61064c71003d5 100644 --- a/statistics/bootstrap.go +++ b/statistics/bootstrap.go @@ -120,6 +120,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables stat continue } hist := NewHistogram(id, ndv, nullCount, version, &colInfo.FieldType, 0, totColSize) + hist.Correlation = row.GetFloat64(9) table.Columns[hist.ID] = &Column{ Histogram: *hist, PhysicalID: table.PhysicalID, @@ -134,7 +135,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables stat func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables statsCache) error { h.mu.Lock() defer h.mu.Unlock() - sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver from mysql.stats_histograms" + sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation from mysql.stats_histograms" rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) if len(rc) > 0 { defer terror.Call(rc[0].Close) diff --git a/statistics/builder.go b/statistics/builder.go index c9a66e057b501..e077137c3ad70 100644 --- a/statistics/builder.go +++ b/statistics/builder.go @@ -105,9 +105,9 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample } sc := ctx.GetSessionVars().StmtCtx samples := collector.Samples - err := types.SortDatums(sc, samples) + err := SortSampleItems(sc, samples) if err != nil { - return nil, errors.Trace(err) + return nil, err } hg := NewHistogram(id, ndv, collector.NullCount, 0, tp, int(numBuckets), collector.TotalSize) @@ -124,9 +124,11 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample } bucketIdx := 0 var lastCount int64 - hg.AppendBucket(&samples[0], &samples[0], int64(sampleFactor), int64(ndvFactor)) + var corrXYSum float64 + hg.AppendBucket(&samples[0].Value, &samples[0].Value, int64(sampleFactor), int64(ndvFactor)) for i := int64(1); i < sampleNum; i++ { - cmp, err := hg.GetUpper(bucketIdx).CompareDatum(sc, &samples[i]) + corrXYSum += float64(i) * float64(samples[i].Ordinal) + cmp, err := hg.GetUpper(bucketIdx).CompareDatum(sc, &samples[i].Value) if err != nil { return nil, errors.Trace(err) } @@ -143,14 +145,32 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample } } else if totalCount-float64(lastCount) <= valuesPerBucket { // The bucket still have room to store a new item, update the bucket. - hg.updateLastBucket(&samples[i], int64(totalCount), int64(ndvFactor)) + hg.updateLastBucket(&samples[i].Value, int64(totalCount), int64(ndvFactor)) } else { lastCount = hg.Buckets[bucketIdx].Count // The bucket is full, store the item in the next bucket. bucketIdx++ - hg.AppendBucket(&samples[i], &samples[i], int64(totalCount), int64(ndvFactor)) + hg.AppendBucket(&samples[i].Value, &samples[i].Value, int64(totalCount), int64(ndvFactor)) } } + // Compute column order correlation with handle. + if sampleNum == 1 { + hg.Correlation = 1 + return hg, nil + } + // X means the ordinal of the item in original sequence, Y means the oridnal of the item in the + // sorted sequence, we know that X and Y value sets are both: + // 0, 1, ..., sampleNum-1 + // we can simply compute sum(X) = sum(Y) = + // (sampleNum-1)*sampleNum / 2 + // and sum(X^2) = sum(Y^2) = + // (sampleNum-1)*sampleNum*(2*sampleNum-1) / 6 + // The formula for computing correlation is borrowed from PostgreSQL. + // Note that (itemsCount*corrX2Sum - corrXSum*corrXSum) would never be zero when sampleNum is larger than 1. + itemsCount := float64(sampleNum) + corrXSum := (itemsCount - 1) * itemsCount / 2.0 + corrX2Sum := (itemsCount - 1) * itemsCount * (2*itemsCount - 1) / 6.0 + hg.Correlation = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum) return hg, nil } diff --git a/statistics/fmsketch_test.go b/statistics/fmsketch_test.go index 556f4e41ed440..fbc4086434724 100644 --- a/statistics/fmsketch_test.go +++ b/statistics/fmsketch_test.go @@ -23,7 +23,7 @@ import ( func (s *testStatisticsSuite) TestSketch(c *C) { sc := &stmtctx.StatementContext{TimeZone: time.Local} maxSize := 1000 - sampleSketch, ndv, err := buildFMSketch(sc, s.samples, maxSize) + sampleSketch, ndv, err := buildFMSketch(sc, extractSampleItemsDatums(s.samples), maxSize) c.Check(err, IsNil) c.Check(ndv, Equals, int64(6232)) @@ -51,7 +51,7 @@ func (s *testStatisticsSuite) TestSketch(c *C) { func (s *testStatisticsSuite) TestSketchProtoConversion(c *C) { sc := &stmtctx.StatementContext{TimeZone: time.Local} maxSize := 1000 - sampleSketch, ndv, err := buildFMSketch(sc, s.samples, maxSize) + sampleSketch, ndv, err := buildFMSketch(sc, extractSampleItemsDatums(s.samples), maxSize) c.Check(err, IsNil) c.Check(ndv, Equals, int64(6232)) diff --git a/statistics/handle_test.go b/statistics/handle_test.go index 9b71310504a32..cd0339d2b2cc0 100644 --- a/statistics/handle_test.go +++ b/statistics/handle_test.go @@ -437,3 +437,68 @@ func newStoreWithBootstrap(statsLease time.Duration) (kv.Storage, *domain.Domain do.SetStatsUpdating(true) return store, do, errors.Trace(err) } + +func (s *testStatsSuite) TestCorrelation(c *C) { + defer cleanEnv(c, s.store, s.do) + testKit := testkit.NewTestKit(c, s.store) + testKit.MustExec("use test") + testKit.MustExec("create table t(c1 int primary key, c2 int)") + testKit.MustExec("insert into t values(1,1),(3,12),(4,20),(2,7),(5,21)") + testKit.MustExec("analyze table t") + result := testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + c.Assert(len(result.Rows()), Equals, 2) + c.Assert(result.Rows()[0][9], Equals, "0") + c.Assert(result.Rows()[1][9], Equals, "1") + testKit.MustExec("insert into t values(8,18)") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + c.Assert(len(result.Rows()), Equals, 2) + c.Assert(result.Rows()[0][9], Equals, "0") + c.Assert(result.Rows()[1][9], Equals, "0.828571") + + testKit.MustExec("truncate table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + c.Assert(len(result.Rows()), Equals, 0) + testKit.MustExec("insert into t values(1,21),(3,12),(4,7),(2,20),(5,1)") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + c.Assert(len(result.Rows()), Equals, 2) + c.Assert(result.Rows()[0][9], Equals, "0") + c.Assert(result.Rows()[1][9], Equals, "-1") + testKit.MustExec("insert into t values(8,4)") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + c.Assert(len(result.Rows()), Equals, 2) + c.Assert(result.Rows()[0][9], Equals, "0") + c.Assert(result.Rows()[1][9], Equals, "-0.942857") + + testKit.MustExec("drop table t") + testKit.MustExec("create table t(c1 int, c2 int)") + testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(4,20),(5,21),(8,18)") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + c.Assert(len(result.Rows()), Equals, 2) + c.Assert(result.Rows()[0][9], Equals, "1") + c.Assert(result.Rows()[1][9], Equals, "0.828571") + + testKit.MustExec("truncate table t") + testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(8,18),(4,20),(5,21)") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + c.Assert(len(result.Rows()), Equals, 2) + c.Assert(result.Rows()[0][9], Equals, "0.828571") + c.Assert(result.Rows()[1][9], Equals, "1") + + testKit.MustExec("drop table t") + testKit.MustExec("create table t(c1 int primary key, c2 int, c3 int, key idx_c2(c2))") + testKit.MustExec("insert into t values(1,1,1),(2,2,2),(3,3,3)") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 0").Sort() + c.Assert(len(result.Rows()), Equals, 3) + c.Assert(result.Rows()[0][9], Equals, "0") + c.Assert(result.Rows()[1][9], Equals, "1") + c.Assert(result.Rows()[2][9], Equals, "1") + result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 1").Sort() + c.Assert(len(result.Rows()), Equals, 1) + c.Assert(result.Rows()[0][9], Equals, "0") +} diff --git a/statistics/histogram.go b/statistics/histogram.go index c594efc338b2f..77b29c1294c71 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -64,6 +64,11 @@ type Histogram struct { scalars []scalar // TotColSize is the total column size for the histogram. TotColSize int64 + + // Correlation is the statistical correlation between physical row ordering and logical ordering of + // the column values. This ranges from -1 to +1, and it is only valid for Column histogram, not for + // Index histogram. + Correlation float64 } // Bucket store the bucket count and repeat. @@ -236,8 +241,8 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg if isAnalyzed == 1 { flag = analyzeFlag } - replaceSQL := fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag) values (%d, %d, %d, %d, %d, %d, X'%X', %d, %d, %d)", - tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, curStatsVersion, flag) + replaceSQL := fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%d, %d, %d, %d, %d, %d, X'%X', %d, %d, %d, %f)", + tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, curStatsVersion, flag, hg.Correlation) _, err = exec.Execute(ctx, replaceSQL) if err != nil { return diff --git a/statistics/sample.go b/statistics/sample.go index f454678478187..f4a7acbe195c2 100644 --- a/statistics/sample.go +++ b/statistics/sample.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math/rand" + "sort" "github.com/pingcap/errors" "github.com/pingcap/parser/ast" @@ -29,9 +30,58 @@ import ( "github.com/pingcap/tipb/go-tipb" ) +// SampleItem is an item of sampled column value. +type SampleItem struct { + // Value is the sampled column value. + Value types.Datum + // Ordinal is original position of this item in SampleCollector before sorting. This + // is used for computing correlation. + Ordinal int +} + +// SortSampleItems sorts a slice of SampleItem. +func SortSampleItems(sc *stmtctx.StatementContext, items []*SampleItem) error { + sorter := sampleItemSorter{items: items, sc: sc} + sort.Sort(&sorter) + return sorter.err +} + +// extractSampleItemsDatums is for test purpose only to extract Datum slice +// from SampleItem slice. +func extractSampleItemsDatums(items []*SampleItem) []types.Datum { + datums := make([]types.Datum, len(items)) + for i, item := range items { + datums[i] = item.Value + } + return datums +} + +type sampleItemSorter struct { + items []*SampleItem + sc *stmtctx.StatementContext + err error +} + +func (s *sampleItemSorter) Len() int { + return len(s.items) +} + +func (s *sampleItemSorter) Less(i, j int) bool { + var cmp int + cmp, s.err = s.items[i].Value.CompareDatum(s.sc, &s.items[j].Value) + if s.err != nil { + return true + } + return cmp < 0 +} + +func (s *sampleItemSorter) Swap(i, j int) { + s.items[i], s.items[j] = s.items[j], s.items[i] +} + // SampleCollector will collect Samples and calculate the count and ndv of an attribute. type SampleCollector struct { - Samples []types.Datum + Samples []*SampleItem seenValues int64 // seenValues is the current seen values. IsMerger bool NullCount int64 @@ -52,8 +102,8 @@ func (c *SampleCollector) MergeSampleCollector(sc *stmtctx.StatementContext, rc err := c.CMSketch.MergeCMSketch(rc.CMSketch) terror.Log(errors.Trace(err)) } - for _, val := range rc.Samples { - err := c.collect(sc, val) + for _, item := range rc.Samples { + err := c.collect(sc, item.Value) terror.Log(errors.Trace(err)) } } @@ -69,8 +119,8 @@ func SampleCollectorToProto(c *SampleCollector) *tipb.SampleCollector { if c.CMSketch != nil { collector.CmSketch = CMSketchToProto(c.CMSketch) } - for _, sample := range c.Samples { - collector.Samples = append(collector.Samples, sample.GetBytes()) + for _, item := range c.Samples { + collector.Samples = append(collector.Samples, item.Value.GetBytes()) } return collector } @@ -91,7 +141,8 @@ func SampleCollectorFromProto(collector *tipb.SampleCollector) *SampleCollector for _, val := range collector.Samples { // When store the histogram bucket boundaries to kv, we need to limit the length of the value. if len(val) <= maxSampleValueLength { - s.Samples = append(s.Samples, types.NewBytesDatum(val)) + item := &SampleItem{Value: types.NewBytesDatum(val)} + s.Samples = append(s.Samples, item) } } return s @@ -118,12 +169,16 @@ func (c *SampleCollector) collect(sc *stmtctx.StatementContext, d types.Datum) e // to the underlying slice, GC can't free them which lead to memory leak eventually. // TODO: Refactor the proto to avoid copying here. if len(c.Samples) < int(c.MaxSampleSize) { - c.Samples = append(c.Samples, types.CopyDatum(d)) + newItem := &SampleItem{Value: types.CopyDatum(d)} + c.Samples = append(c.Samples, newItem) } else { shouldAdd := rand.Int63n(c.seenValues) < c.MaxSampleSize if shouldAdd { idx := rand.Intn(int(c.MaxSampleSize)) - c.Samples[idx] = types.CopyDatum(d) + newItem := &SampleItem{Value: types.CopyDatum(d)} + // To keep the order of the elements, we use delete and append, not direct replacement. + c.Samples = append(c.Samples[:idx], c.Samples[idx+1:]...) + c.Samples = append(c.Samples, newItem) } } return nil diff --git a/statistics/statistics_test.go b/statistics/statistics_test.go index cd9761457f14c..cfd2f4eb51a12 100644 --- a/statistics/statistics_test.go +++ b/statistics/statistics_test.go @@ -43,7 +43,7 @@ var _ = Suite(&testStatisticsSuite{}) type testStatisticsSuite struct { count int - samples []types.Datum + samples []*SampleItem rc sqlexec.RecordSet pk sqlexec.RecordSet } @@ -109,23 +109,26 @@ func (r *recordSet) Close() error { func (s *testStatisticsSuite) SetUpSuite(c *C) { s.count = 100000 - samples := make([]types.Datum, 10000) + samples := make([]*SampleItem, 10000) + for i := 0; i < len(samples); i++ { + samples[i] = &SampleItem{} + } start := 1000 - samples[0].SetInt64(0) + samples[0].Value.SetInt64(0) for i := 1; i < start; i++ { - samples[i].SetInt64(2) + samples[i].Value.SetInt64(2) } for i := start; i < len(samples); i++ { - samples[i].SetInt64(int64(i)) + samples[i].Value.SetInt64(int64(i)) } for i := start; i < len(samples); i += 3 { - samples[i].SetInt64(samples[i].GetInt64() + 1) + samples[i].Value.SetInt64(samples[i].Value.GetInt64() + 1) } for i := start; i < len(samples); i += 5 { - samples[i].SetInt64(samples[i].GetInt64() + 2) + samples[i].Value.SetInt64(samples[i].Value.GetInt64() + 2) } sc := new(stmtctx.StatementContext) - err := types.SortDatums(sc, samples) + err := SortSampleItems(sc, samples) c.Check(err, IsNil) s.samples = samples @@ -324,10 +327,11 @@ func (s *testStatisticsSuite) TestBuild(c *C) { datum := types.Datum{} datum.SetMysqlJSON(json.BinaryJSON{TypeCode: json.TypeCodeLiteral}) + item := &SampleItem{Value: datum} collector = &SampleCollector{ Count: 1, NullCount: 0, - Samples: []types.Datum{datum}, + Samples: []*SampleItem{item}, FMSketch: sketch, } col, err = BuildColumn(ctx, bucketCount, 2, collector, types.NewFieldType(mysql.TypeJSON)) diff --git a/statistics/table.go b/statistics/table.go index 0e0953a85e8a0..7f59926fd1d56 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -214,6 +214,7 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *Table, tableInfo * break } if col != nil { + col.Histogram.Correlation = row.GetFloat64(9) table.Columns[col.ID] = col } else { // If we didn't find a Column or Index in tableInfo, we won't load the histogram for it. @@ -244,7 +245,7 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in table = table.copy() } table.Pseudo = false - selSQL := fmt.Sprintf("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag from mysql.stats_histograms where table_id = %d", physicalID) + selSQL := fmt.Sprintf("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation from mysql.stats_histograms where table_id = %d", physicalID) rows, _, err := h.restrictedExec.ExecRestrictedSQL(nil, selSQL) if err != nil { return nil, errors.Trace(err) From 2cc6b0543d68d93e6a23135b70c13a2f8ab4b222 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Sat, 23 Feb 2019 15:25:40 +0800 Subject: [PATCH 2/3] move extractSampleItemsDatums to _test.go --- statistics/fmsketch_test.go | 11 +++++++++++ statistics/sample.go | 10 ---------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/statistics/fmsketch_test.go b/statistics/fmsketch_test.go index fbc4086434724..fe6a38472b94c 100644 --- a/statistics/fmsketch_test.go +++ b/statistics/fmsketch_test.go @@ -18,8 +18,19 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/types" ) +// extractSampleItemsDatums is for test purpose only to extract Datum slice +// from SampleItem slice. +func extractSampleItemsDatums(items []*SampleItem) []types.Datum { + datums := make([]types.Datum, len(items)) + for i, item := range items { + datums[i] = item.Value + } + return datums +} + func (s *testStatisticsSuite) TestSketch(c *C) { sc := &stmtctx.StatementContext{TimeZone: time.Local} maxSize := 1000 diff --git a/statistics/sample.go b/statistics/sample.go index f4a7acbe195c2..fe14f145382f1 100644 --- a/statistics/sample.go +++ b/statistics/sample.go @@ -46,16 +46,6 @@ func SortSampleItems(sc *stmtctx.StatementContext, items []*SampleItem) error { return sorter.err } -// extractSampleItemsDatums is for test purpose only to extract Datum slice -// from SampleItem slice. -func extractSampleItemsDatums(items []*SampleItem) []types.Datum { - datums := make([]types.Datum, len(items)) - for i, item := range items { - datums[i] = item.Value - } - return datums -} - type sampleItemSorter struct { items []*SampleItem sc *stmtctx.StatementContext From e8ec929615d97d46bc2e322777a59283510aeb24 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Sat, 23 Feb 2019 18:05:30 +0800 Subject: [PATCH 3/3] update comment --- statistics/builder.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/statistics/builder.go b/statistics/builder.go index e077137c3ad70..6ef3a966b4cfc 100644 --- a/statistics/builder.go +++ b/statistics/builder.go @@ -165,7 +165,8 @@ func BuildColumn(ctx sessionctx.Context, numBuckets, id int64, collector *Sample // (sampleNum-1)*sampleNum / 2 // and sum(X^2) = sum(Y^2) = // (sampleNum-1)*sampleNum*(2*sampleNum-1) / 6 - // The formula for computing correlation is borrowed from PostgreSQL. + // We use "Pearson correlation coefficient" to compute the order correlation of columns, + // the formula is based on https://en.wikipedia.org/wiki/Pearson_correlation_coefficient. // Note that (itemsCount*corrX2Sum - corrXSum*corrXSum) would never be zero when sampleNum is larger than 1. itemsCount := float64(sampleNum) corrXSum := (itemsCount - 1) * itemsCount / 2.0