From 89295b59d80a6f5ef57b06c5ff19cbf06651699d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 9 Aug 2023 10:01:15 -0400 Subject: [PATCH] Improve and unify error handling when switching reads/writes Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 114 ++++++++++++--------------- go/vt/wrangler/traffic_switcher.go | 122 ++++++++++++----------------- 2 files changed, 98 insertions(+), 138 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 5c1436dae6f..92e22e34f50 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2323,16 +2323,23 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } } + // Consistently handle errors by logging and returning them. + handleError := func(message string, err error) (*[]string, error) { + werr := vterrors.Wrapf(err, message) + ts.Logger().Error(werr) + return nil, werr + } + log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, ts.optCells, state.String()) if !switchReplica && !switchRdonly { - return nil, fmt.Errorf("tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr) + return handleError("invalid tablet types", fmt.Errorf("tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr)) } if !ts.isPartialMigration { // shard level traffic switching is all or nothing if direction == DirectionBackward && switchReplica && len(state.ReplicaCellsSwitched) == 0 { - return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched") + return handleError("invalid request", fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")) } if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 { - return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched") + return handleError("invalid request", fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) } } var cells []string = req.Cells @@ -2359,8 +2366,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // If journals exist notify user and fail. journalsExist, _, err := ts.checkJournals(ctx) if err != nil { - ts.Logger().Errorf("checkJournals failed: %v", err) - return nil, err + return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) } if journalsExist { log.Infof("Found a previous journal entry for %d", ts.id) @@ -2373,15 +2379,13 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } if err := ts.validate(ctx); err != nil { - ts.Logger().Errorf("validate failed: %v", err) - return nil, err + return handleError("workflow validation failed", err) } // For reads, locking the source keyspace is sufficient. ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") if lockErr != nil { - ts.Logger().Errorf("LockKeyspace failed: %v", lockErr) - return nil, lockErr + return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer unlock(&err) @@ -2389,23 +2393,20 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc if ts.isPartialMigration { ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.") } else if err := sw.switchTableReads(ctx, cells, req.TabletTypes, direction); err != nil { - ts.Logger().Errorf("switchTableReads failed: %v", err) - return nil, err + return handleError("failed to switch read traffic for the tables", err) } return sw.logs(), nil } ts.Logger().Infof("About to switchShardReads: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) if err := sw.switchShardReads(ctx, cells, req.TabletTypes, direction); err != nil { - ts.Logger().Errorf("switchShardReads failed: %v", err) - return nil, err + return handleError("failed to switch read traffic for the shards", err) } ts.Logger().Infof("switchShardReads Completed: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil { - err2 := vterrors.Wrapf(err, "After switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", + err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", ts.targetKeyspace, strings.Join(cells, ",")) - ts.Logger().Errorf("%w", err2) - return nil, err2 + return handleError("failed to validate SrvKeyspace record", err2) } return sw.logs(), nil } @@ -2421,36 +2422,39 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit sw = &switcher{ts: ts, s: s} } + // Consistently handle errors by logging and returning them. + handleError := func(message string, err error) (int64, *[]string, error) { + werr := vterrors.Wrapf(err, message) + ts.Logger().Error(werr) + return 0, nil, werr + } + if ts.frozen { ts.Logger().Warningf("Writes have already been switched for workflow %s, nothing to do here", ts.WorkflowName()) return 0, sw.logs(), nil } if err := ts.validate(ctx); err != nil { - ts.Logger().Errorf("validate failed: %v", err) - return 0, nil, err + return handleError("workflow validation failed", err) } if req.EnableReverseReplication { - err := areTabletsAvailableToStreamFrom(ctx, req, ts, ts.TargetKeyspaceName(), ts.TargetShards()) - if err != nil { - return 0, nil, err + if err := areTabletsAvailableToStreamFrom(ctx, req, ts, ts.TargetKeyspaceName(), ts.TargetShards()); err != nil { + return handleError(fmt.Sprintf("no tablets were available to stream from in the %s keyspace", ts.SourceKeyspaceName()), err) } } // Need to lock both source and target keyspaces. tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") if lockErr != nil { - ts.Logger().Errorf("LockKeyspace failed: %v", lockErr) - return 0, nil, lockErr + return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } ctx = tctx defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") if lockErr != nil { - ts.Logger().Errorf("LockKeyspace failed: %v", lockErr) - return 0, nil, lockErr + return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } ctx = tctx defer targetUnlock(&err) @@ -2467,24 +2471,20 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit !ts.SourceKeyspaceSchema().Keyspace.Sharded { sequenceMetadata, err = ts.getTargetSequenceMetadata(ctx) if err != nil { - werr := vterrors.Wrapf(err, "getSequenceMetadata failed") - ts.Logger().Error(werr) - return 0, nil, werr + return handleError(fmt.Sprintf("failed to get the sequence information in the %s keyspace", ts.TargetKeyspaceName()), err) } } // If no journals exist, sourceWorkflows will be initialized by sm.MigrateStreams. journalsExist, sourceWorkflows, err := ts.checkJournals(ctx) if err != nil { - ts.Logger().Errorf("checkJournals failed: %v", err) - return 0, nil, err + return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) } if !journalsExist { ts.Logger().Infof("No previous journals were found. Proceeding normally.") sm, err := BuildStreamMigrator(ctx, ts, cancel) if err != nil { - ts.Logger().Errorf("buildStreamMigrater failed: %v", err) - return 0, nil, err + return handleError("failed to migrate the workflow streams", err) } if cancel { sw.cancelMigration(ctx, sm) @@ -2494,21 +2494,19 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit ts.Logger().Infof("Stopping streams") sourceWorkflows, err = sw.stopStreams(ctx, sm) if err != nil { - ts.Logger().Errorf("stopStreams failed: %v", err) for key, streams := range sm.Streams() { for _, stream := range streams { ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource) } } sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to stop the workflow streams", err) } ts.Logger().Infof("Stopping source writes") if err := sw.stopSourceWrites(ctx); err != nil { - ts.Logger().Errorf("stopSourceWrites failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { @@ -2517,9 +2515,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // the tablet's deny list check and the first mysqld side table lock. for cnt := 1; cnt <= lockTablesCycles; cnt++ { if err := ts.executeLockTablesOnSource(ctx); err != nil { - ts.Logger().Errorf("Failed to execute LOCK TABLES (attempt %d of %d) on sources: %v", cnt, lockTablesCycles, err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err) } // No need to UNLOCK the tables as the connection was closed once the locks were acquired // and thus the locks released. @@ -2529,50 +2526,42 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, timeout); err != nil { - ts.Logger().Errorf("waitForCatchup failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to sync up replication between the source and target", err) } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { - ts.Logger().Errorf("migrateStreams failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to migrate the workflow streams", err) } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { - ts.Logger().Errorf("resetSequences failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to reset the sequences", err) } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { - ts.Logger().Errorf("createReverseVReplication failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to create the reverse vreplication streams", err) } } else { if cancel { - err := fmt.Errorf("traffic switching has reached the point of no return, cannot cancel") - ts.Logger().Errorf("%v", err) - return 0, nil, err + return handleError("invalid cancel", fmt.Errorf("traffic switching has reached the point of no return, cannot cancel")) } ts.Logger().Infof("Journals were found. Completing the left over steps.") // Need to gather positions in case all journals were not created. if err := ts.gatherPositions(ctx); err != nil { - ts.Logger().Errorf("gatherPositions failed: %v", err) - return 0, nil, err + return handleError("failed to gather replication positions", err) } } // This is the point of no return. Once a journal is created, // traffic can be redirected to target shards. if err := sw.createJournals(ctx, sourceWorkflows); err != nil { - ts.Logger().Errorf("createJournals failed: %v", err) - return 0, nil, err + return handleError("failed to create the journal", err) } // Initialize any target sequences, if there are any, before allowing new writes. if req.InitializeTargetSequences && len(sequenceMetadata) > 0 { @@ -2582,33 +2571,26 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { - werr := vterrors.Wrapf(err, "initializeTargetSequences failed") - ts.Logger().Error(werr) - return 0, nil, werr + return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err) } } if err := sw.allowTargetWrites(ctx); err != nil { - ts.Logger().Errorf("allowTargetWrites failed: %v", err) - return 0, nil, err + return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err) } if err := sw.changeRouting(ctx); err != nil { - ts.Logger().Errorf("changeRouting failed: %v", err) - return 0, nil, err + return handleError("failed to update the routing rules", err) } if err := sw.streamMigraterfinalize(ctx, ts, sourceWorkflows); err != nil { - ts.Logger().Errorf("finalize failed: %v", err) - return 0, nil, err + return handleError("failed to finalize the traffic switch", err) } if req.EnableReverseReplication { if err := sw.startReverseVReplication(ctx); err != nil { - ts.Logger().Errorf("startReverseVReplication failed: %v", err) - return 0, nil, err + return handleError("failed to start the reverse workflow", err) } } if err := sw.freezeTargetVReplication(ctx); err != nil { - ts.Logger().Errorf("deleteTargetVReplication failed: %v", err) - return 0, nil, err + return handleError(fmt.Sprintf("failed to freeze the workflow in the %s keyspace", ts.TargetKeyspaceName()), err) } return ts.id, sw.logs(), nil diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 8a557ed9f05..6ece05c3f7e 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -331,29 +331,32 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl // SwitchReads is a generic way of switching read traffic for a resharding workflow. func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowName string, servedTypes []topodatapb.TabletType, cells []string, direction workflow.TrafficSwitchDirection, dryRun bool) (*[]string, error) { - ts, ws, err := wr.getWorkflowState(ctx, targetKeyspace, workflowName) + // Consistently handle errors by logging and returning them. + handleError := func(message string, err error) (*[]string, error) { + werr := vterrors.Wrapf(err, message) + ts.Logger().Error(werr) + return nil, werr + } if err != nil { - wr.Logger().Errorf("getWorkflowState failed: %v", err) - return nil, err + return handleError("failed to get the current state of the workflow", err) } if ts == nil { errorMsg := fmt.Sprintf("workflow %s not found in keyspace %s", workflowName, targetKeyspace) - wr.Logger().Errorf(errorMsg) - return nil, fmt.Errorf(errorMsg) + return handleError("failed to get the current state of the workflow", fmt.Errorf(errorMsg)) } log.Infof("Switching reads: %s.%s tt %+v, cells %+v, workflow state: %+v", targetKeyspace, workflowName, servedTypes, cells, ws) var switchReplicas, switchRdonly bool for _, servedType := range servedTypes { if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY { - return nil, fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType) + return handleError("invalid tablet type", fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType)) } if !ts.isPartialMigration { // shard level traffic switching is all or nothing if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_REPLICA && len(ws.ReplicaCellsSwitched) == 0 { - return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched") + return handleError("invalid request", fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")) } if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 { - return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched") + return handleError("invalid request", fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) } } switch servedType { @@ -382,8 +385,7 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam // If journals exist notify user and fail. journalsExist, _, err := ts.checkJournals(ctx) if err != nil { - wr.Logger().Errorf("checkJournals failed: %v", err) - return nil, err + return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) } if journalsExist { log.Infof("Found a previous journal entry for %d", ts.id) @@ -396,15 +398,13 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam } if err := ts.validate(ctx); err != nil { - ts.Logger().Errorf("validate failed: %v", err) - return nil, err + return handleError("workflow validation failed", err) } // For reads, locking the source keyspace is sufficient. ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") if lockErr != nil { - ts.Logger().Errorf("LockKeyspace failed: %v", lockErr) - return nil, lockErr + return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer unlock(&err) @@ -412,23 +412,20 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam if ts.isPartialMigration { ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.") } else if err := sw.switchTableReads(ctx, cells, servedTypes, direction); err != nil { - ts.Logger().Errorf("switchTableReads failed: %v", err) - return nil, err + return handleError("failed to switch read traffic for the tables", err) } return sw.logs(), nil } wr.Logger().Infof("About to switchShardReads: %+v, %+v, %+v", cells, servedTypes, direction) if err := sw.switchShardReads(ctx, cells, servedTypes, direction); err != nil { - ts.Logger().Errorf("switchShardReads failed: %v", err) - return nil, err + return handleError("failed to switch read traffic for the shards", err) } wr.Logger().Infof("switchShardReads Completed: %+v, %+v, %+v", cells, servedTypes, direction) if err := wr.ts.ValidateSrvKeyspace(ctx, targetKeyspace, strings.Join(cells, ",")); err != nil { err2 := vterrors.Wrapf(err, "After switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", targetKeyspace, strings.Join(cells, ",")) - log.Errorf("%w", err2) - return nil, err2 + return handleError("failed to validate SrvKeyspace record", err2) } return sw.logs(), nil } @@ -481,14 +478,20 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa cancel, reverse, reverseReplication bool, dryRun, initializeTargetSequences bool) (journalID int64, dryRunResults *[]string, err error) { ts, ws, err := wr.getWorkflowState(ctx, targetKeyspace, workflowName) _ = ws + + // Consistently handle errors by logging and returning them. + handleError := func(message string, err error) (int64, *[]string, error) { + werr := vterrors.Wrapf(err, message) + ts.Logger().Error(werr) + return 0, nil, werr + } + if err != nil { - wr.Logger().Errorf("getWorkflowState failed: %v", err) - return 0, nil, err + handleError("failed to get the current workflow state", err) } if ts == nil { errorMsg := fmt.Sprintf("workflow %s not found in keyspace %s", workflowName, targetKeyspace) - wr.Logger().Errorf(errorMsg) - return 0, nil, fmt.Errorf(errorMsg) + handleError("failed to get the current workflow state", fmt.Errorf(errorMsg)) } var sw iswitcher @@ -505,30 +508,27 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa ts.Logger().Infof("Built switching metadata: %+v", ts) if err := ts.validate(ctx); err != nil { - ts.Logger().Errorf("validate failed: %v", err) - return 0, nil, err + handleError("workflow validation failed", err) } if reverseReplication { err := wr.areTabletsAvailableToStreamFrom(ctx, ts, ts.TargetKeyspaceName(), ts.TargetShards()) if err != nil { - return 0, nil, err + return handleError(fmt.Sprintf("no tablets were available to stream from in the %s keyspace", ts.SourceKeyspaceName()), err) } } // Need to lock both source and target keyspaces. tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") if lockErr != nil { - ts.Logger().Errorf("LockKeyspace failed: %v", lockErr) - return 0, nil, lockErr + return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } ctx = tctx defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") if lockErr != nil { - ts.Logger().Errorf("LockKeyspace failed: %v", lockErr) - return 0, nil, lockErr + return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } ctx = tctx defer targetUnlock(&err) @@ -545,24 +545,20 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa !ts.SourceKeyspaceSchema().Keyspace.Sharded { sequenceMetadata, err = ts.getTargetSequenceMetadata(ctx) if err != nil { - werr := vterrors.Wrapf(err, "getTargetSequenceMetadata failed") - ts.Logger().Error(werr) - return 0, nil, werr + return handleError(fmt.Sprintf("failed to get the sequence information in the %s keyspace", ts.TargetKeyspaceName()), err) } } // If no journals exist, sourceWorkflows will be initialized by sm.MigrateStreams. journalsExist, sourceWorkflows, err := ts.checkJournals(ctx) if err != nil { - ts.Logger().Errorf("checkJournals failed: %v", err) - return 0, nil, err + return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) } if !journalsExist { ts.Logger().Infof("No previous journals were found. Proceeding normally.") sm, err := workflow.BuildStreamMigrator(ctx, ts, cancel) if err != nil { - ts.Logger().Errorf("buildStreamMigrater failed: %v", err) - return 0, nil, err + return handleError("failed to migrate the workflow streams", err) } if cancel { sw.cancelMigration(ctx, sm) @@ -572,21 +568,19 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa ts.Logger().Infof("Stopping streams") sourceWorkflows, err = sw.stopStreams(ctx, sm) if err != nil { - ts.Logger().Errorf("stopStreams failed: %v", err) for key, streams := range sm.Streams() { for _, stream := range streams { ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource) } } sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to stop the workflow streams", err) } ts.Logger().Infof("Stopping source writes") if err := sw.stopSourceWrites(ctx); err != nil { - ts.Logger().Errorf("stopSourceWrites failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { @@ -595,9 +589,8 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa // the tablet's deny list check and the first mysqld side table lock. for cnt := 1; cnt <= lockTablesCycles; cnt++ { if err := ts.executeLockTablesOnSource(ctx); err != nil { - ts.Logger().Errorf("Failed to execute LOCK TABLES (attempt %d of %d) on sources: %v", cnt, lockTablesCycles, err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err) } // No need to UNLOCK the tables as the connection was closed once the locks were acquired // and thus the locks released. @@ -607,50 +600,42 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, timeout); err != nil { - ts.Logger().Errorf("waitForCatchup failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to sync up replication between the source and target", err) } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { - ts.Logger().Errorf("migrateStreams failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to migrate the workflow streams", err) } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { - ts.Logger().Errorf("resetSequences failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to reset the sequences", err) } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { - ts.Logger().Errorf("createReverseVReplication failed: %v", err) sw.cancelMigration(ctx, sm) - return 0, nil, err + return handleError("failed to create the reverse vreplication streams", err) } } else { if cancel { - err := fmt.Errorf("traffic switching has reached the point of no return, cannot cancel") - ts.Logger().Errorf("%v", err) - return 0, nil, err + return handleError("invalid cancel", fmt.Errorf("traffic switching has reached the point of no return, cannot cancel")) } ts.Logger().Infof("Journals were found. Completing the left over steps.") // Need to gather positions in case all journals were not created. if err := ts.gatherPositions(ctx); err != nil { - ts.Logger().Errorf("gatherPositions failed: %v", err) - return 0, nil, err + return handleError("failed to gather replication positions", err) } } // This is the point of no return. Once a journal is created, // traffic can be redirected to target shards. if err := sw.createJournals(ctx, sourceWorkflows); err != nil { - ts.Logger().Errorf("createJournals failed: %v", err) - return 0, nil, err + return handleError("failed to create the journal", err) } // Initialize any target sequences, if there are any, before allowing new writes. if initializeTargetSequences && len(sequenceMetadata) > 0 { @@ -660,33 +645,26 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { - werr := vterrors.Wrapf(err, "initializeTargetSequences failed") - ts.Logger().Error(werr) - return 0, nil, werr + return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err) } } if err := sw.allowTargetWrites(ctx); err != nil { - ts.Logger().Errorf("allowTargetWrites failed: %v", err) - return 0, nil, err + return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err) } if err := sw.changeRouting(ctx); err != nil { - ts.Logger().Errorf("changeRouting failed: %v", err) - return 0, nil, err + return handleError("failed to update the routing rules", err) } if err := sw.streamMigraterfinalize(ctx, ts, sourceWorkflows); err != nil { - ts.Logger().Errorf("finalize failed: %v", err) - return 0, nil, err + handleError("failed to finalize the traffic switch", err) } if reverseReplication { if err := sw.startReverseVReplication(ctx); err != nil { - ts.Logger().Errorf("startReverseVReplication failed: %v", err) - return 0, nil, err + return handleError("failed to start the reverse workflow", err) } } if err := sw.freezeTargetVReplication(ctx); err != nil { - ts.Logger().Errorf("deleteTargetVReplication failed: %v", err) - return 0, nil, err + return handleError(fmt.Sprintf("failed to freeze the workflow in the %s keyspace", ts.TargetKeyspaceName()), err) } return ts.id, sw.logs(), nil