diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 32dca75e1e1..26d8f5432ad 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler" + "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/orchestrator" @@ -43,17 +44,23 @@ import ( // This function is factored out to facilitate unit testing. func newSchedulerV2FromCtx( ctx cdcContext.Context, startTs uint64, -) (scheduler.Scheduler, error) { +) (ret scheduler.Scheduler, err error) { changeFeedID := ctx.ChangefeedVars().ID messageServer := ctx.GlobalVars().MessageServer messageRouter := ctx.GlobalVars().MessageRouter ownerRev := ctx.GlobalVars().OwnerRevision - ret, err := scheduler.NewScheduler( - ctx, changeFeedID, startTs, messageServer, messageRouter, ownerRev) - if err != nil { - return nil, errors.Trace(err) - } - return ret, nil + captureID := ctx.GlobalVars().CaptureInfo.ID + cfg := config.GetGlobalServerConfig().Debug + if cfg.EnableTwoPhaseScheduler { + ret, err = scheduler.NewTpScheduler( + ctx, captureID, changeFeedID, startTs, + messageServer, messageRouter, ownerRev, cfg.Scheduler) + } else { + ret, err = scheduler.NewScheduler( + ctx, captureID, changeFeedID, startTs, + messageServer, messageRouter, ownerRev, cfg.Scheduler) + } + return ret, errors.Trace(err) } func newScheduler(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 33fa3ec3a7f..fd0255a0cdf 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -371,12 +371,14 @@ func (p *processor) GetTableMeta(tableID model.TableID) pipeline.TableMeta { table, ok := p.tables[tableID] if !ok { return pipeline.TableMeta{ + TableID: tableID, CheckpointTs: 0, ResolvedTs: 0, Status: pipeline.TableStateAbsent, } } return pipeline.TableMeta{ + TableID: tableID, CheckpointTs: table.CheckpointTs(), ResolvedTs: table.ResolvedTs(), Status: table.Status(), @@ -661,15 +663,20 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { return nil } -func (p *processor) newAgentImpl(ctx cdcContext.Context) (scheduler.Agent, error) { +func (p *processor) newAgentImpl(ctx cdcContext.Context) (ret scheduler.Agent, err error) { messageServer := ctx.GlobalVars().MessageServer messageRouter := ctx.GlobalVars().MessageRouter etcdClient := ctx.GlobalVars().EtcdClient - ret, err := scheduler.NewAgent(ctx, messageServer, messageRouter, etcdClient, p, p.changefeedID) - if err != nil { - return nil, errors.Trace(err) + captureID := ctx.GlobalVars().CaptureInfo.ID + cfg := config.GetGlobalServerConfig().Debug + if cfg.EnableTwoPhaseScheduler { + ret, err = scheduler.NewTpAgent( + ctx, captureID, messageServer, messageRouter, etcdClient, p, p.changefeedID) + } else { + ret, err = scheduler.NewAgent( + ctx, captureID, messageServer, messageRouter, etcdClient, p, p.changefeedID) } - return ret, nil + return ret, errors.Trace(err) } // handleErrorCh listen the error channel and throw the error if it is not expected. diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index c725f10afd2..e6239012120 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -671,3 +671,6 @@ func TestUpdateBarrierTs(t *testing.T) { tb = p.tables[model.TableID(1)].(*mockTablePipeline) require.Equal(t, tb.barrierTs, uint64(15)) } + +func TestGetTableMeta(t *testing.T) { +} diff --git a/cdc/scheduler/internal/tp/agent.go b/cdc/scheduler/internal/tp/agent.go index 838fbdcc54f..3e743e1c70e 100644 --- a/cdc/scheduler/internal/tp/agent.go +++ b/cdc/scheduler/internal/tp/agent.go @@ -76,7 +76,7 @@ func NewAgent(ctx context.Context, tableExec: tableExecutor, runningTasks: make(map[model.TableID]*dispatchTableTask), } - trans, err := newTransport(ctx, changeFeedID, messageServer, messageRouter) + trans, err := newTransport(ctx, changeFeedID, agentRole, messageServer, messageRouter) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/scheduler/internal/tp/capture_manager.go b/cdc/scheduler/internal/tp/capture_manager.go index 4bb4d5a250e..c8823f644b5 100644 --- a/cdc/scheduler/internal/tp/capture_manager.go +++ b/cdc/scheduler/internal/tp/capture_manager.go @@ -22,19 +22,19 @@ import ( // CaptureState is the state of a capture. // -// ┌──────────────┐ Heartbeat Resp ┌─────────────┐ -// │ Uninitialize ├───────────────>│ Initialized │ -// └──────┬───────┘ └──────┬──────┘ -// │ │ -// IsStopping │ ┌──────────┐ │ IsStopping -// └────────> │ Stopping │ <───────┘ +// ┌───────────────┐ Heartbeat Resp ┌─────────────┐ +// │ Uninitialized ├───────────────>│ Initialized │ +// └──────┬────────┘ └──────┬──────┘ +// │ │ +// IsStopping │ ┌──────────┐ │ IsStopping +// └────────> │ Stopping │ <────────┘ // └──────────┘ type CaptureState int const ( - // CaptureStateUninitialize means the capture status is unknown, + // CaptureStateUninitialized means the capture status is unknown, // no heartbeat response received yet. - CaptureStateUninitialize CaptureState = 1 + CaptureStateUninitialized CaptureState = 1 // CaptureStateInitialized means owner has received heartbeat response. CaptureStateInitialized CaptureState = 2 // CaptureStateStopping means the capture is removing, e.g., shutdown. @@ -50,14 +50,14 @@ type CaptureStatus struct { } func newCaptureStatus(rev schedulepb.OwnerRevision) *CaptureStatus { - return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialize} + return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialized} } func (c *CaptureStatus) handleHeartbeatResponse( resp *schedulepb.HeartbeatResponse, epoch schedulepb.ProcessorEpoch, ) { // Check epoch for initialized captures. - if c.State != CaptureStateUninitialize && c.Epoch.Epoch != epoch.Epoch { + if c.State != CaptureStateUninitialized && c.Epoch.Epoch != epoch.Epoch { log.Warn("tpscheduler: ignore heartbeat response", zap.String("epoch", c.Epoch.Epoch), zap.String("respEpoch", epoch.Epoch), @@ -65,7 +65,7 @@ func (c *CaptureStatus) handleHeartbeatResponse( return } - if c.State == CaptureStateUninitialize { + if c.State == CaptureStateUninitialized { c.Epoch = epoch c.State = CaptureStateInitialized } @@ -112,7 +112,10 @@ func (c *captureManager) CheckAllCaptureInitialized() bool { func (c *captureManager) checkAllCaptureInitialized() bool { for _, captureStatus := range c.Captures { - if captureStatus.State == CaptureStateUninitialize { + // CaptureStateStopping is also considered initialized, because when + // a capture shutdown, it becomes stopping, we need to move its tables + // to other captures. + if captureStatus.State == CaptureStateUninitialized { return false } } @@ -190,7 +193,7 @@ func (c *captureManager) HandleAliveCaptureUpdate( log.Info("tpscheduler: removed a capture", zap.String("capture", id)) delete(c.Captures, id) - // Only update changes after initializtion. + // Only update changes after initialization. if !c.initialized { continue } diff --git a/cdc/scheduler/internal/tp/capture_manager_test.go b/cdc/scheduler/internal/tp/capture_manager_test.go index bc5f65cbd51..dd64d627953 100644 --- a/cdc/scheduler/internal/tp/capture_manager_test.go +++ b/cdc/scheduler/internal/tp/capture_manager_test.go @@ -27,7 +27,7 @@ func TestCaptureStatusHandleHeartbeatResponse(t *testing.T) { rev := schedulepb.OwnerRevision{Revision: 1} epoch := schedulepb.ProcessorEpoch{Epoch: "test"} c := newCaptureStatus(rev) - require.Equal(t, CaptureStateUninitialize, c.State) + require.Equal(t, CaptureStateUninitialized, c.State) // Uninitialize -> Initialized c.handleHeartbeatResponse(&schedulepb.HeartbeatResponse{}, epoch) diff --git a/cdc/scheduler/internal/tp/coordinator.go b/cdc/scheduler/internal/tp/coordinator.go index f9787cc1e6d..12fbe75c652 100644 --- a/cdc/scheduler/internal/tp/coordinator.go +++ b/cdc/scheduler/internal/tp/coordinator.go @@ -44,6 +44,7 @@ var _ internal.Scheduler = (*coordinator)(nil) type coordinator struct { version string revision schedulepb.OwnerRevision + captureID model.CaptureID trans transport scheduler []scheduler replicationM *replicationManager @@ -53,6 +54,7 @@ type coordinator struct { // NewCoordinator returns a two phase scheduler. func NewCoordinator( ctx context.Context, + captureID model.CaptureID, changeFeedID model.ChangeFeedID, checkpointTs model.Ts, messageServer *p2p.MessageServer, @@ -60,7 +62,7 @@ func NewCoordinator( ownerRevision int64, cfg *config.SchedulerConfig, ) (internal.Scheduler, error) { - trans, err := newTransport(ctx, changeFeedID, messageServer, messageRouter) + trans, err := newTransport(ctx, changeFeedID, schedulerRole, messageServer, messageRouter) if err != nil { return nil, errors.Trace(err) } @@ -68,6 +70,7 @@ func NewCoordinator( return &coordinator{ version: version.ReleaseSemver(), revision: revision, + captureID: captureID, trans: trans, scheduler: []scheduler{newBalancer()}, replicationM: newReplicationManager(cfg.MaxTaskConcurrency), @@ -171,8 +174,8 @@ func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, erro n := 0 for _, val := range recvMsgs { - // Filter stale messages. - if val.Header.OwnerRevision == c.revision { + // Filter stale messages and lost messages. + if val.Header.OwnerRevision == c.revision && val.To == c.captureID { recvMsgs[n] = val n++ } @@ -183,15 +186,23 @@ func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, erro func (c *coordinator) sendMsgs(ctx context.Context, msgs []*schedulepb.Message) error { for i := range msgs { m := msgs[i] - m.Header = &schedulepb.Message_Header{ - Version: c.version, - OwnerRevision: c.revision, - } // Correctness check. if len(m.To) == 0 || m.MsgType == schedulepb.MsgUnknown { log.Panic("invalid message no destination or unknown message type", zap.Any("message", m)) } + + epoch := schedulepb.ProcessorEpoch{} + if capture := c.captureM.Captures[m.To]; capture != nil { + epoch = capture.Epoch + } + m.Header = &schedulepb.Message_Header{ + Version: c.version, + OwnerRevision: c.revision, + ProcessorEpoch: epoch, + } + m.From = c.captureID + } return c.trans.Send(ctx, msgs) } diff --git a/cdc/scheduler/internal/tp/coordinator_test.go b/cdc/scheduler/internal/tp/coordinator_test.go index 4a7dabecbb7..9f3369627a7 100644 --- a/cdc/scheduler/internal/tp/coordinator_test.go +++ b/cdc/scheduler/internal/tp/coordinator_test.go @@ -67,20 +67,33 @@ func TestCoordinatorSendMsgs(t *testing.T) { t.Parallel() ctx := context.Background() trans := newMockTrans() - cood := coordinator{ - version: "6.2.0", - revision: schedulepb.OwnerRevision{Revision: 3}, - trans: trans, + coord := coordinator{ + version: "6.2.0", + revision: schedulepb.OwnerRevision{Revision: 3}, + captureID: "0", + trans: trans, } - cood.sendMsgs( + coord.captureM = newCaptureManager(coord.revision, 0) + coord.sendMsgs( + ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}}) + + coord.captureM.Captures["1"] = &CaptureStatus{Epoch: schedulepb.ProcessorEpoch{Epoch: "epoch"}} + coord.sendMsgs( ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}}) require.EqualValues(t, []*schedulepb.Message{{ Header: &schedulepb.Message_Header{ - Version: cood.version, - OwnerRevision: cood.revision, + Version: coord.version, + OwnerRevision: coord.revision, + }, + From: "0", To: "1", MsgType: schedulepb.MsgDispatchTableRequest, + }, { + Header: &schedulepb.Message_Header{ + Version: coord.version, + OwnerRevision: coord.revision, + ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "epoch"}, }, - To: "1", MsgType: schedulepb.MsgDispatchTableRequest, + From: "0", To: "1", MsgType: schedulepb.MsgDispatchTableRequest, }}, trans.sendBuffer) } @@ -90,9 +103,10 @@ func TestCoordinatorRecvMsgs(t *testing.T) { ctx := context.Background() trans := &mockTrans{} coord := coordinator{ - version: "6.2.0", - revision: schedulepb.OwnerRevision{Revision: 3}, - trans: trans, + version: "6.2.0", + revision: schedulepb.OwnerRevision{Revision: 3}, + captureID: "0", + trans: trans, } trans.recvBuffer = append(trans.recvBuffer, @@ -100,14 +114,21 @@ func TestCoordinatorRecvMsgs(t *testing.T) { Header: &schedulepb.Message_Header{ OwnerRevision: coord.revision, }, - From: "1", MsgType: schedulepb.MsgDispatchTableResponse, + From: "1", To: coord.captureID, MsgType: schedulepb.MsgDispatchTableResponse, }) trans.recvBuffer = append(trans.recvBuffer, &schedulepb.Message{ Header: &schedulepb.Message_Header{ OwnerRevision: schedulepb.OwnerRevision{Revision: 4}, }, - From: "2", MsgType: schedulepb.MsgDispatchTableResponse, + From: "2", To: coord.captureID, MsgType: schedulepb.MsgDispatchTableResponse, + }) + trans.recvBuffer = append(trans.recvBuffer, + &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + OwnerRevision: coord.revision, + }, + From: "3", To: "lost", MsgType: schedulepb.MsgDispatchTableResponse, }) msgs, err := coord.recvMsgs(ctx) @@ -116,7 +137,7 @@ func TestCoordinatorRecvMsgs(t *testing.T) { Header: &schedulepb.Message_Header{ OwnerRevision: coord.revision, }, - From: "1", MsgType: schedulepb.MsgDispatchTableResponse, + From: "1", To: "0", MsgType: schedulepb.MsgDispatchTableResponse, }}, msgs) } diff --git a/cdc/scheduler/internal/tp/replication_manager.go b/cdc/scheduler/internal/tp/replication_manager.go index 555b6b77aff..0e571c86b07 100644 --- a/cdc/scheduler/internal/tp/replication_manager.go +++ b/cdc/scheduler/internal/tp/replication_manager.go @@ -30,7 +30,7 @@ type callback func() type burstBalance struct { // Add tables to captures AddTables, RemoveTables map[model.TableID]model.CaptureID - checkpointTs model.Ts + CheckpointTs model.Ts } type moveTable struct { @@ -331,6 +331,7 @@ func (r *replicationManager) handleBurstBalanceTasks( fields = append(fields, zap.Int("total", len(task.AddTables)+len(task.RemoveTables))) log.Info("tpscheduler: handle burst balance task", fields...) + checkpointTs := task.CheckpointTs sentMsgs := make([]*schedulepb.Message, 0, len(task.AddTables)) for tableID, captureID := range task.AddTables { if r.runningTasks[tableID] != nil { @@ -338,7 +339,7 @@ func (r *replicationManager) handleBurstBalanceTasks( continue } msgs, err := r.handleAddTableTask(&addTable{ - TableID: tableID, CaptureID: captureID, + TableID: tableID, CaptureID: captureID, CheckpointTs: checkpointTs, }) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/scheduler/internal/tp/replication_manager_test.go b/cdc/scheduler/internal/tp/replication_manager_test.go index 04e7aae0988..0fccecada43 100644 --- a/cdc/scheduler/internal/tp/replication_manager_test.go +++ b/cdc/scheduler/internal/tp/replication_manager_test.go @@ -28,7 +28,7 @@ func TestReplicationManagerHandleAddTableTask(t *testing.T) { addTableCh := make(chan int, 1) // Absent -> Prepare msgs, err := r.HandleTasks([]*scheduleTask{{ - addTable: &addTable{TableID: 1, CaptureID: "1"}, + addTable: &addTable{TableID: 1, CaptureID: "1", CheckpointTs: 1}, accept: func() { addTableCh <- 1 close(addTableCh) @@ -44,6 +44,7 @@ func TestReplicationManagerHandleAddTableTask(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: true, + Checkpoint: schedulepb.Checkpoint{CheckpointTs: 1}, }, }, }, @@ -84,6 +85,7 @@ func TestReplicationManagerHandleAddTableTask(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: false, + Checkpoint: schedulepb.Checkpoint{CheckpointTs: 1}, }, }, }, @@ -358,11 +360,14 @@ func TestReplicationManagerBurstBalance(t *testing.T) { // Burst balance is not limited by maxTaskConcurrency. msgs, err := r.HandleTasks([]*scheduleTask{{ - addTable: &addTable{TableID: 1, CaptureID: "0"}, + addTable: &addTable{TableID: 1, CaptureID: "0", CheckpointTs: 1}, }, { - burstBalance: &burstBalance{AddTables: map[int64]string{ - 1: "1", 2: "2", 3: "3", - }}, + burstBalance: &burstBalance{ + AddTables: map[int64]string{ + 1: "1", 2: "2", 3: "3", + }, + CheckpointTs: 1, + }, accept: func() { balanceTableCh <- 1 }, @@ -381,6 +386,7 @@ func TestReplicationManagerBurstBalance(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: tableID, IsSecondary: true, + Checkpoint: schedulepb.Checkpoint{CheckpointTs: 1}, }, }, }, @@ -400,6 +406,7 @@ func TestReplicationManagerBurstBalance(t *testing.T) { burstBalance: &burstBalance{ AddTables: map[int64]string{4: "4", 1: "0"}, RemoveTables: map[int64]string{5: "5", 1: "0"}, + CheckpointTs: 2, }, accept: func() { balanceTableCh <- 1 @@ -416,6 +423,7 @@ func TestReplicationManagerBurstBalance(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 4, IsSecondary: true, + Checkpoint: schedulepb.Checkpoint{CheckpointTs: 2}, }, }, }, diff --git a/cdc/scheduler/internal/tp/replication_set.go b/cdc/scheduler/internal/tp/replication_set.go index 4d924c30adb..2bee9501689 100644 --- a/cdc/scheduler/internal/tp/replication_set.go +++ b/cdc/scheduler/internal/tp/replication_set.go @@ -381,6 +381,9 @@ func (r *ReplicationSet) pollOnCommit( zap.Stringer("tableState", input), zap.String("original", original), zap.String("captureID", captureID)) + } + // Secondary has been promoted, retry AddTableRequest. + if r.Primary == captureID && r.Secondary == "" { return &schedulepb.Message{ To: captureID, MsgType: schedulepb.MsgDispatchTableRequest, @@ -395,6 +398,7 @@ func (r *ReplicationSet) pollOnCommit( }, }, false, nil } + case schedulepb.TableStateStopped, schedulepb.TableStateAbsent: if r.Primary == captureID { r.updateCheckpoint(input.Checkpoint) diff --git a/cdc/scheduler/internal/tp/replication_set_test.go b/cdc/scheduler/internal/tp/replication_set_test.go index 2d0d48da53f..16b2132d047 100644 --- a/cdc/scheduler/internal/tp/replication_set_test.go +++ b/cdc/scheduler/internal/tp/replication_set_test.go @@ -387,6 +387,29 @@ func TestReplicationSetAddTable(t *testing.T) { require.Equal(t, ReplicationSetStateCommit, r.State) require.Equal(t, from, r.Primary) require.Equal(t, "", r.Secondary) + // The secondary AddTable request may be lost. + msgs, err = r.handleTableStatus(from, &schedulepb.TableStatus{ + TableID: tableID, + State: schedulepb.TableStatePrepared, + }) + require.Nil(t, err) + require.Len(t, msgs, 1) + require.EqualValues(t, &schedulepb.Message{ + To: from, + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: false, + Checkpoint: r.Checkpoint, + }, + }, + }, + }, msgs[0]) + require.Equal(t, ReplicationSetStateCommit, r.State) + require.Equal(t, from, r.Primary) + require.Equal(t, "", r.Secondary) // Commit -> Replicating msgs, err = r.handleTableStatus(from, &schedulepb.TableStatus{ diff --git a/cdc/scheduler/internal/tp/scheduler_balance.go b/cdc/scheduler/internal/tp/scheduler_balance.go index bd5d73b833f..d25d2e86a50 100644 --- a/cdc/scheduler/internal/tp/scheduler_balance.go +++ b/cdc/scheduler/internal/tp/scheduler_balance.go @@ -72,20 +72,22 @@ func (b *balancer) Schedule( // TODO support table re-balance when adding a new capture. tasks := make([]*scheduleTask, 0) if len(newTables) != 0 { - tasks = append(tasks, newBurstBalanceAddTables(newTables, captureIDs)) + tasks = append( + tasks, newBurstBalanceAddTables(checkpointTs, newTables, captureIDs)) if len(newTables) == len(currentTables) { return tasks } } if len(rmTables) > 0 { - tasks = append(tasks, newBurstBalanceRemoveTables(rmTables, replications)) + tasks = append( + tasks, newBurstBalanceRemoveTables(checkpointTs, rmTables, replications)) } return tasks } func newBurstBalanceAddTables( - newTables []model.TableID, captureIDs []model.CaptureID, + checkpointTs model.Ts, newTables []model.TableID, captureIDs []model.CaptureID, ) *scheduleTask { idx := 0 tables := make(map[model.TableID]model.CaptureID) @@ -96,11 +98,14 @@ func newBurstBalanceAddTables( idx = 0 } } - return &scheduleTask{burstBalance: &burstBalance{AddTables: tables}} + return &scheduleTask{burstBalance: &burstBalance{ + AddTables: tables, + CheckpointTs: checkpointTs, + }} } func newBurstBalanceRemoveTables( - rmTables []model.TableID, replications map[model.TableID]*ReplicationSet, + checkpointTs model.Ts, rmTables []model.TableID, replications map[model.TableID]*ReplicationSet, ) *scheduleTask { tables := make(map[model.TableID]model.CaptureID) for _, tableID := range rmTables { @@ -115,5 +120,8 @@ func newBurstBalanceRemoveTables( continue } } - return &scheduleTask{burstBalance: &burstBalance{RemoveTables: tables}} + return &scheduleTask{burstBalance: &burstBalance{ + RemoveTables: tables, + CheckpointTs: checkpointTs, + }} } diff --git a/cdc/scheduler/internal/tp/scheduler_balance_test.go b/cdc/scheduler/internal/tp/scheduler_balance_test.go index 218237156dc..ebbfe90006e 100644 --- a/cdc/scheduler/internal/tp/scheduler_balance_test.go +++ b/cdc/scheduler/internal/tp/scheduler_balance_test.go @@ -44,9 +44,10 @@ func TestSchedulerBalance(t *testing.T) { 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, 4: {State: ReplicationSetStateAbsent}, } - tasks = b.Schedule(0, currentTables, captures, replications) + tasks = b.Schedule(1, currentTables, captures, replications) require.Len(t, tasks, 1) require.Contains(t, tasks[0].burstBalance.AddTables, model.TableID(4)) + require.Equal(t, tasks[0].burstBalance.CheckpointTs, model.Ts(1)) // AddTable 4, and RemoveTable 5. replications = map[model.TableID]*ReplicationSet{ @@ -55,14 +56,18 @@ func TestSchedulerBalance(t *testing.T) { 3: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, 5: {State: ReplicationSetStateCommit, Primary: "a", Secondary: "b"}, } - tasks = b.Schedule(0, currentTables, captures, replications) + tasks = b.Schedule(2, currentTables, captures, replications) require.Len(t, tasks, 2) if tasks[0].burstBalance.AddTables != nil { require.Contains(t, tasks[0].burstBalance.AddTables, model.TableID(4)) + require.Equal(t, tasks[0].burstBalance.CheckpointTs, model.Ts(2)) require.Contains(t, tasks[1].burstBalance.RemoveTables, model.TableID(5)) + require.Equal(t, tasks[1].burstBalance.CheckpointTs, model.Ts(2)) } else { require.Contains(t, tasks[1].burstBalance.AddTables, model.TableID(4)) + require.Equal(t, tasks[0].burstBalance.CheckpointTs, model.Ts(2)) require.Contains(t, tasks[0].burstBalance.RemoveTables, model.TableID(5)) + require.Equal(t, tasks[1].burstBalance.CheckpointTs, model.Ts(2)) } // RemoveTable only. @@ -73,9 +78,10 @@ func TestSchedulerBalance(t *testing.T) { 4: {State: ReplicationSetStatePrepare, Primary: "a", Secondary: "b"}, 5: {State: ReplicationSetStatePrepare, Secondary: "b"}, } - tasks = b.Schedule(0, currentTables, captures, replications) + tasks = b.Schedule(3, currentTables, captures, replications) require.Len(t, tasks, 1) require.Contains(t, tasks[0].burstBalance.RemoveTables, model.TableID(5)) + require.Equal(t, tasks[0].burstBalance.CheckpointTs, model.Ts(3)) } func benchmarkSchedulerBalance( diff --git a/cdc/scheduler/internal/tp/transport.go b/cdc/scheduler/internal/tp/transport.go index 24d9d2c625e..da8ba4f0bb5 100644 --- a/cdc/scheduler/internal/tp/transport.go +++ b/cdc/scheduler/internal/tp/transport.go @@ -34,40 +34,62 @@ type transport interface { Close() error } -func p2pTopic(changefeed model.ChangeFeedID) p2p.Topic { - return fmt.Sprintf("changefeed/%s/%s", changefeed.Namespace, changefeed.ID) +func p2pTopic(changefeed model.ChangeFeedID, role Role) (selfTopic, peerTopic p2p.Topic) { + if role == agentRole { + selfTopic = fmt.Sprintf( + "changefeed/%s/%s/%s", changefeed.Namespace, changefeed.ID, agentRole) + peerTopic = fmt.Sprintf( + "changefeed/%s/%s/%s", changefeed.Namespace, changefeed.ID, schedulerRole) + } else { + selfTopic = fmt.Sprintf( + "changefeed/%s/%s/%s", changefeed.Namespace, changefeed.ID, schedulerRole) + peerTopic = fmt.Sprintf( + "changefeed/%s/%s/%s", changefeed.Namespace, changefeed.ID, agentRole) + } + return } var _ transport = (*p2pTransport)(nil) type p2pTransport struct { changefeed model.ChangeFeedID - topic p2p.Topic + selfTopic p2p.Topic + peerTopic p2p.Topic messageServer *p2p.MessageServer messageRouter p2p.MessageRouter errCh <-chan error mu struct { sync.Mutex - // FIXME it's an unbounded buffer, and may cuase OOM! + // FIXME it's an unbounded buffer, and may cause OOM! msgBuf []*schedulepb.Message } } +// Role of the transport user. +type Role string + +const ( + agentRole Role = "agent" + schedulerRole Role = "scheduler" +) + func newTransport( - ctx context.Context, changefeed model.ChangeFeedID, + ctx context.Context, changefeed model.ChangeFeedID, role Role, server *p2p.MessageServer, router p2p.MessageRouter, ) (*p2pTransport, error) { + selfTopic, peerTopic := p2pTopic(changefeed, role) trans := &p2pTransport{ changefeed: changefeed, - topic: p2pTopic(changefeed), + selfTopic: selfTopic, + peerTopic: peerTopic, messageServer: server, messageRouter: router, } var err error trans.errCh, err = trans.messageServer.SyncAddHandler( ctx, - trans.topic, + trans.selfTopic, &schedulepb.Message{}, func(sender string, messageI interface{}) error { message := messageI.(*schedulepb.Message) @@ -98,7 +120,7 @@ func (t *p2pTransport) Send( continue } - _, err := client.TrySendMessage(ctx, t.topic, value) + _, err := client.TrySendMessage(ctx, t.peerTopic, value) if err != nil { if cerror.ErrPeerMessageSendTryAgain.Equal(err) { return nil @@ -135,7 +157,7 @@ func (t *p2pTransport) Close() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - err := t.messageServer.SyncRemoveHandler(ctx, t.topic) + err := t.messageServer.SyncRemoveHandler(ctx, t.selfTopic) if err != nil { return errors.Trace(err) } diff --git a/cdc/scheduler/internal/tp/transport_test.go b/cdc/scheduler/internal/tp/transport_test.go index 53d554f11a6..c2c39808d11 100644 --- a/cdc/scheduler/internal/tp/transport_test.go +++ b/cdc/scheduler/internal/tp/transport_test.go @@ -35,24 +35,48 @@ func TestTransSendRecv(t *testing.T) { changefeedID := model.ChangeFeedID{Namespace: "test", ID: "test"} var err error - transMap := make(map[string]transport) + var schedulerAddr string + var schedulerTrans transport + agentTransMap := make(map[string]transport) for addr, node := range cluster.Nodes { - transMap[addr], err = newTransport(ctx, changefeedID, node.Server, node.Router) + if schedulerAddr == "" { + schedulerTrans, err = newTransport( + ctx, changefeedID, schedulerRole, node.Server, node.Router) + } else { + agentTransMap[addr], err = newTransport( + ctx, changefeedID, agentRole, node.Server, node.Router) + } require.Nil(t, err) } - // Send messages - for _, trans := range transMap { - for addr := range transMap { - err := trans.Send(ctx, []*schedulepb.Message{{To: addr}}) - require.Nil(t, err) - } + // Send messages, scheduler -> agent. + for addr := range agentTransMap { + err := schedulerTrans.Send(ctx, []*schedulepb.Message{{To: addr}}) + require.Nil(t, err) } - // Recv messages - total := len(transMap) + // Recv messages from scheduler. + total := len(agentTransMap) recvMegs := make([]*schedulepb.Message, 0, total) - for _, trans := range transMap { + for _, trans := range agentTransMap { + require.Eventually(t, func() bool { + msgs, err := trans.Recv(ctx) + require.Nil(t, err) + recvMegs = append(recvMegs, msgs...) + return len(recvMegs) == total + }, 200*time.Millisecond, 25) + recvMegs = recvMegs[:0] + } + + // Send messages, agent -> scheduler. + for _, trans := range agentTransMap { + err := trans.Send(ctx, []*schedulepb.Message{{To: schedulerAddr}}) + require.Nil(t, err) + } + // Recv messages from agent. + total = len(agentTransMap) + recvMegs = make([]*schedulepb.Message, 0, total) + for _, trans := range agentTransMap { require.Eventually(t, func() bool { msgs, err := trans.Recv(ctx) require.Nil(t, err) @@ -76,7 +100,7 @@ func TestTransUnknownAddr(t *testing.T) { var err error transMap := make(map[string]transport) for addr, node := range cluster.Nodes { - transMap[addr], err = newTransport(ctx, changefeedID, node.Server, node.Router) + transMap[addr], err = newTransport(ctx, changefeedID, agentRole, node.Server, node.Router) require.Nil(t, err) } @@ -101,7 +125,7 @@ func TestTransEmptyRecv(t *testing.T) { var err error transMap := make(map[string]transport) for addr, node := range cluster.Nodes { - transMap[addr], err = newTransport(ctx, changefeedID, node.Server, node.Router) + transMap[addr], err = newTransport(ctx, changefeedID, agentRole, node.Server, node.Router) require.Nil(t, err) } @@ -111,3 +135,22 @@ func TestTransEmptyRecv(t *testing.T) { require.Empty(t, msgs) } } + +// P2P does not allow registering the same topic more than once. +func TestTransTwoRolesInOneNode(t *testing.T) { + t.Parallel() + + cluster := p2p.NewMockCluster(t, 1) + defer cluster.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changefeedID := model.ChangeFeedID{Namespace: "test", ID: "test"} + + for _, node := range cluster.Nodes { + _, err := newTransport(ctx, changefeedID, agentRole, node.Server, node.Router) + require.Nil(t, err) + _, err = newTransport(ctx, changefeedID, schedulerRole, node.Server, node.Router) + require.Nil(t, err) + } +} diff --git a/cdc/scheduler/rexport.go b/cdc/scheduler/rexport.go index 6d4afb3b9ac..0b4331503c4 100644 --- a/cdc/scheduler/rexport.go +++ b/cdc/scheduler/rexport.go @@ -19,6 +19,8 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/scheduler/internal" "github.com/pingcap/tiflow/cdc/scheduler/internal/base" + "github.com/pingcap/tiflow/cdc/scheduler/internal/tp" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/p2p" ) @@ -55,6 +57,7 @@ const CheckpointCannotProceed = internal.CheckpointCannotProceed // NewAgent returns processor agent. func NewAgent( ctx context.Context, + captureID model.CaptureID, messageServer *p2p.MessageServer, messageRouter p2p.MessageRouter, etcdClient *etcd.CDCEtcdClient, @@ -68,12 +71,44 @@ func NewAgent( // NewScheduler returns owner scheduler. func NewScheduler( ctx context.Context, + captureID model.CaptureID, changeFeedID model.ChangeFeedID, checkpointTs model.Ts, messageServer *p2p.MessageServer, messageRouter p2p.MessageRouter, ownerRevision int64, + cfg *config.SchedulerConfig, ) (Scheduler, error) { return base.NewSchedulerV2( ctx, changeFeedID, checkpointTs, messageServer, messageRouter, ownerRevision) } + +// NewTpAgent returns two-phase agent. +func NewTpAgent( + ctx context.Context, + captureID model.CaptureID, + messageServer *p2p.MessageServer, + messageRouter p2p.MessageRouter, + etcdClient *etcd.CDCEtcdClient, + executor TableExecutor, + changefeedID model.ChangeFeedID, +) (Agent, error) { + return tp.NewAgent( + ctx, captureID, changefeedID, messageServer, messageRouter, etcdClient, executor) +} + +// NewTpScheduler returns two-phase scheduler. +func NewTpScheduler( + ctx context.Context, + captureID model.CaptureID, + changeFeedID model.ChangeFeedID, + checkpointTs model.Ts, + messageServer *p2p.MessageServer, + messageRouter p2p.MessageRouter, + ownerRevision int64, + cfg *config.SchedulerConfig, +) (Scheduler, error) { + return tp.NewCoordinator( + ctx, captureID, changeFeedID, checkpointTs, + messageServer, messageRouter, ownerRevision, cfg) +}