Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't freeze with no serving shards if reversing vreplication fails d… #4449

Merged
merged 2 commits into from
Jan 4, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/topotools/events"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

const (
Expand Down Expand Up @@ -475,13 +474,13 @@ func (wr *Wrangler) replicaMigrateServedType(ctx context.Context, keyspace strin
// Check and update all source shard records.
// Enable query service if needed
event.DispatchUpdate(ev, "updating shards to migrate from")
if err = wr.updateShardRecords(ctx, fromShards, cells, servedType, true); err != nil {
if err = wr.updateShardRecords(ctx, fromShards, cells, servedType, true, false); err != nil {
return err
}

// Do the same for destination shards
event.DispatchUpdate(ev, "updating shards to migrate to")
if err = wr.updateShardRecords(ctx, toShards, cells, servedType, false); err != nil {
if err = wr.updateShardRecords(ctx, toShards, cells, servedType, false, false); err != nil {
return err
}

Expand Down Expand Up @@ -521,7 +520,7 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
// - wait for filtered replication to catch up
// - mark source shards as frozen
event.DispatchUpdate(ev, "disabling query service on all source masters")
if err := wr.updateShardRecords(ctx, sourceShards, nil, topodatapb.TabletType_MASTER, true); err != nil {
if err := wr.updateShardRecords(ctx, sourceShards, nil, topodatapb.TabletType_MASTER, true, false); err != nil {
wr.cancelMasterMigrateServedTypes(ctx, sourceShards)
return err
}
Expand Down Expand Up @@ -553,6 +552,13 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
// Always setup reverse replication. We'll start it later if reverseReplication was specified.
// This will allow someone to reverse the replication later if they change their mind.
if err := wr.setupReverseReplication(ctx, sourceShards, destinationShards); err != nil {
// It's safe to unfreeze if reverse replication setup fails.
wr.cancelMasterMigrateServedTypes(ctx, sourceShards)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one more corner case to handle here: if setupReverseReplication partially succeeded, it's possible that it would have setup source shards for the reverse. If so, there will be no easy way to tell which way the replication is going, because both sides will have source shards.

If you look in findSourceDest, the presence a frozen flag is used as the tie breaker to disambiguate.

This means that we have to 'remove the source shards from the source shards' (so confusing) before unfreezing. I think we can do this inside the cancelMasterMigrateServedTypes function. It currently calls updateShardRecords. I recommend expanding that out within the function and additionally setting si.SourceShards to nil.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I think

unfreezeErr := wr.updateFrozenFlag(ctx, sourceShards, false)
if unfreezeErr != nil {
wr.Logger().Errorf("Problem recovering for failed reverse replication: %v", unfreezeErr)
}

return err
}

Expand Down Expand Up @@ -606,7 +612,7 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
}

func (wr *Wrangler) cancelMasterMigrateServedTypes(ctx context.Context, sourceShards []*topo.ShardInfo) {
if err := wr.updateShardRecords(ctx, sourceShards, nil, topodatapb.TabletType_MASTER, false); err != nil {
if err := wr.updateShardRecords(ctx, sourceShards, nil, topodatapb.TabletType_MASTER, false, true); err != nil {
wr.Logger().Errorf2(err, "failed to re-enable source masters")
return
}
Expand Down Expand Up @@ -692,9 +698,12 @@ func (wr *Wrangler) startReverseReplication(ctx context.Context, sourceShards []
}

// updateShardRecords updates the shard records based on 'from' or 'to' direction.
func (wr *Wrangler) updateShardRecords(ctx context.Context, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool) (err error) {
func (wr *Wrangler) updateShardRecords(ctx context.Context, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool, clearSourceShards bool) (err error) {
for i, si := range shards {
shards[i], err = wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
if clearSourceShards {
si.SourceShards = nil
}
if err := si.UpdateServedTypesMap(servedType, cells, isFrom /* remove */); err != nil {
return err
}
Expand Down