Skip to content

Commit

Permalink
Merge pull request #4449 from dweitzman/fix_reverse_failure
Browse files Browse the repository at this point in the history
Don't freeze with no serving shards if reversing vreplication fails d…
  • Loading branch information
sougou authored Jan 4, 2019
2 parents 6cf46db + 96e7c5d commit c5c2c02
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/topotools/events"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

const (
Expand Down Expand Up @@ -475,13 +474,13 @@ func (wr *Wrangler) replicaMigrateServedType(ctx context.Context, keyspace strin
// Check and update all source shard records.
// Enable query service if needed
event.DispatchUpdate(ev, "updating shards to migrate from")
if err = wr.updateShardRecords(ctx, fromShards, cells, servedType, true); err != nil {
if err = wr.updateShardRecords(ctx, fromShards, cells, servedType, true, false); err != nil {
return err
}

// Do the same for destination shards
event.DispatchUpdate(ev, "updating shards to migrate to")
if err = wr.updateShardRecords(ctx, toShards, cells, servedType, false); err != nil {
if err = wr.updateShardRecords(ctx, toShards, cells, servedType, false, false); err != nil {
return err
}

Expand Down Expand Up @@ -521,7 +520,7 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
// - wait for filtered replication to catch up
// - mark source shards as frozen
event.DispatchUpdate(ev, "disabling query service on all source masters")
if err := wr.updateShardRecords(ctx, sourceShards, nil, topodatapb.TabletType_MASTER, true); err != nil {
if err := wr.updateShardRecords(ctx, sourceShards, nil, topodatapb.TabletType_MASTER, true, false); err != nil {
wr.cancelMasterMigrateServedTypes(ctx, sourceShards)
return err
}
Expand Down Expand Up @@ -553,6 +552,13 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
// Always setup reverse replication. We'll start it later if reverseReplication was specified.
// This will allow someone to reverse the replication later if they change their mind.
if err := wr.setupReverseReplication(ctx, sourceShards, destinationShards); err != nil {
// It's safe to unfreeze if reverse replication setup fails.
wr.cancelMasterMigrateServedTypes(ctx, sourceShards)
unfreezeErr := wr.updateFrozenFlag(ctx, sourceShards, false)
if unfreezeErr != nil {
wr.Logger().Errorf("Problem recovering for failed reverse replication: %v", unfreezeErr)
}

return err
}

Expand Down Expand Up @@ -606,7 +612,7 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
}

func (wr *Wrangler) cancelMasterMigrateServedTypes(ctx context.Context, sourceShards []*topo.ShardInfo) {
if err := wr.updateShardRecords(ctx, sourceShards, nil, topodatapb.TabletType_MASTER, false); err != nil {
if err := wr.updateShardRecords(ctx, sourceShards, nil, topodatapb.TabletType_MASTER, false, true); err != nil {
wr.Logger().Errorf2(err, "failed to re-enable source masters")
return
}
Expand Down Expand Up @@ -692,9 +698,12 @@ func (wr *Wrangler) startReverseReplication(ctx context.Context, sourceShards []
}

// updateShardRecords updates the shard records based on 'from' or 'to' direction.
func (wr *Wrangler) updateShardRecords(ctx context.Context, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool) (err error) {
func (wr *Wrangler) updateShardRecords(ctx context.Context, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool, clearSourceShards bool) (err error) {
for i, si := range shards {
shards[i], err = wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
if clearSourceShards {
si.SourceShards = nil
}
if err := si.UpdateServedTypesMap(servedType, cells, isFrom /* remove */); err != nil {
return err
}
Expand Down

0 comments on commit c5c2c02

Please sign in to comment.