From 4a72171ffb55b9dcf80d2dea8788ab62aff22690 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Tue, 20 Dec 2022 09:44:54 +0100 Subject: [PATCH 01/11] *: Fix issue 39999, used wrong column id list for checking partitions (#40003) close pingcap/tidb#39999 --- executor/builder.go | 85 +++++++++++++----------------- executor/index_lookup_join_test.go | 20 +++---- executor/partition_table_test.go | 69 ++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 56 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index b33c57d3de234..d4270397eecd0 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3511,17 +3511,39 @@ func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []ta return nextRange, nil } -func keyColumnsIncludeAllPartitionColumns(keyColumns []int, pe *tables.PartitionExpr) bool { - tmp := make(map[int]struct{}, len(keyColumns)) - for _, offset := range keyColumns { - tmp[offset] = struct{}{} +func getPartitionKeyColOffsets(keyColIDs []int64, pt table.PartitionedTable) []int { + keyColOffsets := make([]int, len(keyColIDs)) + for i, colID := range keyColIDs { + offset := -1 + for j, col := range pt.Cols() { + if colID == col.ID { + offset = j + break + } + } + if offset == -1 { + return nil + } + keyColOffsets[i] = offset + } + + pe, err := pt.(interface { + PartitionExpr() (*tables.PartitionExpr, error) + }).PartitionExpr() + if err != nil { + return nil + } + + offsetMap := make(map[int]struct{}) + for _, offset := range keyColOffsets { + offsetMap[offset] = struct{}{} } for _, offset := range pe.ColumnOffset { - if _, ok := tmp[offset]; !ok { - return false + if _, ok := offsetMap[offset]; !ok { + return nil } } - return true + return keyColOffsets } func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table, schema *expression.Schema, partitionInfo *plannercore.PartitionInfo, @@ -3536,15 +3558,6 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table return nil, false, nil, err } - // check whether can runtime prune. - type partitionExpr interface { - PartitionExpr() (*tables.PartitionExpr, error) - } - pe, err := tbl.(partitionExpr).PartitionExpr() - if err != nil { - return nil, false, nil, err - } - // recalculate key column offsets if len(lookUpContent) == 0 { return nil, false, nil, nil @@ -3552,29 +3565,9 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table if lookUpContent[0].keyColIDs == nil { return nil, false, nil, plannercore.ErrInternal.GenWithStack("cannot get column IDs when dynamic pruning") } - keyColOffsets := make([]int, len(lookUpContent[0].keyColIDs)) - for i, colID := range lookUpContent[0].keyColIDs { - offset := -1 - for j, col := range partitionTbl.Cols() { - if colID == col.ID { - offset = j - break - } - } - if offset == -1 { - return nil, false, nil, plannercore.ErrInternal.GenWithStack("invalid column offset when dynamic pruning") - } - keyColOffsets[i] = offset - } - - offsetMap := make(map[int]bool) - for _, offset := range keyColOffsets { - offsetMap[offset] = true - } - for _, offset := range pe.ColumnOffset { - if _, ok := offsetMap[offset]; !ok { - return condPruneResult, false, nil, nil - } + keyColOffsets := getPartitionKeyColOffsets(lookUpContent[0].keyColIDs, partitionTbl) + if len(keyColOffsets) == 0 { + return condPruneResult, false, nil, nil } locateKey := make([]types.Datum, len(partitionTbl.Cols())) @@ -4149,12 +4142,6 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte } tbl, _ := builder.is.TableByID(tbInfo.ID) pt := tbl.(table.PartitionedTable) - pe, err := tbl.(interface { - PartitionExpr() (*tables.PartitionExpr, error) - }).PartitionExpr() - if err != nil { - return nil, err - } partitionInfo := &v.PartitionInfo usedPartitionList, err := builder.partitionPruning(pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames) if err != nil { @@ -4165,8 +4152,12 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte usedPartitions[p.GetPhysicalID()] = p } var kvRanges []kv.KeyRange + var keyColOffsets []int + if len(lookUpContents) > 0 { + keyColOffsets = getPartitionKeyColOffsets(lookUpContents[0].keyColIDs, pt) + } if v.IsCommonHandle { - if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) { + if len(keyColOffsets) > 0 { locateKey := make([]types.Datum, e.Schema().Len()) kvRanges = make([]kv.KeyRange, 0, len(lookUpContents)) // lookUpContentsByPID groups lookUpContents by pid(partition) so that kv ranges for same partition can be merged. @@ -4212,7 +4203,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte handles, lookUpContents := dedupHandles(lookUpContents) - if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) { + if len(keyColOffsets) > 0 { locateKey := make([]types.Datum, e.Schema().Len()) kvRanges = make([]kv.KeyRange, 0, len(lookUpContents)) for _, content := range lookUpContents { diff --git a/executor/index_lookup_join_test.go b/executor/index_lookup_join_test.go index 9a021568b20ee..600f052b1225e 100644 --- a/executor/index_lookup_join_test.go +++ b/executor/index_lookup_join_test.go @@ -428,17 +428,19 @@ PARTITIONS 1`) // Why does the t2.prefiller need be at least 2^32 ? If smaller the bug will not appear!?! tk.MustExec("insert into t2 values ( pow(2,32), 1, 1), ( pow(2,32)+1, 2, 0)") + tk.MustExec(`analyze table t1`) + tk.MustExec(`analyze table t2`) // Why must it be = 1 and not 2? - tk.MustQuery("explain select /* +INL_JOIN(t1,t2) */ t1.id, t1.pc from t1 where id in ( select prefiller from t2 where t2.postfiller = 1 )").Check(testkit.Rows("" + - "IndexJoin_15 10.00 root inner join, inner:TableReader_14, outer key:test.t2.prefiller, inner key:test.t1.id, equal cond:eq(test.t2.prefiller, test.t1.id)]\n" + - "[├─HashAgg_25(Build) 8.00 root group by:test.t2.prefiller, funcs:firstrow(test.t2.prefiller)->test.t2.prefiller]\n" + - "[│ └─TableReader_26 8.00 root data:HashAgg_20]\n" + - "[│ └─HashAgg_20 8.00 cop[tikv] group by:test.t2.prefiller, ]\n" + - "[│ └─Selection_24 10.00 cop[tikv] eq(test.t2.postfiller, 1)]\n" + - "[│ └─TableFullScan_23 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo]\n" + - "[└─TableReader_14(Probe) 8.00 root partition:all data:TableRangeScan_13]\n" + - "[ └─TableRangeScan_13 8.00 cop[tikv] table:t1 range: decided by [eq(test.t1.id, test.t2.prefiller)], keep order:false, stats:pseudo")) + tk.MustQuery("explain format='brief' select /* +INL_JOIN(t1,t2) */ t1.id, t1.pc from t1 where id in ( select prefiller from t2 where t2.postfiller = 1 )").Check(testkit.Rows(""+ + `IndexJoin 1.25 root inner join, inner:TableReader, outer key:test.t2.prefiller, inner key:test.t1.id, equal cond:eq(test.t2.prefiller, test.t1.id)`, + `├─HashAgg(Build) 1.00 root group by:test.t2.prefiller, funcs:firstrow(test.t2.prefiller)->test.t2.prefiller`, + `│ └─TableReader 1.00 root data:HashAgg`, + `│ └─HashAgg 1.00 cop[tikv] group by:test.t2.prefiller, `, + `│ └─Selection 1.00 cop[tikv] eq(test.t2.postfiller, 1)`, + `│ └─TableFullScan 2.00 cop[tikv] table:t2 keep order:false`, + `└─TableReader(Probe) 1.00 root partition:all data:TableRangeScan`, + ` └─TableRangeScan 1.00 cop[tikv] table:t1 range: decided by [eq(test.t1.id, test.t2.prefiller)], keep order:false, stats:pseudo`)) tk.MustQuery("show warnings").Check(testkit.Rows()) // without fix it fails with: "runtime error: index out of range [0] with length 0" tk.MustQuery("select /* +INL_JOIN(t1,t2) */ t1.id, t1.pc from t1 where id in ( select prefiller from t2 where t2.postfiller = 1 )").Check(testkit.Rows()) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 15d2c2872ca9c..5696b56f6f730 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -3831,3 +3831,72 @@ func TestIssue21732(t *testing.T) { }) } } + +func TestIssue39999(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + + tk.MustExec(`create schema test39999`) + tk.MustExec(`use test39999`) + tk.MustExec(`drop table if exists c, t`) + tk.MustExec("CREATE TABLE `c` (" + + "`serial_id` varchar(24)," + + "`occur_trade_date` date," + + "`txt_account_id` varchar(24)," + + "`capital_sub_class` varchar(10)," + + "`occur_amount` decimal(16,2)," + + "`broker` varchar(10)," + + "PRIMARY KEY (`txt_account_id`,`occur_trade_date`,`serial_id`) /*T![clustered_index] CLUSTERED */," + + "KEY `idx_serial_id` (`serial_id`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci " + + "PARTITION BY RANGE COLUMNS(`serial_id`) (" + + "PARTITION `p202209` VALUES LESS THAN ('20221001')," + + "PARTITION `p202210` VALUES LESS THAN ('20221101')," + + "PARTITION `p202211` VALUES LESS THAN ('20221201')" + + ")") + + tk.MustExec("CREATE TABLE `t` ( " + + "`txn_account_id` varchar(24), " + + "`account_id` varchar(32), " + + "`broker` varchar(10), " + + "PRIMARY KEY (`txn_account_id`) /*T![clustered_index] CLUSTERED */ " + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci") + + tk.MustExec("INSERT INTO `c` (serial_id, txt_account_id, capital_sub_class, occur_trade_date, occur_amount, broker) VALUES ('2022111700196920','04482786','CUST','2022-11-17',-2.01,'0009')") + tk.MustExec("INSERT INTO `t` VALUES ('04482786','1142927','0009')") + + tk.MustExec(`set tidb_partition_prune_mode='dynamic'`) + tk.MustExec(`analyze table c`) + tk.MustExec(`analyze table t`) + query := `select + /*+ inl_join(c) */ + c.occur_amount +from + c + join t on c.txt_account_id = t.txn_account_id + and t.broker = '0009' + and c.occur_trade_date = '2022-11-17'` + tk.MustQuery("explain " + query).Check(testkit.Rows(""+ + "IndexJoin_22 1.00 root inner join, inner:TableReader_21, outer key:test39999.t.txn_account_id, inner key:test39999.c.txt_account_id, equal cond:eq(test39999.t.txn_account_id, test39999.c.txt_account_id)", + "├─TableReader_27(Build) 1.00 root data:Selection_26", + "│ └─Selection_26 1.00 cop[tikv] eq(test39999.t.broker, \"0009\")", + "│ └─TableFullScan_25 1.00 cop[tikv] table:t keep order:false", + "└─TableReader_21(Probe) 1.00 root partition:all data:Selection_20", + " └─Selection_20 1.00 cop[tikv] eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)", + " └─TableRangeScan_19 1.00 cop[tikv] table:c range: decided by [eq(test39999.c.txt_account_id, test39999.t.txn_account_id) eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)], keep order:false")) + tk.MustQuery(query).Check(testkit.Rows("-2.01")) + + // Add the missing partition key part. + tk.MustExec(`alter table t add column serial_id varchar(24) default '2022111700196920'`) + query += ` and c.serial_id = t.serial_id` + tk.MustQuery(query).Check(testkit.Rows("-2.01")) + tk.MustQuery("explain " + query).Check(testkit.Rows(""+ + `IndexJoin_20 0.80 root inner join, inner:TableReader_19, outer key:test39999.t.txn_account_id, test39999.t.serial_id, inner key:test39999.c.txt_account_id, test39999.c.serial_id, equal cond:eq(test39999.t.serial_id, test39999.c.serial_id), eq(test39999.t.txn_account_id, test39999.c.txt_account_id)`, + `├─TableReader_25(Build) 0.80 root data:Selection_24`, + `│ └─Selection_24 0.80 cop[tikv] eq(test39999.t.broker, "0009"), not(isnull(test39999.t.serial_id))`, + `│ └─TableFullScan_23 1.00 cop[tikv] table:t keep order:false`, + `└─TableReader_19(Probe) 0.80 root partition:all data:Selection_18`, + ` └─Selection_18 0.80 cop[tikv] eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)`, + ` └─TableRangeScan_17 0.80 cop[tikv] table:c range: decided by [eq(test39999.c.txt_account_id, test39999.t.txn_account_id) eq(test39999.c.serial_id, test39999.t.serial_id) eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)], keep order:false`)) +} From 6ab38033b857a1de7ffb8fdd0e9b1f57fab26133 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Tue, 20 Dec 2022 17:38:55 +0800 Subject: [PATCH 02/11] oomtest: add whitelist for oom test to fix flaky test (#40055) close pingcap/tidb#40054 --- executor/oomtest/oom_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/executor/oomtest/oom_test.go b/executor/oomtest/oom_test.go index 5b348f5c238de..fc95bb47ceab8 100644 --- a/executor/oomtest/oom_test.go +++ b/executor/oomtest/oom_test.go @@ -223,7 +223,8 @@ func (h *oomCapture) Write(entry zapcore.Entry, fields []zapcore.Field) error { return nil } // They are just common background task and not related to the oom. - if entry.Message == "SetTiFlashGroupConfig" { + if entry.Message == "SetTiFlashGroupConfig" || + entry.Message == "record table item load status failed due to not finding item" { return nil } From 017901d1d6daa1f07d8ad33f70ddbb7a286fb086 Mon Sep 17 00:00:00 2001 From: Zhou Kunqin <25057648+time-and-fate@users.noreply.github.com> Date: Tue, 20 Dec 2022 18:00:55 +0800 Subject: [PATCH 03/11] *: extend skyline pruning diagnostic info and add extra warnings (#39894) ref pingcap/tidb#39893 --- expression/expression.go | 23 ++- infoschema/tables_test.go | 160 +++++++++++++++++- planner/core/common_plans.go | 10 -- planner/core/exhaust_physical_plans.go | 5 +- planner/core/find_best_task.go | 10 +- planner/core/stats.go | 52 +++--- planner/core/task.go | 9 +- .../core/testdata/integration_suite_out.json | 7 +- planner/optimize.go | 2 + sessionctx/stmtctx/stmtctx.go | 54 +++++- sessionctx/variable/session.go | 7 +- 11 files changed, 278 insertions(+), 61 deletions(-) diff --git a/expression/expression.go b/expression/expression.go index 352f105c52d65..6d7eb080b29fc 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -1358,12 +1358,15 @@ func canScalarFuncPushDown(scalarFunc *ScalarFunction, pc PbConverter, storeType panic(errors.Errorf("unspecified PbCode: %T", scalarFunc.Function)) }) } + storageName := storeType.Name() + if storeType == kv.UnSpecified { + storageName = "storage layer" + } + warnErr := errors.New("Scalar function '" + scalarFunc.FuncName.L + "'(signature: " + scalarFunc.Function.PbCode().String() + ", return type: " + scalarFunc.RetType.CompactStr() + ") is not supported to push down to " + storageName + " now.") if pc.sc.InExplainStmt { - storageName := storeType.Name() - if storeType == kv.UnSpecified { - storageName = "storage layer" - } - pc.sc.AppendWarning(errors.New("Scalar function '" + scalarFunc.FuncName.L + "'(signature: " + scalarFunc.Function.PbCode().String() + ", return type: " + scalarFunc.RetType.CompactStr() + ") is not supported to push down to " + storageName + " now.")) + pc.sc.AppendWarning(warnErr) + } else { + pc.sc.AppendExtraWarning(warnErr) } return false } @@ -1393,14 +1396,20 @@ func canExprPushDown(expr Expression, pc PbConverter, storeType kv.StoreType, ca if expr.GetType().GetType() == mysql.TypeEnum && canEnumPush { break } + warnErr := errors.New("Expression about '" + expr.String() + "' can not be pushed to TiFlash because it contains unsupported calculation of type '" + types.TypeStr(expr.GetType().GetType()) + "'.") if pc.sc.InExplainStmt { - pc.sc.AppendWarning(errors.New("Expression about '" + expr.String() + "' can not be pushed to TiFlash because it contains unsupported calculation of type '" + types.TypeStr(expr.GetType().GetType()) + "'.")) + pc.sc.AppendWarning(warnErr) + } else { + pc.sc.AppendExtraWarning(warnErr) } return false case mysql.TypeNewDecimal: if !expr.GetType().IsDecimalValid() { + warnErr := errors.New("Expression about '" + expr.String() + "' can not be pushed to TiFlash because it contains invalid decimal('" + strconv.Itoa(expr.GetType().GetFlen()) + "','" + strconv.Itoa(expr.GetType().GetDecimal()) + "').") if pc.sc.InExplainStmt { - pc.sc.AppendWarning(errors.New("Expression about '" + expr.String() + "' can not be pushed to TiFlash because it contains invalid decimal('" + strconv.Itoa(expr.GetType().GetFlen()) + "','" + strconv.Itoa(expr.GetType().GetDecimal()) + "').")) + pc.sc.AppendWarning(warnErr) + } else { + pc.sc.AppendExtraWarning(warnErr) } return false } diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index ebf3e9b535893..d4d4c4fa588f7 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -531,18 +531,166 @@ func TestSlowQuery(t *testing.T) { slowLogFileName := "tidb_slow.log" prepareSlowLogfile(t, slowLogFileName) defer func() { require.NoError(t, os.Remove(slowLogFileName)) }() + expectedRes := [][]interface{}{ + {"2019-02-12 19:33:56.571953", + "406315658548871171", + "root", + "localhost", + "6", + "57", + "0.12", + "4.895492", + "0.4", + "0.2", + "0.000000003", + "2", + "0.000000002", + "0.00000001", + "0.000000003", + "0.19", + "0.21", + "0.01", + "0", + "0.18", + "[txnLock]", + "0.03", + "0", + "15", + "480", + "1", + "8", + "0.3824278", + "0.161", + "0.101", + "0.092", + "1.71", + "1", + "100001", + "100000", + "100", + "10", + "10", + "10", + "100", + "test", + "", + "0", + "42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772", + "t1:1,t2:2", + "0.1", + "0.2", + "0.03", + "127.0.0.1:20160", + "0.05", + "0.6", + "0.8", + "0.0.0.0:20160", + "70724", + "65536", + "0", + "0", + "0", + "0", + "10", + "", + "0", + "1", + "0", + "0", + "1", + "0", + "0", + "abcd", + "60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4", + "", + "update t set i = 2;", + "select * from t_slim;"}, + {"2021-09-08 14:39:54.506967", + "427578666238083075", + "root", + "172.16.0.0", + "40507", + "0", + "0", + "25.571605962", + "0.002923536", + "0.006800973", + "0.002100764", + "0", + "0", + "0", + "0.000015801", + "25.542014572", + "0", + "0.002294647", + "0.000605473", + "12.483", + "[tikvRPC regionMiss tikvRPC regionMiss regionMiss]", + "0", + "0", + "624", + "172064", + "60", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "rtdb", + "", + "0", + "124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc", + "", + "0", + "0", + "0", + "", + "0", + "0", + "0", + "", + "856544", + "0", + "86.635049185", + "0.015486658", + "100.054", + "0", + "0", + "", + "0", + "1", + "0", + "0", + "0", + "0", + "0", + "", + "", + "", + "", + "INSERT INTO ...;", + }, + } tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", slowLogFileName)) tk.MustExec("set time_zone = '+08:00';") re := tk.MustQuery("select * from information_schema.slow_query") - re.Check(testkit.RowsWithSep("|", "2019-02-12 19:33:56.571953|406315658548871171|root|localhost|6|57|0.12|4.895492|0.4|0.2|0.000000003|2|0.000000002|0.00000001|0.000000003|0.19|0.21|0.01|0|0.18|[txnLock]|0.03|0|15|480|1|8|0.3824278|0.161|0.101|0.092|1.71|1|100001|100000|100|10|10|10|100|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|65536|0|0|0|0|10||0|1|0|0|1|0|0|abcd|60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4||update t set i = 2;|select * from t_slim;", - "2021-09-08|14:39:54.506967|427578666238083075|root|172.16.0.0|40507|0|0|25.571605962|0.002923536|0.006800973|0.002100764|0|0|0|0.000015801|25.542014572|0|0.002294647|0.000605473|12.483|[tikvRPC regionMiss tikvRPC regionMiss regionMiss]|0|0|624|172064|60|0|0|0|0|0|0|0|0|0|0|0|0|0|0|rtdb||0|124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc||0|0|0||0|0|0||856544|0|86.635049185|0.015486658|100.054|0|0||0|1|0|0|0|0|0|||||INSERT INTO ...;", - )) + re.Check(expectedRes) + tk.MustExec("set time_zone = '+00:00';") re = tk.MustQuery("select * from information_schema.slow_query") - re.Check(testkit.RowsWithSep("|", "2019-02-12 11:33:56.571953|406315658548871171|root|localhost|6|57|0.12|4.895492|0.4|0.2|0.000000003|2|0.000000002|0.00000001|0.000000003|0.19|0.21|0.01|0|0.18|[txnLock]|0.03|0|15|480|1|8|0.3824278|0.161|0.101|0.092|1.71|1|100001|100000|100|10|10|10|100|test||0|42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772|t1:1,t2:2|0.1|0.2|0.03|127.0.0.1:20160|0.05|0.6|0.8|0.0.0.0:20160|70724|65536|0|0|0|0|10||0|1|0|0|1|0|0|abcd|60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4||update t set i = 2;|select * from t_slim;", - "2021-09-08|06:39:54.506967|427578666238083075|root|172.16.0.0|40507|0|0|25.571605962|0.002923536|0.006800973|0.002100764|0|0|0|0.000015801|25.542014572|0|0.002294647|0.000605473|12.483|[tikvRPC regionMiss tikvRPC regionMiss regionMiss]|0|0|624|172064|60|0|0|0|0|0|0|0|0|0|0|0|0|0|0|rtdb||0|124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc||0|0|0||0|0|0||856544|0|86.635049185|0.015486658|100.054|0|0||0|1|0|0|0|0|0|||||INSERT INTO ...;", - )) + expectedRes[0][0] = "2019-02-12 11:33:56.571953" + expectedRes[1][0] = "2021-09-08 06:39:54.506967" + re.Check(expectedRes) // Test for long query. f, err := os.OpenFile(slowLogFileName, os.O_CREATE|os.O_WRONLY, 0644) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index f11f60b95cfe5..8feb357745853 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -914,11 +914,6 @@ func (e *Explain) explainOpRecursivelyInJSONFormat(flatOp *FlatOperator, flats F textTreeExplainID := texttree.PrettyIdentifier(explainID, flatOp.TextTreeIndent, flatOp.IsLastChild) cur := e.prepareOperatorInfoForJSONFormat(flatOp.Origin, taskTp, textTreeExplainID, explainID) - if e.ctx != nil && e.ctx.GetSessionVars() != nil && e.ctx.GetSessionVars().StmtCtx != nil { - if optimInfo, ok := e.ctx.GetSessionVars().StmtCtx.OptimInfo[flatOp.Origin.ID()]; ok { - e.ctx.GetSessionVars().StmtCtx.AppendNote(errors.New(optimInfo)) - } - } for _, idx := range flatOp.ChildrenIdx { cur.SubOperators = append(cur.SubOperators, @@ -938,11 +933,6 @@ func (e *Explain) explainFlatOpInRowFormat(flatOp *FlatOperator) { flatOp.TextTreeIndent, flatOp.IsLastChild) e.prepareOperatorInfo(flatOp.Origin, taskTp, textTreeExplainID) - if e.ctx != nil && e.ctx.GetSessionVars() != nil && e.ctx.GetSessionVars().StmtCtx != nil { - if optimInfo, ok := e.ctx.GetSessionVars().StmtCtx.OptimInfo[flatOp.Origin.ID()]; ok { - e.ctx.GetSessionVars().StmtCtx.AppendNote(errors.New(optimInfo)) - } - } } func getRuntimeInfoStr(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetails.RuntimeStatsColl) (actRows, analyzeInfo, memoryInfo, diskInfo string) { diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 768a8c20fc0b5..2817f370ffcec 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2730,8 +2730,11 @@ func (la *LogicalAggregation) checkCanPushDownToMPP() bool { } } if hasUnsupportedDistinct { + warnErr := errors.New("Aggregation can not be pushed to storage layer in mpp mode because it contains agg function with distinct") if la.ctx.GetSessionVars().StmtCtx.InExplainStmt { - la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("Aggregation can not be pushed to storage layer in mpp mode because it contains agg function with distinct")) + la.ctx.GetSessionVars().StmtCtx.AppendWarning(warnErr) + } else { + la.ctx.GetSessionVars().StmtCtx.AppendExtraWarning(warnErr) } return false } diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index afc5223b9be94..639bc15dbdc98 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -742,7 +742,7 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida } func (ds *DataSource) getPruningInfo(candidates []*candidatePath, prop *property.PhysicalProperty) string { - if !ds.ctx.GetSessionVars().StmtCtx.InVerboseExplain || len(candidates) == len(ds.possibleAccessPaths) { + if len(candidates) == len(ds.possibleAccessPaths) { return "" } if len(candidates) == 1 && len(candidates[0].path.Ranges) == 0 { @@ -889,10 +889,12 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter pruningInfo := ds.getPruningInfo(candidates, prop) defer func() { if err == nil && t != nil && !t.invalid() && pruningInfo != "" { - if ds.ctx.GetSessionVars().StmtCtx.OptimInfo == nil { - ds.ctx.GetSessionVars().StmtCtx.OptimInfo = make(map[int]string) + warnErr := errors.New(pruningInfo) + if ds.ctx.GetSessionVars().StmtCtx.InVerboseExplain { + ds.ctx.GetSessionVars().StmtCtx.AppendNote(warnErr) + } else { + ds.ctx.GetSessionVars().StmtCtx.AppendExtraNote(warnErr) } - ds.ctx.GetSessionVars().StmtCtx.OptimInfo[t.plan().ID()] = pruningInfo } }() diff --git a/planner/core/stats.go b/planner/core/stats.go index f377feac91030..71e1037c52c76 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -343,35 +343,37 @@ func (ds *DataSource) derivePathStatsAndTryHeuristics() error { if selected != nil { ds.possibleAccessPaths[0] = selected ds.possibleAccessPaths = ds.possibleAccessPaths[:1] - if ds.ctx.GetSessionVars().StmtCtx.InVerboseExplain { - var tableName string - if ds.TableAsName.O == "" { - tableName = ds.tableInfo.Name.O + var tableName string + if ds.TableAsName.O == "" { + tableName = ds.tableInfo.Name.O + } else { + tableName = ds.TableAsName.O + } + var sb strings.Builder + if selected.IsTablePath() { + // TODO: primary key / handle / real name? + sb.WriteString(fmt.Sprintf("handle of %s is selected since the path only has point ranges", tableName)) + } else { + if selected.Index.Unique { + sb.WriteString("unique ") + } + sb.WriteString(fmt.Sprintf("index %s of %s is selected since the path", selected.Index.Name.O, tableName)) + if isRefinedPath { + sb.WriteString(" only fetches limited number of rows") } else { - tableName = ds.TableAsName.O + sb.WriteString(" only has point ranges") } - if selected.IsTablePath() { - // TODO: primary key / handle / real name? - ds.ctx.GetSessionVars().StmtCtx.AppendNote(fmt.Errorf("handle of %s is selected since the path only has point ranges", tableName)) + if selected.IsSingleScan { + sb.WriteString(" with single scan") } else { - var sb strings.Builder - if selected.Index.Unique { - sb.WriteString("unique ") - } - sb.WriteString(fmt.Sprintf("index %s of %s is selected since the path", selected.Index.Name.O, tableName)) - if isRefinedPath { - sb.WriteString(" only fetches limited number of rows") - } else { - sb.WriteString(" only has point ranges") - } - if selected.IsSingleScan { - sb.WriteString(" with single scan") - } else { - sb.WriteString(" with double scan") - } - ds.ctx.GetSessionVars().StmtCtx.AppendNote(errors.New(sb.String())) + sb.WriteString(" with double scan") } } + if ds.ctx.GetSessionVars().StmtCtx.InVerboseExplain { + ds.ctx.GetSessionVars().StmtCtx.AppendNote(errors.New(sb.String())) + } else { + ds.ctx.GetSessionVars().StmtCtx.AppendExtraNote(errors.New(sb.String())) + } } return nil } @@ -435,8 +437,10 @@ func (ds *DataSource) DeriveStats(_ []*property.StatsInfo, _ *expression.Schema, if needConsiderIndexMerge { // PushDownExprs() will append extra warnings, which is annoying. So we reset warnings here. warnings := stmtCtx.GetWarnings() + extraWarnings := stmtCtx.GetExtraWarnings() _, remaining := expression.PushDownExprs(stmtCtx, indexMergeConds, ds.ctx.GetClient(), kv.UnSpecified) stmtCtx.SetWarnings(warnings) + stmtCtx.SetExtraWarnings(extraWarnings) if len(remaining) != 0 { needConsiderIndexMerge = false } diff --git a/planner/core/task.go b/planner/core/task.go index cc27029d83c8e..99952038688fe 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1239,12 +1239,17 @@ func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFunc ret = false } - if !ret && sc.InExplainStmt { + if !ret { storageName := storeType.Name() if storeType == kv.UnSpecified { storageName = "storage layer" } - sc.AppendWarning(errors.New("Aggregation can not be pushed to " + storageName + " because " + reason)) + warnErr := errors.New("Aggregation can not be pushed to " + storageName + " because " + reason) + if sc.InExplainStmt { + sc.AppendWarning(warnErr) + } else { + sc.AppendExtraWarning(warnErr) + } } return ret } diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index 14c04c6cfb0ab..d61124de927bd 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -1969,7 +1969,8 @@ " └─TableRangeScan_8 3333.33 923531.15 cop[tikv] table:t range:(1,+inf], keep order:false, stats:pseudo" ], "Warnings": [ - "Note 1105 [t] remain after pruning paths for t given Prop{SortItems: [], TaskTp: rootTask}" + "Note 1105 [t] remain after pruning paths for t given Prop{SortItems: [], TaskTp: rootTask}", + "Note 1105 [t,f,f_g] remain after pruning paths for t given Prop{SortItems: [{test.t.f asc}], TaskTp: rootTask}" ] }, { @@ -2014,7 +2015,8 @@ " └─TableRowIDScan_12(Probe) 10.00 2770.59 cop[tikv] table:t keep order:false, stats:pseudo" ], "Warnings": [ - "Note 1105 [t,g] remain after pruning paths for t given Prop{SortItems: [], TaskTp: rootTask}" + "Note 1105 [t,g] remain after pruning paths for t given Prop{SortItems: [], TaskTp: rootTask}", + "Note 1105 [t,f_g,g] remain after pruning paths for t given Prop{SortItems: [{test.t.f asc}], TaskTp: rootTask}" ] }, { @@ -2026,6 +2028,7 @@ "└─TableRowIDScan_13(Probe) 10.00 2770.59 cop[tikv] table:t keep order:false, stats:pseudo" ], "Warnings": [ + "Note 1105 [t] remain after pruning paths for t given Prop{SortItems: [], TaskTp: rootTask}", "Note 1105 [t,c_d_e] remain after pruning paths for t given Prop{SortItems: [{test.t.c asc} {test.t.e asc}], TaskTp: rootTask}" ] } diff --git a/planner/optimize.go b/planner/optimize.go index 0f37a59788a5c..1acd83d312516 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -234,6 +234,8 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in sessVars.FoundInBinding = true if sessVars.StmtCtx.InVerboseExplain { sessVars.StmtCtx.AppendNote(errors.Errorf("Using the bindSQL: %v", chosenBinding.BindSQL)) + } else { + sessVars.StmtCtx.AppendExtraNote(errors.Errorf("Using the bindSQL: %v", chosenBinding.BindSQL)) } } // Restore the hint to avoid changing the stmt node. diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 6f9a276691149..47159dc8f8a60 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -209,8 +209,15 @@ type StatementContext struct { copied uint64 touched uint64 - message string - warnings []SQLWarn + message string + warnings []SQLWarn + // extraWarnings record the extra warnings and are only used by the slow log only now. + // If a warning is expected to be output only under some conditions (like in EXPLAIN or EXPLAIN VERBOSE) but it's + // not under such conditions now, it is considered as an extra warning. + // extraWarnings would not be printed through SHOW WARNINGS, but we want to always output them through the slow + // log to help diagnostics, so we store them here separately. + extraWarnings []SQLWarn + execDetails execdetails.ExecDetails allExecDetails []*execdetails.DetailsNeedP90 } @@ -299,8 +306,6 @@ type StatementContext struct { LogOnExceed [2]memory.LogOnExceed } - // OptimInfo maps Plan.ID() to optimization information when generating Plan. - OptimInfo map[int]string // InVerboseExplain indicates the statement is "explain format='verbose' ...". InVerboseExplain bool @@ -812,6 +817,47 @@ func (sc *StatementContext) AppendError(warn error) { } } +// GetExtraWarnings gets extra warnings. +func (sc *StatementContext) GetExtraWarnings() []SQLWarn { + sc.mu.Lock() + defer sc.mu.Unlock() + return sc.mu.extraWarnings +} + +// SetExtraWarnings sets extra warnings. +func (sc *StatementContext) SetExtraWarnings(warns []SQLWarn) { + sc.mu.Lock() + defer sc.mu.Unlock() + sc.mu.extraWarnings = warns +} + +// AppendExtraWarning appends an extra warning with level 'Warning'. +func (sc *StatementContext) AppendExtraWarning(warn error) { + sc.mu.Lock() + defer sc.mu.Unlock() + if len(sc.mu.extraWarnings) < math.MaxUint16 { + sc.mu.extraWarnings = append(sc.mu.extraWarnings, SQLWarn{WarnLevelWarning, warn}) + } +} + +// AppendExtraNote appends an extra warning with level 'Note'. +func (sc *StatementContext) AppendExtraNote(warn error) { + sc.mu.Lock() + defer sc.mu.Unlock() + if len(sc.mu.extraWarnings) < math.MaxUint16 { + sc.mu.extraWarnings = append(sc.mu.extraWarnings, SQLWarn{WarnLevelNote, warn}) + } +} + +// AppendExtraError appends an extra warning with level 'Error'. +func (sc *StatementContext) AppendExtraError(warn error) { + sc.mu.Lock() + defer sc.mu.Unlock() + if len(sc.mu.extraWarnings) < math.MaxUint16 { + sc.mu.extraWarnings = append(sc.mu.extraWarnings, SQLWarn{WarnLevelError, warn}) + } +} + // HandleTruncate ignores or returns the error based on the StatementContext state. func (sc *StatementContext) HandleTruncate(err error) error { // TODO: At present we have not checked whether the error can be ignored or treated as warning. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index fe4972fb5ff3a..695d0ad48a5a5 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1482,8 +1482,13 @@ func (s *SessionVars) IsMPPEnforced() bool { // TODO: Confirm whether this function will be inlined and // omit the overhead of string construction when calling with false condition. func (s *SessionVars) RaiseWarningWhenMPPEnforced(warning string) { - if s.IsMPPEnforced() && s.StmtCtx.InExplainStmt { + if !s.IsMPPEnforced() { + return + } + if s.StmtCtx.InExplainStmt { s.StmtCtx.AppendWarning(errors.New(warning)) + } else { + s.StmtCtx.AppendExtraWarning(errors.New(warning)) } } From 0f3031e3da2c6169e2199be0422c28fb55b0388f Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 20 Dec 2022 19:08:55 +0800 Subject: [PATCH 04/11] ddl: add privilege check when alter table add foreign key (#40051) close pingcap/tidb#40050 --- ddl/fktest/foreign_key_test.go | 18 ++++++++++++++++++ planner/core/planbuilder.go | 8 ++++++++ 2 files changed, 26 insertions(+) diff --git a/ddl/fktest/foreign_key_test.go b/ddl/fktest/foreign_key_test.go index 911f86f49ec85..13701ad5b610d 100644 --- a/ddl/fktest/foreign_key_test.go +++ b/ddl/fktest/foreign_key_test.go @@ -319,6 +319,24 @@ func TestCreateTableWithForeignKeyPrivilegeCheck(t *testing.T) { tk2.MustExec("create table t4 (a int, foreign key fk(a) references t1(id), foreign key (a) references t3(id));") } +func TestAlterTableWithForeignKeyPrivilegeCheck(t *testing.T) { + store, _ := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create user 'u1'@'%' identified by '';") + tk.MustExec("grant create,alter on *.* to 'u1'@'%';") + tk.MustExec("create table t1 (id int key);") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.Session().Auth(&auth.UserIdentity{Username: "u1", Hostname: "localhost", CurrentUser: true, AuthUsername: "u1", AuthHostname: "%"}, nil, []byte("012345678901234567890")) + tk2.MustExec("create table t2 (a int)") + err := tk2.ExecToErr("alter table t2 add foreign key (a) references t1 (id) on update cascade") + require.Error(t, err) + require.Equal(t, "[planner:1142]REFERENCES command denied to user 'u1'@'%' for table 't1'", err.Error()) + tk.MustExec("grant references on test.t1 to 'u1'@'%';") + tk2.MustExec("alter table t2 add foreign key (a) references t1 (id) on update cascade") +} + func TestRenameTableWithForeignKeyMetaInfo(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index df7a0d893e4ed..26508814523d2 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -4522,6 +4522,14 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err } b.visitInfo = appendVisitInfo(b.visitInfo, mysql.UpdatePriv, mysql.SystemDB, "stats_extended", "", authErr) + } else if spec.Tp == ast.AlterTableAddConstraint { + if b.ctx.GetSessionVars().User != nil && spec.Constraint != nil && + spec.Constraint.Tp == ast.ConstraintForeignKey && spec.Constraint.Refer != nil { + authErr = ErrTableaccessDenied.GenWithStackByArgs("REFERENCES", b.ctx.GetSessionVars().User.AuthUsername, + b.ctx.GetSessionVars().User.AuthHostname, spec.Constraint.Refer.Table.Name.L) + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.ReferencesPriv, spec.Constraint.Refer.Table.Schema.L, + spec.Constraint.Refer.Table.Name.L, "", authErr) + } } } case *ast.AlterSequenceStmt: From 5f1a739491f29b972522d40bada31e8ff7e66f56 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Tue, 20 Dec 2022 19:58:54 +0800 Subject: [PATCH 05/11] planner: add more test cases for non-prep plan cache (#40060) --- planner/core/plan_cache.go | 4 +++ planner/core/plan_cache_param.go | 11 ++++++-- planner/core/plan_cache_param_test.go | 5 ++-- planner/core/plan_cache_test.go | 40 +++++++++++++++++++++++++++ planner/core/plan_cache_utils.go | 4 +++ planner/optimize.go | 11 ++++++-- 6 files changed, 68 insertions(+), 7 deletions(-) diff --git a/planner/core/plan_cache.go b/planner/core/plan_cache.go index d9df658aaa46e..ab4eb4e4912ab 100644 --- a/planner/core/plan_cache.go +++ b/planner/core/plan_cache.go @@ -115,6 +115,10 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt, params []expression.Expression) (plan Plan, names []*types.FieldName, err error) { + if v := ctx.Value("____GetPlanFromSessionPlanCacheErr"); v != nil { // for testing + return nil, nil, errors.New("____GetPlanFromSessionPlanCacheErr") + } + if err := planCachePreprocess(ctx, sctx, isNonPrepared, is, stmt, params); err != nil { return nil, nil, err } diff --git a/planner/core/plan_cache_param.go b/planner/core/plan_cache_param.go index 7c79b2a6416a0..9094edec621c0 100644 --- a/planner/core/plan_cache_param.go +++ b/planner/core/plan_cache_param.go @@ -15,6 +15,7 @@ package core import ( + "context" "errors" "strings" "sync" @@ -70,7 +71,7 @@ func (pr *paramReplacer) Reset() { pr.params = nil } // ParameterizeAST parameterizes this StmtNode. // e.g. `select * from t where a<10 and b<23` --> `select * from t where a `select * from t where a<10 and b<23`. -func RestoreASTWithParams(_ sessionctx.Context, stmt ast.StmtNode, params []*driver.ValueExpr) error { +func RestoreASTWithParams(ctx context.Context, _ sessionctx.Context, stmt ast.StmtNode, params []*driver.ValueExpr) error { + if v := ctx.Value("____RestoreASTWithParamsErr"); v != nil { + return errors.New("____RestoreASTWithParamsErr") + } + pr := paramRestorerPool.Get().(*paramRestorer) defer func() { pr.Reset() diff --git a/planner/core/plan_cache_param_test.go b/planner/core/plan_cache_param_test.go index 5c65b89767a60..ee4a8e9ae65c5 100644 --- a/planner/core/plan_cache_param_test.go +++ b/planner/core/plan_cache_param_test.go @@ -15,6 +15,7 @@ package core import ( + "context" "strings" "testing" @@ -61,7 +62,7 @@ func TestParameterize(t *testing.T) { for _, c := range cases { stmt, err := parser.New().ParseOneStmt(c.sql, "", "") require.Nil(t, err) - paramSQL, params, err := ParameterizeAST(sctx, stmt) + paramSQL, params, err := ParameterizeAST(context.Background(), sctx, stmt) require.Nil(t, err) require.Equal(t, c.paramSQL, paramSQL) require.Equal(t, len(c.params), len(params)) @@ -69,7 +70,7 @@ func TestParameterize(t *testing.T) { require.Equal(t, c.params[i], params[i].Datum.GetValue()) } - err = RestoreASTWithParams(sctx, stmt, params) + err = RestoreASTWithParams(context.Background(), sctx, stmt, params) require.Nil(t, err) var buf strings.Builder rCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, &buf) diff --git a/planner/core/plan_cache_test.go b/planner/core/plan_cache_test.go index 5c3c7c45b9702..7a4ac860d8593 100644 --- a/planner/core/plan_cache_test.go +++ b/planner/core/plan_cache_test.go @@ -15,6 +15,7 @@ package core_test import ( + "context" "errors" "fmt" "math/rand" @@ -112,6 +113,45 @@ func TestNonPreparedPlanCacheWithExplain(t *testing.T) { tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) } +func TestNonPreparedPlanCacheFallback(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(`use test`) + tk.MustExec(`create table t (a int)`) + for i := 0; i < 5; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%v)", i)) + } + tk.MustExec("set tidb_enable_non_prepared_plan_cache=1") + + // inject a fault to GeneratePlanCacheStmtWithAST + ctx := context.WithValue(context.Background(), "____GeneratePlanCacheStmtWithASTErr", struct{}{}) + tk.MustQueryWithContext(ctx, "select * from t where a in (1, 2)").Sort().Check(testkit.Rows("1", "2")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // cannot generate PlanCacheStmt + tk.MustQueryWithContext(ctx, "select * from t where a in (1, 3)").Sort().Check(testkit.Rows("1", "3")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // cannot generate PlanCacheStmt + tk.MustQuery("select * from t where a in (1, 2)").Sort().Check(testkit.Rows("1", "2")) + tk.MustQuery("select * from t where a in (1, 3)").Sort().Check(testkit.Rows("1", "3")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) // no error + + // inject a fault to GetPlanFromSessionPlanCache + tk.MustQuery("select * from t where a=1").Check(testkit.Rows("1")) // cache this plan + tk.MustQuery("select * from t where a=2").Check(testkit.Rows("2")) // plan from cache + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + ctx = context.WithValue(context.Background(), "____GetPlanFromSessionPlanCacheErr", struct{}{}) + tk.MustQueryWithContext(ctx, "select * from t where a=3").Check(testkit.Rows("3")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // fallback to the normal opt-path + tk.MustQueryWithContext(ctx, "select * from t where a=4").Check(testkit.Rows("4")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // fallback to the normal opt-path + tk.MustQueryWithContext(context.Background(), "select * from t where a=0").Check(testkit.Rows("0")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) // use the cached plan if no error + + // inject a fault to RestoreASTWithParams + ctx = context.WithValue(context.Background(), "____GetPlanFromSessionPlanCacheErr", struct{}{}) + ctx = context.WithValue(ctx, "____RestoreASTWithParamsErr", struct{}{}) + _, err := tk.ExecWithContext(ctx, "select * from t where a=1") + require.NotNil(t, err) +} + func TestNonPreparedPlanCacheBasically(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/planner/core/plan_cache_utils.go b/planner/core/plan_cache_utils.go index 1cb66d3cdeb6a..8dc867316207d 100644 --- a/planner/core/plan_cache_utils.go +++ b/planner/core/plan_cache_utils.go @@ -67,6 +67,10 @@ func (e *paramMarkerExtractor) Leave(in ast.Node) (ast.Node, bool) { // paramSQL is the corresponding parameterized sql like 'select * from t where a?'. // paramStmt is the Node of paramSQL. func GeneratePlanCacheStmtWithAST(ctx context.Context, sctx sessionctx.Context, paramSQL string, paramStmt ast.StmtNode) (*PlanCacheStmt, Plan, int, error) { + if v := ctx.Value("____GeneratePlanCacheStmtWithASTErr"); v != nil { // for testing + return nil, nil, 0, errors.New("____GeneratePlanCacheStmtWithASTErr") + } + vars := sctx.GetSessionVars() var extractor paramMarkerExtractor paramStmt.Accept(&extractor) diff --git a/planner/optimize.go b/planner/optimize.go index 1acd83d312516..5e572d8485368 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -74,15 +74,22 @@ func matchSQLBinding(sctx sessionctx.Context, stmtNode ast.StmtNode) (bindRecord } // getPlanFromNonPreparedPlanCache tries to get an available cached plan from the NonPrepared Plan Cache for this stmt. -func getPlanFromNonPreparedPlanCache(ctx context.Context, sctx sessionctx.Context, stmt ast.StmtNode, is infoschema.InfoSchema) (core.Plan, types.NameSlice, bool, error) { +func getPlanFromNonPreparedPlanCache(ctx context.Context, sctx sessionctx.Context, stmt ast.StmtNode, is infoschema.InfoSchema) (p core.Plan, ns types.NameSlice, ok bool, err error) { if sctx.GetSessionVars().StmtCtx.InPreparedPlanBuilding || // already in cached plan rebuilding phase !core.NonPreparedPlanCacheableWithCtx(sctx, stmt, is) { return nil, nil, false, nil } - paramSQL, params, err := core.ParameterizeAST(sctx, stmt) + paramSQL, params, err := core.ParameterizeAST(ctx, sctx, stmt) if err != nil { return nil, nil, false, err } + defer func() { + if err != nil { + // keep the stmt unchanged if err so that it can fallback to the normal optimization path. + // TODO: add metrics + err = core.RestoreASTWithParams(ctx, sctx, stmt, params) + } + }() val := sctx.GetSessionVars().GetNonPreparedPlanCacheStmt(paramSQL) if val == nil { cachedStmt, _, _, err := core.GeneratePlanCacheStmtWithAST(ctx, sctx, paramSQL, stmt) From 806fcbfde9eaa28d73535e5f996bc2ff1296e678 Mon Sep 17 00:00:00 2001 From: Xiaoju Wu Date: Tue, 20 Dec 2022 20:50:54 +0800 Subject: [PATCH 06/11] sessionctx: disable gc aware memory track (#40019) close pingcap/tidb#39971 --- sessionctx/variable/tidb_vars.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 00137ca7608cf..c8da3ed5c10e6 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1107,7 +1107,7 @@ const ( DefAdaptiveClosestReadThreshold = 4096 DefTiDBEnableAnalyzeSnapshot = false DefTiDBGenerateBinaryPlan = true - DefEnableTiDBGCAwareMemoryTrack = true + DefEnableTiDBGCAwareMemoryTrack = false DefTiDBDefaultStrMatchSelectivity = 0.8 DefTiDBEnableTmpStorageOnOOM = true DefTiDBEnableMDL = true From fdf335e3e5217a6c625ee3908e1a95f60791060b Mon Sep 17 00:00:00 2001 From: Xiaoju Wu Date: Tue, 20 Dec 2022 21:56:55 +0800 Subject: [PATCH 07/11] *: make auto-analyze killable by global memory limit (#39978) ref pingcap/tidb#39971, close pingcap/tidb#39994 --- executor/analyzetest/BUILD.bazel | 2 +- executor/analyzetest/analyze_test.go | 114 +++++++++++++++++++++++++++ executor/executor.go | 1 + executor/executor_test.go | 32 -------- server/server.go | 5 ++ session/session.go | 2 + testkit/mocksessionmanager.go | 12 +++ util/memory/tracker.go | 11 +++ 8 files changed, 146 insertions(+), 33 deletions(-) diff --git a/executor/analyzetest/BUILD.bazel b/executor/analyzetest/BUILD.bazel index 53126213363a5..3112abe57c00f 100644 --- a/executor/analyzetest/BUILD.bazel +++ b/executor/analyzetest/BUILD.bazel @@ -8,7 +8,6 @@ go_test( "main_test.go", ], flaky = True, - race = "on", shard_count = 50, deps = [ "//domain", @@ -30,6 +29,7 @@ go_test( "//tablecodec", "//testkit", "//types", + "//util", "//util/codec", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/executor/analyzetest/analyze_test.go b/executor/analyzetest/analyze_test.go index e3bf9d51d9260..55f3ad9397be9 100644 --- a/executor/analyzetest/analyze_test.go +++ b/executor/analyzetest/analyze_test.go @@ -17,6 +17,7 @@ package analyzetest import ( "context" "fmt" + "runtime" "strconv" "strings" "testing" @@ -43,6 +44,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" @@ -3060,3 +3062,115 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseCount")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseModifyCount")) } + +func TestGlobalMemoryControlForAnalyze(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk0 := testkit.NewTestKit(t, store) + tk0.MustExec("set global tidb_mem_oom_action = 'cancel'") + tk0.MustExec("set global tidb_server_memory_limit = 512MB") + tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") + + sm := &testkit.MockSessionManager{ + PS: []*util.ProcessInfo{tk0.Session().ShowProcess()}, + } + dom.ServerMemoryLimitHandle().SetSessionManager(sm) + go dom.ServerMemoryLimitHandle().Run() + + tk0.MustExec("use test") + tk0.MustExec("create table t(a int)") + tk0.MustExec("insert into t select 1") + for i := 1; i <= 8; i++ { + tk0.MustExec("insert into t select * from t") // 256 Lines + } + sql := "analyze table t with 1.0 samplerate;" // Need about 100MB + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) + }() + _, err := tk0.Exec(sql) + require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) + runtime.GC() +} + +func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + originalVal1 := tk.MustQuery("select @@global.tidb_mem_oom_action").Rows()[0][0].(string) + tk.MustExec("set global tidb_mem_oom_action = 'cancel'") + //originalVal2 := tk.MustQuery("select @@global.tidb_server_memory_limit").Rows()[0][0].(string) + tk.MustExec("set global tidb_server_memory_limit = 512MB") + originalVal3 := tk.MustQuery("select @@global.tidb_server_memory_limit_sess_min_size").Rows()[0][0].(string) + tk.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") + defer func() { + tk.MustExec(fmt.Sprintf("set global tidb_mem_oom_action = %v", originalVal1)) + //tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit = %v", originalVal2)) + tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit_sess_min_size = %v", originalVal3)) + }() + + // clean child trackers + oldChildTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + for _, tracker := range oldChildTrackers { + tracker.Detach() + } + defer func() { + for _, tracker := range oldChildTrackers { + tracker.AttachTo(executor.GlobalAnalyzeMemoryTracker) + } + }() + childTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + require.Len(t, childTrackers, 0) + + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t select 1") + for i := 1; i <= 8; i++ { + tk.MustExec("insert into t select * from t") // 256 Lines + } + _, err0 := tk.Exec("analyze table t with 1.0 samplerate;") + require.NoError(t, err0) + rs0 := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") + require.Len(t, rs0.Rows(), 0) + + h := dom.StatsHandle() + originalVal4 := handle.AutoAnalyzeMinCnt + originalVal5 := tk.MustQuery("select @@global.tidb_auto_analyze_ratio").Rows()[0][0].(string) + handle.AutoAnalyzeMinCnt = 0 + tk.MustExec("set global tidb_auto_analyze_ratio = 0.001") + defer func() { + handle.AutoAnalyzeMinCnt = originalVal4 + tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_ratio = %v", originalVal5)) + }() + + sm := &testkit.MockSessionManager{ + Dom: dom, + PS: []*util.ProcessInfo{tk.Session().ShowProcess()}, + } + dom.ServerMemoryLimitHandle().SetSessionManager(sm) + go dom.ServerMemoryLimitHandle().Run() + + tk.MustExec("insert into t values(4),(5),(6)") + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) + err := h.Update(dom.InfoSchema()) + require.NoError(t, err) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) + }() + tk.MustQuery("select 1") + childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + require.Len(t, childTrackers, 0) + + h.HandleAutoAnalyze(dom.InfoSchema()) + rs := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") + failReason := rs.Rows()[0][0].(string) + require.True(t, strings.Contains(failReason, "Out Of Memory Quota!")) + + childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest() + require.Len(t, childTrackers, 0) +} diff --git a/executor/executor.go b/executor/executor.go index 90622ce52e527..2e5b5c4a0280f 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1965,6 +1965,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow + vars.MemTracker.Detach() vars.MemTracker.UnbindActions() vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery) vars.MemTracker.ResetMaxConsumed() diff --git a/executor/executor_test.go b/executor/executor_test.go index bd64c39e5a134..122fbdbe7dd2f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6199,38 +6199,6 @@ func TestGlobalMemoryControl2(t *testing.T) { runtime.GC() } -func TestGlobalMemoryControlForAnalyze(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - - tk0 := testkit.NewTestKit(t, store) - tk0.MustExec("set global tidb_mem_oom_action = 'cancel'") - tk0.MustExec("set global tidb_server_memory_limit = 512MB") - tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128") - - sm := &testkit.MockSessionManager{ - PS: []*util.ProcessInfo{tk0.Session().ShowProcess()}, - } - dom.ServerMemoryLimitHandle().SetSessionManager(sm) - go dom.ServerMemoryLimitHandle().Run() - - tk0.MustExec("use test") - tk0.MustExec("create table t(a int)") - tk0.MustExec("insert into t select 1") - for i := 1; i <= 8; i++ { - tk0.MustExec("insert into t select * from t") // 256 Lines - } - sql := "analyze table t with 1.0 samplerate;" // Need about 100MB - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`)) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume")) - }() - _, err := tk0.Exec(sql) - require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!")) - runtime.GC() -} - func TestCompileOutOfMemoryQuota(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/server/server.go b/server/server.go index 09a20c8cb39c2..ba915c64f23cb 100644 --- a/server/server.go +++ b/server/server.go @@ -733,6 +733,11 @@ func (s *Server) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) { conn, ok := s.clients[id] s.rwlock.RUnlock() if !ok { + if s.dom != nil { + if pinfo, ok2 := s.dom.SysProcTracker().GetSysProcessList()[id]; ok2 { + return pinfo, true + } + } return &util.ProcessInfo{}, false } return conn.ctx.ShowProcess(), ok diff --git a/session/session.go b/session/session.go index 2c6aa0567fa66..63bc1c970fe08 100644 --- a/session/session.go +++ b/session/session.go @@ -1988,6 +1988,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu s.sessionVars.StmtCtx.OriginalSQL = prevSQL s.sessionVars.StmtCtx.StmtType = prevStmtType s.sessionVars.StmtCtx.Tables = prevTables + s.sessionVars.MemTracker.Detach() }, nil } @@ -2049,6 +2050,7 @@ func (s *session) getInternalSession(execOption sqlexec.ExecOption) (*session, f se.sessionVars.PartitionPruneMode.Store(prePruneMode) se.sessionVars.OptimizerUseInvisibleIndexes = false se.sessionVars.InspectionTableCache = nil + se.sessionVars.MemTracker.Detach() s.sysSessionPool().Put(tmp) }, nil } diff --git a/testkit/mocksessionmanager.go b/testkit/mocksessionmanager.go index a9e4d085dc34d..550ff69132d91 100644 --- a/testkit/mocksessionmanager.go +++ b/testkit/mocksessionmanager.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "sync" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" @@ -31,6 +32,7 @@ type MockSessionManager struct { PSMu sync.RWMutex SerID uint64 TxnInfo []*txninfo.TxnInfo + Dom *domain.Domain conn map[uint64]session.Session mu sync.Mutex } @@ -68,6 +70,11 @@ func (msm *MockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret[connID] = pi.ShowProcess() } msm.mu.Unlock() + if msm.Dom != nil { + for connID, pi := range msm.Dom.SysProcTracker().GetSysProcessList() { + ret[connID] = pi + } + } return ret } @@ -85,6 +92,11 @@ func (msm *MockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, boo if sess := msm.conn[id]; sess != nil { return sess.ShowProcess(), true } + if msm.Dom != nil { + if pinfo, ok := msm.Dom.SysProcTracker().GetSysProcessList()[id]; ok { + return pinfo, true + } + } return &util.ProcessInfo{}, false } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 9c2adf31ace14..39261a45355a1 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -762,6 +762,17 @@ func (t *Tracker) CountAllChildrenMemUse() map[string]int64 { return trackerMemUseMap } +// GetChildrenForTest returns children trackers +func (t *Tracker) GetChildrenForTest() []*Tracker { + t.mu.Lock() + defer t.mu.Unlock() + trackers := make([]*Tracker, 0) + for _, list := range t.mu.children { + trackers = append(trackers, list...) + } + return trackers +} + func countChildMem(t *Tracker, familyTreeName string, trackerMemUseMap map[string]int64) { if len(familyTreeName) > 0 { familyTreeName += " <- " From ae2d551171392e2033be11585a0acee0e4fa3157 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 21 Dec 2022 10:38:56 +0800 Subject: [PATCH 08/11] cpu: fix ticker to avoid close early (#40036) ref pingcap/tidb#40029 --- util/cpu/BUILD.bazel | 1 + util/cpu/cpu.go | 8 +++++--- util/cpu/cpu_test.go | 7 ++++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/util/cpu/BUILD.bazel b/util/cpu/BUILD.bazel index 58bc047a332c4..08893520caaa0 100644 --- a/util/cpu/BUILD.bazel +++ b/util/cpu/BUILD.bazel @@ -20,5 +20,6 @@ go_test( name = "cpu_test", srcs = ["cpu_test.go"], embed = [":cpu"], + flaky = True, deps = ["@com_github_stretchr_testify//require"], ) diff --git a/util/cpu/cpu.go b/util/cpu/cpu.go index 416b3c3eaeb99..2803b4e106c49 100644 --- a/util/cpu/cpu.go +++ b/util/cpu/cpu.go @@ -56,11 +56,13 @@ func NewCPUObserver() *Observer { // Start starts the cpu observer. func (c *Observer) Start() { - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() c.wg.Add(1) go func() { - defer c.wg.Done() + ticker := time.NewTicker(100 * time.Millisecond) + defer func() { + ticker.Stop() + c.wg.Done() + }() for { select { case <-ticker.C: diff --git a/util/cpu/cpu_test.go b/util/cpu/cpu_test.go index 6c7e863f9060a..cd330a11e5196 100644 --- a/util/cpu/cpu_test.go +++ b/util/cpu/cpu_test.go @@ -42,9 +42,10 @@ func TestCPUValue(t *testing.T) { } }() } - time.Sleep(30 * time.Second) - require.Greater(t, Observer.observe(), 0.0) - require.Less(t, Observer.observe(), 1.0) + Observer.Start() + time.Sleep(5 * time.Second) + require.GreaterOrEqual(t, GetCPUUsage(), 0.0) + require.Less(t, GetCPUUsage(), 1.0) Observer.Stop() close(exit) wg.Wait() From a2fa1875114144c4c92b4aeb1b67c8eeb4ab1b08 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 21 Dec 2022 11:58:54 +0800 Subject: [PATCH 09/11] build(deps): bump golang.org/x/time from 0.2.0 to 0.3.0 (#39912) --- DEPS.bzl | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index ac0a348ad55fe..dbd1edbaf98d6 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4467,8 +4467,8 @@ def go_deps(): name = "org_golang_x_time", build_file_proto_mode = "disable_global", importpath = "golang.org/x/time", - sum = "h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE=", - version = "v0.2.0", + sum = "h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=", + version = "v0.3.0", ) go_repository( name = "org_golang_x_tools", diff --git a/go.mod b/go.mod index df18ccec64b18..f4d3fb93f9e98 100644 --- a/go.mod +++ b/go.mod @@ -116,7 +116,7 @@ require ( golang.org/x/sys v0.3.0 golang.org/x/term v0.3.0 golang.org/x/text v0.5.0 - golang.org/x/time v0.2.0 + golang.org/x/time v0.3.0 golang.org/x/tools v0.2.0 google.golang.org/api v0.74.0 google.golang.org/grpc v1.45.0 diff --git a/go.sum b/go.sum index b7de00223b8a3..8c21c0326306d 100644 --- a/go.sum +++ b/go.sum @@ -1338,8 +1338,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.2.0 h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE= -golang.org/x/time v0.2.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= From 51cce4578e6bd8a7b869eb829b605cf14d4f386f Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 21 Dec 2022 14:30:55 +0800 Subject: [PATCH 10/11] ddl: use latest ts to read record for adding index (#40081) close pingcap/tidb#40074 --- ddl/backfilling.go | 7 +------ ddl/export_test.go | 4 ++-- ddl/index_cop.go | 17 +++++++++++------ ddl/index_cop_test.go | 2 +- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index d1035bad084bd..0f0910e1caf28 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -807,12 +807,7 @@ func (b *backfillScheduler) initCopReqSenderPool() { logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) return } - ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope) - if err != nil { - logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) - return - } - b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, ver.Ver) + b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore()) } func (b *backfillScheduler) canSkipError(err error) bool { diff --git a/ddl/export_test.go b/ddl/export_test.go index 486390f9a6810..3ea26fb04290c 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -28,7 +28,7 @@ func SetBatchInsertDeleteRangeSize(i int) { var NewCopContext4Test = newCopContext -func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS uint64, +func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, store kv.Storage, batchSize int) ([]*indexRecord, bool, error) { variable.SetDDLReorgBatchSize(int32(batchSize)) task := &reorgBackfillTask{ @@ -36,7 +36,7 @@ func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS startKey: startKey, endKey: endKey, } - pool := newCopReqSenderPool(context.Background(), copCtx, startTS) + pool := newCopReqSenderPool(context.Background(), copCtx, store) pool.adjustSize(1) pool.tasksCh <- task idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 0a04ac63eb190..fab097727139b 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -103,9 +103,9 @@ type copReqSenderPool struct { resultsCh chan idxRecResult results generic.SyncMap[int, struct{}] - ctx context.Context - copCtx *copContext - startTS uint64 + ctx context.Context + copCtx *copContext + store kv.Storage senders []*copReqSender wg sync.WaitGroup @@ -139,7 +139,12 @@ func (c *copReqSender) run() { curTaskID = task.id logutil.BgLogger().Info("[ddl-ingest] start a cop-request task", zap.Int("id", task.id), zap.String("task", task.String())) - rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey()) + ver, err := p.store.CurrentVersion(kv.GlobalTxnScope) + if err != nil { + p.resultsCh <- idxRecResult{id: task.id, err: err} + return + } + rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey()) if err != nil { p.resultsCh <- idxRecResult{id: task.id, err: err} return @@ -167,7 +172,7 @@ func (c *copReqSender) run() { } } -func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool { +func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage) *copReqSenderPool { poolSize := copReadChunkPoolSize() idxBufPool := make(chan []*indexRecord, poolSize) srcChkPool := make(chan *chunk.Chunk, poolSize) @@ -181,7 +186,7 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64 results: generic.NewSyncMap[int, struct{}](10), ctx: ctx, copCtx: copCtx, - startTS: startTS, + store: store, senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), wg: sync.WaitGroup{}, idxBufPool: idxBufPool, diff --git a/ddl/index_cop_test.go b/ddl/index_cop_test.go index 80e37f6a74121..38bced0b6678d 100644 --- a/ddl/index_cop_test.go +++ b/ddl/index_cop_test.go @@ -43,7 +43,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { endKey := startKey.PrefixNext() txn, err := store.Begin() require.NoError(t, err) - idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, txn.StartTS(), 10) + idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, store, 10) require.NoError(t, err) require.False(t, done) require.NoError(t, txn.Rollback()) From 2150c6b3a11c8acbfdbd3a10a8426effa25c5692 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 21 Dec 2022 15:52:55 +0800 Subject: [PATCH 11/11] executor: close recordset again (#40073) --- executor/admin_test.go | 36 +++++++++++++----------------------- testkit/asynctestkit.go | 15 +++++++++++++++ 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/executor/admin_test.go b/executor/admin_test.go index 41b926cf2c377..0b2530e76d5a3 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -1095,9 +1095,7 @@ func TestCheckFailReport(t *testing.T) { require.NoError(t, txn.Commit(tk.ctx)) ctx, hook := withLogHook(tk.ctx, t, "inconsistency") - _, err = tk.Exec(ctx, "admin check table admin_test") - require.Error(t, err) - require.Equal(t, "[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 1, index-values:\"\" != record-values:\"handle: 1, values: [KindInt64 1]\"", err.Error()) + tk.MustGetErrMsg(ctx, "admin check table admin_test", "[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 1, index-values:\"\" != record-values:\"handle: 1, values: [KindInt64 1]\"") hook.checkLogCount(t, 1) hook.logs[0].checkMsg(t, "admin check found data inconsistency") hook.logs[0].checkField(t, @@ -1119,9 +1117,7 @@ func TestCheckFailReport(t *testing.T) { require.NoError(t, txn.Commit(tk.ctx)) ctx, hook := withLogHook(tk.ctx, t, "inconsistency") - _, err = tk.Exec(ctx, "admin check table admin_test") - require.Error(t, err) - require.Equal(t, "[admin:8223]data inconsistency in table: admin_test, index: k2, handle: 1, index-values:\"\" != record-values:\"handle: 1, values: [KindString 10]\"", err.Error()) + tk.MustGetErrMsg(ctx, "admin check table admin_test", "[admin:8223]data inconsistency in table: admin_test, index: k2, handle: 1, index-values:\"\" != record-values:\"handle: 1, values: [KindString 10]\"") hook.checkLogCount(t, 1) hook.logs[0].checkMsg(t, "admin check found data inconsistency") hook.logs[0].checkField(t, @@ -1143,9 +1139,8 @@ func TestCheckFailReport(t *testing.T) { require.NoError(t, txn.Commit(tk.ctx)) ctx, hook := withLogHook(tk.ctx, t, "inconsistency") - _, err = tk.Exec(ctx, "admin check table admin_test") - require.Error(t, err) - require.Equal(t, "[admin:8223]data inconsistency in table: admin_test, index: k2, handle: 1, index-values:\"handle: 1, values: [KindString 100 KindInt64 1]\" != record-values:\"\"", err.Error()) + tk.MustGetErrMsg(ctx, "admin check table admin_test", + "[admin:8223]data inconsistency in table: admin_test, index: k2, handle: 1, index-values:\"handle: 1, values: [KindString 100 KindInt64 1]\" != record-values:\"\"") hook.checkLogCount(t, 1) logEntry := hook.logs[0] logEntry.checkMsg(t, "admin check found data inconsistency") @@ -1188,9 +1183,8 @@ func TestCheckFailReport(t *testing.T) { require.NoError(t, txn.Commit(tk.ctx)) ctx, hook := withLogHook(tk.ctx, t, "inconsistency") - _, err = tk.Exec(ctx, "admin check table admin_test") - require.Error(t, err) - require.Equal(t, "[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 1, index-values:\"handle: 1, values: [KindInt64 10 KindInt64 1]\" != record-values:\"\"", err.Error()) + tk.MustGetErrMsg(ctx, "admin check table admin_test", + "[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 1, index-values:\"handle: 1, values: [KindInt64 10 KindInt64 1]\" != record-values:\"\"") hook.checkLogCount(t, 1) logEntry := hook.logs[0] logEntry.checkMsg(t, "admin check found data inconsistency") @@ -1233,9 +1227,8 @@ func TestCheckFailReport(t *testing.T) { require.NoError(t, err) require.NoError(t, txn.Commit(tk.ctx)) ctx, hook := withLogHook(tk.ctx, t, "inconsistency") - _, err = tk.Exec(ctx, "admin check table admin_test") - require.Error(t, err) - require.Equal(t, "[executor:8134]data inconsistency in table: admin_test, index: uk1, col: c2, handle: \"1\", index-values:\"KindInt64 20\" != record-values:\"KindInt64 10\", compare err:", err.Error()) + tk.MustGetErrMsg(ctx, "admin check table admin_test", + "[executor:8134]data inconsistency in table: admin_test, index: uk1, col: c2, handle: \"1\", index-values:\"KindInt64 20\" != record-values:\"KindInt64 10\", compare err:") hook.checkLogCount(t, 1) logEntry := hook.logs[0] logEntry.checkMsg(t, "admin check found data inconsistency") @@ -1261,9 +1254,8 @@ func TestCheckFailReport(t *testing.T) { require.NoError(t, err) require.NoError(t, txn.Commit(tk.ctx)) ctx, hook := withLogHook(tk.ctx, t, "inconsistency") - _, err = tk.Exec(ctx, "admin check table admin_test") - require.Error(t, err) - require.Equal(t, "[executor:8134]data inconsistency in table: admin_test, index: k2, col: c3, handle: \"1\", index-values:\"KindString 200\" != record-values:\"KindString 100\", compare err:", err.Error()) + tk.MustGetErrMsg(ctx, "admin check table admin_test", + "[executor:8134]data inconsistency in table: admin_test, index: k2, col: c3, handle: \"1\", index-values:\"KindString 200\" != record-values:\"KindString 100\", compare err:") hook.checkLogCount(t, 1) logEntry := hook.logs[0] logEntry.checkMsg(t, "admin check found data inconsistency") @@ -1301,12 +1293,10 @@ func TestCheckFailReport(t *testing.T) { // TODO(tiancaiamao): admin check doesn't support the chunk protocol. // Remove this after https://github.com/pingcap/tidb/issues/35156 - _, err = tk.Exec(ctx, "set @@tidb_enable_chunk_rpc = off") - require.NoError(t, err) + tk.MustExec(ctx, "set @@tidb_enable_chunk_rpc = off") - _, err = tk.Exec(ctx, "admin check table admin_test") - require.Error(t, err) - require.Equal(t, `[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 282574488403969, index-values:"handle: 282574488403969, values: [KindInt64 282578800083201 KindInt64 282574488403969]" != record-values:""`, err.Error()) + tk.MustGetErrMsg(ctx, "admin check table admin_test", + `[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 282574488403969, index-values:"handle: 282574488403969, values: [KindInt64 282578800083201 KindInt64 282574488403969]" != record-values:""`) hook.checkLogCount(t, 1) logEntry := hook.logs[0] logEntry.checkMsg(t, "admin check found data inconsistency") diff --git a/testkit/asynctestkit.go b/testkit/asynctestkit.go index aa0f3fcadf8ef..a875088c82abf 100644 --- a/testkit/asynctestkit.go +++ b/testkit/asynctestkit.go @@ -183,6 +183,21 @@ func (tk *AsyncTestKit) MustExec(ctx context.Context, sql string, args ...interf } } +// MustGetErrMsg executes a sql statement and assert its error message. +func (tk *AsyncTestKit) MustGetErrMsg(ctx context.Context, sql string, errStr string) { + err := tk.ExecToErr(ctx, sql) + tk.require.EqualError(err, errStr) +} + +// ExecToErr executes a sql statement and discard results. +func (tk *AsyncTestKit) ExecToErr(ctx context.Context, sql string, args ...interface{}) error { + res, err := tk.Exec(ctx, sql, args...) + if res != nil { + tk.require.NoError(res.Close()) + } + return err +} + // MustQuery query the statements and returns result rows. // If expected result is set it asserts the query result equals expected result. func (tk *AsyncTestKit) MustQuery(ctx context.Context, sql string, args ...interface{}) *Result {