From 4ddf595a1f0a3ecacd2c4ae4585012e6ddfefe57 Mon Sep 17 00:00:00 2001 From: xhe Date: Sun, 25 Oct 2020 17:00:07 +0800 Subject: [PATCH 01/12] ddl: handle drop/truncate placement rule cache Signed-off-by: xhe --- ddl/ddl_worker.go | 44 +++++++++++++++++++++++-------------------- ddl/table.go | 29 +++++++++++++++++++++++++++- infoschema/builder.go | 29 +++++++++++++++++++++++++++- 3 files changed, 80 insertions(+), 22 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index d09170a89bbb7..b578970900be1 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -813,6 +813,21 @@ func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Durat w.waitSchemaChanged(ctx, d, waitTime, latestSchemaVersion, job) } +func buildPlacementAffects(oldIDs []int64, newIDs []in64) []*model.AffectedOption { + affects := make([]*model.AffectedOption, len(oldIDs)) + for i := 0; i < len(oldIDs); i++ { + affects[i] = &model.AffectedOption{ + OldTableID: oldIDs[i], + } + if newIDs != nil { + affects[i].TableID = newIDs[i] + } else { + affects[i].TableID = oldIDs[i] + } + } + return affects +} + // updateSchemaVersion increments the schema version by 1 and sets SchemaDiff. func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) { schemaVersion, err := t.GenSchemaVersion() @@ -832,6 +847,11 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) { return 0, errors.Trace(err) } diff.OldTableID = job.TableID + + // affects are used to update placement rule cache + oldIDs := job.CtxVars[0].([]int64) + newIDs := job.CtxVars[1].([]int64) + diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) case model.ActionCreateView: tbInfo := &model.TableInfo{} var orReplace bool @@ -869,32 +889,16 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) { } diff.AffectedOpts = affects case model.ActionTruncateTablePartition: + diff.TableID = job.TableID oldIDs := job.CtxVars[0].([]int64) newIDs := job.CtxVars[1].([]int64) - diff.TableID = job.TableID - affects := make([]*model.AffectedOption, len(oldIDs)) - for i := 0; i < len(oldIDs); i++ { - affects[i] = &model.AffectedOption{ - SchemaID: job.SchemaID, - TableID: newIDs[i], - OldTableID: oldIDs[i], - } - } - diff.AffectedOpts = affects - case model.ActionDropTablePartition: + diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + case model.ActionDropTablePartition, model.ActionRecoverTable, model.ActionDropTable: // affects are used to update placement rule cache diff.TableID = job.TableID if len(job.CtxVars) > 0 { if oldIDs, ok := job.CtxVars[0].([]int64); ok { - affects := make([]*model.AffectedOption, len(oldIDs)) - for i := 0; i < len(oldIDs); i++ { - affects[i] = &model.AffectedOption{ - SchemaID: job.SchemaID, - TableID: oldIDs[i], - OldTableID: oldIDs[i], - } - } - diff.AffectedOpts = affects + diff.AffectedOpts = buildPlacementAffects(oldIDs, nil) } } case model.ActionAlterTableAlterPartition: diff --git a/ddl/table.go b/ddl/table.go index bbd29a546def7..e5f53654959f1 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -25,7 +25,9 @@ import ( "github.com/pingcap/parser/charset" "github.com/pingcap/parser/model" field_types "github.com/pingcap/parser/types" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -183,6 +185,8 @@ func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) { ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State) case model.StateDeleteOnly: tblInfo.State = model.StateNone + oldIDs := getPartitionIDs(tblInfo) + job.CtxVars = []interface{}{oldIDs} ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State) if err != nil { return ver, errors.Trace(err) @@ -199,7 +203,7 @@ func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) { // Finish this job. job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) startKey := tablecodec.EncodeTablePrefix(job.TableID) - job.Args = append(job.Args, startKey, getPartitionIDs(tblInfo)) + job.Args = append(job.Args, startKey, oldIDs) default: err = ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State) } @@ -321,6 +325,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in } }) + job.CtxVars = []interface{}{tids} ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) @@ -474,6 +479,27 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } } + bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) + if oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(tableID)); ok { + bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newTableID)) + } + + newDefs := tblInfo.Partition.Definitions + newIDs := make([]int64, 0, len(oldPartitionIDs)) + for i := range oldPartitionIDs { + if oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(oldPartitionIDs[i])); ok { + newID := newDefs[i].ID + newIDs = append(newIDs, newID) + bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newID)) + } + } + + err = infosync.PutRuleBundles(nil, bundles) + if err != nil { + job.State = model.JobStateCancelled + return 0, errors.Wrapf(err, "failed to notify PD the placement rules") + } + // Clear the tiflash replica available status. if tblInfo.TiFlashReplica != nil { tblInfo.TiFlashReplica.AvailablePartitionIDs = nil @@ -493,6 +519,7 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } }) + job.CtxVars = []interface{}{oldPartitionIDs, newIDs} ver, err = updateSchemaVersion(t, job) if err != nil { return ver, errors.Trace(err) diff --git a/infoschema/builder.go b/infoschema/builder.go index 4573d1ee2a01e..bb0e59d31b54b 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -73,6 +73,20 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro oldTableID = diff.TableID newTableID = diff.TableID } + // handle placement rule cache + switch diff.Type { + case model.ActionDropTable: + b.applyPlacementDelete(placement.GroupID(oldTableID)) + case model.ActionTruncateTable: + b.applyPlacementDelete(placement.GroupID(oldTableID)) + if err := b.applyPlacementUpdate(placement.GroupID(newTableID)); err != nil { + return nil, errors.Trace(err) + } + case model.ActionRecoverTable: + if err := b.applyPlacementUpdate(placement.GroupID(newTableID)); err != nil { + return nil, errors.Trace(err) + } + } dbInfo := b.copySchemaTables(roDBInfo.Name.L) b.copySortedTables(oldTableID, newTableID) @@ -134,8 +148,21 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro return nil, errors.Trace(err) } continue - case model.ActionDropTablePartition: + case model.ActionDropTable, model.ActionDropTablePartition: + b.applyPlacementDelete(placement.GroupID(opt.OldTableID)) + continue + case model.ActionTruncateTable: b.applyPlacementDelete(placement.GroupID(opt.OldTableID)) + err := b.applyPlacementUpdate(placement.GroupID(opt.TableID)) + if err != nil { + return nil, errors.Trace(err) + } + continue + case model.ActionRecoverTable: + err := b.applyPlacementUpdate(placement.GroupID(opt.TableID)) + if err != nil { + return nil, errors.Trace(err) + } continue } var err error From a002c14f892ef6c7380f822858b89a216a88a1ca Mon Sep 17 00:00:00 2001 From: xhe Date: Sun, 25 Oct 2020 17:15:01 +0800 Subject: [PATCH 02/12] ddl: fix format & build Signed-off-by: xhe --- ddl/ddl_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index b578970900be1..c4f3df9ffac24 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -813,7 +813,7 @@ func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Durat w.waitSchemaChanged(ctx, d, waitTime, latestSchemaVersion, job) } -func buildPlacementAffects(oldIDs []int64, newIDs []in64) []*model.AffectedOption { +func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption { affects := make([]*model.AffectedOption, len(oldIDs)) for i := 0; i < len(oldIDs); i++ { affects[i] = &model.AffectedOption{ From fe1cf53bac503ee044ae0128f443596be86ebb5b Mon Sep 17 00:00:00 2001 From: xhe Date: Sun, 25 Oct 2020 19:10:40 +0800 Subject: [PATCH 03/12] ddl: fix pointer check Signed-off-by: xhe --- ddl/partition.go | 4 ++-- ddl/table.go | 38 ++++++++++++++++++++++---------------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index 3537f4f9be4bd..edd5bc7d992b3 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1018,7 +1018,7 @@ func getTableInfoWithDroppingPartitions(t *model.TableInfo) *model.TableInfo { } func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error { - if d.infoHandle != nil { + if d.infoHandle != nil && d.infoHandle.IsValid() { bundles := make([]*placement.Bundle, 0, len(physicalTableIDs)) for _, ID := range physicalTableIDs { oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(ID)) @@ -1203,7 +1203,7 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e } } - if d.infoHandle != nil { + if d.infoHandle != nil && d.infoHandle.IsValid() { bundles := make([]*placement.Bundle, 0, len(oldIDs)) for i, oldID := range oldIDs { diff --git a/ddl/table.go b/ddl/table.go index e5f53654959f1..fc1fcdce783d3 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -479,25 +479,31 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } } - bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) - if oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(tableID)); ok { - bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newTableID)) - } - - newDefs := tblInfo.Partition.Definitions newIDs := make([]int64, 0, len(oldPartitionIDs)) - for i := range oldPartitionIDs { - if oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(oldPartitionIDs[i])); ok { - newID := newDefs[i].ID - newIDs = append(newIDs, newID) - bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newID)) + if d.infoHandle != nil && d.infoHandle.IsValid() { + is := d.infoHandle.Get() + + bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) + if oldBundle, ok := is.BundleByName(placement.GroupID(tableID)); ok { + bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newTableID)) } - } - err = infosync.PutRuleBundles(nil, bundles) - if err != nil { - job.State = model.JobStateCancelled - return 0, errors.Wrapf(err, "failed to notify PD the placement rules") + if pi := tblInfo.GetPartitionInfo(); pi != nil { + newDefs := pi.Definitions + for i := range oldPartitionIDs { + if oldBundle, ok := is.BundleByName(placement.GroupID(oldPartitionIDs[i])); ok { + newID := newDefs[i].ID + newIDs = append(newIDs, newID) + bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newID)) + } + } + } + + err = infosync.PutRuleBundles(nil, bundles) + if err != nil { + job.State = model.JobStateCancelled + return 0, errors.Wrapf(err, "failed to notify PD the placement rules") + } } // Clear the tiflash replica available status. From 2da602ba127cb300c45bee0ca86560c4334fcf48 Mon Sep 17 00:00:00 2001 From: xhe Date: Sun, 25 Oct 2020 19:44:58 +0800 Subject: [PATCH 04/12] ddl: truncate unpartitioned table may get null oldIDs Signed-off-by: xhe --- ddl/ddl_worker.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index c4f3df9ffac24..97b207fbaf83b 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -814,12 +814,16 @@ func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Durat } func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption { + if len(oldIDs) == 0 { + return nil + } + affects := make([]*model.AffectedOption, len(oldIDs)) for i := 0; i < len(oldIDs); i++ { affects[i] = &model.AffectedOption{ OldTableID: oldIDs[i], } - if newIDs != nil { + if i < len(newIDs) { affects[i].TableID = newIDs[i] } else { affects[i].TableID = oldIDs[i] From a640cfc3563e2df67914ed4dd1edd622672fa09d Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 27 Oct 2020 16:51:06 +0800 Subject: [PATCH 05/12] ddl: address comment Signed-off-by: xhe --- ddl/ddl_worker.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 97b207fbaf83b..8442575dcd3f3 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -823,11 +823,7 @@ func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOpti affects[i] = &model.AffectedOption{ OldTableID: oldIDs[i], } - if i < len(newIDs) { - affects[i].TableID = newIDs[i] - } else { - affects[i].TableID = oldIDs[i] - } + affects[i].TableID = newIDs[i] } return affects } @@ -902,7 +898,7 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) { diff.TableID = job.TableID if len(job.CtxVars) > 0 { if oldIDs, ok := job.CtxVars[0].([]int64); ok { - diff.AffectedOpts = buildPlacementAffects(oldIDs, nil) + diff.AffectedOpts = buildPlacementAffects(oldIDs, oldIDs) } } case model.ActionAlterTableAlterPartition: From 882af9dc466449ea36402d3209b203232cdac574 Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 27 Oct 2020 16:55:06 +0800 Subject: [PATCH 06/12] ddl: refine the name Signed-off-by: xhe --- ddl/partition.go | 2 +- ddl/placement/utils.go | 4 ++-- ddl/placement_rule_test.go | 2 +- ddl/table.go | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ddl/partition.go b/ddl/partition.go index edd5bc7d992b3..8afd7c97702ea 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1210,7 +1210,7 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(oldID)) if ok && !oldBundle.IsEmpty() { bundles = append(bundles, placement.BuildPlacementDropBundle(oldID)) - bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newIDs[i])) + bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newIDs[i])) } } diff --git a/ddl/placement/utils.go b/ddl/placement/utils.go index dac822d012bc8..10c08c8db5be9 100644 --- a/ddl/placement/utils.go +++ b/ddl/placement/utils.go @@ -118,8 +118,8 @@ func BuildPlacementDropBundle(partitionID int64) *Bundle { } } -// BuildPlacementTruncateBundle builds the bundle to copy placement rules from old id to new id. -func BuildPlacementTruncateBundle(oldBundle *Bundle, newID int64) *Bundle { +// BuildPlacementCopyBundle copy a new bundle from the old, with a new name, and new keyranges. +func BuildPlacementCopyBundle(oldBundle *Bundle, newID int64) *Bundle { newBundle := oldBundle.Clone() newBundle.ID = GroupID(newID) startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(newID))) diff --git a/ddl/placement_rule_test.go b/ddl/placement_rule_test.go index a23a514b41577..37760c521e62f 100644 --- a/ddl/placement_rule_test.go +++ b/ddl/placement_rule_test.go @@ -300,7 +300,7 @@ func (s *testPlacementSuite) TestPlacementBuildTruncate(c *C) { }, } for _, t := range tests { - out := placement.BuildPlacementTruncateBundle(bundle, t.input) + out := placement.BuildPlacementCopyBundle(bundle, t.input) c.Assert(t.output, DeepEquals, out) c.Assert(bundle.ID, Equals, placement.GroupID(-1)) c.Assert(bundle.Rules, HasLen, 1) diff --git a/ddl/table.go b/ddl/table.go index fc1fcdce783d3..1b7dada1e757e 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -485,7 +485,7 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1) if oldBundle, ok := is.BundleByName(placement.GroupID(tableID)); ok { - bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newTableID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newTableID)) } if pi := tblInfo.GetPartitionInfo(); pi != nil { @@ -494,7 +494,7 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro if oldBundle, ok := is.BundleByName(placement.GroupID(oldPartitionIDs[i])); ok { newID := newDefs[i].ID newIDs = append(newIDs, newID) - bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newID)) } } } From f732a0c3f8db409123ee800dc53e8e5eba095f15 Mon Sep 17 00:00:00 2001 From: xhe Date: Tue, 27 Oct 2020 16:57:53 +0800 Subject: [PATCH 07/12] ddl: fix Signed-off-by: xhe --- ddl/ddl_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 8442575dcd3f3..7f44d2b4006f6 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -822,8 +822,8 @@ func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOpti for i := 0; i < len(oldIDs); i++ { affects[i] = &model.AffectedOption{ OldTableID: oldIDs[i], + TableID: newIDs[i], } - affects[i].TableID = newIDs[i] } return affects } From cb9afd712c3158b456b00fcc7155a9cdc8c2d4ac Mon Sep 17 00:00:00 2001 From: xhe Date: Wed, 28 Oct 2020 11:39:23 +0800 Subject: [PATCH 08/12] ddl: Update ddl/placement/utils.go Co-authored-by: djshow832 --- ddl/placement/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/placement/utils.go b/ddl/placement/utils.go index 10c08c8db5be9..037a0c6e22be8 100644 --- a/ddl/placement/utils.go +++ b/ddl/placement/utils.go @@ -118,7 +118,7 @@ func BuildPlacementDropBundle(partitionID int64) *Bundle { } } -// BuildPlacementCopyBundle copy a new bundle from the old, with a new name, and new keyranges. +// BuildPlacementCopyBundle copies a new bundle from the old, with a new name and a new key range. func BuildPlacementCopyBundle(oldBundle *Bundle, newID int64) *Bundle { newBundle := oldBundle.Clone() newBundle.ID = GroupID(newID) From 05b00450e9eb9565dbcc7b9bd5423b7435c58352 Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 29 Oct 2020 20:59:17 +0800 Subject: [PATCH 09/12] ddl: add test case Signed-off-by: xhe --- ddl/placement_sql_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/ddl/placement_sql_test.go b/ddl/placement_sql_test.go index f8482603d4006..065bc52746dfd 100644 --- a/ddl/placement_sql_test.go +++ b/ddl/placement_sql_test.go @@ -14,8 +14,12 @@ package ddl_test import ( + "fmt" + . "github.com/pingcap/check" + "github.com/pingcap/parser/model" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/util/testkit" ) @@ -271,3 +275,37 @@ add placement policy replicas=3`) c.Assert(ddl.ErrPartitionMgmtOnNonpartitioned.Equal(err), IsTrue) } + +func (s *testDBSuite1) TestPlacementPolicyCache(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + defer tk.MustExec("drop table if exists t1") + + bundles := make(map[string]*placement.Bundle) + + tk.MustExec("create table t1(id int) partition by hash(id) partitions 2") + + is := s.dom.InfoSchema() + is.MockBundles(bundles) + + tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + c.Assert(err, IsNil) + partDefs := tb.Meta().GetPartitionInfo().Definitions + + rows := []string{} + for _, v := range partDefs { + ptID := placement.GroupID(v.ID) + bundles[ptID] = &placement.Bundle{ + ID: ptID, + Rules: []*placement.Rule{ + {ID: "default"}, + }, + } + rows = append(rows, fmt.Sprintf("%s 0 default test t1 %s 0 ", ptID, v.Name.L)) + } + + tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows(rows...)) + tk.MustExec("drop table t1") + tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows()) +} From 4417a586249882c2dd135e5ba25df2870f7b18ca Mon Sep 17 00:00:00 2001 From: xhe Date: Thu, 29 Oct 2020 22:14:01 +0800 Subject: [PATCH 10/12] ddl: lazily update rule cache when truncate not every partition needs to update, some partitions do not have a non-empty bundle. Signed-off-by: xhe --- ddl/ddl_worker.go | 16 ++++++++++------ ddl/partition.go | 9 +++++---- ddl/table.go | 10 ++++++---- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 7f44d2b4006f6..a8d82a4f0908d 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -849,9 +849,11 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) { diff.OldTableID = job.TableID // affects are used to update placement rule cache - oldIDs := job.CtxVars[0].([]int64) - newIDs := job.CtxVars[1].([]int64) - diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + if len(job.CtxVars) > 0 { + oldIDs := job.CtxVars[0].([]int64) + newIDs := job.CtxVars[1].([]int64) + diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + } case model.ActionCreateView: tbInfo := &model.TableInfo{} var orReplace bool @@ -890,9 +892,11 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) { diff.AffectedOpts = affects case model.ActionTruncateTablePartition: diff.TableID = job.TableID - oldIDs := job.CtxVars[0].([]int64) - newIDs := job.CtxVars[1].([]int64) - diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + if len(job.CtxVars) > 0 { + oldIDs := job.CtxVars[0].([]int64) + newIDs := job.CtxVars[1].([]int64) + diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs) + } case model.ActionDropTablePartition, model.ActionRecoverTable, model.ActionDropTable: // affects are used to update placement rule cache diff.TableID = job.TableID diff --git a/ddl/partition.go b/ddl/partition.go index 8afd7c97702ea..b08d22bc54bea 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1166,7 +1166,6 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e } newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs)) - newIDs := make([]int64, 0, len(oldIDs)) for _, oldID := range oldIDs { for i := 0; i < len(pi.Definitions); i++ { def := &pi.Definitions[i] @@ -1178,7 +1177,6 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e def.ID = pid // Shallow copy only use the def.ID in event handle. newPartitions = append(newPartitions, *def) - newIDs = append(newIDs, pid) break } } @@ -1206,13 +1204,18 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e if d.infoHandle != nil && d.infoHandle.IsValid() { bundles := make([]*placement.Bundle, 0, len(oldIDs)) + yoldIDs := make([]int64, 0, len(oldIDs)) + newIDs := make([]int64, 0, len(oldIDs)) for i, oldID := range oldIDs { oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(oldID)) if ok && !oldBundle.IsEmpty() { + yoldIDs = append(yoldIDs, oldID) + newIDs = append(newIDs, newIDs[i]) bundles = append(bundles, placement.BuildPlacementDropBundle(oldID)) bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newIDs[i])) } } + job.CtxVars = []interface{}{yoldIDs, newIDs} err = infosync.PutRuleBundles(nil, bundles) if err != nil { @@ -1221,8 +1224,6 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e } } - // used by ApplyDiff in updateSchemaVersion - job.CtxVars = []interface{}{oldIDs, newIDs} ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) diff --git a/ddl/table.go b/ddl/table.go index 1b7dada1e757e..e6a49dbf34035 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -479,7 +479,6 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } } - newIDs := make([]int64, 0, len(oldPartitionIDs)) if d.infoHandle != nil && d.infoHandle.IsValid() { is := d.infoHandle.Get() @@ -489,14 +488,18 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } if pi := tblInfo.GetPartitionInfo(); pi != nil { + oldIDs := make([]int64, 0, len(oldPartitionIDs)) + newIDs := make([]int64, 0, len(oldPartitionIDs)) newDefs := pi.Definitions for i := range oldPartitionIDs { - if oldBundle, ok := is.BundleByName(placement.GroupID(oldPartitionIDs[i])); ok { - newID := newDefs[i].ID + newID := newDefs[i].ID + if oldBundle, ok := is.BundleByName(placement.GroupID(oldPartitionIDs[i])); ok && !oldBundle.IsEmpty() { + oldIDs = append(oldIDs, oldPartitionIDs[i]) newIDs = append(newIDs, newID) bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newID)) } } + job.CtxVars = []interface{}{oldIDs, newIDs} } err = infosync.PutRuleBundles(nil, bundles) @@ -525,7 +528,6 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro } }) - job.CtxVars = []interface{}{oldPartitionIDs, newIDs} ver, err = updateSchemaVersion(t, job) if err != nil { return ver, errors.Trace(err) From 3f9e989572c92f5847f63d71fea43d7df7674fd3 Mon Sep 17 00:00:00 2001 From: xhe Date: Fri, 30 Oct 2020 13:58:59 +0800 Subject: [PATCH 11/12] ddl: add truncate test Signed-off-by: xhe --- ddl/placement_sql_test.go | 50 +++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/ddl/placement_sql_test.go b/ddl/placement_sql_test.go index 065bc52746dfd..98f3b39b3353f 100644 --- a/ddl/placement_sql_test.go +++ b/ddl/placement_sql_test.go @@ -279,33 +279,41 @@ add placement policy func (s *testDBSuite1) TestPlacementPolicyCache(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("drop table if exists t1") defer tk.MustExec("drop table if exists t1") - bundles := make(map[string]*placement.Bundle) - - tk.MustExec("create table t1(id int) partition by hash(id) partitions 2") - - is := s.dom.InfoSchema() - is.MockBundles(bundles) - - tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) - c.Assert(err, IsNil) - partDefs := tb.Meta().GetPartitionInfo().Definitions - - rows := []string{} - for _, v := range partDefs { - ptID := placement.GroupID(v.ID) - bundles[ptID] = &placement.Bundle{ - ID: ptID, - Rules: []*placement.Rule{ - {ID: "default"}, - }, + initTable := func() []string { + bundles := make(map[string]*placement.Bundle) + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(id int) partition by hash(id) partitions 2") + + is := s.dom.InfoSchema() + is.MockBundles(bundles) + + tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + c.Assert(err, IsNil) + partDefs := tb.Meta().GetPartitionInfo().Definitions + + rows := []string{} + for _, v := range partDefs { + ptID := placement.GroupID(v.ID) + bundles[ptID] = &placement.Bundle{ + ID: ptID, + Rules: []*placement.Rule{{ID: "default"}}, + } + rows = append(rows, fmt.Sprintf("%s 0 default test t1 %s 0 ", ptID, v.Name.L)) } - rows = append(rows, fmt.Sprintf("%s 0 default test t1 %s 0 ", ptID, v.Name.L)) + return rows } + // test drop + rows := initTable() tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows(rows...)) tk.MustExec("drop table t1") tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows()) + + // test truncate + rows = initTable() + tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows(rows...)) + tk.MustExec("truncate table t1") + tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows()) } From ef54450fd30d2430728327649ba2b1ae95b748b9 Mon Sep 17 00:00:00 2001 From: xhe Date: Fri, 30 Oct 2020 14:06:59 +0800 Subject: [PATCH 12/12] ddl: format fix Signed-off-by: xhe --- ddl/placement_sql_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/placement_sql_test.go b/ddl/placement_sql_test.go index 98f3b39b3353f..70f5e1942cb8a 100644 --- a/ddl/placement_sql_test.go +++ b/ddl/placement_sql_test.go @@ -297,7 +297,7 @@ func (s *testDBSuite1) TestPlacementPolicyCache(c *C) { for _, v := range partDefs { ptID := placement.GroupID(v.ID) bundles[ptID] = &placement.Bundle{ - ID: ptID, + ID: ptID, Rules: []*placement.Rule{{ID: "default"}}, } rows = append(rows, fmt.Sprintf("%s 0 default test t1 %s 0 ", ptID, v.Name.L))