diff --git a/go/vt/vtadmin/api_test.go b/go/vt/vtadmin/api_test.go index e321d717e15..9b4b9cebca9 100644 --- a/go/vt/vtadmin/api_test.go +++ b/go/vt/vtadmin/api_test.go @@ -727,7 +727,7 @@ func TestGetSchemas(t *testing.T) { for _, tablet := range cts { // AddTablet also adds the keyspace + shard for us. - testutil.AddTablet(context.Background(), t, toposerver, tablet.Tablet) + testutil.AddTablet(context.Background(), t, toposerver, tablet.Tablet, nil) // Adds each SchemaDefinition to the fake TabletManagerClient, or nil // if there are no schemas for that tablet. (All tablet aliases must diff --git a/go/vt/vtctl/grpcvtctldserver/server_test.go b/go/vt/vtctl/grpcvtctldserver/server_test.go index 6bcf28642e2..9861c14ee30 100644 --- a/go/vt/vtctl/grpcvtctldserver/server_test.go +++ b/go/vt/vtctl/grpcvtctldserver/server_test.go @@ -209,9 +209,7 @@ func TestChangeTabletType(t *testing.T) { vtctld := NewVtctldServer(ts) testutil.TabletManagerClient.Topo = ts - for _, tablet := range tt.tablets { - testutil.AddTablet(ctx, t, ts, tablet) - } + testutil.AddTablets(ctx, t, ts, nil, tt.tablets...) resp, err := vtctld.ChangeTabletType(ctx, tt.req) if tt.shouldErr { @@ -257,7 +255,7 @@ func TestChangeTabletType(t *testing.T) { Uid: 100, }, Type: topodatapb.TabletType_REPLICA, - }) + }, nil) _, err := vtctld.ChangeTabletType(ctx, &vtctldatapb.ChangeTabletTypeRequest{ TabletAlias: &topodatapb.TabletAlias{ @@ -1364,7 +1362,7 @@ func TestDeleteShards(t *testing.T) { vtctld := NewVtctldServer(ts) testutil.AddShards(ctx, t, ts, tt.shards...) - testutil.AddTablets(ctx, t, ts, tt.tablets...) + testutil.AddTablets(ctx, t, ts, nil, tt.tablets...) testutil.SetupReplicationGraphs(ctx, t, ts, tt.replicationGraphs...) testutil.UpdateSrvKeyspaces(ctx, t, ts, tt.srvKeyspaces) @@ -1853,9 +1851,7 @@ func TestDeleteTablets(t *testing.T) { vtctld := NewVtctldServer(ts) // Setup tablets and shards - for _, tablet := range tt.tablets { - testutil.AddTablet(ctx, t, ts, tablet) - } + testutil.AddTablets(ctx, t, ts, nil, tt.tablets...) for key, updateFn := range tt.shardFieldUpdates { ks, shard, err := topoproto.ParseKeyspaceShard(key) @@ -2166,7 +2162,7 @@ func TestGetTablet(t *testing.T) { Type: topodatapb.TabletType_REPLICA, } - testutil.AddTablet(ctx, t, ts, tablet) + testutil.AddTablet(ctx, t, ts, tablet, nil) resp, err := vtctld.GetTablet(ctx, &vtctldatapb.GetTabletRequest{ TabletAlias: &topodatapb.TabletAlias{ @@ -2198,14 +2194,14 @@ func TestGetSchema(t *testing.T) { } testutil.AddTablet(ctx, t, ts, &topodatapb.Tablet{ Alias: validAlias, - }) + }, nil) otherAlias := &topodatapb.TabletAlias{ Cell: "zone1", Uid: 101, } testutil.AddTablet(ctx, t, ts, &topodatapb.Tablet{ Alias: otherAlias, - }) + }, nil) // we need to run this on each test case or they will pollute each other setupSchema := func() { @@ -2869,9 +2865,7 @@ func TestGetTablets(t *testing.T) { ts := memorytopo.NewServer(tt.cells...) vtctld := NewVtctldServer(ts) - for _, tablet := range tt.tablets { - testutil.AddTablet(ctx, t, ts, tablet) - } + testutil.AddTablets(ctx, t, ts, nil, tt.tablets...) resp, err := vtctld.GetTablets(ctx, tt.req) if tt.shouldErr { diff --git a/go/vt/vtctl/grpcvtctldserver/testutil/util.go b/go/vt/vtctl/grpcvtctldserver/testutil/util.go index c6298c86556..290e153fe45 100644 --- a/go/vt/vtctl/grpcvtctldserver/testutil/util.go +++ b/go/vt/vtctl/grpcvtctldserver/testutil/util.go @@ -20,6 +20,7 @@ package testutil import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -27,6 +28,7 @@ import ( "google.golang.org/grpc" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtctl/vtctldclient" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -77,6 +79,19 @@ func AddKeyspaces(ctx context.Context, t *testing.T, ts *topo.Server, keyspaces } } +// AddTabletOptions is a container for different behaviors tests need from +// AddTablet. +type AddTabletOptions struct { + // AlsoSetShardMaster is an option to control additional setup to take when + // AddTablet receives a tablet of type MASTER. When set, AddTablet will also + // update the shard record to make that tablet the primary, and fail the + // test if the shard record has a serving primary already. + AlsoSetShardMaster bool + // SkipShardCreation, when set, makes AddTablet never attempt to create a + // shard record in the topo under any circumstances. + SkipShardCreation bool +} + // AddTablet adds a tablet to the topology, failing a test if that tablet record // could not be created. It shallow copies to prevent XXX_ fields from changing, // including nested proto message fields. @@ -84,14 +99,28 @@ func AddKeyspaces(ctx context.Context, t *testing.T, ts *topo.Server, keyspaces // AddTablet also optionally adds empty keyspace and shard records to the // topology, if they are set on the tablet record and they cannot be retrieved // from the topo server without error. -func AddTablet(ctx context.Context, t *testing.T, ts *topo.Server, tablet *topodatapb.Tablet) { +// +// If AddTablet receives a tablet record with a keyspace and shard set, and that +// tablet's type is MASTER, and opts.AlsoSetShardMaster is set, then AddTablet +// will update the shard record to make that tablet the shard master and set the +// shard to serving. If that shard record already has a serving primary, then +// AddTablet will fail the test. +func AddTablet(ctx context.Context, t *testing.T, ts *topo.Server, tablet *topodatapb.Tablet, opts *AddTabletOptions) { in := *tablet alias := *tablet.Alias in.Alias = &alias + if opts == nil { + opts = &AddTabletOptions{} + } + err := ts.CreateTablet(ctx, &in) require.NoError(t, err, "CreateTablet(%+v)", &in) + if opts.SkipShardCreation { + return + } + if tablet.Keyspace != "" { if _, err := ts.GetKeyspace(ctx, tablet.Keyspace); err != nil { err := ts.CreateKeyspace(ctx, tablet.Keyspace, &topodatapb.Keyspace{}) @@ -103,15 +132,30 @@ func AddTablet(ctx context.Context, t *testing.T, ts *topo.Server, tablet *topod err := ts.CreateShard(ctx, tablet.Keyspace, tablet.Shard) require.NoError(t, err, "CreateShard(%s, %s)", tablet.Keyspace, tablet.Shard) } + + if tablet.Type == topodatapb.TabletType_MASTER && opts.AlsoSetShardMaster { + _, err := ts.UpdateShardFields(ctx, tablet.Keyspace, tablet.Shard, func(si *topo.ShardInfo) error { + if si.IsMasterServing && si.MasterAlias != nil { + return fmt.Errorf("shard %v/%v already has a serving master (%v)", tablet.Keyspace, tablet.Shard, topoproto.TabletAliasString(si.MasterAlias)) + } + + si.MasterAlias = tablet.Alias + si.IsMasterServing = true + si.MasterTermStartTime = tablet.MasterTermStartTime + + return nil + }) + require.NoError(t, err, "UpdateShardFields(%s, %s) to set %s as serving primary failed", tablet.Keyspace, tablet.Shard, topoproto.TabletAliasString(tablet.Alias)) + } } } } // AddTablets adds a list of tablets to the topology. See AddTablet for more // details. -func AddTablets(ctx context.Context, t *testing.T, ts *topo.Server, tablets ...*topodatapb.Tablet) { +func AddTablets(ctx context.Context, t *testing.T, ts *topo.Server, opts *AddTabletOptions, tablets ...*topodatapb.Tablet) { for _, tablet := range tablets { - AddTablet(ctx, t, ts, tablet) + AddTablet(ctx, t, ts, tablet, opts) } } @@ -130,6 +174,15 @@ func AddShards(ctx context.Context, t *testing.T, ts *topo.Server, shards ...*vt err := ts.CreateShard(ctx, shard.Keyspace, shard.Name) require.NoError(t, err, "CreateShard(%s/%s)", shard.Keyspace, shard.Name) + + if shard.Shard != nil { + _, err := ts.UpdateShardFields(ctx, shard.Keyspace, shard.Name, func(si *topo.ShardInfo) error { + si.Shard = shard.Shard + + return nil + }) + require.NoError(t, err, "UpdateShardFields(%s/%s, %v)", shard.Keyspace, shard.Name, shard.Shard) + } } } diff --git a/go/vt/vtctl/reparentutil/emergency_reparenter.go b/go/vt/vtctl/reparentutil/emergency_reparenter.go new file mode 100644 index 00000000000..6d5716b1f12 --- /dev/null +++ b/go/vt/vtctl/reparentutil/emergency_reparenter.go @@ -0,0 +1,314 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reparentutil + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + + "vitess.io/vitess/go/event" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools/events" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + logutilpb "vitess.io/vitess/go/vt/proto/logutil" + replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// EmergencyReparenter performs EmergencyReparentShard operations. +type EmergencyReparenter struct { + ts *topo.Server + tmc tmclient.TabletManagerClient + logger logutil.Logger +} + +// EmergencyReparentOptions provides optional parameters to +// EmergencyReparentShard operations. Options are passed by value, so it is safe +// for callers to mutate and reuse options structs for multiple calls. +type EmergencyReparentOptions struct { + NewPrimaryAlias *topodatapb.TabletAlias + IgnoreReplicas sets.String + WaitReplicasTimeout time.Duration + + // Private options managed internally. We use value passing to avoid leaking + // these details back out. + + lockAction string +} + +// NewEmergencyReparenter returns a new EmergencyReparenter object, ready to +// perform EmergencyReparentShard operations using the given topo.Server, +// TabletManagerClient, and logger. +// +// Providing a nil logger instance is allowed. +func NewEmergencyReparenter(ts *topo.Server, tmc tmclient.TabletManagerClient, logger logutil.Logger) *EmergencyReparenter { + erp := EmergencyReparenter{ + ts: ts, + tmc: tmc, + logger: logger, + } + + if erp.logger == nil { + // Create a no-op logger so we can call functions on er.logger without + // needed to constantly check for non-nil. + erp.logger = logutil.NewCallbackLogger(func(*logutilpb.Event) {}) + } + + return &erp +} + +// ReparentShard performs the EmergencyReparentShard operation on the given +// keyspace and shard. +func (erp *EmergencyReparenter) ReparentShard(ctx context.Context, keyspace string, shard string, opts EmergencyReparentOptions) (*events.Reparent, error) { + opts.lockAction = erp.getLockAction(opts.NewPrimaryAlias) + + ctx, unlock, err := erp.ts.LockShard(ctx, keyspace, shard, opts.lockAction) + if err != nil { + return nil, err + } + + defer unlock(&err) + + ev := &events.Reparent{} + defer func() { + switch err { + case nil: + event.DispatchUpdate(ev, "finished EmergencyReparentShard") + default: + event.DispatchUpdate(ev, "failed EmergencyReparentShard: "+err.Error()) + } + }() + + err = erp.reparentShardLocked(ctx, ev, keyspace, shard, opts) + + return ev, err +} + +func (erp *EmergencyReparenter) getLockAction(newPrimaryAlias *topodatapb.TabletAlias) string { + action := "EmergencyReparentShard" + + if newPrimaryAlias != nil { + action += fmt.Sprintf("(%v)", topoproto.TabletAliasString(newPrimaryAlias)) + } + + return action +} + +func (erp *EmergencyReparenter) promoteNewPrimary( + ctx context.Context, + ev *events.Reparent, + keyspace string, + shard string, + newPrimaryTabletAlias string, + tabletMap map[string]*topo.TabletInfo, + statusMap map[string]*replicationdatapb.StopReplicationStatus, + opts EmergencyReparentOptions, +) error { + erp.logger.Infof("promoting tablet %v to master", newPrimaryTabletAlias) + event.DispatchUpdate(ev, "promoting replica") + + newPrimaryTabletInfo, ok := tabletMap[newPrimaryTabletAlias] + if !ok { + return vterrors.Errorf(vtrpc.Code_INTERNAL, "attempted to promote master-elect %v that was not in the tablet map; this an impossible situation", newPrimaryTabletAlias) + } + + rp, err := erp.tmc.PromoteReplica(ctx, newPrimaryTabletInfo.Tablet) + if err != nil { + return vterrors.Wrapf(err, "master-elect tablet %v failed to be upgraded to master: %v", newPrimaryTabletAlias, err) + } + + if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) + } + + replCtx, replCancel := context.WithTimeout(ctx, opts.WaitReplicasTimeout) + defer replCancel() + + event.DispatchUpdate(ev, "reparenting all tablets") + + // (@ajm188) - A question while migrating: Is this by design? By my read, + // there's nothing consuming that error channel, meaning any replica that + // fails to SetMaster will actually block trying to send to the errCh. In + // addition, the only way an operator will ever notice these errors will be + // in the logs on the tablet, and not from any error propagation in + // vtctl/wrangler, so a shard will continue to attempt to serve (probably?) + // after a partially-failed ERS. + now := time.Now().UnixNano() + errCh := make(chan error) + + handlePrimary := func(alias string, ti *topo.TabletInfo) error { + erp.logger.Infof("populating reparent journal on new master %v", alias) + return erp.tmc.PopulateReparentJournal(replCtx, ti.Tablet, now, opts.lockAction, newPrimaryTabletInfo.Alias, rp) + } + + handleReplica := func(alias string, ti *topo.TabletInfo) { + erp.logger.Infof("setting new master on replica %v", alias) + + var err error + defer func() { errCh <- err }() + + forceStart := false + if status, ok := statusMap[alias]; ok { + forceStart = ReplicaWasRunning(status) + } + + err = erp.tmc.SetMaster(replCtx, ti.Tablet, newPrimaryTabletInfo.Alias, now, "", forceStart) + if err != nil { + err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err) + } + } + + for alias, ti := range tabletMap { + switch { + case alias == newPrimaryTabletAlias: + continue + case !opts.IgnoreReplicas.Has(alias): + go handleReplica(alias, ti) + } + } + + primaryErr := handlePrimary(newPrimaryTabletAlias, newPrimaryTabletInfo) + if primaryErr != nil { + erp.logger.Warningf("master failed to PopulateReparentJournal") + replCancel() + + return vterrors.Wrapf(primaryErr, "failed to PopulateReparentJournal on master: %v", primaryErr) + } + + return nil +} + +func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace string, shard string, opts EmergencyReparentOptions) error { + shardInfo, err := erp.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return err + } + + ev.ShardInfo = *shardInfo + + event.DispatchUpdate(ev, "reading all tablets") + + tabletMap, err := erp.ts.GetTabletMapForShard(ctx, keyspace, shard) + if err != nil { + return vterrors.Wrapf(err, "failed to get tablet map for %v/%v: %v", keyspace, shard, err) + } + + statusMap, primaryStatusMap, err := StopReplicationAndBuildStatusMaps(ctx, erp.tmc, ev, tabletMap, opts.WaitReplicasTimeout, opts.IgnoreReplicas, erp.logger) + if err != nil { + return vterrors.Wrapf(err, "failed to stop replication and build status maps: %v", err) + } + + if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) + } + + validCandidates, err := FindValidEmergencyReparentCandidates(statusMap, primaryStatusMap) + if err != nil { + return err + } else if len(validCandidates) == 0 { + return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "no valid candidates for emergency reparent") + } + + // Wait for all candidates to apply relay logs + if err := erp.waitForAllRelayLogsToApply(ctx, validCandidates, tabletMap, statusMap, opts); err != nil { + return err + } + + // Elect the candidate with the most up-to-date position. + var ( + winningPosition mysql.Position + winningPrimaryTabletAliasStr string + ) + + for alias, position := range validCandidates { + if winningPosition.IsZero() || position.AtLeast(winningPosition) { + winningPosition = position + winningPrimaryTabletAliasStr = alias + } + } + + // If we were requested to elect a particular primary, verify it's a valid + // candidate (non-zero position, no errant GTIDs) and is at least as + // advanced as the winning position. + if opts.NewPrimaryAlias != nil { + winningPrimaryTabletAliasStr = topoproto.TabletAliasString(opts.NewPrimaryAlias) + pos, ok := validCandidates[winningPrimaryTabletAliasStr] + switch { + case !ok: + return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "master elect %v has errant GTIDs", winningPrimaryTabletAliasStr) + case !pos.AtLeast(winningPosition): + return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "master elect %v at position %v is not fully caught up. Winning position: %v", winningPrimaryTabletAliasStr, pos, winningPosition) + } + } + + // Check (again) we still have the topology lock. + if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { + return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) + } + + // Do the promotion. + if err := erp.promoteNewPrimary(ctx, ev, keyspace, shard, winningPrimaryTabletAliasStr, tabletMap, statusMap, opts); err != nil { + return err + } + + return nil +} + +func (erp *EmergencyReparenter) waitForAllRelayLogsToApply( + ctx context.Context, + validCandidates map[string]mysql.Position, + tabletMap map[string]*topo.TabletInfo, + statusMap map[string]*replicationdatapb.StopReplicationStatus, + opts EmergencyReparentOptions, +) error { + errCh := make(chan error) + defer close(errCh) + + groupCtx, groupCancel := context.WithTimeout(ctx, opts.WaitReplicasTimeout) + defer groupCancel() + + for candidate := range validCandidates { + go func(alias string) { + var err error + defer func() { errCh <- err }() + err = WaitForRelayLogsToApply(groupCtx, erp.tmc, tabletMap[alias], statusMap[alias]) + }(candidate) + } + + errgroup := concurrency.ErrorGroup{ + NumGoroutines: len(validCandidates), + NumRequiredSuccesses: len(validCandidates), + NumAllowedErrors: 0, + } + rec := errgroup.Wait(groupCancel, errCh) + + if len(rec.Errors) != 0 { + return vterrors.Wrapf(rec.Error(), "could not apply all relay logs within the provided WaitReplicasTimeout (%s): %v", opts.WaitReplicasTimeout, rec.Error()) + } + + return nil +} diff --git a/go/vt/vtctl/reparentutil/emergency_reparenter_test.go b/go/vt/vtctl/reparentutil/emergency_reparenter_test.go new file mode 100644 index 00000000000..d47cee02f38 --- /dev/null +++ b/go/vt/vtctl/reparentutil/emergency_reparenter_test.go @@ -0,0 +1,1651 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reparentutil + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/sets" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools/events" + "vitess.io/vitess/go/vt/vtctl/grpcvtctldserver/testutil" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" +) + +func TestNewEmergencyReparenter(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + logger logutil.Logger + }{ + { + name: "default case", + logger: logutil.NewMemoryLogger(), + }, + { + name: "overrides nil logger with no-op", + logger: nil, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + er := NewEmergencyReparenter(nil, nil, tt.logger) + assert.NotNil(t, er.logger, "NewEmergencyReparenter should never result in a nil logger instance on the EmergencyReparenter") + }) + } +} + +func TestEmergencyReparenter_getLockAction(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + alias *topodatapb.TabletAlias + expected string + msg string + }{ + { + name: "explicit new primary specified", + alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + expected: "EmergencyReparentShard(zone1-0000000100)", + msg: "lockAction should include tablet alias", + }, + { + name: "user did not specify new primary elect", + alias: nil, + expected: "EmergencyReparentShard", + msg: "lockAction should omit parens when no primary elect passed", + }, + } + + erp := &EmergencyReparenter{} + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + actual := erp.getLockAction(tt.alias) + assert.Equal(t, tt.expected, actual, tt.msg) + }) + } +} + +func TestEmergencyReparenter_reparentShardLocked(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + // setup + ts *topo.Server + tmc *emergencyReparenterTestTMClient + unlockTopo bool + shards []*vtctldatapb.Shard + tablets []*topodatapb.Tablet + // params + keyspace string + shard string + opts EmergencyReparentOptions + // results + shouldErr bool + }{ + { + name: "success", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PopulateReparentJournalResults: map[string]error{ + "zone1-0000000102": nil, + }, + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000102": { + Result: "ok", + Error: nil, + }, + }, + StopReplicationAndGetStatusResults: map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000102": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-26", + }, + }, + }, + }, + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000101": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000102": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-26": nil, + }, + }, + }, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Keyspace: "testkeyspace", + Shard: "-", + Hostname: "most up-to-date position, wins election", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{}, + shouldErr: false, + }, + { + // Here, all our tablets are tied, so we're going to explicitly pick + // zone1-101. + name: "success with requested primary-elect", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PopulateReparentJournalResults: map[string]error{ + "zone1-0000000101": nil, + }, + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000101": { + Result: "ok", + Error: nil, + }, + }, + StopReplicationAndGetStatusResults: map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000102": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + }, + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000101": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000102": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + }, + }, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{ + NewPrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + shouldErr: false, + }, + { + name: "shard not found", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{}, + unlockTopo: true, // we shouldn't try to lock the nonexistent shard + shards: nil, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "cannot stop replication", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + StopReplicationAndGetStatusResults: map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + }{ + // We actually need >1 to fail here. + "zone1-0000000100": { + Error: assert.AnError, + }, + "zone1-0000000101": { + Error: assert.AnError, + }, + "zone1-0000000102": { + Error: assert.AnError, + }, + }, + }, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "lost topo lock", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + StopReplicationAndGetStatusResults: map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{}, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{}, + }, + "zone1-0000000102": { + StopStatus: &replicationdatapb.StopReplicationStatus{}, + }, + }, + }, + unlockTopo: true, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "cannot get reparent candidates", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + StopReplicationAndGetStatusResults: map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000102": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{}, + }, + }, + }, + }, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Keyspace: "testkeyspace", + Shard: "-", + Hostname: "has a zero relay log position", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "zero valid reparent candidates", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{}, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "error waiting for relay logs to apply", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + StopReplicationAndGetStatusResults: map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000102": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + }, + WaitForPositionDelays: map[string]time.Duration{ + "zone1-0000000101": time.Minute, + }, + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000101": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000102": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": assert.AnError, + }, + }, + }, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Keyspace: "testkeyspace", + Shard: "-", + Hostname: "slow to apply relay logs", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Keyspace: "testkeyspace", + Shard: "-", + Hostname: "fails to apply relay logs", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{ + WaitReplicasTimeout: time.Millisecond * 50, // one replica is going to take a minute to apply relay logs + }, + shouldErr: true, + }, + { + name: "requested primary-elect is not in tablet map", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + StopReplicationAndGetStatusResults: map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000102": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + }, + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000101": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000102": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + }, + }, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{ + NewPrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 200, + }, + }, + shouldErr: true, + }, + { + name: "requested primary-elect is not winning primary-elect", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + StopReplicationAndGetStatusResults: map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000102": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-20", + }, + }, + }, + }, + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000101": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000102": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-20": nil, + }, + }, + }, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Keyspace: "testkeyspace", + Shard: "-", + Hostname: "not most up-to-date position", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{ + NewPrimaryAlias: &topodatapb.TabletAlias{ // we're requesting a tablet that's behind in replication + Cell: "zone1", + Uid: 102, + }, + }, + shouldErr: true, + }, + { + name: "cannot promote new primary", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000102": { + Error: assert.AnError, + }, + }, + StopReplicationAndGetStatusResults: map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + }{ + "zone1-0000000100": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000101": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + "zone1-0000000102": { + StopStatus: &replicationdatapb.StopReplicationStatus{ + After: &replicationdatapb.Status{ + MasterUuid: "3E11FA47-71CA-11E1-9E33-C80AA9429562", + RelayLogPosition: "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21", + }, + }, + }, + }, + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000101": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + "zone1-0000000102": { + "MySQL56/3E11FA47-71CA-11E1-9E33-C80AA9429562:1-21": nil, + }, + }, + }, + shards: []*vtctldatapb.Shard{ + { + Keyspace: "testkeyspace", + Name: "-", + }, + }, + tablets: []*topodatapb.Tablet{ + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + Keyspace: "testkeyspace", + Shard: "-", + }, + { + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Keyspace: "testkeyspace", + Shard: "-", + Hostname: "not most up-to-date position", + }, + }, + keyspace: "testkeyspace", + shard: "-", + opts: EmergencyReparentOptions{ + // We're explicitly requesting a primary-elect in this test case + // because we don't care about the correctness of the selection + // code (it's covered by other test cases), and it simplifies + // the error mocking. + NewPrimaryAlias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + shouldErr: true, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := logutil.NewMemoryLogger() + ev := &events.Reparent{} + + testutil.AddShards(ctx, t, tt.ts, tt.shards...) + testutil.AddTablets(ctx, t, tt.ts, nil, tt.tablets...) + + if !tt.unlockTopo { + lctx, unlock, lerr := tt.ts.LockShard(ctx, tt.keyspace, tt.shard, "test lock") + require.NoError(t, lerr, "could not lock %s/%s for testing", tt.keyspace, tt.shard) + + defer func() { + unlock(&lerr) + require.NoError(t, lerr, "could not unlock %s/%s after test", tt.keyspace, tt.shard) + }() + + ctx = lctx // make the reparentShardLocked call use the lock ctx + } + + erp := NewEmergencyReparenter(tt.ts, tt.tmc, logger) + + err := erp.reparentShardLocked(ctx, ev, tt.keyspace, tt.shard, tt.opts) + if tt.shouldErr { + assert.Error(t, err) + + return + } + + assert.NoError(t, err) + }) + } +} + +func TestEmergencyReparenter_promoteNewPrimary(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + ts *topo.Server + tmc *emergencyReparenterTestTMClient + unlockTopo bool + keyspace string + shard string + newPrimaryTabletAlias string + tabletMap map[string]*topo.TabletInfo + statusMap map[string]*replicationdatapb.StopReplicationStatus + opts EmergencyReparentOptions + shouldErr bool + }{ + { + name: "success", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PopulateReparentJournalResults: map[string]error{ + "zone1-0000000100": nil, + }, + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000100": { + Error: nil, + }, + }, + SetMasterResults: map[string]error{ + "zone1-0000000101": nil, + "zone1-0000000102": nil, + "zone1-0000000404": assert.AnError, // okay, because we're ignoring it. + }, + }, + keyspace: "testkeyspace", + shard: "-", + newPrimaryTabletAlias: "zone1-0000000100", + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + Hostname: "primary-elect", + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + "zone1-00000000102": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + Hostname: "requires force start", + }, + }, + "zone1-00000000404": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 404, + }, + Hostname: "ignored tablet", + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000101": { // forceStart = false + Before: &replicationdatapb.Status{ + IoThreadRunning: false, + SqlThreadRunning: false, + }, + }, + "zone1-0000000102": { // forceStart = true + Before: &replicationdatapb.Status{ + IoThreadRunning: true, + SqlThreadRunning: true, + }, + }, + }, + opts: EmergencyReparentOptions{ + IgnoreReplicas: sets.NewString("zone1-0000000404"), + }, + shouldErr: false, + }, + { + name: "primary not in tablet map", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{}, + keyspace: "testkeyspace", + shard: "-", + newPrimaryTabletAlias: "zone2-0000000200", + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": {}, + "zone1-0000000101": {}, + }, + opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "PromoteReplica error", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000100": { + Error: assert.AnError, + }, + }, + }, + keyspace: "testkeyspace", + shard: "-", + newPrimaryTabletAlias: "zone1-0000000100", + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{}, + opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "lost topology lock", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000100": { + Error: nil, + }, + }, + }, + unlockTopo: true, + keyspace: "testkeyspace", + shard: "-", + newPrimaryTabletAlias: "zone1-0000000100", + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{}, + opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "cannot repopulate reparent journal on new primary", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PopulateReparentJournalResults: map[string]error{ + "zone1-0000000100": assert.AnError, + }, + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000100": { + Error: nil, + }, + }, + }, + keyspace: "testkeyspace", + shard: "-", + newPrimaryTabletAlias: "zone1-0000000100", + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{}, + opts: EmergencyReparentOptions{}, + shouldErr: true, + }, + { + name: "replicas failing to SetMaster does not fail the promotion", + ts: memorytopo.NewServer("zone1"), + tmc: &emergencyReparenterTestTMClient{ + PopulateReparentJournalResults: map[string]error{ + "zone1-0000000100": nil, + }, + PromoteReplicaResults: map[string]struct { + Result string + Error error + }{ + "zone1-0000000100": { + Error: nil, + }, + }, + SetMasterResults: map[string]error{ + // everyone fails, who cares?! + "zone1-0000000101": assert.AnError, + "zone1-0000000102": assert.AnError, + }, + }, + keyspace: "testkeyspace", + shard: "-", + newPrimaryTabletAlias: "zone1-0000000100", + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + "zone1-00000000102": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{}, + opts: EmergencyReparentOptions{}, + shouldErr: false, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + if tt.ts == nil { + t.Skip("toposerver is nil, assuming we're working on other test cases") + } + + ctx := context.Background() + logger := logutil.NewMemoryLogger() + ev := &events.Reparent{} + + testutil.AddShards(ctx, t, tt.ts, &vtctldatapb.Shard{ + Keyspace: tt.keyspace, + Name: tt.shard, + }) + + if !tt.unlockTopo { + var ( + unlock func(*error) + lerr error + ) + + ctx, unlock, lerr = tt.ts.LockShard(ctx, tt.keyspace, tt.shard, "test lock") + require.NoError(t, lerr, "could not lock %s/%s for test", tt.keyspace, tt.shard) + + defer func() { + unlock(&lerr) + require.NoError(t, lerr, "could not unlock %s/%s after test", tt.keyspace, tt.shard) + }() + } + + erp := NewEmergencyReparenter(tt.ts, tt.tmc, logger) + + err := erp.promoteNewPrimary(ctx, ev, tt.keyspace, tt.shard, tt.newPrimaryTabletAlias, tt.tabletMap, tt.statusMap, tt.opts) + if tt.shouldErr { + assert.Error(t, err) + + return + } + + assert.NoError(t, err) + }) + } +} + +func TestEmergencyReparenter_waitForAllRelayLogsToApply(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := logutil.NewMemoryLogger() + opts := EmergencyReparentOptions{ + WaitReplicasTimeout: time.Millisecond * 50, + } + tests := []struct { + name string + tmc *emergencyReparenterTestTMClient + candidates map[string]mysql.Position + tabletMap map[string]*topo.TabletInfo + statusMap map[string]*replicationdatapb.StopReplicationStatus + shouldErr bool + }{ + { + name: "all tablet pass", + tmc: &emergencyReparenterTestTMClient{ + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "position1": nil, + }, + "zone1-0000000101": { + "position1": nil, + }, + }, + }, + candidates: map[string]mysql.Position{ + "zone1-0000000100": {}, + "zone1-0000000101": {}, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000100": { + After: &replicationdatapb.Status{ + RelayLogPosition: "position1", + }, + }, + "zone1-0000000101": { + After: &replicationdatapb.Status{ + RelayLogPosition: "position1", + }, + }, + }, + shouldErr: false, + }, + { + name: "one tablet fails", + tmc: &emergencyReparenterTestTMClient{ + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "position1": nil, + }, + "zone1-0000000101": { + "position1": nil, + }, + }, + }, + candidates: map[string]mysql.Position{ + "zone1-0000000100": {}, + "zone1-0000000101": {}, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000100": { + After: &replicationdatapb.Status{ + RelayLogPosition: "position1", + }, + }, + "zone1-0000000101": { + After: &replicationdatapb.Status{ + RelayLogPosition: "position2", // cannot wait for the desired "position1", so we fail + }, + }, + }, + shouldErr: true, + }, + { + name: "multiple tablets fail", + tmc: &emergencyReparenterTestTMClient{ + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "position1": nil, + }, + "zone1-0000000101": { + "position2": nil, + }, + "zone1-0000000102": { + "position3": nil, + }, + }, + }, + candidates: map[string]mysql.Position{ + "zone1-0000000100": {}, + "zone1-0000000101": {}, + "zone1-0000000102": {}, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + "zone1-0000000102": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 102, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000100": { + After: &replicationdatapb.Status{ + RelayLogPosition: "position1", + }, + }, + "zone1-0000000101": { + After: &replicationdatapb.Status{ + RelayLogPosition: "position1", + }, + }, + "zone1-0000000102": { + After: &replicationdatapb.Status{ + RelayLogPosition: "position1", + }, + }, + }, + shouldErr: true, + }, + { + name: "one slow tablet", + tmc: &emergencyReparenterTestTMClient{ + WaitForPositionDelays: map[string]time.Duration{ + "zone1-0000000101": time.Minute, + }, + WaitForPositionResults: map[string]map[string]error{ + "zone1-0000000100": { + "position1": nil, + }, + "zone1-0000000101": { + "position1": nil, + }, + }, + }, + candidates: map[string]mysql.Position{ + "zone1-0000000100": {}, + "zone1-0000000101": {}, + }, + tabletMap: map[string]*topo.TabletInfo{ + "zone1-0000000100": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + }, + }, + "zone1-0000000101": { + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 101, + }, + }, + }, + }, + statusMap: map[string]*replicationdatapb.StopReplicationStatus{ + "zone1-0000000100": { + After: &replicationdatapb.Status{ + RelayLogPosition: "position1", + }, + }, + "zone1-0000000101": { + After: &replicationdatapb.Status{ + RelayLogPosition: "position1", + }, + }, + }, + shouldErr: true, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + erp := NewEmergencyReparenter(nil, tt.tmc, logger) + err := erp.waitForAllRelayLogsToApply(ctx, tt.candidates, tt.tabletMap, tt.statusMap, opts) + if tt.shouldErr { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + }) + } +} + +type emergencyReparenterTestTMClient struct { + tmclient.TabletManagerClient + // keyed by tablet alias + PopulateReparentJournalResults map[string]error + // keyed by tablet alias. + PromoteReplicaResults map[string]struct { + Result string + Error error + } + // keyed by tablet alias. + SetMasterResults map[string]error + // keyed by tablet alias. + StopReplicationAndGetStatusDelays map[string]time.Duration + // keyed by tablet alias. + StopReplicationAndGetStatusResults map[string]struct { + Status *replicationdatapb.Status + StopStatus *replicationdatapb.StopReplicationStatus + Error error + } + // keyed by tablet alias. + WaitForPositionDelays map[string]time.Duration + // WaitForPosition(tablet *topodatapb.Tablet, position string) error, so we + // key by tablet alias and then by position. + WaitForPositionResults map[string]map[string]error +} + +func (fake *emergencyReparenterTestTMClient) PopulateReparentJournal(ctx context.Context, tablet *topodatapb.Tablet, timeCreatedNS int64, actionName string, primaryAlias *topodatapb.TabletAlias, pos string) error { + if fake.PopulateReparentJournalResults == nil { + return assert.AnError + } + + key := topoproto.TabletAliasString(tablet.Alias) + if result, ok := fake.PopulateReparentJournalResults[key]; ok { + return result + } + + return assert.AnError +} + +func (fake *emergencyReparenterTestTMClient) PromoteReplica(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { + if fake.PromoteReplicaResults == nil { + return "", assert.AnError + } + + key := topoproto.TabletAliasString(tablet.Alias) + if result, ok := fake.PromoteReplicaResults[key]; ok { + return result.Result, result.Error + } + + return "", assert.AnError +} + +func (fake *emergencyReparenterTestTMClient) SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool) error { + if fake.SetMasterResults == nil { + return assert.AnError + } + + key := topoproto.TabletAliasString(tablet.Alias) + if result, ok := fake.SetMasterResults[key]; ok { + return result + } + + return assert.AnError +} + +func (fake *emergencyReparenterTestTMClient) StopReplicationAndGetStatus(ctx context.Context, tablet *topodatapb.Tablet, mode replicationdatapb.StopReplicationMode) (*replicationdatapb.Status, *replicationdatapb.StopReplicationStatus, error) { + if fake.StopReplicationAndGetStatusResults == nil { + return nil, nil, assert.AnError + } + + if tablet.Alias == nil { + return nil, nil, assert.AnError + } + + key := topoproto.TabletAliasString(tablet.Alias) + + if fake.StopReplicationAndGetStatusDelays != nil { + if delay, ok := fake.StopReplicationAndGetStatusDelays[key]; ok { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case <-time.After(delay): + // proceed to results + } + } + } + + if result, ok := fake.StopReplicationAndGetStatusResults[key]; ok { + return result.Status, result.StopStatus, result.Error + } + + return nil, nil, assert.AnError +} + +func (fake *emergencyReparenterTestTMClient) WaitForPosition(ctx context.Context, tablet *topodatapb.Tablet, position string) error { + tabletKey := topoproto.TabletAliasString(tablet.Alias) + + if fake.WaitForPositionDelays != nil { + if delay, ok := fake.WaitForPositionDelays[tabletKey]; ok { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + // proceed to results + } + } + } + + if fake.WaitForPositionResults == nil { + return assert.AnError + } + + tabletResultsByPosition, ok := fake.WaitForPositionResults[tabletKey] + if !ok { + return assert.AnError + } + + result, ok := tabletResultsByPosition[position] + if !ok { + return assert.AnError + } + + return result +} diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 57ebf3aefef..1a7bfe779e3 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -48,7 +48,7 @@ import ( const ( plannedReparentShardOperation = "PlannedReparentShard" - emergencyReparentShardOperation = "EmergencyReparentShard" + emergencyReparentShardOperation = "EmergencyReparentShard" //nolint tabletExternallyReparentedOperation = "TabletExternallyReparented" //nolint ) @@ -541,182 +541,20 @@ func (wr *Wrangler) plannedReparentShardLocked(ctx context.Context, ev *events.R // EmergencyReparentShard will make the provided tablet the master for // the shard, when the old master is completely unreachable. func (wr *Wrangler) EmergencyReparentShard(ctx context.Context, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, ignoredTablets sets.String) (err error) { - // lock the shard - actionMsg := emergencyReparentShardOperation - if masterElectTabletAlias != nil { - actionMsg += fmt.Sprintf("(%v)", topoproto.TabletAliasString(masterElectTabletAlias)) - } - ctx, unlock, lockErr := wr.ts.LockShard(ctx, keyspace, shard, actionMsg) - if lockErr != nil { - return lockErr - } - defer unlock(&err) + _, err = reparentutil.NewEmergencyReparenter(wr.ts, wr.tmc, wr.logger).ReparentShard( + ctx, + keyspace, + shard, + reparentutil.EmergencyReparentOptions{ + NewPrimaryAlias: masterElectTabletAlias, + WaitReplicasTimeout: waitReplicasTimeout, + IgnoreReplicas: ignoredTablets, + }, + ) - // Create reusable Reparent event with available info - ev := &events.Reparent{} - - // do the work - err = wr.emergencyReparentShardLocked(ctx, ev, keyspace, shard, masterElectTabletAlias, waitReplicasTimeout, ignoredTablets) - if err != nil { - event.DispatchUpdate(ev, "failed EmergencyReparentShard: "+err.Error()) - } else { - event.DispatchUpdate(ev, "finished EmergencyReparentShard") - } return err } -func (wr *Wrangler) emergencyReparentShardLocked(ctx context.Context, ev *events.Reparent, keyspace, shard string, masterElectTabletAlias *topodatapb.TabletAlias, waitReplicasTimeout time.Duration, ignoredTablets sets.String) error { - shardInfo, err := wr.ts.GetShard(ctx, keyspace, shard) - if err != nil { - return err - } - ev.ShardInfo = *shardInfo - - event.DispatchUpdate(ev, "reading all tablets") - tabletMap, err := wr.ts.GetTabletMapForShard(ctx, keyspace, shard) - if err != nil { - return vterrors.Wrapf(err, "failed to get tablet map for shard %v in keyspace %v: %v", shard, keyspace, err) - } - - statusMap, masterStatusMap, err := reparentutil.StopReplicationAndBuildStatusMaps(ctx, wr.tmc, ev, tabletMap, waitReplicasTimeout, ignoredTablets, wr.logger) - if err != nil { - return vterrors.Wrapf(err, "failed to stop replication and build status maps: %v", err) - } - - // Check we still have the topology lock. - if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) - } - - validCandidates, err := reparentutil.FindValidEmergencyReparentCandidates(statusMap, masterStatusMap) - if err != nil { - return err - } - if len(validCandidates) == 0 { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no valid candidates for emergency reparent") - } - - errChan := make(chan error) - rec := &concurrency.AllErrorRecorder{} - groupCtx, groupCancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer groupCancel() - for candidate := range validCandidates { - go func(alias string) { - var err error - defer func() { errChan <- err }() - err = reparentutil.WaitForRelayLogsToApply(groupCtx, wr.tmc, tabletMap[alias], statusMap[alias]) - }(candidate) - } - - resultCounter := 0 - for waitErr := range errChan { - resultCounter++ - if waitErr != nil { - rec.RecordError(waitErr) - groupCancel() - } - if resultCounter == len(validCandidates) { - break - } - } - if len(rec.Errors) != 0 { - return vterrors.Wrapf(rec.Error(), "could not apply all relay logs within the provided wait_replicas_timeout: %v", rec.Error()) - } - - var winningPosition mysql.Position - var newMasterTabletAliasStr string - for alias, position := range validCandidates { - if winningPosition.IsZero() { - winningPosition = position - newMasterTabletAliasStr = alias - continue - } - if position.AtLeast(winningPosition) { - winningPosition = position - newMasterTabletAliasStr = alias - } - } - - if masterElectTabletAlias != nil { - newMasterTabletAliasStr = topoproto.TabletAliasString(masterElectTabletAlias) - masterPos, ok := validCandidates[newMasterTabletAliasStr] - if !ok { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect %v has errant GTIDs", newMasterTabletAliasStr) - } - if !masterPos.AtLeast(winningPosition) { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "master elect: %v at position %v, is not fully caught up. Winning position: %v", newMasterTabletAliasStr, masterPos, winningPosition) - } - } - - // Check we still have the topology lock. - if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) - } - - // Promote the masterElect - wr.logger.Infof("promote tablet %v to master", newMasterTabletAliasStr) - event.DispatchUpdate(ev, "promoting replica") - rp, err := wr.tmc.PromoteReplica(ctx, tabletMap[newMasterTabletAliasStr].Tablet) - if err != nil { - return vterrors.Wrapf(err, "master-elect tablet %v failed to be upgraded to master: %v", newMasterTabletAliasStr, err) - } - - // Check we still have the topology lock. - if err := topo.CheckShardLocked(ctx, keyspace, shard); err != nil { - return vterrors.Wrapf(err, "lost topology lock, aborting: %v", err) - } - - // Create a cancelable context for the following RPCs. - // If error conditions happen, we can cancel all outgoing RPCs. - replCtx, replCancel := context.WithTimeout(ctx, waitReplicasTimeout) - defer replCancel() - - // Reset replication on all replicas to point to the new master, and - // insert test row in the new master. - // Go through all the tablets: - // - new master: populate the reparent journal - // - everybody else: reparent to new master, wait for row - event.DispatchUpdate(ev, "reparenting all tablets") - now := time.Now().UnixNano() - errChan = make(chan error) - - handleMaster := func(alias string, tabletInfo *topo.TabletInfo) error { - wr.logger.Infof("populating reparent journal on new master %v", alias) - return wr.tmc.PopulateReparentJournal(replCtx, tabletInfo.Tablet, now, emergencyReparentShardOperation, tabletMap[newMasterTabletAliasStr].Alias, rp) - } - handleReplica := func(alias string, tabletInfo *topo.TabletInfo) { - var err error - defer func() { errChan <- err }() - - wr.logger.Infof("setting new master on replica %v", alias) - forceStart := false - if status, ok := statusMap[alias]; ok { - forceStart = reparentutil.ReplicaWasRunning(status) - } - err = wr.tmc.SetMaster(replCtx, tabletInfo.Tablet, tabletMap[newMasterTabletAliasStr].Alias, now, "", forceStart) - if err != nil { - err = vterrors.Wrapf(err, "tablet %v SetMaster failed: %v", alias, err) - } - } - - for alias, tabletInfo := range tabletMap { - if alias == newMasterTabletAliasStr { - continue - } else if !ignoredTablets.Has(alias) { - go handleReplica(alias, tabletInfo) - } - } - - masterErr := handleMaster(newMasterTabletAliasStr, tabletMap[newMasterTabletAliasStr]) - if masterErr != nil { - wr.logger.Warningf("master failed to PopulateReparentJournal") - replCancel() - return vterrors.Wrapf(masterErr, "failed to PopulateReparentJournal on master: %v", masterErr) - } - - return nil -} - // TabletExternallyReparented changes the type of new master for this shard to MASTER // and updates it's tablet record in the topo. Updating the shard record is handled // by the new master tablet