Skip to content

Commit

Permalink
Improve and unify error handling when switching reads/writes
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Aug 9, 2023
1 parent c00925c commit 89295b5
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 138 deletions.
114 changes: 48 additions & 66 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2323,16 +2323,23 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}
}

// Consistently handle errors by logging and returning them.
handleError := func(message string, err error) (*[]string, error) {
werr := vterrors.Wrapf(err, message)
ts.Logger().Error(werr)
return nil, werr
}

log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, ts.optCells, state.String())
if !switchReplica && !switchRdonly {
return nil, fmt.Errorf("tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr)
return handleError("invalid tablet types", fmt.Errorf("tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
}
if !ts.isPartialMigration { // shard level traffic switching is all or nothing
if direction == DirectionBackward && switchReplica && len(state.ReplicaCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")
return handleError("invalid request", fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
}
if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")
return handleError("invalid request", fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
}
}
var cells []string = req.Cells
Expand All @@ -2359,8 +2366,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
// If journals exist notify user and fail.
journalsExist, _, err := ts.checkJournals(ctx)
if err != nil {
ts.Logger().Errorf("checkJournals failed: %v", err)
return nil, err
return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
}
if journalsExist {
log.Infof("Found a previous journal entry for %d", ts.id)
Expand All @@ -2373,39 +2379,34 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}

if err := ts.validate(ctx); err != nil {
ts.Logger().Errorf("validate failed: %v", err)
return nil, err
return handleError("workflow validation failed", err)
}

// For reads, locking the source keyspace is sufficient.
ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads")
if lockErr != nil {
ts.Logger().Errorf("LockKeyspace failed: %v", lockErr)
return nil, lockErr
return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer unlock(&err)

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
if ts.isPartialMigration {
ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.")
} else if err := sw.switchTableReads(ctx, cells, req.TabletTypes, direction); err != nil {
ts.Logger().Errorf("switchTableReads failed: %v", err)
return nil, err
return handleError("failed to switch read traffic for the tables", err)
}
return sw.logs(), nil
}
ts.Logger().Infof("About to switchShardReads: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction)
if err := sw.switchShardReads(ctx, cells, req.TabletTypes, direction); err != nil {
ts.Logger().Errorf("switchShardReads failed: %v", err)
return nil, err
return handleError("failed to switch read traffic for the shards", err)
}

ts.Logger().Infof("switchShardReads Completed: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction)
if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil {
err2 := vterrors.Wrapf(err, "After switching shard reads, found SrvKeyspace for %s is corrupt in cell %s",
err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s",
ts.targetKeyspace, strings.Join(cells, ","))
ts.Logger().Errorf("%w", err2)
return nil, err2
return handleError("failed to validate SrvKeyspace record", err2)
}
return sw.logs(), nil
}
Expand All @@ -2421,36 +2422,39 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
sw = &switcher{ts: ts, s: s}
}

// Consistently handle errors by logging and returning them.
handleError := func(message string, err error) (int64, *[]string, error) {
werr := vterrors.Wrapf(err, message)
ts.Logger().Error(werr)
return 0, nil, werr
}

if ts.frozen {
ts.Logger().Warningf("Writes have already been switched for workflow %s, nothing to do here", ts.WorkflowName())
return 0, sw.logs(), nil
}

if err := ts.validate(ctx); err != nil {
ts.Logger().Errorf("validate failed: %v", err)
return 0, nil, err
return handleError("workflow validation failed", err)
}

if req.EnableReverseReplication {
err := areTabletsAvailableToStreamFrom(ctx, req, ts, ts.TargetKeyspaceName(), ts.TargetShards())
if err != nil {
return 0, nil, err
if err := areTabletsAvailableToStreamFrom(ctx, req, ts, ts.TargetKeyspaceName(), ts.TargetShards()); err != nil {
return handleError(fmt.Sprintf("no tablets were available to stream from in the %s keyspace", ts.SourceKeyspaceName()), err)
}
}

// Need to lock both source and target keyspaces.
tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites")
if lockErr != nil {
ts.Logger().Errorf("LockKeyspace failed: %v", lockErr)
return 0, nil, lockErr
return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
ctx = tctx
defer sourceUnlock(&err)
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites")
if lockErr != nil {
ts.Logger().Errorf("LockKeyspace failed: %v", lockErr)
return 0, nil, lockErr
return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
ctx = tctx
defer targetUnlock(&err)
Expand All @@ -2467,24 +2471,20 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
!ts.SourceKeyspaceSchema().Keyspace.Sharded {
sequenceMetadata, err = ts.getTargetSequenceMetadata(ctx)
if err != nil {
werr := vterrors.Wrapf(err, "getSequenceMetadata failed")
ts.Logger().Error(werr)
return 0, nil, werr
return handleError(fmt.Sprintf("failed to get the sequence information in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}

// If no journals exist, sourceWorkflows will be initialized by sm.MigrateStreams.
journalsExist, sourceWorkflows, err := ts.checkJournals(ctx)
if err != nil {
ts.Logger().Errorf("checkJournals failed: %v", err)
return 0, nil, err
return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
}
if !journalsExist {
ts.Logger().Infof("No previous journals were found. Proceeding normally.")
sm, err := BuildStreamMigrator(ctx, ts, cancel)
if err != nil {
ts.Logger().Errorf("buildStreamMigrater failed: %v", err)
return 0, nil, err
return handleError("failed to migrate the workflow streams", err)
}
if cancel {
sw.cancelMigration(ctx, sm)
Expand All @@ -2494,21 +2494,19 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
ts.Logger().Infof("Stopping streams")
sourceWorkflows, err = sw.stopStreams(ctx, sm)
if err != nil {
ts.Logger().Errorf("stopStreams failed: %v", err)
for key, streams := range sm.Streams() {
for _, stream := range streams {
ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
}
}
sw.cancelMigration(ctx, sm)
return 0, nil, err
return handleError("failed to stop the workflow streams", err)
}

ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
ts.Logger().Errorf("stopSourceWrites failed: %v", err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
Expand All @@ -2517,9 +2515,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// the tablet's deny list check and the first mysqld side table lock.
for cnt := 1; cnt <= lockTablesCycles; cnt++ {
if err := ts.executeLockTablesOnSource(ctx); err != nil {
ts.Logger().Errorf("Failed to execute LOCK TABLES (attempt %d of %d) on sources: %v", cnt, lockTablesCycles, err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err)
}
// No need to UNLOCK the tables as the connection was closed once the locks were acquired
// and thus the locks released.
Expand All @@ -2529,50 +2526,42 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit

ts.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, timeout); err != nil {
ts.Logger().Errorf("waitForCatchup failed: %v", err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
return handleError("failed to sync up replication between the source and target", err)
}

ts.Logger().Infof("Migrating streams")
if err := sw.migrateStreams(ctx, sm); err != nil {
ts.Logger().Errorf("migrateStreams failed: %v", err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
return handleError("failed to migrate the workflow streams", err)
}

ts.Logger().Infof("Resetting sequences")
if err := sw.resetSequences(ctx); err != nil {
ts.Logger().Errorf("resetSequences failed: %v", err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
return handleError("failed to reset the sequences", err)
}

ts.Logger().Infof("Creating reverse streams")
if err := sw.createReverseVReplication(ctx); err != nil {
ts.Logger().Errorf("createReverseVReplication failed: %v", err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
return handleError("failed to create the reverse vreplication streams", err)
}
} else {
if cancel {
err := fmt.Errorf("traffic switching has reached the point of no return, cannot cancel")
ts.Logger().Errorf("%v", err)
return 0, nil, err
return handleError("invalid cancel", fmt.Errorf("traffic switching has reached the point of no return, cannot cancel"))
}
ts.Logger().Infof("Journals were found. Completing the left over steps.")
// Need to gather positions in case all journals were not created.
if err := ts.gatherPositions(ctx); err != nil {
ts.Logger().Errorf("gatherPositions failed: %v", err)
return 0, nil, err
return handleError("failed to gather replication positions", err)
}
}

// This is the point of no return. Once a journal is created,
// traffic can be redirected to target shards.
if err := sw.createJournals(ctx, sourceWorkflows); err != nil {
ts.Logger().Errorf("createJournals failed: %v", err)
return 0, nil, err
return handleError("failed to create the journal", err)
}
// Initialize any target sequences, if there are any, before allowing new writes.
if req.InitializeTargetSequences && len(sequenceMetadata) > 0 {
Expand All @@ -2582,33 +2571,26 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
werr := vterrors.Wrapf(err, "initializeTargetSequences failed")
ts.Logger().Error(werr)
return 0, nil, werr
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
if err := sw.allowTargetWrites(ctx); err != nil {
ts.Logger().Errorf("allowTargetWrites failed: %v", err)
return 0, nil, err
return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err)
}
if err := sw.changeRouting(ctx); err != nil {
ts.Logger().Errorf("changeRouting failed: %v", err)
return 0, nil, err
return handleError("failed to update the routing rules", err)
}
if err := sw.streamMigraterfinalize(ctx, ts, sourceWorkflows); err != nil {
ts.Logger().Errorf("finalize failed: %v", err)
return 0, nil, err
return handleError("failed to finalize the traffic switch", err)
}
if req.EnableReverseReplication {
if err := sw.startReverseVReplication(ctx); err != nil {
ts.Logger().Errorf("startReverseVReplication failed: %v", err)
return 0, nil, err
return handleError("failed to start the reverse workflow", err)
}
}

if err := sw.freezeTargetVReplication(ctx); err != nil {
ts.Logger().Errorf("deleteTargetVReplication failed: %v", err)
return 0, nil, err
return handleError(fmt.Sprintf("failed to freeze the workflow in the %s keyspace", ts.TargetKeyspaceName()), err)
}

return ts.id, sw.logs(), nil
Expand Down
Loading

0 comments on commit 89295b5

Please sign in to comment.