Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: handle placement rule cache for drop/truncate/recover/flashback table #20622

Merged
merged 15 commits into from
Nov 2, 2020
48 changes: 28 additions & 20 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,21 @@ func (w *worker) waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Durat
w.waitSchemaChanged(ctx, d, waitTime, latestSchemaVersion, job)
}

func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {
if len(oldIDs) == 0 {
return nil
}

affects := make([]*model.AffectedOption, len(oldIDs))
for i := 0; i < len(oldIDs); i++ {
affects[i] = &model.AffectedOption{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about schemaID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not used anyway. Affects are processed by L141-165, then continue the loop. It is never passed to ApplyDiff or whatever function that required schemaID.

OldTableID: oldIDs[i],
TableID: newIDs[i],
}
}
return affects
}

// updateSchemaVersion increments the schema version by 1 and sets SchemaDiff.
func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) {
schemaVersion, err := t.GenSchemaVersion()
Expand All @@ -832,6 +847,13 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) {
return 0, errors.Trace(err)
}
diff.OldTableID = job.TableID

// affects are used to update placement rule cache
if len(job.CtxVars) > 0 {
oldIDs := job.CtxVars[0].([]int64)
newIDs := job.CtxVars[1].([]int64)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
case model.ActionCreateView:
tbInfo := &model.TableInfo{}
var orReplace bool
Expand Down Expand Up @@ -869,32 +891,18 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) {
}
diff.AffectedOpts = affects
case model.ActionTruncateTablePartition:
oldIDs := job.CtxVars[0].([]int64)
newIDs := job.CtxVars[1].([]int64)
diff.TableID = job.TableID
affects := make([]*model.AffectedOption, len(oldIDs))
for i := 0; i < len(oldIDs); i++ {
affects[i] = &model.AffectedOption{
SchemaID: job.SchemaID,
TableID: newIDs[i],
OldTableID: oldIDs[i],
}
if len(job.CtxVars) > 0 {
oldIDs := job.CtxVars[0].([]int64)
newIDs := job.CtxVars[1].([]int64)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
diff.AffectedOpts = affects
case model.ActionDropTablePartition:
case model.ActionDropTablePartition, model.ActionRecoverTable, model.ActionDropTable:
// affects are used to update placement rule cache
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
if oldIDs, ok := job.CtxVars[0].([]int64); ok {
affects := make([]*model.AffectedOption, len(oldIDs))
for i := 0; i < len(oldIDs); i++ {
affects[i] = &model.AffectedOption{
SchemaID: job.SchemaID,
TableID: oldIDs[i],
OldTableID: oldIDs[i],
}
}
diff.AffectedOpts = affects
diff.AffectedOpts = buildPlacementAffects(oldIDs, oldIDs)
}
}
case model.ActionAlterTableAlterPartition:
Expand Down
15 changes: 8 additions & 7 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ func getTableInfoWithDroppingPartitions(t *model.TableInfo) *model.TableInfo {
}

func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error {
if d.infoHandle != nil {
if d.infoHandle != nil && d.infoHandle.IsValid() {
bundles := make([]*placement.Bundle, 0, len(physicalTableIDs))
for _, ID := range physicalTableIDs {
oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(ID))
Expand Down Expand Up @@ -1166,7 +1166,6 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
}

newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs))
newIDs := make([]int64, 0, len(oldIDs))
for _, oldID := range oldIDs {
for i := 0; i < len(pi.Definitions); i++ {
def := &pi.Definitions[i]
Expand All @@ -1178,7 +1177,6 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
def.ID = pid
// Shallow copy only use the def.ID in event handle.
newPartitions = append(newPartitions, *def)
newIDs = append(newIDs, pid)
break
}
}
Expand All @@ -1203,16 +1201,21 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
}
}

if d.infoHandle != nil {
if d.infoHandle != nil && d.infoHandle.IsValid() {
bundles := make([]*placement.Bundle, 0, len(oldIDs))

yoldIDs := make([]int64, 0, len(oldIDs))
newIDs := make([]int64, 0, len(oldIDs))
for i, oldID := range oldIDs {
oldBundle, ok := d.infoHandle.Get().BundleByName(placement.GroupID(oldID))
if ok && !oldBundle.IsEmpty() {
yoldIDs = append(yoldIDs, oldID)
newIDs = append(newIDs, newIDs[i])
bundles = append(bundles, placement.BuildPlacementDropBundle(oldID))
bundles = append(bundles, placement.BuildPlacementTruncateBundle(oldBundle, newIDs[i]))
bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newIDs[i]))
}
}
job.CtxVars = []interface{}{yoldIDs, newIDs}

err = infosync.PutRuleBundles(nil, bundles)
if err != nil {
Expand All @@ -1221,8 +1224,6 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
}
}

// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []interface{}{oldIDs, newIDs}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions ddl/placement/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func BuildPlacementDropBundle(partitionID int64) *Bundle {
}
}

// BuildPlacementTruncateBundle builds the bundle to copy placement rules from old id to new id.
func BuildPlacementTruncateBundle(oldBundle *Bundle, newID int64) *Bundle {
// BuildPlacementCopyBundle copies a new bundle from the old, with a new name and a new key range.
func BuildPlacementCopyBundle(oldBundle *Bundle, newID int64) *Bundle {
newBundle := oldBundle.Clone()
newBundle.ID = GroupID(newID)
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(newID)))
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (s *testPlacementSuite) TestPlacementBuildTruncate(c *C) {
},
}
for _, t := range tests {
out := placement.BuildPlacementTruncateBundle(bundle, t.input)
out := placement.BuildPlacementCopyBundle(bundle, t.input)
c.Assert(t.output, DeepEquals, out)
c.Assert(bundle.ID, Equals, placement.GroupID(-1))
c.Assert(bundle.Rules, HasLen, 1)
Expand Down
38 changes: 38 additions & 0 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
package ddl_test

import (
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -271,3 +275,37 @@ add placement policy
replicas=3`)
c.Assert(ddl.ErrPartitionMgmtOnNonpartitioned.Equal(err), IsTrue)
}

func (s *testDBSuite1) TestPlacementPolicyCache(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
defer tk.MustExec("drop table if exists t1")

bundles := make(map[string]*placement.Bundle)

tk.MustExec("create table t1(id int) partition by hash(id) partitions 2")

is := s.dom.InfoSchema()
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
partDefs := tb.Meta().GetPartitionInfo().Definitions

rows := []string{}
for _, v := range partDefs {
ptID := placement.GroupID(v.ID)
bundles[ptID] = &placement.Bundle{
ID: ptID,
Rules: []*placement.Rule{
{ID: "default"},
},
}
rows = append(rows, fmt.Sprintf("%s 0 default test t1 %s <nil> 0 ", ptID, v.Name.L))
}

tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows(rows...))
tk.MustExec("drop table t1")
xhebox marked this conversation as resolved.
Show resolved Hide resolved
tk.MustQuery("select * from information_schema.placement_policy").Check(testkit.Rows())
}
37 changes: 36 additions & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
field_types "github.com/pingcap/parser/types"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -183,6 +185,8 @@ func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) {
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State)
case model.StateDeleteOnly:
tblInfo.State = model.StateNone
oldIDs := getPartitionIDs(tblInfo)
job.CtxVars = []interface{}{oldIDs}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != tblInfo.State)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -199,7 +203,7 @@ func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
startKey := tablecodec.EncodeTablePrefix(job.TableID)
job.Args = append(job.Args, startKey, getPartitionIDs(tblInfo))
job.Args = append(job.Args, startKey, oldIDs)
default:
err = ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State)
}
Expand Down Expand Up @@ -321,6 +325,7 @@ func (w *worker) onRecoverTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
}
})

job.CtxVars = []interface{}{tids}
xhebox marked this conversation as resolved.
Show resolved Hide resolved
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -474,6 +479,36 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
}
}

if d.infoHandle != nil && d.infoHandle.IsValid() {
is := d.infoHandle.Get()

bundles := make([]*placement.Bundle, 0, len(oldPartitionIDs)+1)
if oldBundle, ok := is.BundleByName(placement.GroupID(tableID)); ok {
bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newTableID))
}

if pi := tblInfo.GetPartitionInfo(); pi != nil {
oldIDs := make([]int64, 0, len(oldPartitionIDs))
newIDs := make([]int64, 0, len(oldPartitionIDs))
newDefs := pi.Definitions
for i := range oldPartitionIDs {
newID := newDefs[i].ID
if oldBundle, ok := is.BundleByName(placement.GroupID(oldPartitionIDs[i])); ok && !oldBundle.IsEmpty() {
oldIDs = append(oldIDs, oldPartitionIDs[i])
newIDs = append(newIDs, newID)
bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newID))
}
}
job.CtxVars = []interface{}{oldIDs, newIDs}
}

err = infosync.PutRuleBundles(nil, bundles)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
}
}

// Clear the tiflash replica available status.
if tblInfo.TiFlashReplica != nil {
tblInfo.TiFlashReplica.AvailablePartitionIDs = nil
Expand Down
29 changes: 28 additions & 1 deletion infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
oldTableID = diff.TableID
newTableID = diff.TableID
}
// handle placement rule cache
switch diff.Type {
case model.ActionDropTable:
b.applyPlacementDelete(placement.GroupID(oldTableID))
case model.ActionTruncateTable:
b.applyPlacementDelete(placement.GroupID(oldTableID))
if err := b.applyPlacementUpdate(placement.GroupID(newTableID)); err != nil {
return nil, errors.Trace(err)
}
case model.ActionRecoverTable:
if err := b.applyPlacementUpdate(placement.GroupID(newTableID)); err != nil {
return nil, errors.Trace(err)
}
}
xhebox marked this conversation as resolved.
Show resolved Hide resolved
dbInfo := b.copySchemaTables(roDBInfo.Name.L)
b.copySortedTables(oldTableID, newTableID)

Expand Down Expand Up @@ -134,8 +148,21 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
return nil, errors.Trace(err)
}
continue
case model.ActionDropTablePartition:
case model.ActionDropTable, model.ActionDropTablePartition:
b.applyPlacementDelete(placement.GroupID(opt.OldTableID))
continue
case model.ActionTruncateTable:
b.applyPlacementDelete(placement.GroupID(opt.OldTableID))
err := b.applyPlacementUpdate(placement.GroupID(opt.TableID))
if err != nil {
return nil, errors.Trace(err)
}
continue
case model.ActionRecoverTable:
err := b.applyPlacementUpdate(placement.GroupID(opt.TableID))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
var err error
Expand Down