From 98f5610a42c8bb507551ceae1118919c874c83e3 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 10 Aug 2023 15:30:27 +0100 Subject: [PATCH 1/6] This is an automated cherry-pick of #45877 Signed-off-by: ti-chi-bot --- ddl/db_partition_test.go | 118 +++++++++++++++++++++++++++++++ ddl/ddl_api.go | 1 - ddl/ddl_worker.go | 36 +++++----- ddl/failtest/fail_db_test.go | 2 - ddl/partition.go | 124 +++++++++++++++++---------------- ddl/placement_policy_test.go | 73 ++++++++++--------- ddl/rollingback.go | 32 +++++++++ executor/insert_common.go | 5 ++ executor/write.go | 2 +- infoschema/builder.go | 56 +++++++++++---- parser/model/model.go | 3 +- table/tables/partition_test.go | 107 ++++++++++++++++++++++++++++ 12 files changed, 433 insertions(+), 126 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 5c53a7ee5d649..c0d010940eaa1 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2606,6 +2606,124 @@ func TestExchangePartitionTableCompatiable(t *testing.T) { require.NoError(t, err) } +func TestExchangePartitionMultiTable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk1 := testkit.NewTestKit(t, store) + + dbName := "ExchangeMultiTable" + tk1.MustExec(`create schema ` + dbName) + tk1.MustExec(`use ` + dbName) + tk1.MustExec(`CREATE TABLE t1 (a int)`) + tk1.MustExec(`CREATE TABLE t2 (a int)`) + tk1.MustExec(`CREATE TABLE tp (a int) partition by hash(a) partitions 3`) + tk1.MustExec(`insert into t1 values (0)`) + tk1.MustExec(`insert into t2 values (3)`) + tk1.MustExec(`insert into tp values (6)`) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec(`use ` + dbName) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec(`use ` + dbName) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use ` + dbName) + waitFor := func(col int, tableName, s string) { + for { + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use test`) + sql := `admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = '` + tableName + `' and job_type = 'exchange partition'` + res := tk4.MustQuery(sql).Rows() + if len(res) == 1 && res[0][col] == s { + break + } + time.Sleep(10 * time.Millisecond) + } + } + alterChan1 := make(chan error) + alterChan2 := make(chan error) + tk3.MustExec(`BEGIN`) + tk3.MustExec(`insert into tp values (1)`) + go func() { + alterChan1 <- tk1.ExecToErr(`alter table tp exchange partition p0 with table t1`) + }() + waitFor(11, "t1", "running") + go func() { + alterChan2 <- tk2.ExecToErr(`alter table tp exchange partition p0 with table t2`) + }() + waitFor(11, "t2", "queueing") + tk3.MustExec(`rollback`) + require.NoError(t, <-alterChan1) + err := <-alterChan2 + tk3.MustQuery(`select * from t1`).Check(testkit.Rows("6")) + tk3.MustQuery(`select * from t2`).Check(testkit.Rows("0")) + tk3.MustQuery(`select * from tp`).Check(testkit.Rows("3")) + require.NoError(t, err) +} + +func TestExchangePartitionValidation(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + dbName := "ExchangeValidation" + tk.MustExec(`create schema ` + dbName) + tk.MustExec(`use ` + dbName) + tk.MustExec(`CREATE TABLE t1 ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name))`) + + tk.MustExec(`CREATE TABLE t1p ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) + PARTITION BY RANGE COLUMNS(d) + (PARTITION p202307 VALUES LESS THAN ('2023-08-01'), + PARTITION p202308 VALUES LESS THAN ('2023-09-01'), + PARTITION p202309 VALUES LESS THAN ('2023-10-01'), + PARTITION p202310 VALUES LESS THAN ('2023-11-01'), + PARTITION p202311 VALUES LESS THAN ('2023-12-01'), + PARTITION p202312 VALUES LESS THAN ('2024-01-01'), + PARTITION pfuture VALUES LESS THAN (MAXVALUE))`) + + tk.MustExec(`insert into t1 values ("2023-08-06","0000")`) + tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1 with validation`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`insert into t1 values ("2023-08-06","0001")`) +} + +func TestExchangePartitionPlacementPolicy(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec(`create schema ExchangePartWithPolicy`) + tk.MustExec(`use ExchangePartWithPolicy`) + tk.MustExec(`CREATE PLACEMENT POLICY rule1 FOLLOWERS=1`) + tk.MustExec(`CREATE PLACEMENT POLICY rule2 FOLLOWERS=2`) + tk.MustExec(`CREATE TABLE t1 ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) PLACEMENT POLICY="rule1"`) + + tk.MustExec(`CREATE TABLE t1p ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) PLACEMENT POLICY="rule2" + PARTITION BY RANGE COLUMNS(d) + (PARTITION p202307 VALUES LESS THAN ('2023-08-01'), + PARTITION p202308 VALUES LESS THAN ('2023-09-01'), + PARTITION p202309 VALUES LESS THAN ('2023-10-01'), + PARTITION p202310 VALUES LESS THAN ('2023-11-01'), + PARTITION p202311 VALUES LESS THAN ('2023-12-01'), + PARTITION p202312 VALUES LESS THAN ('2024-01-01'), + PARTITION pfuture VALUES LESS THAN (MAXVALUE))`) + + tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1`, + "[ddl:1736]Tables have different definitions") + tk.MustExec(`insert into t1 values ("2023-08-06","0000")`) +} + func TestExchangePartitionHook(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 4509aa0d954e9..50791879fc59f 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -4522,7 +4522,6 @@ func checkExchangePartition(pt *model.TableInfo, nt *model.TableInfo) error { return errors.Trace(dbterror.ErrPartitionExchangeForeignKey.GenWithStackByArgs(nt.Name)) } - // NOTE: if nt is temporary table, it should be checked return nil } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 30782fa694985..dba28a43a30b0 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1366,24 +1366,28 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.OldSchemaID = oldSchemaIDs[0] diff.AffectedOpts = affects case model.ActionExchangeTablePartition: - var ( - ptSchemaID int64 - ptTableID int64 - partName string - withValidation bool - ) - err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation) - if err != nil { - return 0, errors.Trace(err) - } diff.OldTableID = job.TableID - affects := make([]*model.AffectedOption, 1) - affects[0] = &model.AffectedOption{ - SchemaID: ptSchemaID, - TableID: ptTableID, - OldTableID: ptTableID, + diff.OldSchemaID = job.SchemaID + if job.SchemaState != model.StatePublic { + diff.TableID = job.TableID + diff.SchemaID = job.SchemaID + } else { + // Update the partitioned table (it is only done in the last state) + var ( + ptSchemaID int64 + ptTableID int64 + ptDefID int64 // Not needed, will reload the whole table + partName string // Not used + withValidation bool // Not used + ) + // See ddl.ExchangeTablePartition + err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) + if err != nil { + return 0, errors.Trace(err) + } + diff.SchemaID = ptSchemaID + diff.TableID = ptTableID } - diff.AffectedOpts = affects case model.ActionTruncateTablePartition: diff.TableID = job.TableID if len(job.CtxVars) > 0 { diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index 8a0c4c41e9ecd..3fdcda0dddc75 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -135,8 +135,6 @@ func TestHalfwayCancelOperations(t *testing.T) { tk.MustExec("insert into pt values(1), (3), (5)") tk.MustExec("create table nt(a int)") tk.MustExec("insert into nt values(7)") - tk.MustExec("set @@tidb_enable_exchange_partition=1") - defer tk.MustExec("set @@tidb_enable_exchange_partition=0") err = tk.ExecToErr("alter table pt exchange partition p1 with table nt") require.Error(t, err) diff --git a/ddl/partition.go b/ddl/partition.go index 612a7e4d4165f..6cb52a9db02ac 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2116,6 +2116,9 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + if job.IsRollingback() { + return rollbackExchangeTablePartition(d, t, job, nt) + } pt, err := getTableInfo(t, ptID, ptSchemaID) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { @@ -2124,35 +2127,49 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - if pt.State != model.StatePublic { - job.State = model.JobStateCancelled - return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) - } - - err = checkExchangePartition(pt, nt) + index, partDef, err := getPartitionDef(pt, partName) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } + if job.SchemaState == model.StateNone { + if pt.State != model.StatePublic { + job.State = model.JobStateCancelled + return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) + } + err = checkExchangePartition(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } - err = checkTableDefCompatible(pt, nt) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } + err = checkTableDefCompatible(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkExchangePartitionPlacementPolicy(t, nt.PlacementPolicyRef, pt.PlacementPolicyRef, partDef.PlacementPolicyRef) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } - index, _, err := getPartitionDef(pt, partName) - if err != nil { - return ver, errors.Trace(err) - } - if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag { nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ - ExchangePartitionFlag: true, ExchangePartitionID: ptID, ExchangePartitionDefID: defID, } + // We need an interim schema version, + // so there are no non-matching rows inserted + // into the table using the schema version + // before the exchange is made. + job.SchemaState = model.StateWriteOnly return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) } + // From now on, nt (the non-partitioned table) has + // ExchangePartitionInfo set, meaning it is restricted + // to only allow writes that would match the + // partition to be exchange with. + // So we need to rollback that change, instead of just cancelling. if d.lease > 0 { delayForAsyncCommit() @@ -2161,7 +2178,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo if withValidation { err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) if err != nil { - job.State = model.JobStateCancelled + job.State = model.JobStateRollingback return ver, errors.Trace(err) } } @@ -2169,19 +2186,11 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo // partition table auto IDs. ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } // non-partition table auto IDs. ntAutoIDs, err := t.GetAutoIDAccessors(job.SchemaID, nt.ID).Get() if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - _, partDef, err := getPartitionDef(pt, partName) - if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -2194,35 +2203,32 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } } - // exchange table meta id - partDef.ID, nt.ID = nt.ID, partDef.ID - - err = t.UpdateTable(ptSchemaID, pt) + // Recreate non-partition table meta info, + // by first delete it with the old table id + err = t.DropTableOrView(job.SchemaID, nt.ID) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } - failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { - if val.(bool) { - job.State = model.JobStateCancelled - failpoint.Return(ver, errors.New("occur an error after updating partition id")) - } - }) + // exchange table meta id + partDef.ID, nt.ID = nt.ID, partDef.ID - // recreate non-partition table meta info - err = t.DropTableOrView(job.SchemaID, partDef.ID) + err = t.UpdateTable(ptSchemaID, pt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = t.CreateTableOrView(job.SchemaID, nt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } + failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(ver, errors.New("occur an error after updating partition id")) + } + }) + // Set both tables to the maximum auto IDs between normal table and partitioned table. newAutoIDs := meta.AutoIDGroup{ RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID), @@ -2231,12 +2237,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } err = t.GetAutoIDAccessors(ptSchemaID, pt.ID).Put(newAutoIDs) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = t.GetAutoIDAccessors(job.SchemaID, nt.ID).Put(newAutoIDs) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -2255,23 +2259,15 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } }) - err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect - bundles, err := bundlesForExchangeTablePartition(t, job, pt, partDef, nt) + bundles, err := bundlesForExchangeTablePartition(t, pt, partDef, nt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { - job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } @@ -2280,7 +2276,6 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo rules, err := infosync.GetLabelRules(context.TODO(), []string{ntrID, ptrID}) if err != nil { - job.State = model.JobStateCancelled return 0, errors.Wrapf(err, "failed to get PD the label rules") } @@ -2307,10 +2302,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo patch := label.NewRulePatch(setRules, deleteRules) err = infosync.UpdateLabelRules(context.TODO(), patch) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the label rules") } + job.SchemaState = model.StatePublic nt.ExchangePartitionInfo = nil ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true) if err != nil { @@ -2959,7 +2954,11 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) return nil } +<<<<<<< HEAD func bundlesForExchangeTablePartition(t *meta.Meta, job *model.Job, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { +======= +func bundlesForExchangeTablePartition(t *meta.Meta, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { +>>>>>>> c7c7000165a (ddl: Exchange partition rollback (#45877)) bundles := make([]*placement.Bundle, 0, 3) ptBundle, err := placement.NewTableBundle(t, pt) @@ -3057,16 +3056,21 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde return nil } -func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) error { - if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil { +func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPPRef, ptPPRef, partPPRef *model.PolicyRefInfo) error { + partitionPPRef := partPPRef + if partitionPPRef == nil { + partitionPPRef = ptPPRef + } + + if ntPPRef == nil && partitionPPRef == nil { return nil } - if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { + if ntPPRef == nil || partitionPPRef == nil { return dbterror.ErrTablesDifferentMetadata } - ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) - ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) + ptPlacementPolicyInfo, _ := getPolicyInfo(t, partitionPPRef.ID) + ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPPRef.ID) if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { return nil } diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index f7a29c4e711e9..91c8ffb0dfbb5 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -2087,61 +2087,48 @@ func TestExchangePartitionWithPlacement(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) // clearAllBundles(t) tk := testkit.NewTestKit(t, store) - tk.MustExec("set @@tidb_enable_exchange_partition=1") tk.MustExec("use test") - tk.MustExec("drop table if exists t1, t2, tp") - tk.MustExec("drop placement policy if exists p1") - tk.MustExec("drop placement policy if exists p2") - tk.MustExec("drop placement policy if exists p3") - - tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'") - defer tk.MustExec("drop placement policy p1") - - tk.MustExec("create placement policy p2 primary_region='r2' regions='r2'") - defer tk.MustExec("drop placement policy p2") - tk.MustExec("create placement policy p3 primary_region='r3' regions='r3'") - defer tk.MustExec("drop placement policy p3") + tk.MustExec("create placement policy pp1 primary_region='r1' regions='r1'") + tk.MustExec("create placement policy pp2 primary_region='r2' regions='r2'") + tk.MustExec("create placement policy pp3 primary_region='r3' regions='r3'") - policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p1")) + policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("pp1")) require.True(t, ok) - tk.MustExec(`CREATE TABLE t1 (id INT) placement policy p1`) - defer tk.MustExec("drop table t1") - + tk.MustExec(`CREATE TABLE t1 (id INT) placement policy pp1`) tk.MustExec(`CREATE TABLE t2 (id INT)`) - defer tk.MustExec("drop table t2") + tk.MustExec(`CREATE TABLE t3 (id INT) placement policy pp3`) t1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) t1ID := t1.Meta().ID - tk.MustExec(`CREATE TABLE tp (id INT) placement policy p3 PARTITION BY RANGE (id) ( - PARTITION p0 VALUES LESS THAN (100) placement policy p1, - PARTITION p1 VALUES LESS THAN (1000) placement policy p2, - PARTITION p2 VALUES LESS THAN (10000) - );`) - defer tk.MustExec("drop table tp") + tk.MustExec(`CREATE TABLE tp (id INT) placement policy pp3 PARTITION BY RANGE (id) ( + PARTITION p1 VALUES LESS THAN (100) placement policy pp1, + PARTITION p2 VALUES LESS THAN (1000) placement policy pp2, + PARTITION p3 VALUES LESS THAN (10000) + )`) tp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) tpID := tp.Meta().ID par0ID := tp.Meta().Partition.Definitions[0].ID - // exchange par0, t1 - tk.MustExec("alter table tp exchange partition p0 with table t1") + // exchange par1, t1 + tk.MustExec("alter table tp exchange partition p1 with table t1") tk.MustQuery("show create table t1").Check(testkit.Rows("" + "t1 CREATE TABLE `t1` (\n" + " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */")) + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */")) tk.MustQuery("show create table tp").Check(testkit.Rows("" + "tp CREATE TABLE `tp` (\n" + " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp3` */\n" + "PARTITION BY RANGE (`id`)\n" + - "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + - " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + - " PARTITION `p2` VALUES LESS THAN (10000))")) + "(PARTITION `p1` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`pp1` */,\n" + + " PARTITION `p2` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`pp2` */,\n" + + " PARTITION `p3` VALUES LESS THAN (10000))")) tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) require.Equal(t, tpID, tp.Meta().ID) @@ -2153,11 +2140,31 @@ func TestExchangePartitionWithPlacement(t *testing.T) { require.Equal(t, policy1.ID, t1.Meta().PlacementPolicyRef.ID) checkExistTableBundlesInPD(t, dom, "test", "tp") - // exchange par0, t2 - tk.MustGetErrCode("alter table tp exchange partition p0 with table t2", mysql.ErrTablesDifferentMetadata) + // exchange par2, t1 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t1", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t1 + tk.MustGetErrCode("alter table tp exchange partition p3 with table t1", mysql.ErrTablesDifferentMetadata) // exchange par1, t2 tk.MustGetErrCode("alter table tp exchange partition p1 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par2, t2 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t2 + tk.MustGetErrCode("alter table tp exchange partition p3 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par1, t3 + tk.MustGetErrCode("alter table tp exchange partition p1 with table t3", mysql.ErrTablesDifferentMetadata) + + // exchange par2, t3 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t3", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t3 + tk.MustExec("alter table tp exchange partition p3 with table t3") + checkExistTableBundlesInPD(t, dom, "test", "tp") + checkExistTableBundlesInPD(t, dom, "test", "t3") } func TestPDFail(t *testing.T) { diff --git a/ddl/rollingback.go b/ddl/rollingback.go index c3ea2c920ca25..e2647e790266d 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -263,6 +263,30 @@ func needNotifyAndStopReorgWorker(job *model.Job) bool { return false } +// rollbackExchangeTablePartition will clear the non-partitioned +// table's ExchangePartitionInfo state. +func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (int64, error) { + tblInfo.ExchangePartitionInfo = nil + job.State = model.JobStateRollbackDone + job.SchemaState = model.StatePublic + return updateVersionAndTableInfo(d, t, job, tblInfo, true) +} + +func rollingbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + if job.SchemaState == model.StateNone { + // Nothing is changed + job.State = model.JobStateCancelled + return ver, dbterror.ErrCancelledDDLJob + } + var nt *model.TableInfo + nt, err = GetTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + ver, err = rollbackExchangeTablePartition(d, t, job, nt) + return ver, errors.Trace(err) +} + func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { addingDefinitions := tblInfo.Partition.AddingDefinitions partNames := make([]string, 0, len(addingDefinitions)) @@ -377,6 +401,7 @@ func rollingbackReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ve } // addingDefinitions is also in tblInfo, here pass the tblInfo as parameter directly. + // TODO: Test this with reorganize partition p1 into (partition p1 ...)! return convertAddTablePartitionJob2RollbackJob(d, t, job, dbterror.ErrCancelledDDLJob, tblInfo) } @@ -409,6 +434,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) err = rollingbackDropTableOrView(t, job) case model.ActionDropTablePartition: ver, err = rollingbackDropTablePartition(t, job) + case model.ActionExchangeTablePartition: + ver, err = rollingbackExchangeTablePartition(d, t, job) case model.ActionDropSchema: err = rollingbackDropSchema(t, job) case model.ActionRenameIndex: @@ -424,8 +451,13 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition, model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable, model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility, +<<<<<<< HEAD model.ActionExchangeTablePartition, model.ActionModifySchemaDefaultPlacement, model.ActionRecoverSchema: +======= + model.ActionModifySchemaDefaultPlacement, + model.ActionRecoverSchema, model.ActionAlterCheckConstraint: +>>>>>>> c7c7000165a (ddl: Exchange partition rollback (#45877)) ver, err = cancelOnlyNotHandledJob(job, model.StateNone) case model.ActionMultiSchemaChange: err = rollingBackMultiSchemaChange(job) diff --git a/executor/insert_common.go b/executor/insert_common.go index 36c187c13ef9f..1eb64be775e97 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -687,8 +687,13 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue } tbl := e.Table.Meta() // Handle exchange partition +<<<<<<< HEAD if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) +======= + if tbl.ExchangePartitionInfo != nil { + is := e.Ctx().GetDomainInfoSchema().(infoschema.InfoSchema) +>>>>>>> c7c7000165a (ddl: Exchange partition rollback (#45877)) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { return nil, errors.Errorf("exchange partition process table by id failed") diff --git a/executor/write.go b/executor/write.go index 41e1e1d322206..ff1e31e5deee6 100644 --- a/executor/write.go +++ b/executor/write.go @@ -78,7 +78,7 @@ func updateRecord( // Handle exchange partition tbl := t.Meta() - if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo != nil { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { diff --git a/infoschema/builder.go b/infoschema/builder.go index 7ac4b9528ab9a..e2c78387eb9bf 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -227,6 +227,8 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro return b.applyCreateTables(m, diff) case model.ActionReorganizePartition: return b.applyReorganizePartition(m, diff) + case model.ActionExchangeTablePartition: + return b.applyExchangeTablePartition(m, diff) case model.ActionFlashbackCluster: return []int64{-1}, nil default: @@ -309,6 +311,47 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) return tblIDs, nil } +func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + // The partitioned table is not affected until the last stage + if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID { + return b.applyTableUpdate(m, diff) + } + ntSchemaID := diff.OldSchemaID + ntID := diff.OldTableID + ptSchemaID := diff.SchemaID + ptID := diff.TableID + if len(diff.AffectedOpts) > 0 { + // From old version + ptID = diff.AffectedOpts[0].TableID + ptSchemaID = diff.AffectedOpts[0].SchemaID + } + // The normal table needs to be updated first: + // Just update the tables separately + currDiff := &model.SchemaDiff{ + Version: diff.Version, + TableID: ntID, + SchemaID: ntSchemaID, + } + ntIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + b.markPartitionBundleShouldUpdate(ntID) + // Then the partitioned table + currDiff.TableID = ptID + currDiff.SchemaID = ptSchemaID + ptIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + b.markTableBundleShouldUpdate(ptID) + err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) + if err != nil { + return nil, errors.Trace(err) + } + return append(ptIDs, ntIDs...), nil +} + func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { @@ -376,14 +419,6 @@ func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]in return nil, errors.Trace(err) } tblIDs = append(tblIDs, affectedIDs...) - - if diff.Type == model.ActionExchangeTablePartition { - // handle partition table and table AutoID - err = updateAutoIDForExchangePartition(b.store, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.TableID) - if err != nil { - return nil, errors.Trace(err) - } - } } return tblIDs, nil @@ -415,7 +450,7 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 newTableID = diff.TableID case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID - case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: + case model.ActionTruncateTable, model.ActionCreateView: oldTableID = diff.OldTableID newTableID = diff.TableID default: @@ -433,8 +468,6 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 b.markTableBundleShouldUpdate(newTableID) case model.ActionRecoverTable: b.markTableBundleShouldUpdate(newTableID) - case model.ActionExchangeTablePartition: - b.markPartitionBundleShouldUpdate(newTableID) case model.ActionAlterTablePlacement: b.markTableBundleShouldUpdate(newTableID) } @@ -445,7 +478,6 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 var allocs autoid.Allocators if tableIDIsValid(oldTableID) { if oldTableID == newTableID && (diff.Type != model.ActionRenameTable && diff.Type != model.ActionRenameTables) && - diff.Type != model.ActionExchangeTablePartition && // For repairing table in TiDB cluster, given 2 normal node and 1 repair node. // For normal node's information schema, repaired table is existed. // For repair node's information schema, repaired table is filtered (couldn't find it in `is`). diff --git a/parser/model/model.go b/parser/model/model.go index aa48c817f5de1..233468edcc0d3 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1142,9 +1142,10 @@ func (p PartitionType) String() string { // ExchangePartitionInfo provides exchange partition info. type ExchangePartitionInfo struct { - ExchangePartitionFlag bool `json:"exchange_partition_flag"` ExchangePartitionID int64 `json:"exchange_partition_id"` ExchangePartitionDefID int64 `json:"exchange_partition_def_id"` + // Deprecated, not used + XXXExchangePartitionFlag bool `json:"exchange_partition_flag"` } // PartitionInfo provides table partition info. diff --git a/table/tables/partition_test.go b/table/tables/partition_test.go index e0bc678057700..7867d0272ae3c 100644 --- a/table/tables/partition_test.go +++ b/table/tables/partition_test.go @@ -709,6 +709,113 @@ func TestIssue31629(t *testing.T) { } } +func TestExchangePartitionStates(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + dbName := "partSchemaVer" + tk.MustExec("create database " + dbName) + tk.MustExec("use " + dbName) + tk.MustExec(`set @@global.tidb_enable_metadata_lock = ON`) + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use " + dbName) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use " + dbName) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec("use " + dbName) + tk.MustExec(`create table t (a int primary key, b varchar(255), key (b))`) + tk.MustExec(`create table tp (a int primary key, b varchar(255), key (b)) partition by range (a) (partition p0 values less than (1000000), partition p1M values less than (2000000))`) + tk.MustExec(`insert into t values (1, "1")`) + tk.MustExec(`insert into tp values (2, "2")`) + tk.MustExec(`analyze table t,tp`) + tk.MustExec("BEGIN") + tk.MustQuery(`select * from t`).Check(testkit.Rows("1 1")) + tk.MustQuery(`select * from tp`).Check(testkit.Rows("2 2")) + alterChan := make(chan error) + go func() { + // WITH VALIDATION is the default + err := tk2.ExecToErr(`alter table tp exchange partition p0 with table t`) + alterChan <- err + }() + waitFor := func(tableName, s string, pos int) { + for { + select { + case alterErr := <-alterChan: + require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr) + default: + // Alter still running + } + res := tk4.MustQuery(`admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = '` + tableName + `' and job_type = 'exchange partition'`).Rows() + if len(res) == 1 && res[0][pos] == s { + logutil.BgLogger().Info("Got state", zap.String("State", s)) + break + } + gotime.Sleep(50 * gotime.Millisecond) + } + } + waitFor("t", "write only", 4) + tk3.MustExec(`BEGIN`) + tk3.MustExec(`insert into t values (4,"4")`) + tk3.MustContainErrMsg(`insert into t values (1000004,"1000004")`, "[table:1748]Found a row not matching the given partition set") + tk.MustExec(`insert into t values (5,"5")`) + // This should fail the alter table! + tk.MustExec(`insert into t values (1000005,"1000005")`) + + // MDL will block the alter to not continue until all clients + // are in StateWriteOnly, which tk is blocking until it commits + tk.MustExec(`COMMIT`) + waitFor("t", "rollback done", 11) + // MDL will block the alter from finish, tk is in 'rollbacked' schema version + // but the alter is still waiting for tk3 to commit, before continuing + tk.MustExec("BEGIN") + tk.MustExec(`insert into t values (1000006,"1000006")`) + tk.MustExec(`insert into t values (6,"6")`) + tk3.MustExec(`insert into t values (7,"7")`) + tk3.MustContainErrMsg(`insert into t values (1000007,"1000007")`, + "[table:1748]Found a row not matching the given partition set") + tk3.MustExec("COMMIT") + require.ErrorContains(t, <-alterChan, + "[ddl:1737]Found a row that does not match the partition") + tk3.MustExec(`BEGIN`) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows( + "1 1", "1000005 1000005", "1000006 1000006", "5 5", "6 6")) + tk.MustQuery(`select * from tp`).Sort().Check(testkit.Rows("2 2")) + tk3.MustQuery(`select * from t`).Sort().Check(testkit.Rows( + "1 1", "1000005 1000005", "4 4", "5 5", "7 7")) + tk3.MustQuery(`select * from tp`).Sort().Check(testkit.Rows("2 2")) + tk.MustContainErrMsg(`insert into t values (7,"7")`, + "[kv:1062]Duplicate entry '7' for key 't.PRIMARY'") + tk.MustExec(`insert into t values (8,"8")`) + tk.MustExec(`insert into t values (1000008,"1000008")`) + tk.MustExec(`insert into tp values (9,"9")`) + tk.MustExec(`insert into tp values (1000009,"1000009")`) + tk3.MustExec(`insert into t values (10,"10")`) + tk3.MustExec(`insert into t values (1000010,"1000010")`) + + tk3.MustExec(`COMMIT`) + tk.MustQuery(`show create table tp`).Check(testkit.Rows("" + + "tp CREATE TABLE `tp` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (1000000),\n" + + " PARTITION `p1M` VALUES LESS THAN (2000000))")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustExec(`commit`) + tk.MustExec(`insert into t values (11,"11")`) + tk.MustExec(`insert into t values (1000011,"1000011")`) + tk.MustExec(`insert into tp values (12,"12")`) + tk.MustExec(`insert into tp values (1000012,"1000012")`) +} + func TestAddKeyPartitionStates(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) From be821d283992e1caa0f9d8995bdf2380343e33b4 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 17 Aug 2023 13:23:09 +0200 Subject: [PATCH 2/6] This is an automated cherry-pick of #46126 Signed-off-by: ti-chi-bot --- ddl/ddl_worker.go | 37 +++++++++++++++++++++++++ ddl/partition.go | 49 +++++++++++++++++++++++++++++++++ infoschema/builder.go | 64 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 30782fa694985..7ffd56eaea0f4 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1366,6 +1366,7 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.OldSchemaID = oldSchemaIDs[0] diff.AffectedOpts = affects case model.ActionExchangeTablePartition: +<<<<<<< HEAD var ( ptSchemaID int64 ptTableID int64 @@ -1375,6 +1376,42 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation) if err != nil { return 0, errors.Trace(err) +======= + // From start of function: diff.SchemaID = job.SchemaID + // Old is original non partitioned table + diff.OldTableID = job.TableID + diff.OldSchemaID = job.SchemaID + // Update the partitioned table (it is only done in the last state) + var ( + ptSchemaID int64 + ptTableID int64 + ptDefID int64 + partName string // Not used + withValidation bool // Not used + ) + // See ddl.ExchangeTablePartition + err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) + if err != nil { + return 0, errors.Trace(err) + } + // This is needed for not crashing TiFlash! + // TODO: Update TiFlash, to handle StateWriteOnly + diff.AffectedOpts = []*model.AffectedOption{{ + TableID: ptTableID, + }} + if job.SchemaState != model.StatePublic { + // No change, just to refresh the non-partitioned table + // with its new ExchangePartitionInfo. + diff.TableID = job.TableID + // Keep this as Schema ID of non-partitioned table + // to avoid trigger early rename in TiFlash + diff.AffectedOpts[0].SchemaID = job.SchemaID + } else { + // Swap + diff.TableID = ptDefID + // Also add correct SchemaID in case different schemas + diff.AffectedOpts[0].SchemaID = ptSchemaID +>>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126)) } diff.OldTableID = job.TableID affects := make([]*model.AffectedOption, 1) diff --git a/ddl/partition.go b/ddl/partition.go index 612a7e4d4165f..d27cd0fe94c0d 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2145,7 +2145,43 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo if err != nil { return ver, errors.Trace(err) } +<<<<<<< HEAD if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag { +======= + if job.SchemaState == model.StateNone { + if pt.State != model.StatePublic { + job.State = model.JobStateCancelled + return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) + } + err = checkExchangePartition(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkTableDefCompatible(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkExchangePartitionPlacementPolicy(t, nt.PlacementPolicyRef, pt.PlacementPolicyRef, partDef.PlacementPolicyRef) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + if defID != partDef.ID { + logutil.BgLogger().Info("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"), + zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID)) + job.Args[0] = partDef.ID + defID = partDef.ID + err = updateDDLJob2Table(w.sess, job, true) + if err != nil { + return ver, errors.Trace(err) + } + } +>>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126)) nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ ExchangePartitionFlag: true, ExchangePartitionID: ptID, @@ -2158,6 +2194,18 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo delayForAsyncCommit() } + if defID != partDef.ID { + // Should never happen, should have been updated above, in previous state! + logutil.BgLogger().Error("Exchange partition id changed, updating to actual id", zap.String("category", "ddl"), + zap.String("job", job.String()), zap.Int64("defID", defID), zap.Int64("partDef.ID", partDef.ID)) + job.Args[0] = partDef.ID + defID = partDef.ID + err = updateDDLJob2Table(w.sess, job, true) + if err != nil { + return ver, errors.Trace(err) + } + } + if withValidation { err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) if err != nil { @@ -2287,6 +2335,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo ntr := rules[ntrID] ptr := rules[ptrID] + // This must be a bug, nt cannot be partitioned! partIDs := getPartitionIDs(nt) var setRules []*label.Rule diff --git a/infoschema/builder.go b/infoschema/builder.go index 7ac4b9528ab9a..a373c62f05135 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -309,6 +309,65 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) return tblIDs, nil } +<<<<<<< HEAD +======= +func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + // The partitioned table is not affected until the last stage + if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID { + return b.applyTableUpdate(m, diff) + } + ntSchemaID := diff.OldSchemaID + ntID := diff.OldTableID + ptSchemaID := diff.SchemaID + ptID := diff.TableID + partID := diff.TableID + if len(diff.AffectedOpts) > 0 { + ptID = diff.AffectedOpts[0].TableID + if diff.AffectedOpts[0].SchemaID != 0 { + ptSchemaID = diff.AffectedOpts[0].SchemaID + } + } + // The normal table needs to be updated first: + // Just update the tables separately + currDiff := &model.SchemaDiff{ + // This is only for the case since https://github.com/pingcap/tidb/pull/45877 + // Fixed now, by adding back the AffectedOpts + // to carry the partitioned Table ID. + Type: diff.Type, + Version: diff.Version, + TableID: ntID, + SchemaID: ntSchemaID, + } + if ptID != partID { + currDiff.TableID = partID + currDiff.OldTableID = ntID + currDiff.OldSchemaID = ntSchemaID + } + ntIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + // partID is the new id for the non-partitioned table! + b.markTableBundleShouldUpdate(partID) + // Then the partitioned table, will re-read the whole table, including all partitions! + currDiff.TableID = ptID + currDiff.SchemaID = ptSchemaID + currDiff.OldTableID = ptID + currDiff.OldSchemaID = ptSchemaID + ptIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + // ntID is the new id for the partition! + b.markPartitionBundleShouldUpdate(ntID) + err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) + if err != nil { + return nil, errors.Trace(err) + } + return append(ptIDs, ntIDs...), nil +} + +>>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126)) func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { @@ -415,7 +474,12 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 newTableID = diff.TableID case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID +<<<<<<< HEAD case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: +======= + case model.ActionTruncateTable, model.ActionCreateView, + model.ActionExchangeTablePartition: +>>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126)) oldTableID = diff.OldTableID newTableID = diff.TableID default: From 5a9d680332f77552e69765bd7935a9d0b444eead Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Tue, 3 Oct 2023 15:00:37 +0200 Subject: [PATCH 3/6] Manual merge resolve --- ddl/partition.go | 4 ---- ddl/rollingback.go | 5 ----- executor/insert_common.go | 7 +------ 3 files changed, 1 insertion(+), 15 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index 6cb52a9db02ac..07b0c4b286251 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2954,11 +2954,7 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) return nil } -<<<<<<< HEAD -func bundlesForExchangeTablePartition(t *meta.Meta, job *model.Job, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { -======= func bundlesForExchangeTablePartition(t *meta.Meta, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { ->>>>>>> c7c7000165a (ddl: Exchange partition rollback (#45877)) bundles := make([]*placement.Bundle, 0, 3) ptBundle, err := placement.NewTableBundle(t, pt) diff --git a/ddl/rollingback.go b/ddl/rollingback.go index e2647e790266d..4e4f8c2c31137 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -451,13 +451,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition, model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable, model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility, -<<<<<<< HEAD - model.ActionExchangeTablePartition, model.ActionModifySchemaDefaultPlacement, - model.ActionRecoverSchema: -======= model.ActionModifySchemaDefaultPlacement, model.ActionRecoverSchema, model.ActionAlterCheckConstraint: ->>>>>>> c7c7000165a (ddl: Exchange partition rollback (#45877)) ver, err = cancelOnlyNotHandledJob(job, model.StateNone) case model.ActionMultiSchemaChange: err = rollingBackMultiSchemaChange(job) diff --git a/executor/insert_common.go b/executor/insert_common.go index 1eb64be775e97..f738829262a60 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -687,13 +687,8 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue } tbl := e.Table.Meta() // Handle exchange partition -<<<<<<< HEAD - if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { - is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) -======= if tbl.ExchangePartitionInfo != nil { - is := e.Ctx().GetDomainInfoSchema().(infoschema.InfoSchema) ->>>>>>> c7c7000165a (ddl: Exchange partition rollback (#45877)) + is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { return nil, errors.Errorf("exchange partition process table by id failed") From 623b592c6f774ddf6c667100c2e6434c97451782 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Tue, 3 Oct 2023 15:18:50 +0200 Subject: [PATCH 4/6] Manual merge fixes --- ddl/ddl_worker.go | 13 ------------- ddl/partition.go | 28 +++++++++++----------------- infoschema/builder.go | 7 ------- 3 files changed, 11 insertions(+), 37 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 7ffd56eaea0f4..add27c44469d0 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1366,17 +1366,6 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.OldSchemaID = oldSchemaIDs[0] diff.AffectedOpts = affects case model.ActionExchangeTablePartition: -<<<<<<< HEAD - var ( - ptSchemaID int64 - ptTableID int64 - partName string - withValidation bool - ) - err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation) - if err != nil { - return 0, errors.Trace(err) -======= // From start of function: diff.SchemaID = job.SchemaID // Old is original non partitioned table diff.OldTableID = job.TableID @@ -1395,7 +1384,6 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... return 0, errors.Trace(err) } // This is needed for not crashing TiFlash! - // TODO: Update TiFlash, to handle StateWriteOnly diff.AffectedOpts = []*model.AffectedOption{{ TableID: ptTableID, }} @@ -1411,7 +1399,6 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.TableID = ptDefID // Also add correct SchemaID in case different schemas diff.AffectedOpts[0].SchemaID = ptSchemaID ->>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126)) } diff.OldTableID = job.TableID affects := make([]*model.AffectedOption, 1) diff --git a/ddl/partition.go b/ddl/partition.go index d27cd0fe94c0d..d82dc293741df 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2141,13 +2141,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - index, _, err := getPartitionDef(pt, partName) + index, partDef, err := getPartitionDef(pt, partName) if err != nil { return ver, errors.Trace(err) } -<<<<<<< HEAD - if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag { -======= if job.SchemaState == model.StateNone { if pt.State != model.StatePublic { job.State = model.JobStateCancelled @@ -2181,7 +2178,6 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } } ->>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126)) nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ ExchangePartitionFlag: true, ExchangePartitionID: ptID, @@ -2227,12 +2223,6 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - _, partDef, err := getPartitionDef(pt, partName) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - if pt.TiFlashReplica != nil { for i, id := range pt.TiFlashReplica.AvailablePartitionIDs { if id == partDef.ID { @@ -2303,7 +2293,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } }) - err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) + err = checkExchangePartitionPlacementPolicy(t, nt.PlacementPolicyRef, pt.PlacementPolicyRef, partDef.PlacementPolicyRef) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -3106,16 +3096,20 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde return nil } -func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) error { - if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil { +func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPPRef, ptPPRef, partPPRef *model.PolicyRefInfo) error { + partitionPPRef := partPPRef + if partitionPPRef == nil { + partitionPPRef = ptPPRef + } + if ntPPRef == nil && partitionPPRef == nil { return nil } - if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { + if ntPPRef == nil || partitionPPRef == nil { return dbterror.ErrTablesDifferentMetadata } - ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) - ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) + ptPlacementPolicyInfo, _ := getPolicyInfo(t, partitionPPRef.ID) + ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPPRef.ID) if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { return nil } diff --git a/infoschema/builder.go b/infoschema/builder.go index a373c62f05135..e6c81352d66d1 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -309,8 +309,6 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff) return tblIDs, nil } -<<<<<<< HEAD -======= func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { // The partitioned table is not affected until the last stage if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID { @@ -367,7 +365,6 @@ func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDi return append(ptIDs, ntIDs...), nil } ->>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126)) func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { @@ -474,12 +471,8 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 newTableID = diff.TableID case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID -<<<<<<< HEAD - case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: -======= case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: ->>>>>>> 48e22971729 (ddl: Exchange part schema load fix (#46126)) oldTableID = diff.OldTableID newTableID = diff.TableID default: From 9bc71de7cbd89c6ad069043fe9e47fa9e60f7f1a Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 5 Oct 2023 23:12:54 +0200 Subject: [PATCH 5/6] Including changes from #46126 --- ddl/ddl_worker.go | 44 +++++++++++++++++++++++++++---------------- infoschema/builder.go | 24 ++++++++++++++++++----- 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index dba28a43a30b0..3863f196f4aa4 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1366,27 +1366,39 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.OldSchemaID = oldSchemaIDs[0] diff.AffectedOpts = affects case model.ActionExchangeTablePartition: + // From start of function: diff.SchemaID = job.SchemaID + // Old is original non partitioned table diff.OldTableID = job.TableID diff.OldSchemaID = job.SchemaID + // Update the partitioned table (it is only done in the last state) + var ( + ptSchemaID int64 + ptTableID int64 + ptDefID int64 + partName string // Not used + withValidation bool // Not used + ) + // See ddl.ExchangeTablePartition + err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) + if err != nil { + return 0, errors.Trace(err) + } + // This is needed for not crashing TiFlash! + diff.AffectedOpts = []*model.AffectedOption{{ + TableID: ptTableID, + }} if job.SchemaState != model.StatePublic { + // No change, just to refresh the non-partitioned table + // with its new ExchangePartitionInfo. diff.TableID = job.TableID - diff.SchemaID = job.SchemaID + // Keep this as Schema ID of non-partitioned table + // to avoid trigger early rename in TiFlash + diff.AffectedOpts[0].SchemaID = job.SchemaID } else { - // Update the partitioned table (it is only done in the last state) - var ( - ptSchemaID int64 - ptTableID int64 - ptDefID int64 // Not needed, will reload the whole table - partName string // Not used - withValidation bool // Not used - ) - // See ddl.ExchangeTablePartition - err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) - if err != nil { - return 0, errors.Trace(err) - } - diff.SchemaID = ptSchemaID - diff.TableID = ptTableID + // Swap + diff.TableID = ptDefID + // Also add correct SchemaID in case different schemas + diff.AffectedOpts[0].SchemaID = ptSchemaID } case model.ActionTruncateTablePartition: diff.TableID = job.TableID diff --git a/infoschema/builder.go b/infoschema/builder.go index e2c78387eb9bf..75ac943dd0641 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -320,31 +320,45 @@ func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDi ntID := diff.OldTableID ptSchemaID := diff.SchemaID ptID := diff.TableID + partID := diff.TableID if len(diff.AffectedOpts) > 0 { - // From old version + // should always have len == 1 ptID = diff.AffectedOpts[0].TableID - ptSchemaID = diff.AffectedOpts[0].SchemaID + if diff.AffectedOpts[0].SchemaID != 0 { + ptSchemaID = diff.AffectedOpts[0].SchemaID + } } // The normal table needs to be updated first: // Just update the tables separately currDiff := &model.SchemaDiff{ + Type: diff.Type, Version: diff.Version, TableID: ntID, SchemaID: ntSchemaID, } + if ptID != partID { + currDiff.TableID = partID + currDiff.OldTableID = ntID + currDiff.OldSchemaID = ntSchemaID + } ntIDs, err := b.applyTableUpdate(m, currDiff) if err != nil { return nil, errors.Trace(err) } - b.markPartitionBundleShouldUpdate(ntID) - // Then the partitioned table + // partID is the new id for the non-partitioned table! + b.markTableBundleShouldUpdate(partID) + //b.markPartitionBundleShouldUpdate(partID) + // Then the partitioned table, will re-read the whole table, including all partitions! currDiff.TableID = ptID currDiff.SchemaID = ptSchemaID + currDiff.OldTableID = ptID + currDiff.OldSchemaID = ptSchemaID ptIDs, err := b.applyTableUpdate(m, currDiff) if err != nil { return nil, errors.Trace(err) } - b.markTableBundleShouldUpdate(ptID) + // ntID is the new id for the partition! + b.markPartitionBundleShouldUpdate(ntID) err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) if err != nil { return nil, errors.Trace(err) From 2ccffd5b98766678b910fcf6865587936f5ec78a Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 9 Oct 2023 15:05:55 +0200 Subject: [PATCH 6/6] Adding back case removed by mistake. --- infoschema/builder.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/infoschema/builder.go b/infoschema/builder.go index 121aad74985bd..dc41c1d0d1d96 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -461,6 +461,8 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 // Since the cluster-index feature also has similar problem, we chose to prevent DDL execution during the upgrade process to avoid this issue. oldTableID = diff.OldTableID newTableID = diff.TableID + case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: + oldTableID = diff.TableID case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: oldTableID = diff.OldTableID