Skip to content

Commit

Permalink
*: add explicit partition pruning to index joins (#32007) (#32093)
Browse files Browse the repository at this point in the history
close #32007
  • Loading branch information
mjonss authored Mar 9, 2022
1 parent a1d8f2f commit b9bd5a7
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 49 deletions.
4 changes: 2 additions & 2 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ func TestAlterTableTruncatePartitionByList(t *testing.T) {
tk.MustExec(`insert into t values (1),(3),(5),(null)`)
oldTbl := tk.GetTableByName("test", "t")
tk.MustExec(`alter table t truncate partition p1`)
tk.MustQuery("select * from t").Check(testkit.Rows("1", "5", "<nil>"))
tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1", "5", "<nil>"))
tbl := tk.GetTableByName("test", "t")
require.NotNil(t, tbl.Meta().Partition)
part := tbl.Meta().Partition
Expand Down Expand Up @@ -1329,7 +1329,7 @@ func TestAlterTableTruncatePartitionByListColumns(t *testing.T) {
tk.MustExec(`insert into t values (1,'a'),(3,'a'),(5,'a'),(null,null)`)
oldTbl := tk.GetTableByName("test", "t")
tk.MustExec(`alter table t truncate partition p1`)
tk.MustQuery("select * from t").Check(testkit.Rows("1 a", "5 a", "<nil> <nil>"))
tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1 a", "5 a", "<nil> <nil>"))
tbl := tk.GetTableByName("test", "t")
require.NotNil(t, tbl.Meta().Partition)
part := tbl.Meta().Partition
Expand Down
80 changes: 35 additions & 45 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3896,26 +3896,37 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
return nil, err
}
tbInfo := e.table.Meta()
if v.IsCommonHandle {
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
if v.IsCommonHandle {
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
}

tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil, err
}
var kvRanges []kv.KeyRange
handles, _ := dedupHandles(lookUpContents)
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}
tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil, err
}
partitionInfo := &v.PartitionInfo
usedPartitionList, err := partitionPruning(e.ctx, pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
return nil, err
}
usedPartitions := make(map[int64]table.PhysicalTable, len(usedPartitionList))
for _, p := range usedPartitionList {
usedPartitions[p.GetPhysicalID()] = p
}
var kvRanges []kv.KeyRange
if v.IsCommonHandle {
if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
// In this case we can use dynamic partition pruning.
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
for _, content := range lookUpContents {
Expand All @@ -3927,22 +3938,19 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
return nil, err
}
pid := p.GetPhysicalID()
if _, ok := usedPartitions[pid]; !ok {
continue
}
tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, []*indexJoinLookUpContent{content}, indexRanges, keyOff2IdxOff, cwc, nil, interruptSignal)
if err != nil {
return nil, err
}
kvRanges = append(kvRanges, tmp...)
}
} else {
partitionInfo := &v.PartitionInfo
partitions, err := partitionPruning(e.ctx, pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
return nil, err
}
kvRanges = make([]kv.KeyRange, 0, len(partitions)*len(lookUpContents))
for _, p := range partitions {
pid := p.GetPhysicalID()
tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
kvRanges = make([]kv.KeyRange, 0, len(usedPartitions)*len(lookUpContents))
for _, p := range usedPartitionList {
tmp, err := buildKvRangesForIndexJoin(e.ctx, p.GetPhysicalID(), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -3953,22 +3961,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}

handles, lookUpContents := dedupHandles(lookUpContents)
if tbInfo.GetPartitionInfo() == nil {
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}
if !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}

tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil, err
}
var kvRanges []kv.KeyRange
if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
Expand All @@ -3981,19 +3974,16 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
return nil, err
}
pid := p.GetPhysicalID()
if _, ok := usedPartitions[pid]; !ok {
continue
}
handle := kv.IntHandle(content.keys[0].GetInt64())
tmp := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, tmp...)
}
} else {
partitionInfo := &v.PartitionInfo
partitions, err := partitionPruning(e.ctx, pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
return nil, err
}
for _, p := range partitions {
pid := p.GetPhysicalID()
tmp := distsql.TableHandlesToKVRanges(pid, handles)
for _, p := range usedPartitionList {
tmp := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, tmp...)
}
}
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ type IndexLookUpExecutor struct {
partitionTableMode bool // if this executor is accessing a partition table
prunedPartitions []table.PhysicalTable // partition tables need to access
partitionRangeMap map[int64][]*ranger.Range
partitionKVRanges [][]kv.KeyRange // kvRanges of each partition table
partitionKVRanges [][]kv.KeyRange // kvRanges of each prunedPartitions

// All fields above are immutable.

Expand Down
1 change: 1 addition & 0 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,7 @@ func (p *LogicalJoin) constructInnerTableScanTask(
Columns: ds.TblCols,
ColumnNames: ds.names,
}
ts.PartitionInfo = copTask.partitionInfo
selStats := ts.stats.Scale(selectivity)
ts.addPushedDownSelection(copTask, selStats)
t := copTask.convertToRootTask(ds.ctx)
Expand Down
24 changes: 23 additions & 1 deletion planner/core/partition_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func TestIssue22898(t *testing.T) {
tk.MustExec("CREATE TABLE NT_RP3763 (COL1 TINYINT(8) SIGNED COMMENT \"NUMERIC NO INDEX\" DEFAULT 41,COL2 VARCHAR(20),COL3 DATETIME,COL4 BIGINT,COL5 FLOAT) PARTITION BY RANGE (COL1 * COL3) (PARTITION P0 VALUES LESS THAN (0),PARTITION P1 VALUES LESS THAN (10),PARTITION P2 VALUES LESS THAN (20),PARTITION P3 VALUES LESS THAN (30),PARTITION P4 VALUES LESS THAN (40),PARTITION P5 VALUES LESS THAN (50),PARTITION PMX VALUES LESS THAN MAXVALUE);")
tk.MustExec("insert into NT_RP3763 (COL1,COL2,COL3,COL4,COL5) values(-82,\"夐齏醕皆磹漋甓崘潮嵙燷渏艂朼洛炷鉢儝鱈肇\",\"5748\\-06\\-26\\ 20:48:49\",-3133527360541070260,-2.624880003397658e+38);")
tk.MustExec("insert into NT_RP3763 (COL1,COL2,COL3,COL4,COL5) values(48,\"簖鹩筈匹眜赖泽騈爷詵赺玡婙Ɇ郝鮙廛賙疼舢\",\"7228\\-12\\-13\\ 02:59:54\",-6181009269190017937,2.7731105531290494e+38);")
tk.MustQuery("select * from `NT_RP3763` where `COL1` in (10, 48, -82);").Check(testkit.Rows("-82 夐齏醕皆磹漋甓崘潮嵙燷渏艂朼洛炷鉢儝鱈肇 5748-06-26 20:48:49 -3133527360541070260 -262488000000000000000000000000000000000", "48 簖鹩筈匹眜赖泽騈爷詵赺玡婙Ɇ郝鮙廛賙疼舢 7228-12-13 02:59:54 -6181009269190017937 277311060000000000000000000000000000000"))
tk.MustQuery("select * from `NT_RP3763` where `COL1` in (10, 48, -82);").Sort().Check(testkit.Rows("-82 夐齏醕皆磹漋甓崘潮嵙燷渏艂朼洛炷鉢儝鱈肇 5748-06-26 20:48:49 -3133527360541070260 -262488000000000000000000000000000000000", "48 簖鹩筈匹眜赖泽騈爷詵赺玡婙Ɇ郝鮙廛賙疼舢 7228-12-13 02:59:54 -6181009269190017937 277311060000000000000000000000000000000"))
tk.MustQuery("select * from `NT_RP3763` where `COL1` in (48);").Check(testkit.Rows("48 簖鹩筈匹眜赖泽騈爷詵赺玡婙Ɇ郝鮙廛賙疼舢 7228-12-13 02:59:54 -6181009269190017937 277311060000000000000000000000000000000"))
}

Expand Down Expand Up @@ -696,3 +696,25 @@ func TestHashPartitionPruning(t *testing.T) {
tk.MustExec("insert into t(col1, col3) values(0, 3522101843073676459);")
tk.MustQuery("SELECT col1, COL3 FROM t WHERE COL1 IN (0,14158354938390,0) AND COL3 IN (3522101843073676459,-2846203247576845955,838395691793635638);").Check(testkit.Rows("0 3522101843073676459"))
}

func TestIssue32007(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database Issue32007")
tk.MustExec("USE Issue32007")
tk.MustExec("create table t1 (a int, b tinyint, primary key (a)) partition by range (a) (" +
"partition p0 values less than (5)," +
"partition p1 values less than (20)," +
"partition p2 values less than (30)," +
"partition p3 values less than (40)," +
"partition p4 values less than MAXVALUE)")
tk.MustExec("insert into t1 values (0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14), (15, 15), (20, 20), (21, 21), (22, 22), (23, 23), (24, 24), (25, 25), (30, 30), (31, 31), (32, 32), (33, 33), (34, 34), (35, 35), (36, 36), (40, 40), (50, 50), (80, 80), (90, 90), (100, 100)")
tk.MustExec("create table t3 (a int, b mediumint, primary key (a))")
tk.MustExec("insert into t3 values (0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14), (15, 15), (16, 16), (17, 17), (18, 18), (19, 19), (20, 20), (21, 21), (22, 22), (23, 23)")

tk.MustExec("set @@tidb_partition_prune_mode='static'")
tk.MustQuery("select * from t3 where t3.a <> ALL (select t1.a from t1 partition (p0)) order by t3.a").Sort().Check(testkit.Rows("10 10", "11 11", "12 12", "13 13", "14 14", "15 15", "16 16", "17 17", "18 18", "19 19", "20 20", "21 21", "22 22", "23 23", "5 5", "6 6", "7 7", "8 8", "9 9"))
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustQuery("select * from t3 where t3.a <> ALL (select t1.a from t1 partition (p0)) order by t3.a").Sort().Check(testkit.Rows("10 10", "11 11", "12 12", "13 13", "14 14", "15 15", "16 16", "17 17", "18 18", "19 19", "20 20", "21 21", "22 22", "23 23", "5 5", "6 6", "7 7", "8 8", "9 9"))
}

0 comments on commit b9bd5a7

Please sign in to comment.