Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MoveTables Cancel: drop denied tables on target when dropping source/target tables #14008

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 31 additions & 26 deletions go/test/endtoend/vreplication/partial_movetables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"
"testing"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"

"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

Expand All @@ -32,43 +34,46 @@ import (
// Before canceling, we first switch traffic to the target keyspace and then reverse it back to the source keyspace.
// This tests that artifacts are being properly cleaned up when a MoveTables ia canceled.
func testCancel(t *testing.T) {
wfName := "partial80DashForCancel"
sourceKs := "customer"
targetKs := "customer2"
shard := "80-"
ksWf := fmt.Sprintf("%s.%s", targetKs, wfName)
targetKeyspace := "customer2"
sourceKeyspace := "customer"
workflowName := "partial80DashForCancel"
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
// We use a different table in this MoveTables than the subsequent one, so that setting up of the artifacts
// while creating MoveTables do not paper over any issues with cleaning up artifacts when MoveTables is canceled.
// Ref: https://github.com/vitessio/vitess/issues/13998
table := "customer2"
shard := "80-"
// start the partial movetables for 80-
err := tstWorkflowExec(t, defaultCellName, wfName, sourceKs, targetKs,
table, workflowActionCreate, "", shard, "", false)
require.NoError(t, err)

targetTab1 = vc.getPrimaryTablet(t, targetKs, shard)
catchup(t, targetTab1, wfName, "Partial MoveTables Customer to Customer2")
vdiff1(t, ksWf, "")
mt := newMoveTables(vc, &moveTables{
workflowName: workflowName,
targetKeyspace: targetKeyspace,
sourceKeyspace: sourceKeyspace,
tables: table,
sourceShards: shard,
}, moveTablesFlavorRandom)
Comment on lines +47 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 🙂

mt.Create()

checkDenyList := func(keyspace string, expected bool) {
validateTableInDenyList(t, vc, fmt.Sprintf("%s:%s", keyspace, shard), table, expected)
}

waitForRowCount(t, vtgateConn, "customer", table, 3) // customer: all shards
waitForRowCount(t, vtgateConn, "customer2", table, 3) // customer2: all shards
waitForRowCount(t, vtgateConn, "customer2:80-", table, 2) // customer2: 80-
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())

validateTableInDenyList(t, vc, "customer2:80-", table, false)
validateTableInDenyList(t, vc, "customer:-80", table, false)
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, false)

require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", false))
mt.SwitchReadsAndWrites()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, true)

validateTableInDenyList(t, vc, "customer2:80-", table, false)
validateTableInDenyList(t, vc, "customer:80-", table, true)
mt.ReverseReadsAndWrites()
checkDenyList(targetKeyspace, true)
checkDenyList(sourceKeyspace, false)

require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionReverseTraffic, "", "", "", false))
validateTableInDenyList(t, vc, "customer2:80-", table, true)
validateTableInDenyList(t, vc, "customer:80-", table, false)
mt.Cancel()
checkDenyList(targetKeyspace, false)
checkDenyList(sourceKeyspace, false)

require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionCancel, "", "", "", false))
validateTableInDenyList(t, vc, "customer2:80-", table, false)
validateTableInDenyList(t, vc, "customer:80-", table, false)
}

// TestPartialMoveTablesBasic tests partial move tables by moving each
Expand Down
25 changes: 20 additions & 5 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type moveTables struct {
sourceKeyspace string
tables string
atomicCopy bool
sourceShards string
}

type iMoveTables interface {
Expand All @@ -53,6 +54,7 @@ type iMoveTables interface {
SwitchReads()
SwitchWrites()
SwitchReadsAndWrites()
ReverseReadsAndWrites()
Cancel()
Complete()
Flavor() string
Expand Down Expand Up @@ -91,7 +93,7 @@ func newVtctlMoveTables(mt *moveTables) *VtctlMoveTables {
func (vmt *VtctlMoveTables) Create() {
log.Infof("vmt is %+v", vmt.vc, vmt.tables)
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionCreate, "", "", "", vmt.atomicCopy)
vmt.tables, workflowActionCreate, "", vmt.sourceShards, "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

Expand All @@ -101,6 +103,12 @@ func (vmt *VtctlMoveTables) SwitchReadsAndWrites() {
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) ReverseReadsAndWrites() {
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionReverseTraffic, "", "", "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) Show() {
//TODO implement me
panic("implement me")
Expand All @@ -117,8 +125,9 @@ func (vmt *VtctlMoveTables) SwitchWrites() {
}

func (vmt *VtctlMoveTables) Cancel() {
//TODO implement me
panic("implement me")
err := tstWorkflowExec(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace,
vmt.tables, workflowActionCancel, "", "", "", vmt.atomicCopy)
require.NoError(vmt.vc.t, err)
}

func (vmt *VtctlMoveTables) Complete() {
Expand Down Expand Up @@ -158,13 +167,20 @@ func (v VtctldMoveTables) Create() {
if v.atomicCopy {
args = append(args, "--atomic-copy="+strconv.FormatBool(v.atomicCopy))
}
if v.sourceShards != "" {
args = append(args, "--source-shards="+v.sourceShards)
}
v.exec(args...)
}

func (v VtctldMoveTables) SwitchReadsAndWrites() {
v.exec("SwitchTraffic")
}

func (v VtctldMoveTables) ReverseReadsAndWrites() {
v.exec("ReverseTraffic")
}

func (v VtctldMoveTables) Show() {
//TODO implement me
panic("implement me")
Expand All @@ -181,8 +197,7 @@ func (v VtctldMoveTables) SwitchWrites() {
}

func (v VtctldMoveTables) Cancel() {
//TODO implement me
panic("implement me")
v.exec("Cancel")
}

func (v VtctldMoveTables) Complete() {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,9 @@ func (s *Server) DropTargets(ctx context.Context, targetKeyspace, workflow strin
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetDeniedTables(ctx); err != nil {
return nil, err
}
case binlogdatapb.MigrationType_SHARDS:
if err := sw.dropTargetShards(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -2074,6 +2077,9 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetDeniedTables(ctx); err != nil {
return nil, err
}

case binlogdatapb.MigrationType_SHARDS:
log.Infof("Removing shards")
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtctl/workflow/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (r *switcher) dropSourceDeniedTables(ctx context.Context) error {
return r.ts.dropSourceDeniedTables(ctx)
}

func (r *switcher) dropTargetDeniedTables(ctx context.Context) error {
return r.ts.dropTargetDeniedTables(ctx)
}

func (r *switcher) validateWorkflowHasCompleted(ctx context.Context) error {
return r.ts.validateWorkflowHasCompleted(ctx)
}
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtctl/workflow/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,17 @@ func (dr *switcherDryRun) dropSourceDeniedTables(ctx context.Context) error {
return nil
}

func (dr *switcherDryRun) dropTargetDeniedTables(ctx context.Context) error {
logs := make([]string, 0)
for _, si := range dr.ts.TargetShards() {
logs = append(logs, fmt.Sprintf("keyspace:%s;shard:%s;tablet:%d", si.Keyspace(), si.ShardName(), si.PrimaryAlias.Uid))
}
if len(logs) > 0 {
dr.drLog.Logf("Denied tables records on [%s] will be removed from: [%s]", strings.Join(dr.ts.Tables(), ","), strings.Join(logs, ","))
}
return nil
}

func (dr *switcherDryRun) logs() *[]string {
return &dr.drLog.logs
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtctl/workflow/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type iswitcher interface {
removeSourceTables(ctx context.Context, removalType TableRemovalType) error
dropSourceShards(ctx context.Context) error
dropSourceDeniedTables(ctx context.Context) error
dropTargetDeniedTables(ctx context.Context) error
freezeTargetVReplication(ctx context.Context) error
dropSourceReverseVReplicationStreams(ctx context.Context) error
dropTargetVReplicationStreams(ctx context.Context) error
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,20 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error {
})
}

func (ts *trafficSwitcher) dropTargetDeniedTables(ctx context.Context) error {
return ts.ForAllTargets(func(target *MigrationTarget) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, true, ts.Tables())
}); err != nil {
return err
}
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
return err
})
}

func (ts *trafficSwitcher) validateWorkflowHasCompleted(ctx context.Context) error {
return doValidateWorkflowHasCompleted(ctx, ts)
}
Expand Down