Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to PRS #13623

Merged
merged 11 commits into from
Aug 2, 2023
31 changes: 31 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ type TabletManagerClient struct {
Position *replicationdatapb.Status
Error error
}
PrimaryStatusDelays map[string]time.Duration
PrimaryStatusResults map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
}
RestoreFromBackupResults map[string]struct {
Events []*logutilpb.Event
EventInterval time.Duration
Expand Down Expand Up @@ -870,6 +875,32 @@ func (fake *TabletManagerClient) ReplicationStatus(ctx context.Context, tablet *
return nil, assert.AnError
}

// PrimaryStatus is part of the tmclient.TabletManagerClient interface.
func (fake *TabletManagerClient) PrimaryStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) {
if fake.PrimaryStatusResults == nil {
return nil, assert.AnError
}

key := topoproto.TabletAliasString(tablet.Alias)

if fake.PrimaryStatusDelays != nil {
if delay, ok := fake.PrimaryStatusDelays[key]; ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
// proceed to results
}
}
}

if result, ok := fake.PrimaryStatusResults[key]; ok {
return result.Status, result.Error
}

return nil, assert.AnError
}

type backupRestoreStreamAdapter struct {
*grpcshim.BidiStream
ch chan *logutilpb.Event
Expand Down
33 changes: 31 additions & 2 deletions go/vt/vtctl/reparentutil/planned_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,11 @@ func (pr *PlannedReparenter) reparentShardLocked(
return err
}

err = pr.verifyAllTabletsReachable(ctx, tabletMap)
if err != nil {
return err
}
Comment on lines +522 to +525
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can run this even before we acquire the shard lock... I don't have strong opinions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we don't need the lock, if we hold it we prevent something else from changing things out from under us while we verify the reachability, so I'm in favor of keeping it here


// Check invariants that PlannedReparentShard depends on.
if isNoop, err := pr.preflightChecks(ctx, ev, keyspace, shard, tabletMap, &opts); err != nil {
return err
Expand Down Expand Up @@ -572,12 +577,12 @@ func (pr *PlannedReparenter) reparentShardLocked(
// inserted in the new primary's journal, so we can use it below to check
// that all the replicas have attached to new primary successfully.
switch {
case currentPrimary == nil && ev.ShardInfo.PrimaryAlias == nil:
case currentPrimary == nil && ev.ShardInfo.PrimaryTermStartTime == nil:
// Case (1): no primary has been elected ever. Initialize
// the primary-elect tablet
reparentJournalPos, err = pr.performInitialPromotion(ctx, ev.NewPrimary, opts)
needsRefresh = true
case currentPrimary == nil && ev.ShardInfo.PrimaryAlias != nil:
case currentPrimary == nil && ev.ShardInfo.PrimaryTermStartTime != nil:
// Case (2): no clear current primary. Try to find a safe promotion
// candidate, and promote to it.
err = pr.performPotentialPromotion(ctx, keyspace, shard, ev.NewPrimary, tabletMap, opts)
Expand Down Expand Up @@ -713,3 +718,27 @@ func (pr *PlannedReparenter) reparentTablets(

return nil
}

// verifyAllTabletsReachable verifies that all the tablets are reachable when running PRS.
func (pr *PlannedReparenter) verifyAllTabletsReachable(ctx context.Context, tabletMap map[string]*topo.TabletInfo) error {
// Create a cancellable context for the entire set of RPCs to verify reachability.
verifyCtx, verifyCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer verifyCancel()

var (
wg sync.WaitGroup
rec concurrency.AllErrorRecorder
)

for _, info := range tabletMap {
wg.Add(1)
go func(tablet *topodatapb.Tablet) {
defer wg.Done()
_, err := pr.tmc.PrimaryStatus(verifyCtx, tablet)
rec.RecordError(err)
}(info.Tablet)
}

wg.Wait()
return rec.Error()
}
182 changes: 182 additions & 0 deletions go/vt/vtctl/reparentutil/planned_reparenter_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3638,3 +3638,185 @@ func AssertReparentEventsEqual(t *testing.T, expected *events.Reparent, actual *

AssertReparentEventsEqualWithMessage(t, expected, actual, "")
}

// TestPlannedReparenter_verifyAllTabletsReachable tests the functionality of verifyAllTabletsReachable.
func TestPlannedReparenter_verifyAllTabletsReachable(t *testing.T) {
tests := []struct {
name string
ts *topo.Server
tmc tmclient.TabletManagerClient
tabletMap map[string]*topo.TabletInfo
remoteOpTime time.Duration
wantErr string
}{
{
name: "Success",
tmc: &testutil.TabletManagerClient{
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
}{
"zone1-0000000200": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000201": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
},
},
},
tabletMap: map[string]*topo.TabletInfo{
"zone1-0000000100": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Type: topodatapb.TabletType_PRIMARY,
},
},
"zone1-0000000200": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 200,
},
Type: topodatapb.TabletType_REPLICA,
},
},
"zone1-0000000201": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 201,
},
Type: topodatapb.TabletType_REPLICA,
},
},
},
}, {
name: "Failure",
tmc: &testutil.TabletManagerClient{
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
}{
"zone1-0000000200": {
Error: fmt.Errorf("primary status failed"),
},
"zone1-0000000201": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
},
},
},
tabletMap: map[string]*topo.TabletInfo{
"zone1-0000000100": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Type: topodatapb.TabletType_PRIMARY,
},
},
"zone1-0000000200": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 200,
},
Type: topodatapb.TabletType_REPLICA,
},
},
"zone1-0000000201": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 201,
},
Type: topodatapb.TabletType_REPLICA,
},
},
},
wantErr: "primary status failed",
}, {
name: "Timeout",
tmc: &testutil.TabletManagerClient{
PrimaryStatusDelays: map[string]time.Duration{
"zone1-0000000100": 20 * time.Second,
},
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
}{
"zone1-0000000200": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000201": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
},
},
},
remoteOpTime: 100 * time.Millisecond,
tabletMap: map[string]*topo.TabletInfo{
"zone1-0000000100": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
Type: topodatapb.TabletType_PRIMARY,
},
},
"zone1-0000000200": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 200,
},
Type: topodatapb.TabletType_REPLICA,
},
},
"zone1-0000000201": {
Tablet: &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 201,
},
Type: topodatapb.TabletType_REPLICA,
},
},
},
wantErr: "context deadline exceeded",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pr := &PlannedReparenter{
ts: tt.ts,
tmc: tt.tmc,
}
if tt.remoteOpTime != 0 {
oldTime := topo.RemoteOperationTimeout
topo.RemoteOperationTimeout = tt.remoteOpTime
defer func() {
topo.RemoteOperationTimeout = oldTime
}()
}
err := pr.verifyAllTabletsReachable(context.Background(), tt.tabletMap)
if tt.wantErr == "" {
require.NoError(t, err)
return
}
require.ErrorContains(t, err, tt.wantErr)
})
}
}
6 changes: 6 additions & 0 deletions go/vt/vtctl/reparentutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func ChooseNewPrimary(
// tablets that are possible candidates to be the new primary and their positions
validTablets []*topodatapb.Tablet
tabletPositions []mysql.Position
rec concurrency.AllErrorRecorder
)

for _, tablet := range tabletMap {
Expand All @@ -90,6 +91,7 @@ func ChooseNewPrimary(
defer wg.Done()
// find and store the positions for the tablet
pos, err := findPositionForTablet(ctx, tablet, logger, tmc, waitReplicasTimeout)
rec.RecordError(err)
mu.Lock()
defer mu.Unlock()
if err == nil {
Expand All @@ -106,6 +108,10 @@ func ChooseNewPrimary(
return nil, nil
}

if rec.HasErrors() {
return nil, rec.Error()
}

// sort the tablets for finding the best primary
err := sortTabletsForReparent(validTablets, tabletPositions, durability)
if err != nil {
Expand Down