Skip to content

Commit

Permalink
Keep function semantics the same but provide details if wanted
Browse files Browse the repository at this point in the history
This keeps the previous behavior of ignoring partial refresh related
errors, while providing the defails of WHY we had a partial refresh
for anyone that's interested.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Apr 19, 2022
1 parent abe6c2f commit 18f9573
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 26 deletions.
23 changes: 11 additions & 12 deletions go/vt/topotools/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package topotools
import (
"context"
"fmt"
"strings"
"sync"
"time"

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tmclient"
Expand All @@ -37,20 +37,23 @@ import (
// any tablets without a .Hostname set in the topology are skipped.
//
// However, on partial errors from the topology, or errors from a RefreshState
// RPC will cause a boolean flag to be returned indicating only partial success.
func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) (isPartialRefresh bool, err error) {
// RPC will cause a boolean flag to be returned indicating only partial success
// along with a string detailing why we had a partial refresh.
func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) (isPartialRefresh bool, partialRefreshDetails string, err error) {
logger.Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())
refreshErrors := &concurrency.AllErrorRecorder{}
// Causes and details if we have a partial refresh
prd := strings.Builder{}

tabletMap, err := ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
case err == nil:
// keep going
case topo.IsErrType(err, topo.PartialResult):
logger.Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName())
prd.WriteString(fmt.Sprintf("got partial results from topo server for shard %v/%v: %v", si.Keyspace(), si.ShardName(), err))
isPartialRefresh = true
default:
return false, err
return false, "", err
}

// Any errors from this point onward are ignored.
Expand Down Expand Up @@ -85,20 +88,16 @@ func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.Ta

if err := tmc.RefreshState(grctx, ti.Tablet); err != nil {
logger.Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
refreshErrors.RecordError(fmt.Errorf("failed to refresh %v: %v", ti.AliasString(), err))
m.Lock()
prd.WriteString(fmt.Sprintf("failed to refresh tablet %v: %v", ti.AliasString(), err))
isPartialRefresh = true
m.Unlock()
}
}(ti)
}
wg.Wait()

if refreshErrors.HasErrors() {
err = refreshErrors.Error()
}

return isPartialRefresh, err
return isPartialRefresh, prd.String(), err
}

// UpdateShardRecords updates the shard records based on 'from' or 'to'
Expand Down Expand Up @@ -138,7 +137,7 @@ func UpdateShardRecords(
// For 'to' shards, refresh to make them serve. The 'from' shards will
// be refreshed after traffic has migrated.
if !isFrom {
if _, err := RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger); err != nil {
if _, _, err := RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger); err != nil {
logger.Warningf("RefreshTabletsByShard(%v/%v, cells=%v) failed with %v; continuing ...", si.Keyspace(), si.ShardName(), cells, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,7 @@ func (s *VtctldServer) RefreshStateByShard(ctx context.Context, req *vtctldatapb
return nil, fmt.Errorf("Failed to get shard %s/%s/: %w", req.Keyspace, req.Shard, err)
}

isPartial, err := topotools.RefreshTabletsByShard(ctx, s.ts, s.tmc, si, req.Cells, logutil.NewCallbackLogger(func(e *logutilpb.Event) {
isPartial, _, err := topotools.RefreshTabletsByShard(ctx, s.ts, s.tmc, si, req.Cells, logutil.NewCallbackLogger(func(e *logutilpb.Event) {
switch e.Level {
case logutilpb.Level_WARNING:
log.Warningf(e.Value)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (wr *Wrangler) cancelHorizontalResharding(ctx context.Context, keyspace, sh

destinationShards[i] = updatedShard

if _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
if _, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
return err
}
}
Expand Down Expand Up @@ -442,7 +442,7 @@ func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard stri
refreshShards = destinationShards
}
for _, si := range refreshShards {
_, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger())
_, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger())
rec.RecordError(err)
}
return rec.Error()
Expand Down Expand Up @@ -792,7 +792,7 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
}

for _, si := range destinationShards {
if _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
if _, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, nil, wr.Logger()); err != nil {
return err
}
}
Expand Down Expand Up @@ -1226,7 +1226,7 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp

// Now refresh the source servers so they reload the denylist
event.DispatchUpdate(ev, "refreshing sources tablets state so they update their denied tables")
_, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, sourceShard, cells, wr.Logger())
_, _, err := topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, sourceShard, cells, wr.Logger())
return err
}

Expand Down
13 changes: 7 additions & 6 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ const (
lockTablesCycleDelay = time.Duration(100 * time.Millisecond)

// How long to wait when refreshing the state of each tablet in a shard. Note that these
// are refreshed in parallel, non-topo errors are ignored and we may only do a partial
// refresh. Because it's unsafe to switch the traffic if some tablets do not refresh, we
// need to look for partial results and produce an error if we see them.
// are refreshed in parallel, non-topo errors are ignored (in the error handling) and we
// may only do a partial refresh. Because in some cases it's unsafe to switch the traffic
// if some tablets do not refresh, we may need to look for partial results and produce
// an error (with the provided details of WHY) if we see them.
// Side note: the default lock/lease TTL in etcd is 60s so the default tablet refresh
// timeout of 60s can cause us to lose our keyspace lock before completing the
// operation too.
Expand Down Expand Up @@ -1026,7 +1027,7 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a
}
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
return err
})
}
Expand Down Expand Up @@ -1296,7 +1297,7 @@ func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error {
}
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
return err
})
}
Expand Down Expand Up @@ -1404,7 +1405,7 @@ func (ts *trafficSwitcher) dropSourceDeniedTables(ctx context.Context) error {
}
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger())
return err
})
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,10 @@ func (vrw *VReplicationWorkflow) canSwitch(keyspace, workflowName string) (reaso
refreshTablets := func(shards []*topo.ShardInfo, stype string) {
defer wg.Done()
for _, si := range shards {
if partial, err := topotools.RefreshTabletsByShard(rtbsCtx, vrw.wr.ts, vrw.wr.tmc, si, nil, vrw.wr.Logger()); err != nil || partial {
if partial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, vrw.wr.ts, vrw.wr.tmc, si, nil, vrw.wr.Logger()); err != nil || partial {
m.Lock()
refreshErrors.WriteString(fmt.Sprintf("Failed to successfully refresh all tablets in the %s/%s %s shard:\n %v\n\n",
si.Keyspace(), si.ShardName(), stype, err))
refreshErrors.WriteString(fmt.Sprintf("Failed to successfully refresh all tablets in the %s/%s %s shard (%v):\n %v\n",
si.Keyspace(), si.ShardName(), stype, err, partialDetails))
m.Unlock()
}
}
Expand Down

0 comments on commit 18f9573

Please sign in to comment.