diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index c0054e84d45..52b3fdc229f 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -844,7 +844,7 @@ func (p *processor) getTableName(ctx context.Context, tableID model.TableID) str retry.WithIsRetryableErr(cerror.IsRetryableError)) if tableName == nil { - log.Warn("failed to get table name for metric") + log.Warn("failed to get table name for metric", zap.Any("tableID", tableID)) return strconv.Itoa(int(tableID)) } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 9a33d460bbf..0ec7c7a671b 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -16,6 +16,7 @@ package replication import ( "bytes" "container/heap" + "fmt" "math" "time" @@ -51,12 +52,30 @@ type BurstBalance struct { MoveTables []MoveTable } +func (b BurstBalance) String() string { + if len(b.AddTables) != 0 { + return fmt.Sprintf("BurstBalance, add tables: %v", b.AddTables) + } + if len(b.RemoveTables) != 0 { + return fmt.Sprintf("BurstBalance, remove tables: %v", b.RemoveTables) + } + if len(b.MoveTables) != 0 { + return fmt.Sprintf("BurstBalance, move tables: %v", b.MoveTables) + } + return "BurstBalance, no tables" +} + // MoveTable is a schedule task for moving a table. type MoveTable struct { Span tablepb.Span DestCapture model.CaptureID } +func (t MoveTable) String() string { + return fmt.Sprintf("MoveTable, span: %s, dest: %s", + t.Span.String(), t.DestCapture) +} + // AddTable is a schedule task for adding a table. type AddTable struct { Span tablepb.Span @@ -64,12 +83,22 @@ type AddTable struct { CheckpointTs model.Ts } +func (t AddTable) String() string { + return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d", + t.Span.String(), t.CaptureID, t.CheckpointTs) +} + // RemoveTable is a schedule task for removing a table. type RemoveTable struct { Span tablepb.Span CaptureID model.CaptureID } +func (t RemoveTable) String() string { + return fmt.Sprintf("RemoveTable, span: %s, capture: %s", + t.Span.String(), t.CaptureID) +} + // ScheduleTask is a schedule task that wraps add/move/remove table tasks. type ScheduleTask struct { //nolint:revive MoveTable *MoveTable @@ -94,6 +123,22 @@ func (s *ScheduleTask) Name() string { return "unknown" } +func (s *ScheduleTask) String() string { + if s.MoveTable != nil { + return s.MoveTable.String() + } + if s.AddTable != nil { + return s.AddTable.String() + } + if s.RemoveTable != nil { + return s.RemoveTable.String() + } + if s.BurstBalance != nil { + return s.BurstBalance.String() + } + return "" +} + // Manager manages replications and running scheduling tasks. type Manager struct { //nolint:revive spans *spanz.BtreeMap[*ReplicationSet] @@ -295,7 +340,7 @@ func (r *Manager) HandleTasks( tasks []*ScheduleTask, ) ([]*schedulepb.Message, error) { // Check if a running task is finished. - toBeDeleted := []tablepb.Span{} + var toBeDeleted []tablepb.Span r.runningTasks.Ascend(func(span tablepb.Span, task *ScheduleTask) bool { if table, ok := r.spans.Get(span); ok { // If table is back to Replicating or Removed, @@ -687,7 +732,7 @@ func (r *Manager) logSlowTableInfo(currentPDTime time.Time) { zap.String("namespace", r.changefeedID.Namespace), zap.String("changefeed", r.changefeedID.ID), zap.Int64("tableID", table.Span.TableID), - zap.String("tableStatus", table.Stats.String()), + zap.String("tableStatus", table.State.String()), zap.Uint64("checkpointTs", table.Checkpoint.CheckpointTs), zap.Uint64("resolvedTs", table.Checkpoint.ResolvedTs), zap.Duration("checkpointLag", currentPDTime. diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index cdcf933597c..fec432499a8 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -200,8 +200,10 @@ func NewReplicationSet( // We need to wait its state becomes Stopped or Absent before // proceeding further scheduling. log.Warn("schedulerv3: found a stopping capture during initializing", - zap.Any("replicationSet", r), + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Int64("tableID", table.Span.TableID), + zap.Any("replicationSet", r), zap.Any("status", tableStatus)) err := r.setCapture(captureID, RoleUndetermined) if err != nil { @@ -213,8 +215,10 @@ func NewReplicationSet( // Ignore stop state. default: log.Warn("schedulerv3: unknown table state", - zap.Any("replicationSet", r), + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Int64("tableID", table.Span.TableID), + zap.Any("replicationSet", r), zap.Any("status", tableStatus)) } } @@ -238,6 +242,8 @@ func NewReplicationSet( r.State = ReplicationSetStateRemoving } log.Info("schedulerv3: initialize replication set", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r)) return r, nil @@ -289,7 +295,10 @@ func (r *ReplicationSet) clearCapture(captureID model.CaptureID, role Role) erro func (r *ReplicationSet) promoteSecondary(captureID model.CaptureID) error { if r.Primary == captureID { - log.Warn("schedulerv3: capture is already promoted", + log.Warn("schedulerv3: capture is already promoted as the primary", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.String("captureID", captureID), zap.Any("replicationSet", r)) return nil } @@ -317,6 +326,8 @@ func (r *ReplicationSet) inconsistentError( input *tablepb.TableStatus, captureID model.CaptureID, msg string, fields ...zap.Field, ) error { fields = append(fields, []zap.Field{ + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.String("captureID", captureID), zap.Stringer("tableState", input), zap.Any("replicationSet", r), @@ -330,6 +341,8 @@ func (r *ReplicationSet) multiplePrimaryError( input *tablepb.TableStatus, captureID model.CaptureID, msg string, fields ...zap.Field, ) error { fields = append(fields, []zap.Field{ + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.String("captureID", captureID), zap.Stringer("tableState", input), zap.Any("replicationSet", r), @@ -392,7 +405,7 @@ func (r *ReplicationSet) poll( var msg *schedulepb.Message switch r.State { case ReplicationSetStateAbsent: - msg, stateChanged, err = r.pollOnAbsent(input, captureID) + stateChanged, err = r.pollOnAbsent(input, captureID) case ReplicationSetStatePrepare: msg, stateChanged, err = r.pollOnPrepare(input, captureID) case ReplicationSetStateCommit: @@ -413,12 +426,12 @@ func (r *ReplicationSet) poll( } if stateChanged { log.Info("schedulerv3: replication state transition, poll", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Stringer("old", oldState), - zap.Stringer("new", r.State), - zap.String("namespace", r.Changefeed.Namespace), - zap.String("changefeed", r.Changefeed.ID)) + zap.Stringer("new", r.State)) } } @@ -428,26 +441,28 @@ func (r *ReplicationSet) poll( //nolint:unparam func (r *ReplicationSet) pollOnAbsent( input *tablepb.TableStatus, captureID model.CaptureID, -) (*schedulepb.Message, bool, error) { +) (bool, error) { switch input.State { case tablepb.TableStateAbsent: r.State = ReplicationSetStatePrepare err := r.setCapture(captureID, RoleSecondary) - return nil, true, errors.Trace(err) + return true, errors.Trace(err) case tablepb.TableStateStopped: // Ignore stopped table state as a capture may shutdown unexpectedly. - return nil, false, nil + return false, nil case tablepb.TableStatePreparing, tablepb.TableStatePrepared, tablepb.TableStateReplicating, tablepb.TableStateStopping: } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) - return nil, false, nil + return false, nil } func (r *ReplicationSet) pollOnPrepare( @@ -491,6 +506,8 @@ func (r *ReplicationSet) pollOnPrepare( // Primary is stopped, but we may still has secondary. // Clear primary and promote secondary when it's prepared. log.Info("schedulerv3: primary is stopped during Prepare", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -499,6 +516,8 @@ func (r *ReplicationSet) pollOnPrepare( } if r.isInRole(captureID, RoleSecondary) { log.Info("schedulerv3: capture is stopped during Prepare", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -519,6 +538,8 @@ func (r *ReplicationSet) pollOnPrepare( } } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -552,6 +573,8 @@ func (r *ReplicationSet) pollOnCommit( // before promoting the secondary, otherwise there may be two // primary that write data and lead to data inconsistency. log.Info("schedulerv3: there are unknown captures during commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("tableState", input), zap.String("captureID", captureID)) @@ -564,6 +587,8 @@ func (r *ReplicationSet) pollOnCommit( } log.Info("schedulerv3: promote secondary, no primary", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("tableState", input), zap.String("captureID", captureID)) @@ -593,6 +618,8 @@ func (r *ReplicationSet) pollOnCommit( if !r.hasRole(RoleSecondary) { // If there is no secondary, transit to Absent. log.Info("schedulerv3: primary is stopped during Commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -606,6 +633,8 @@ func (r *ReplicationSet) pollOnCommit( return nil, false, errors.Trace(err) } log.Info("schedulerv3: replication state promote secondary", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("tableState", input), zap.String("original", original), @@ -628,6 +657,8 @@ func (r *ReplicationSet) pollOnCommit( // upon entering Commit state. Do not change state and wait // the original primary reports its table. log.Info("schedulerv3: secondary is stopped during Commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -642,6 +673,8 @@ func (r *ReplicationSet) pollOnCommit( return nil, true, nil } else if r.isInRole(captureID, RoleUndetermined) { log.Info("schedulerv3: capture is stopped during Commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -690,6 +723,8 @@ func (r *ReplicationSet) pollOnCommit( return nil, false, nil } else if r.isInRole(captureID, RoleUndetermined) { log.Info("schedulerv3: capture is stopping during Commit", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -699,6 +734,8 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStatePreparing: } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -728,6 +765,8 @@ func (r *ReplicationSet) pollOnReplicating( // Primary is stopped, but we still has secondary. // Clear primary and promote secondary when it's prepared. log.Info("schedulerv3: primary is stopped during Replicating", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -737,6 +776,8 @@ func (r *ReplicationSet) pollOnReplicating( } } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -763,26 +804,30 @@ func (r *ReplicationSet) pollOnRemoving( }, }, false, nil case tablepb.TableStateAbsent, tablepb.TableStateStopped: - errField := zap.Skip() + var err error if r.Primary == captureID { r.clearPrimary() } else if r.isInRole(captureID, RoleSecondary) { - err := r.clearCapture(captureID, RoleSecondary) - errField = zap.Error(err) + err = r.clearCapture(captureID, RoleSecondary) } else { - err := r.clearCapture(captureID, RoleUndetermined) - errField = zap.Error(err) + err = r.clearCapture(captureID, RoleUndetermined) + } + if err != nil { + log.Warn("schedulerv3: replication state remove capture with error", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Any("replicationSet", r), + zap.Stringer("tableState", input), + zap.String("captureID", captureID), + zap.Error(err)) } - log.Info("schedulerv3: replication state remove capture", - zap.Any("replicationSet", r), - zap.Stringer("tableState", input), - zap.String("captureID", captureID), - errField) return nil, false, nil case tablepb.TableStateStopping: return nil, false, nil } log.Warn("schedulerv3: ignore input, unexpected replication set state", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -801,7 +846,10 @@ func (r *ReplicationSet) handleAddTable( // Ignore add table if it's not in Absent state. if r.State != ReplicationSetStateAbsent { log.Warn("schedulerv3: add table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } err := r.setCapture(captureID, RoleSecondary) @@ -820,6 +868,8 @@ func (r *ReplicationSet) handleAddTable( } log.Info("schedulerv3: replication state transition, add table", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), zap.Any("replicationSet", r), zap.Stringer("old", oldState), zap.Stringer("new", r.State)) return msgs, nil @@ -831,7 +881,10 @@ func (r *ReplicationSet) handleMoveTable( // Ignore move table if it has been removed already. if r.hasRemoved() { log.Warn("schedulerv3: move table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } // Ignore move table if @@ -839,7 +892,10 @@ func (r *ReplicationSet) handleMoveTable( // 2) the dest capture is the primary. if r.State != ReplicationSetStateReplicating || r.Primary == dest { log.Warn("schedulerv3: move table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } oldState := r.State @@ -849,8 +905,11 @@ func (r *ReplicationSet) handleMoveTable( return nil, errors.Trace(err) } log.Info("schedulerv3: replication state transition, move table", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Stringer("new", r.State), zap.Any("replicationSet", r), - zap.Stringer("old", oldState), zap.Stringer("new", r.State)) + zap.Stringer("old", oldState)) status := tablepb.TableStatus{ Span: r.Span, State: tablepb.TableStateAbsent, @@ -863,20 +922,29 @@ func (r *ReplicationSet) handleRemoveTable() ([]*schedulepb.Message, error) { // Ignore remove table if it has been removed already. if r.hasRemoved() { log.Warn("schedulerv3: remove table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } // Ignore remove table if it's not in Replicating state. if r.State != ReplicationSetStateReplicating { log.Warn("schedulerv3: remove table is ignored", - zap.Any("replicationSet", r), zap.Int64("tableID", r.Span.TableID)) + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r)) return nil, nil } oldState := r.State r.State = ReplicationSetStateRemoving log.Info("schedulerv3: replication state transition, remove table", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), zap.Any("replicationSet", r), - zap.Stringer("old", oldState), zap.Stringer("new", r.State)) + zap.Stringer("old", oldState)) status := tablepb.TableStatus{ Span: r.Span, State: tablepb.TableStateReplicating, @@ -910,7 +978,14 @@ func (r *ReplicationSet) handleCaptureShutdown( Span: r.Span, State: tablepb.TableStateStopped, } + oldState := r.State msgs, err := r.poll(&status, captureID) + log.Info("schedulerv3: replication state transition, capture shutdown", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), + zap.Any("replicationSet", r), + zap.Stringer("old", oldState), zap.Stringer("new", r.State)) return msgs, true, errors.Trace(err) } @@ -919,6 +994,9 @@ func (r *ReplicationSet) updateCheckpointAndStats( ) { if checkpoint.ResolvedTs < checkpoint.CheckpointTs { log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), zap.Any("replicationSet", r), zap.Any("checkpoint", checkpoint)) @@ -935,6 +1013,9 @@ func (r *ReplicationSet) updateCheckpointAndStats( } if r.Checkpoint.ResolvedTs < r.Checkpoint.CheckpointTs { log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", + zap.String("namespace", r.Changefeed.Namespace), + zap.String("changefeed", r.Changefeed.ID), + zap.Int64("tableID", r.Span.TableID), zap.Any("replicationSet", r), zap.Any("checkpointTs", r.Checkpoint.CheckpointTs), zap.Any("resolvedTs", r.Checkpoint.ResolvedTs)) diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go index f98ca68a0be..2efffdbda7a 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go @@ -101,13 +101,8 @@ func (b *basicScheduler) Schedule( zap.Any("allCaptureStatus", captures)) return tasks } - log.Info("schedulerv3: burst add table", - zap.String("namespace", b.changefeedID.Namespace), - zap.String("changefeed", b.changefeedID.ID), - zap.Strings("captureIDs", captureIDs), - zap.Int("tableCount", len(newSpans))) tasks = append( - tasks, newBurstAddTables(checkpointTs, newSpans, captureIDs)) + tasks, newBurstAddTables(b.changefeedID, checkpointTs, newSpans, captureIDs)) } // Build remove table tasks. @@ -116,15 +111,14 @@ func (b *basicScheduler) Schedule( // Fast path for check whether two sets are identical: // If the length of currentTables and replications are equal, // and for all tables in currentTables have a record in replications. - if !tablesLenEqual || !tablesAllFind { + if !(tablesLenEqual && tablesAllFind) { // The two sets are not identical. We need to find removed tables. intersectionTable := spanz.NewBtreeMap[struct{}]() for _, span := range currentSpans { _, ok := replications.Get(span) - if !ok { - continue + if ok { + intersectionTable.ReplaceOrInsert(span, struct{}{}) } - intersectionTable.ReplaceOrInsert(span, struct{}{}) } rmSpans := make([]tablepb.Span, 0) replications.Ascend(func(span tablepb.Span, value *replication.ReplicationSet) bool { @@ -134,13 +128,10 @@ func (b *basicScheduler) Schedule( } return true }) - if len(rmSpans) > 0 { - log.Info("schedulerv3: burst remove table", - zap.String("namespace", b.changefeedID.Namespace), - zap.String("changefeed", b.changefeedID.ID), - zap.Int("tableCount", len(newSpans))) - tasks = append(tasks, - newBurstRemoveTables(rmSpans, replications, b.changefeedID)) + + removeTableTasks := newBurstRemoveTables(rmSpans, replications, b.changefeedID) + if removeTableTasks != nil { + tasks = append(tasks, removeTableTasks) } } return tasks @@ -148,24 +139,34 @@ func (b *basicScheduler) Schedule( // newBurstAddTables add each new table to captures in a round-robin way. func newBurstAddTables( + changefeedID model.ChangeFeedID, checkpointTs model.Ts, newSpans []tablepb.Span, captureIDs []model.CaptureID, ) *replication.ScheduleTask { idx := 0 tables := make([]replication.AddTable, 0, len(newSpans)) for _, span := range newSpans { + targetCapture := captureIDs[idx] tables = append(tables, replication.AddTable{ Span: span, - CaptureID: captureIDs[idx], + CaptureID: targetCapture, CheckpointTs: checkpointTs, }) + log.Info("schedulerv3: burst add table", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.String("captureID", targetCapture), + zap.Any("tableID", span.TableID)) + idx++ if idx >= len(captureIDs) { idx = 0 } } - return &replication.ScheduleTask{BurstBalance: &replication.BurstBalance{ - AddTables: tables, - }} + return &replication.ScheduleTask{ + BurstBalance: &replication.BurstBalance{ + AddTables: tables, + }, + } } func newBurstRemoveTables( @@ -181,7 +182,8 @@ func newBurstRemoveTables( break } if captureID == "" { - log.Warn("schedulerv3: primary or secondary not found for removed table", + log.Warn("schedulerv3: primary or secondary not found for removed table,"+ + "this may happen if the capture shutdown", zap.String("namespace", changefeedID.Namespace), zap.String("changefeed", changefeedID.ID), zap.Any("table", rep)) @@ -191,8 +193,20 @@ func newBurstRemoveTables( Span: span, CaptureID: captureID, }) + log.Info("schedulerv3: burst remove table", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.String("captureID", captureID), + zap.Any("tableID", span.TableID)) + } + + if len(tables) == 0 { + return nil + } + + return &replication.ScheduleTask{ + BurstBalance: &replication.BurstBalance{ + RemoveTables: tables, + }, } - return &replication.ScheduleTask{BurstBalance: &replication.BurstBalance{ - RemoveTables: tables, - }} } diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go index f45fd6100c9..76817b344f6 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go @@ -88,12 +88,6 @@ func (sm *Manager) Schedule( sm.tasksCounter[name]++ } if len(tasks) != 0 { - log.Info("schedulerv3: new schedule task", - zap.String("namespace", sm.changefeedID.Namespace), - zap.String("changefeed", sm.changefeedID.ID), - zap.Int("taskNumber", len(tasks)), - zap.Any("task", tasks), - zap.String("scheduler", scheduler.Name())) return tasks } } diff --git a/pkg/p2p/grpc_client.go b/pkg/p2p/grpc_client.go index 96a61ffcf04..23fc36518c5 100644 --- a/pkg/p2p/grpc_client.go +++ b/pkg/p2p/grpc_client.go @@ -91,11 +91,6 @@ func (c *grpcMessageClient) Run( defer func() { c.isClosed.Store(true) close(c.closeCh) - - log.Info("peer message client exited", - zap.String("addr", addr), - zap.String("captureID", receiverID), - zap.Error(ret)) }() metricsClientCount := clientCount.With(prometheus.Labels{ diff --git a/pkg/p2p/message_router.go b/pkg/p2p/message_router.go index 791a593a0be..56576190561 100644 --- a/pkg/p2p/message_router.go +++ b/pkg/p2p/message_router.go @@ -140,11 +140,16 @@ func (m *messageRouterImpl) GetClient(target NodeID) MessageClient { defer m.wg.Done() defer cancel() err := client.Run(ctx, "tcp", addr, target, m.credentials) - log.Warn("p2p client exited with error", - zap.String("addr", addr), - zap.String("targetCapture", target), - zap.Error(err)) - + if err != nil { + log.Warn("p2p client exited with error", + zap.String("addr", addr), + zap.String("targetCapture", target), + zap.Error(err)) + } else { + log.Info("peer message client exited", + zap.String("addr", addr), + zap.String("targetCapture", target)) + } if errors.Cause(err) != context.Canceled { // Send the error to the error channel. select {