Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Tablet Refresh Behavior in VReplication Traffic Switch Handling #10058

Merged
merged 14 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions go/vt/topotools/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package topotools

import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -35,26 +37,40 @@ import (
// any tablets without a .Hostname set in the topology are skipped.
//
// However, on partial errors from the topology, or errors from a RefreshState
// RPC will cause a boolean flag to be returned indicating only partial success.
func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) (isPartialRefresh bool, err error) {
// RPC will cause a boolean flag to be returned indicating only partial success
// along with a string detailing why we had a partial refresh.
func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) (isPartialRefresh bool, partialRefreshDetails string, err error) {
logger.Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())
// Causes and details if we have a partial refresh
prd := strings.Builder{}

tabletMap, err := ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
case err == nil:
// keep going
case topo.IsErrType(err, topo.PartialResult):
logger.Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName())
prd.WriteString(fmt.Sprintf("got partial results from topo server for shard %v/%v: %v", si.Keyspace(), si.ShardName(), err))
isPartialRefresh = true
default:
return false, err
return false, "", err
}

// Any errors from this point onward are ignored.
var (
m sync.Mutex
wg sync.WaitGroup
m sync.Mutex
wg sync.WaitGroup
refreshTimeout = 60 * time.Second
)

// If there's a timeout set on the context, use what's left of it instead of the 60s default.
if deadline, ok := ctx.Deadline(); ok {
timeLeft := time.Until(deadline)
if timeLeft > 0 {
refreshTimeout = time.Until(deadline)
}
}

for _, ti := range tabletMap {
if ti.Hostname == "" {
// The tablet is not running, we don't have the host
Expand All @@ -66,22 +82,22 @@ func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.Ta
wg.Add(1)
go func(ti *topo.TabletInfo) {
defer wg.Done()
logger.Infof("Calling RefreshState on tablet %v", ti.AliasString())
grctx, grcancel := context.WithTimeout(ctx, refreshTimeout)
defer grcancel()
logger.Infof("Calling RefreshState on tablet %v with a timeout of %v", ti.AliasString(), refreshTimeout)

ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

if err := tmc.RefreshState(ctx, ti.Tablet); err != nil {
if err := tmc.RefreshState(grctx, ti.Tablet); err != nil {
logger.Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
m.Lock()
prd.WriteString(fmt.Sprintf("failed to refresh tablet %v: %v", ti.AliasString(), err))
isPartialRefresh = true
m.Unlock()
}
}(ti)
}

wg.Wait()
return isPartialRefresh, nil

return isPartialRefresh, prd.String(), err
}

// UpdateShardRecords updates the shard records based on 'from' or 'to'
Expand Down Expand Up @@ -121,7 +137,7 @@ func UpdateShardRecords(
// For 'to' shards, refresh to make them serve. The 'from' shards will
// be refreshed after traffic has migrated.
if !isFrom {
if _, err := RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger); err != nil {
if _, _, err := RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called from a number of places via Wrangler.updateShardRecords(). I don't think that we can change the partial handling here w/o other changes.

logger.Warningf("RefreshTabletsByShard(%v/%v, cells=%v) failed with %v; continuing ...", si.Keyspace(), si.ShardName(), cells, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,7 @@ func (s *VtctldServer) RefreshStateByShard(ctx context.Context, req *vtctldatapb
return nil, fmt.Errorf("Failed to get shard %s/%s/: %w", req.Keyspace, req.Shard, err)
}

isPartial, err := topotools.RefreshTabletsByShard(ctx, s.ts, s.tmc, si, req.Cells, logutil.NewCallbackLogger(func(e *logutilpb.Event) {
isPartial, _, err := topotools.RefreshTabletsByShard(ctx, s.ts, s.tmc, si, req.Cells, logutil.NewCallbackLogger(func(e *logutilpb.Event) {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
switch e.Level {
case logutilpb.Level_WARNING:
log.Warningf(e.Value)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (wr *Wrangler) cancelHorizontalResharding(ctx context.Context, keyspace, sh

destinationShards[i] = updatedShard

if _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
if _, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't make sense to return an error when canceling.

return err
}
}
Expand Down Expand Up @@ -442,7 +442,7 @@ func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard stri
refreshShards = destinationShards
}
for _, si := range refreshShards {
_, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger())
_, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is past the point of no return.

rec.RecordError(err)
}
return rec.Error()
Expand Down Expand Up @@ -792,7 +792,7 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
}

for _, si := range destinationShards {
if _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
if _, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is past the point of no return.

return err
}
}
Expand Down Expand Up @@ -1226,7 +1226,7 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp

// Now refresh the source servers so they reload the denylist
event.DispatchUpdate(ev, "refreshing sources tablets state so they update their denied tables")
_, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, sourceShard, cells, wr.Logger())
_, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, sourceShard, cells, wr.Logger())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is past the point of no return.

return err
}

Expand Down
22 changes: 19 additions & 3 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ const (
lockTablesCycles = 2
// time to wait between LOCK TABLES cycles on the sources during SwitchWrites
lockTablesCycleDelay = time.Duration(100 * time.Millisecond)

// How long to wait when refreshing the state of each tablet in a shard. Note that these
// are refreshed in parallel, non-topo errors are ignored (in the error handling) and we
// may only do a partial refresh. Because in some cases it's unsafe to switch the traffic
// if some tablets do not refresh, we may need to look for partial results and produce
// an error (with the provided details of WHY) if we see them.
// Side note: the default lock/lease TTL in etcd is 60s so the default tablet refresh
// timeout of 60s can cause us to lose our keyspace lock before completing the
// operation too.
shardTabletRefreshTimeout = time.Duration(30 * time.Second)
)

// trafficSwitcher contains the metadata for switching read and write traffic
Expand Down Expand Up @@ -1015,7 +1025,9 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a
}); err != nil {
return err
}
_, err := topotools.RefreshTabletsByShard(ctx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
Copy link
Contributor Author

@mattlord mattlord Apr 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a safe/good place to return an error if we could not refresh all tablets as its early on in the operation. I'll work on this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: 90f656f

return err
})
}
Expand Down Expand Up @@ -1283,7 +1295,9 @@ func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error {
}); err != nil {
return err
}
_, err := topotools.RefreshTabletsByShard(ctx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is after the point of no return.

return err
})
}
Expand Down Expand Up @@ -1389,7 +1403,9 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error {
}); err != nil {
return err
}
_, err := topotools.RefreshTabletsByShard(ctx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called when canceling.

return err
})
}
Expand Down
39 changes: 35 additions & 4 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vtgate/evalengine"

Expand Down Expand Up @@ -468,10 +471,11 @@ type TableCopyProgress struct {
type CopyProgress map[string]*TableCopyProgress

const (
cannotSwitchError = "workflow has errors"
cannotSwitchCopyIncomplete = "copy is still in progress"
cannotSwitchHighLag = "replication lag %ds is higher than allowed lag %ds"
cannotSwitchFrozen = "workflow is frozen"
cannotSwitchError = "workflow has errors"
cannotSwitchCopyIncomplete = "copy is still in progress"
cannotSwitchHighLag = "replication lag %ds is higher than allowed lag %ds"
cannotSwitchFailedTabletRefresh = "could not refresh all of the tablets involved in the operation:\n%s"
cannotSwitchFrozen = "workflow is frozen"
)

func (vrw *VReplicationWorkflow) canSwitch(keyspace, workflowName string) (reason string, err error) {
Expand Down Expand Up @@ -506,6 +510,33 @@ func (vrw *VReplicationWorkflow) canSwitch(keyspace, workflowName string) (reaso
if result.MaxVReplicationTransactionLag > vrw.params.MaxAllowedTransactionLagSeconds {
return fmt.Sprintf(cannotSwitchHighLag, result.MaxVReplicationTransactionLag, vrw.params.MaxAllowedTransactionLagSeconds), nil
}

// Ensure that the tablets on both sides are in good shape as we make this same call in the process
// and an error will cause us to backout
refreshErrors := strings.Builder{}
var m sync.Mutex
var wg sync.WaitGroup
rtbsCtx, cancel := context.WithTimeout(vrw.ctx, shardTabletRefreshTimeout)
defer cancel()
refreshTablets := func(shards []*topo.ShardInfo, stype string) {
defer wg.Done()
for _, si := range shards {
if partial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, vrw.wr.ts, vrw.wr.tmc, si, nil, vrw.wr.Logger()); err != nil || partial {
m.Lock()
refreshErrors.WriteString(fmt.Sprintf("Failed to successfully refresh all tablets in the %s/%s %s shard (%v):\n %v\n",
si.Keyspace(), si.ShardName(), stype, err, partialDetails))
m.Unlock()
}
}
}
wg.Add(1)
go refreshTablets(vrw.ts.SourceShards(), "source")
wg.Add(1)
go refreshTablets(vrw.ts.TargetShards(), "target")
wg.Wait()
if refreshErrors.Len() > 0 {
return fmt.Sprintf(cannotSwitchFailedTabletRefresh, refreshErrors.String()), nil
}
return "", nil
}

Expand Down