Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Tablet Refresh Behavior in VReplication Traffic Switch Handling #10058

Merged
merged 14 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
918 changes: 465 additions & 453 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 29 additions & 13 deletions go/vt/topotools/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package topotools

import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -35,26 +37,40 @@ import (
// any tablets without a .Hostname set in the topology are skipped.
//
// However, on partial errors from the topology, or errors from a RefreshState
// RPC will cause a boolean flag to be returned indicating only partial success.
func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) (isPartialRefresh bool, err error) {
// RPC will cause a boolean flag to be returned indicating only partial success
// along with a string detailing why we had a partial refresh.
func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) (isPartialRefresh bool, partialRefreshDetails string, err error) {
logger.Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())
// Causes and details if we have a partial refresh
prd := strings.Builder{}

tabletMap, err := ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
case err == nil:
// keep going
case topo.IsErrType(err, topo.PartialResult):
logger.Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName())
prd.WriteString(fmt.Sprintf("got partial results from topo server for shard %v/%v: %v", si.Keyspace(), si.ShardName(), err))
isPartialRefresh = true
default:
return false, err
return false, "", err
}

// Any errors from this point onward are ignored.
var (
m sync.Mutex
wg sync.WaitGroup
m sync.Mutex
wg sync.WaitGroup
refreshTimeout = 60 * time.Second
)

// If there's a timeout set on the context, use what's left of it instead of the 60s default.
if deadline, ok := ctx.Deadline(); ok {
timeLeft := time.Until(deadline)
if timeLeft > 0 {
refreshTimeout = time.Until(deadline)
}
}

for _, ti := range tabletMap {
if ti.Hostname == "" {
// The tablet is not running, we don't have the host
Expand All @@ -66,22 +82,22 @@ func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.Ta
wg.Add(1)
go func(ti *topo.TabletInfo) {
defer wg.Done()
logger.Infof("Calling RefreshState on tablet %v", ti.AliasString())
grctx, grcancel := context.WithTimeout(ctx, refreshTimeout)
defer grcancel()
logger.Infof("Calling RefreshState on tablet %v with a timeout of %v", ti.AliasString(), refreshTimeout)

ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

if err := tmc.RefreshState(ctx, ti.Tablet); err != nil {
if err := tmc.RefreshState(grctx, ti.Tablet); err != nil {
logger.Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
m.Lock()
prd.WriteString(fmt.Sprintf("failed to refresh tablet %v: %v", ti.AliasString(), err))
isPartialRefresh = true
m.Unlock()
}
}(ti)
}

wg.Wait()
return isPartialRefresh, nil

return isPartialRefresh, prd.String(), err
}

// UpdateShardRecords updates the shard records based on 'from' or 'to'
Expand Down Expand Up @@ -121,7 +137,7 @@ func UpdateShardRecords(
// For 'to' shards, refresh to make them serve. The 'from' shards will
// be refreshed after traffic has migrated.
if !isFrom {
if _, err := RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger); err != nil {
if _, _, err := RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called from a number of places via Wrangler.updateShardRecords(). I don't think that we can change the partial handling here w/o other changes.

logger.Warningf("RefreshTabletsByShard(%v/%v, cells=%v) failed with %v; continuing ...", si.Keyspace(), si.ShardName(), cells, err)
}
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,7 @@ func (s *VtctldServer) RefreshStateByShard(ctx context.Context, req *vtctldatapb
return nil, fmt.Errorf("Failed to get shard %s/%s/: %w", req.Keyspace, req.Shard, err)
}

isPartial, err := topotools.RefreshTabletsByShard(ctx, s.ts, s.tmc, si, req.Cells, logutil.NewCallbackLogger(func(e *logutilpb.Event) {
isPartial, partialDetails, err := topotools.RefreshTabletsByShard(ctx, s.ts, s.tmc, si, req.Cells, logutil.NewCallbackLogger(func(e *logutilpb.Event) {
switch e.Level {
case logutilpb.Level_WARNING:
log.Warningf(e.Value)
Expand All @@ -2079,7 +2079,8 @@ func (s *VtctldServer) RefreshStateByShard(ctx context.Context, req *vtctldatapb
}

return &vtctldatapb.RefreshStateByShardResponse{
IsPartialRefresh: isPartial,
IsPartialRefresh: isPartial,
PartialRefreshDetails: partialDetails,
}, nil
}

Expand Down
6 changes: 4 additions & 2 deletions go/vt/vtctl/grpcvtctldserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6496,7 +6496,8 @@ func TestRefreshStateByShard(t *testing.T) {
Cells: []string{"zone1"}, // If we didn't filter, we would get IsPartialRefresh=true because of the failure in zone2.
},
expected: &vtctldatapb.RefreshStateByShardResponse{
IsPartialRefresh: false,
IsPartialRefresh: false,
PartialRefreshDetails: "",
},
shouldErr: false,
},
Expand Down Expand Up @@ -6532,7 +6533,8 @@ func TestRefreshStateByShard(t *testing.T) {
Shard: "-",
},
expected: &vtctldatapb.RefreshStateByShardResponse{
IsPartialRefresh: true,
IsPartialRefresh: true,
PartialRefreshDetails: "failed to refresh tablet zone2-0000000100: assert.AnError general error for testing: RefreshState failed on zone2-100",
},
shouldErr: false,
},
Expand Down
8 changes: 4 additions & 4 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (wr *Wrangler) cancelHorizontalResharding(ctx context.Context, keyspace, sh

destinationShards[i] = updatedShard

if _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
if _, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't make sense to return an error when canceling.

return err
}
}
Expand Down Expand Up @@ -442,7 +442,7 @@ func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard stri
refreshShards = destinationShards
}
for _, si := range refreshShards {
_, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger())
_, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is past the point of no return.

rec.RecordError(err)
}
return rec.Error()
Expand Down Expand Up @@ -792,7 +792,7 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
}

for _, si := range destinationShards {
if _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
if _, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is past the point of no return.

return err
}
}
Expand Down Expand Up @@ -1226,7 +1226,7 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp

// Now refresh the source servers so they reload the denylist
event.DispatchUpdate(ev, "refreshing sources tablets state so they update their denied tables")
_, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, sourceShard, cells, wr.Logger())
_, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, sourceShard, cells, wr.Logger())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is past the point of no return.

return err
}

Expand Down
26 changes: 23 additions & 3 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ const (
lockTablesCycles = 2
// time to wait between LOCK TABLES cycles on the sources during SwitchWrites
lockTablesCycleDelay = time.Duration(100 * time.Millisecond)

// How long to wait when refreshing the state of each tablet in a shard. Note that these
// are refreshed in parallel, non-topo errors are ignored (in the error handling) and we
// may only do a partial refresh. Because in some cases it's unsafe to switch the traffic
// if some tablets do not refresh, we may need to look for partial results and produce
// an error (with the provided details of WHY) if we see them.
// Side note: the default lock/lease TTL in etcd is 60s so the default tablet refresh
// timeout of 60s can cause us to lose our keyspace lock before completing the
// operation too.
shardTabletRefreshTimeout = time.Duration(30 * time.Second)
)

// trafficSwitcher contains the metadata for switching read and write traffic
Expand Down Expand Up @@ -1015,7 +1025,13 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a
}); err != nil {
return err
}
_, err := topotools.RefreshTabletsByShard(ctx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
if isPartial {
err = fmt.Errorf("failed to successfully refresh all tablets in the %s/%s source shard (%v):\n %v",
source.GetShard().Keyspace(), source.GetShard().ShardName(), err, partialDetails)
}
return err
})
}
Expand Down Expand Up @@ -1283,7 +1299,9 @@ func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error {
}); err != nil {
return err
}
_, err := topotools.RefreshTabletsByShard(ctx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is after the point of no return.

return err
})
}
Expand Down Expand Up @@ -1389,7 +1407,9 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error {
}); err != nil {
return err
}
_, err := topotools.RefreshTabletsByShard(ctx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called when canceling.

return err
})
}
Expand Down
39 changes: 35 additions & 4 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vtgate/evalengine"

Expand Down Expand Up @@ -468,10 +471,11 @@ type TableCopyProgress struct {
type CopyProgress map[string]*TableCopyProgress

const (
cannotSwitchError = "workflow has errors"
cannotSwitchCopyIncomplete = "copy is still in progress"
cannotSwitchHighLag = "replication lag %ds is higher than allowed lag %ds"
cannotSwitchFrozen = "workflow is frozen"
cannotSwitchError = "workflow has errors"
cannotSwitchCopyIncomplete = "copy is still in progress"
cannotSwitchHighLag = "replication lag %ds is higher than allowed lag %ds"
cannotSwitchFailedTabletRefresh = "could not refresh all of the tablets involved in the operation:\n%s"
cannotSwitchFrozen = "workflow is frozen"
)

func (vrw *VReplicationWorkflow) canSwitch(keyspace, workflowName string) (reason string, err error) {
Expand Down Expand Up @@ -506,6 +510,33 @@ func (vrw *VReplicationWorkflow) canSwitch(keyspace, workflowName string) (reaso
if result.MaxVReplicationTransactionLag > vrw.params.MaxAllowedTransactionLagSeconds {
return fmt.Sprintf(cannotSwitchHighLag, result.MaxVReplicationTransactionLag, vrw.params.MaxAllowedTransactionLagSeconds), nil
}

// Ensure that the tablets on both sides are in good shape as we make this same call in the process
// and an error will cause us to backout
refreshErrors := strings.Builder{}
var m sync.Mutex
var wg sync.WaitGroup
rtbsCtx, cancel := context.WithTimeout(vrw.ctx, shardTabletRefreshTimeout)
defer cancel()
refreshTablets := func(shards []*topo.ShardInfo, stype string) {
defer wg.Done()
for _, si := range shards {
if partial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, vrw.wr.ts, vrw.wr.tmc, si, nil, vrw.wr.Logger()); err != nil || partial {
m.Lock()
refreshErrors.WriteString(fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s %s shard (%v):\n %v\n",
si.Keyspace(), si.ShardName(), stype, err, partialDetails))
m.Unlock()
}
}
}
wg.Add(1)
go refreshTablets(vrw.ts.SourceShards(), "source")
wg.Add(1)
go refreshTablets(vrw.ts.TargetShards(), "target")
wg.Wait()
if refreshErrors.Len() > 0 {
return fmt.Sprintf(cannotSwitchFailedTabletRefresh, refreshErrors.String()), nil
}
return "", nil
}

Expand Down
2 changes: 2 additions & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,8 @@ message RefreshStateByShardRequest {

message RefreshStateByShardResponse {
bool is_partial_refresh = 1;
// This explains why we had a partial refresh (if we did)
string partial_refresh_details = 2;
}

message ReloadSchemaRequest {
Expand Down
6 changes: 6 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38469,6 +38469,9 @@ export namespace vtctldata {

/** RefreshStateByShardResponse is_partial_refresh */
is_partial_refresh?: (boolean|null);

/** RefreshStateByShardResponse partial_refresh_details */
partial_refresh_details?: (string|null);
}

/** Represents a RefreshStateByShardResponse. */
Expand All @@ -38483,6 +38486,9 @@ export namespace vtctldata {
/** RefreshStateByShardResponse is_partial_refresh. */
public is_partial_refresh: boolean;

/** RefreshStateByShardResponse partial_refresh_details. */
public partial_refresh_details: string;

/**
* Creates a new RefreshStateByShardResponse instance using the specified properties.
* @param [properties] Properties to set
Expand Down
Loading