From 409594b274c1cc5987e12b77db49e5fc4aedef43 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 14 Jun 2022 18:00:51 +0800 Subject: [PATCH] fb/latency(cdc): agent to be table state awared, to handle different p2p messages. (#5820) * add table struct to agent. * agent add table state machine. * simplify coordinator. * agent to be state awared. * call IsRemoveTableFinished to clean table resource from processor. * fix message header. * fix agent. * prepare new agent ready. * fix some test. * add basic ut. * fix agent handle message ut. * add all test. * fix agent handle stopping. * adjust pipeline table state. * refine the agent. * introduce tableManager. * fix all tests. * add some new test. * fix log. * fix by make check * fix by review comment. * fix by review comment. * fix ut. * fix check. * agent fix heartbeat does not refresh each tick. * remove scheduler log. * fix ut. * fix some test. * fix ut. * rename * fix by check. --- cdc/processor/pipeline/table.go | 18 +- cdc/processor/pipeline/table_actor.go | 4 +- cdc/processor/processor_test.go | 3 +- cdc/scheduler/internal/tp/agent.go | 331 ++--------- cdc/scheduler/internal/tp/agent_bench_test.go | 16 +- cdc/scheduler/internal/tp/agent_test.go | 559 +++++++++++------- cdc/scheduler/internal/tp/capture_manager.go | 2 + .../internal/tp/replication_manager.go | 2 +- cdc/scheduler/internal/tp/replication_set.go | 12 +- .../internal/tp/scheduler_rebalance.go | 4 - cdc/scheduler/internal/tp/table.go | 366 ++++++++++++ cdc/scheduler/internal/tp/table_test.go | 37 ++ 12 files changed, 858 insertions(+), 496 deletions(-) create mode 100644 cdc/scheduler/internal/tp/table.go create mode 100644 cdc/scheduler/internal/tp/table_test.go diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 0df6c3b87d1..883f82097cb 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -31,27 +31,31 @@ type TableState int32 // TableState for table pipeline const ( + TableStateUnknown TableState = iota + // TableStateAbsent means the table not found + TableStateAbsent // TableStatePreparing indicate that the table is preparing connecting to regions - TableStatePreparing TableState = iota + TableStatePreparing // TableStatePrepared means the first `Resolved Ts` is received. TableStatePrepared - // TableStateReplicating means that sink is consuming data from the sorter, and replicating it to downstream + // TableStateReplicating means that sink is consuming data from the sorter, + // and replicating it to downstream TableStateReplicating // TableStateStopping means the table is stopping, but not guaranteed yet. + // at the moment, this state is not used, only keep aligned with `schedulepb.TableStateStopping` TableStateStopping - // TableStateStopped means sink stop all works. + // TableStateStopped means sink stop all works, but the table resource not released yet. TableStateStopped - // TableStateAbsent means the table not found - TableStateAbsent ) var tableStatusStringMap = map[TableState]string{ + TableStateUnknown: "Unknown", + TableStateAbsent: "Absent", TableStatePreparing: "Preparing", TableStatePrepared: "Prepared", TableStateReplicating: "Replicating", TableStateStopping: "Stopping", TableStateStopped: "Stopped", - TableStateAbsent: "Absent", } func (s TableState) String() string { @@ -92,7 +96,7 @@ type TablePipeline interface { AsyncStop(targetTs model.Ts) bool // Start the sink consume data from the given `ts` - Start(ts model.Ts) bool + Start(ts model.Ts) // Workload returns the workload of this table Workload() model.WorkloadInfo diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 679536b1e2d..6929bcf263c 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -517,13 +517,11 @@ func (t *tableActor) Wait() { _ = t.wg.Wait() } -func (t *tableActor) Start(ts model.Ts) bool { +func (t *tableActor) Start(ts model.Ts) { if atomic.CompareAndSwapInt32(&t.sortNode.started, 0, 1) { t.sortNode.startTsCh <- ts close(t.sortNode.startTsCh) - return true } - return false } // MemoryConsumption return the memory consumption in bytes diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index e47bbd41072..216ca31f1c7 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -202,9 +202,8 @@ func (m *mockTablePipeline) Wait() { // do nothing } -func (m *mockTablePipeline) Start(ts model.Ts) bool { +func (m *mockTablePipeline) Start(ts model.Ts) { m.sinkStartTs = ts - return true } // MemoryConsumption return the memory consumption in bytes diff --git a/cdc/scheduler/internal/tp/agent.go b/cdc/scheduler/internal/tp/agent.go index bda8aef6dcd..2ab76468ca1 100644 --- a/cdc/scheduler/internal/tp/agent.go +++ b/cdc/scheduler/internal/tp/agent.go @@ -15,13 +15,13 @@ package tp import ( "context" + "encoding/json" "time" "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/processor/pipeline" "github.com/pingcap/tiflow/cdc/scheduler/internal" "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -35,11 +35,9 @@ import ( var _ internal.Agent = (*agent)(nil) type agent struct { - trans transport - tableExec internal.TableExecutor + trans transport - // runningTasks track all in progress dispatch table task - runningTasks map[model.TableID]*dispatchTableTask + tableM *tableManager // owner's information ownerInfo ownerInfo @@ -60,6 +58,11 @@ type ownerInfo struct { captureID string } +func (o ownerInfo) String() string { + bytes, _ := json.Marshal(o) + return string(bytes) +} + // NewAgent returns a new agent. func NewAgent(ctx context.Context, captureID model.CaptureID, @@ -73,8 +76,7 @@ func NewAgent(ctx context.Context, version: version.ReleaseSemver(), captureID: captureID, changeFeedID: changeFeedID, - tableExec: tableExecutor, - runningTasks: make(map[model.TableID]*dispatchTableTask), + tableM: newTableManager(tableExecutor), } trans, err := newTransport(ctx, changeFeedID, agentRole, messageServer, messageRouter) if err != nil { @@ -101,7 +103,7 @@ func NewAgent(ctx context.Context, return result, nil } - log.Info("tpscheduler: owner found", + log.Info("tpscheduler: agent owner found", zap.String("ownerCaptureID", ownerCaptureID), zap.String("captureID", captureID), zap.String("namespace", changeFeedID.Namespace), @@ -141,15 +143,14 @@ func (a *agent) Tick(ctx context.Context) error { outboundMessages := a.handleMessage(inboundMessages) - responses, err := a.handleDispatchTableTasks(ctx) - outboundMessages = append(outboundMessages, responses...) - - err2 := a.sendMsgs(ctx, outboundMessages) - + responses, err := a.tableM.poll(ctx, a.stopping) if err != nil { return errors.Trace(err) } - if err2 != nil { + + outboundMessages = append(outboundMessages, responses...) + + if err := a.sendMsgs(ctx, outboundMessages); err != nil { return errors.Trace(err) } @@ -170,11 +171,11 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message { } switch message.GetMsgType() { - case schedulepb.MsgDispatchTableRequest: - a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch) case schedulepb.MsgHeartbeat: response := a.handleMessageHeartbeat(message.Heartbeat.GetTableIDs()) result = append(result, response) + case schedulepb.MsgDispatchTableRequest: + a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch) default: log.Warn("tpscheduler: unknown message received", zap.String("capture", a.captureID), @@ -183,91 +184,33 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message { zap.Any("message", message)) } } - return result } -func (a *agent) tableStatus2PB(state pipeline.TableState) schedulepb.TableState { - switch state { - case pipeline.TableStatePreparing: - return schedulepb.TableStatePreparing - case pipeline.TableStatePrepared: - return schedulepb.TableStatePrepared - case pipeline.TableStateReplicating: - return schedulepb.TableStateReplicating - case pipeline.TableStateStopping: - return schedulepb.TableStateStopping - case pipeline.TableStateStopped: - return schedulepb.TableStateStopped - case pipeline.TableStateAbsent: - return schedulepb.TableStateAbsent - default: - } - log.Warn("tpscheduler: table state unknown", - zap.Any("state", state), - zap.String("capture", a.captureID), - zap.String("namespace", a.changeFeedID.Namespace), - zap.String("changefeed", a.changeFeedID.ID), - ) - return schedulepb.TableStateUnknown -} - -func (a *agent) newTableStatus(tableID model.TableID) schedulepb.TableStatus { - meta := a.tableExec.GetTableMeta(tableID) - state := a.tableStatus2PB(meta.State) - - if task, ok := a.runningTasks[tableID]; ok { - // there is task that try to remove the table, - // return `stopping` instead of the real table state, - // to indicate that the remove table request was received. - if task.IsRemove == true && state != schedulepb.TableStateAbsent { - state = schedulepb.TableStateStopping +func (a *agent) handleMessageHeartbeat(expected []model.TableID) *schedulepb.Message { + allTables := a.tableM.getAllTables() + result := make([]schedulepb.TableStatus, 0, len(allTables)) + for _, table := range allTables { + status := table.getTableStatus() + if table.task != nil && table.task.IsRemove { + status.State = schedulepb.TableStateStopping } + result = append(result, status) } - - return schedulepb.TableStatus{ - TableID: meta.TableID, - State: state, - Checkpoint: schedulepb.Checkpoint{ - CheckpointTs: meta.CheckpointTs, - ResolvedTs: meta.ResolvedTs, - }, - } -} - -func (a *agent) collectTableStatus(expected []model.TableID) []schedulepb.TableStatus { - allTables := make(map[model.TableID]struct{}) - - currentTables := a.tableExec.GetAllCurrentTables() - for _, tableID := range currentTables { - allTables[tableID] = struct{}{} - } - for _, tableID := range expected { - allTables[tableID] = struct{}{} - } - - result := make([]schedulepb.TableStatus, 0, len(allTables)) - for tableID := range allTables { - status := a.newTableStatus(tableID) - result = append(result, status) + if _, ok := allTables[tableID]; !ok { + status := a.tableM.getTableStatus(tableID) + result = append(result, status) + } } - return result -} - -func (a *agent) handleMessageHeartbeat(expected []model.TableID) *schedulepb.Message { - tables := a.collectTableStatus(expected) response := &schedulepb.HeartbeatResponse{ - Tables: tables, + Tables: result, IsStopping: a.stopping, } message := &schedulepb.Message{ - Header: a.newMessageHeader(), MsgType: schedulepb.MsgHeartbeatResponse, - From: a.captureID, - To: a.ownerInfo.captureID, HeartbeatResponse: response, } @@ -310,40 +253,52 @@ func (a *agent) handleMessageDispatchTableRequest( zap.String("expected", a.epoch.Epoch)) return } - task := new(dispatchTableTask) + var ( + table *table + task *dispatchTableTask + ok bool + ) + // make the assumption that all tables are tracked by the agent now. + // this should be guaranteed by the caller of this method. switch req := request.Request.(type) { case *schedulepb.DispatchTableRequest_AddTable: if a.stopping { log.Info("tpscheduler: agent is stopping, and decline handle add table request", zap.String("capture", a.captureID), zap.String("namespace", a.changeFeedID.Namespace), - zap.String("changefeed", a.changeFeedID.ID)) + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("request", request)) return } + tableID := req.AddTable.GetTableID() task = &dispatchTableTask{ - TableID: req.AddTable.GetTableID(), + TableID: tableID, StartTs: req.AddTable.GetCheckpoint().CheckpointTs, IsRemove: false, IsPrepare: req.AddTable.GetIsSecondary(), Epoch: epoch, status: dispatchTableTaskReceived, } + table = a.tableM.addTable(tableID) case *schedulepb.DispatchTableRequest_RemoveTable: + tableID := req.RemoveTable.GetTableID() + table, ok = a.tableM.getTable(tableID) + if !ok { + log.Warn("tpscheduler: agent ignore remove table request,"+ + "since the table not found", + zap.Any("tableID", tableID), + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("request", request)) + return + } task = &dispatchTableTask{ - TableID: req.RemoveTable.GetTableID(), + TableID: tableID, IsRemove: true, Epoch: epoch, status: dispatchTableTaskReceived, } - if a.tableExec.GetTableMeta(task.TableID).State == pipeline.TableStateAbsent { - log.Warn("tpscheduler: agent ignore remove table request, "+ - "since the table is absent", - zap.Any("tableID", task.TableID), - zap.String("capture", a.captureID), - zap.String("namespace", a.changeFeedID.Namespace), - zap.String("changefeed", a.changeFeedID.ID)) - return - } default: log.Warn("tpscheduler: agent ignore unknown dispatch table request", zap.String("capture", a.captureID), @@ -352,161 +307,7 @@ func (a *agent) handleMessageDispatchTableRequest( zap.Any("request", request)) return } - - if task, ok := a.runningTasks[task.TableID]; ok { - log.Warn("tpscheduler: agent found duplicate task, ignore the current request", - zap.String("capture", a.captureID), - zap.String("namespace", a.changeFeedID.Namespace), - zap.String("changefeed", a.changeFeedID.ID), - zap.Any("task", task), - zap.Any("request", request)) - return - } - log.Debug("tpscheduler: agent found dispatch table task", - zap.String("capture", a.captureID), - zap.String("namespace", a.changeFeedID.Namespace), - zap.String("changefeed", a.changeFeedID.ID), - zap.Any("task", task)) - a.runningTasks[task.TableID] = task -} - -func (a *agent) handleRemoveTableTask( - ctx context.Context, - task *dispatchTableTask, -) (response *schedulepb.Message) { - if task.status == dispatchTableTaskReceived { - done := a.tableExec.RemoveTable(ctx, task.TableID) - if !done { - status := a.newTableStatus(task.TableID) - return a.newRemoveTableResponseMessage(status) - } - task.status = dispatchTableTaskProcessed - } - - checkpointTs, done := a.tableExec.IsRemoveTableFinished(ctx, task.TableID) - if !done { - return nil - } - status := schedulepb.TableStatus{ - TableID: task.TableID, - // after `IsRemoveTableFinished` return true, the table is removed, - // and `absent` will be return, we still return `stopped` here for the design purpose. - // but the `absent` is identical to `stopped`. we should only keep one. - State: schedulepb.TableStateStopped, - Checkpoint: schedulepb.Checkpoint{ - CheckpointTs: checkpointTs, - }, - } - message := a.newRemoveTableResponseMessage(status) - delete(a.runningTasks, task.TableID) - return message -} - -func (a *agent) newRemoveTableResponseMessage( - status schedulepb.TableStatus, -) *schedulepb.Message { - message := &schedulepb.Message{ - Header: a.newMessageHeader(), - MsgType: schedulepb.MsgDispatchTableResponse, - From: a.captureID, - To: a.ownerInfo.captureID, - DispatchTableResponse: &schedulepb.DispatchTableResponse{ - Response: &schedulepb.DispatchTableResponse_RemoveTable{ - RemoveTable: &schedulepb.RemoveTableResponse{ - Status: &status, - Checkpoint: status.Checkpoint, - }, - }, - }, - } - - return message -} - -func (a *agent) handleAddTableTask( - ctx context.Context, - task *dispatchTableTask, -) (*schedulepb.Message, error) { - if task.status == dispatchTableTaskReceived { - if a.stopping { - status := a.newTableStatus(task.TableID) - message := a.newAddTableResponseMessage(status, true) - delete(a.runningTasks, task.TableID) - return message, nil - } - done, err := a.tableExec.AddTable(ctx, task.TableID, task.StartTs, task.IsPrepare) - if err != nil || !done { - log.Info("tpscheduler: agent add table failed", - zap.String("capture", a.captureID), - zap.String("namespace", a.changeFeedID.Namespace), - zap.String("changefeed", a.changeFeedID.ID), - zap.Any("task", task), - zap.Error(err)) - status := a.newTableStatus(task.TableID) - message := a.newAddTableResponseMessage(status, false) - return message, errors.Trace(err) - } - task.status = dispatchTableTaskProcessed - } - - done := a.tableExec.IsAddTableFinished(ctx, task.TableID, task.IsPrepare) - if !done { - // no need send a special message, table status will be reported by the heartbeat. - return nil, nil - } - log.Info("tpscheduler: agent finish processing add table task", - zap.String("capture", a.captureID), - zap.String("namespace", a.changeFeedID.Namespace), - zap.String("changefeed", a.changeFeedID.ID), - zap.Any("task", task)) - - status := a.newTableStatus(task.TableID) - message := a.newAddTableResponseMessage(status, false) - delete(a.runningTasks, task.TableID) - - return message, nil -} - -func (a *agent) newAddTableResponseMessage( - status schedulepb.TableStatus, - reject bool, -) *schedulepb.Message { - return &schedulepb.Message{ - Header: a.newMessageHeader(), - MsgType: schedulepb.MsgDispatchTableResponse, - From: a.captureID, - To: a.ownerInfo.captureID, - DispatchTableResponse: &schedulepb.DispatchTableResponse{ - Response: &schedulepb.DispatchTableResponse_AddTable{ - AddTable: &schedulepb.AddTableResponse{ - Status: &status, - Checkpoint: status.Checkpoint, - Reject: reject, - }, - }, - }, - } -} - -func (a *agent) handleDispatchTableTasks( - ctx context.Context, -) (result []*schedulepb.Message, err error) { - result = make([]*schedulepb.Message, 0) - for _, task := range a.runningTasks { - var response *schedulepb.Message - if task.IsRemove { - response = a.handleRemoveTableTask(ctx, task) - } else { - response, err = a.handleAddTableTask(ctx, task) - } - if err != nil { - return result, errors.Trace(err) - } - if response != nil { - result = append(result, response) - } - } - return result, nil + table.injectDispatchTableTask(task) } // GetLastSentCheckpointTs implement agent interface @@ -524,14 +325,6 @@ func (a *agent) Close() error { return a.trans.Close() } -func (a *agent) newMessageHeader() *schedulepb.Message_Header { - return &schedulepb.Message_Header{ - Version: a.version, - OwnerRevision: a.ownerInfo.revision, - ProcessorEpoch: a.epoch, - } -} - // handleOwnerInfo return false, if the given owner's info is staled. // update owner's info to the latest otherwise. // id: the incoming owner's capture ID @@ -561,7 +354,7 @@ func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version stri a.resetEpoch() - log.Info("tpscheduler: new owner in power, drop pending dispatch table tasks", + log.Info("tpscheduler: new owner in power", zap.String("capture", a.captureID), zap.String("namespace", a.changeFeedID.Namespace), zap.String("changefeed", a.changeFeedID.ID), @@ -608,14 +401,20 @@ func (a *agent) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) { func (a *agent) sendMsgs(ctx context.Context, msgs []*schedulepb.Message) error { for i := range msgs { m := msgs[i] - // Correctness check. - if len(m.To) == 0 || m.MsgType == schedulepb.MsgUnknown { + if m.MsgType == schedulepb.MsgUnknown { log.Panic("tpscheduler: invalid message no destination or unknown message type", zap.String("capture", a.captureID), zap.String("namespace", a.changeFeedID.Namespace), zap.String("changefeed", a.changeFeedID.ID), zap.Any("message", m)) } + m.Header = &schedulepb.Message_Header{ + Version: a.version, + OwnerRevision: a.ownerInfo.revision, + ProcessorEpoch: a.epoch, + } + m.From = a.captureID + m.To = a.ownerInfo.captureID } return a.trans.Send(ctx, msgs) } diff --git a/cdc/scheduler/internal/tp/agent_bench_test.go b/cdc/scheduler/internal/tp/agent_bench_test.go index 8f4bcedd6e4..c8b3b9921d6 100644 --- a/cdc/scheduler/internal/tp/agent_bench_test.go +++ b/cdc/scheduler/internal/tp/agent_bench_test.go @@ -18,18 +18,18 @@ import ( "testing" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/processor/pipeline" ) -func benchmarkCollectTableStatus(b *testing.B, bench func(b *testing.B, a *agent)) { +func benchmarkHeartbeatResponse(b *testing.B, bench func(b *testing.B, a *agent)) { upperBound := 16384 for size := 1; size <= upperBound; size *= 2 { tableExec := newMockTableExecutor() a := &agent{ - tableExec: tableExec, + tableM: newTableManager(tableExec), } + for j := 0; j < size; j++ { - tableExec.tables[model.TableID(10000+j)] = pipeline.TableStateReplicating + _ = a.tableM.addTable(model.TableID(10000 + j)) } b.ResetTimer() @@ -38,12 +38,12 @@ func benchmarkCollectTableStatus(b *testing.B, bench func(b *testing.B, a *agent } } -func BenchmarkCollectTableStatus(b *testing.B) { - benchmarkCollectTableStatus(b, func(b *testing.B, a *agent) { - total := len(a.tableExec.GetAllCurrentTables()) +func BenchmarkRefreshAllTables(b *testing.B) { + benchmarkHeartbeatResponse(b, func(b *testing.B, a *agent) { + total := len(a.tableM.tables) b.Run(fmt.Sprintf("%d tables", total), func(b *testing.B) { for i := 0; i < b.N; i++ { - a.collectTableStatus([]model.TableID{}) + a.handleMessageHeartbeat([]model.TableID{}) } }) }) diff --git a/cdc/scheduler/internal/tp/agent_test.go b/cdc/scheduler/internal/tp/agent_test.go index df57a3ae456..a7bc3b58b45 100644 --- a/cdc/scheduler/internal/tp/agent_test.go +++ b/cdc/scheduler/internal/tp/agent_test.go @@ -34,20 +34,190 @@ func newBaseAgent4Test() *agent { captureID: "owner-1", revision: schedulepb.OwnerRevision{Revision: 1}, }, - version: "agent-version-1", - epoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, - captureID: "agent-1", - runningTasks: make(map[model.TableID]*dispatchTableTask), + version: "agent-version-1", + epoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, + captureID: "agent-1", } } -func TestAgentCollectTableStatus(t *testing.T) { +func TestAgentHandleMessageDispatchTable(t *testing.T) { t.Parallel() a := newBaseAgent4Test() + mockTableExecutor := newMockTableExecutor() + a.tableM = newTableManager(mockTableExecutor) + + removeTableRequest := &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{ + TableID: 1, + }, + }, + } + processorEpoch := schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"} + + // remove table not exist + ctx := context.Background() + a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) + responses, err := a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + require.Len(t, responses, 0) + + addTableRequest := &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: true, + }, + }, + } + + // stopping, addTableRequest should be ignored. + a.stopping = true + a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) + responses, err = a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + require.Len(t, responses, 0) + + a.stopping = false + + mockTableExecutor.On("AddTable", mock.Anything, mock.Anything, + mock.Anything, mock.Anything).Return(false, nil) + a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) + responses, err = a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + require.Len(t, responses, 1) + + addTableResponse, ok := responses[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_AddTable) + require.True(t, ok) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, schedulepb.TableStateAbsent, addTableResponse.AddTable.Status.State) + require.NotContains(t, a.tableM.tables, model.TableID(1)) + + mockTableExecutor.ExpectedCalls = nil + mockTableExecutor.On("AddTable", mock.Anything, mock.Anything, + mock.Anything, mock.Anything).Return(true, nil) + mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mock.Anything, mock.Anything).Return(false, nil) + a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) + _, err = a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + + mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] + mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mock.Anything, mock.Anything).Return(true, nil) + a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) + responses, err = a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + require.Len(t, responses, 1) + + addTableResponse, ok = responses[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_AddTable) + require.True(t, ok) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, schedulepb.TableStatePrepared, addTableResponse.AddTable.Status.State) + require.Contains(t, a.tableM.tables, model.TableID(1)) + + // let the prepared table become replicating, by set `IsSecondary` to false. + addTableRequest.Request.(*schedulepb.DispatchTableRequest_AddTable). + AddTable.IsSecondary = false + + // only mock `IsAddTableFinished`, since `AddTable` by start a prepared table always success. + mockTableExecutor.ExpectedCalls = nil + mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mock.Anything, mock.Anything).Return(false, nil) + + a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) + responses, err = a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + require.Len(t, responses, 1) + + addTableResponse, ok = responses[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_AddTable) + require.True(t, ok) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, schedulepb.TableStatePrepared, addTableResponse.AddTable.Status.State) + require.Contains(t, a.tableM.tables, model.TableID(1)) + + mockTableExecutor.ExpectedCalls = nil + mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mock.Anything, mock.Anything).Return(true, nil) + a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) + responses, err = a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + require.Len(t, responses, 1) + + addTableResponse, ok = responses[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_AddTable) + require.True(t, ok) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, schedulepb.TableStateReplicating, addTableResponse.AddTable.Status.State) + require.Contains(t, a.tableM.tables, model.TableID(1)) + + mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything). + Return(false) + // remove table in the replicating state failed, should still in replicating. + a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) + responses, err = a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + require.Len(t, responses, 1) + removeTableResponse, ok := responses[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_RemoveTable) + require.True(t, ok) + require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.TableID) + require.Equal(t, schedulepb.TableStateStopping, removeTableResponse.RemoveTable.Status.State) + require.Contains(t, a.tableM.tables, model.TableID(1)) + + mockTableExecutor.ExpectedCalls = nil + mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything). + Return(true) + mockTableExecutor.On("IsRemoveTableFinished", mock.Anything, mock.Anything). + Return(3, false) + // remove table in the replicating state failed, should still in replicating. + a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) + responses, err = a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + require.Len(t, responses, 1) + removeTableResponse, ok = responses[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_RemoveTable) + require.True(t, ok) + require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.TableID) + require.Equal(t, schedulepb.TableStateStopping, removeTableResponse.RemoveTable.Status.State) + + mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] + mockTableExecutor.On("IsRemoveTableFinished", mock.Anything, mock.Anything). + Return(3, true) + // remove table in the replicating state success, should in stopped + a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) + responses, err = a.tableM.poll(ctx, a.stopping) + require.NoError(t, err) + require.Len(t, responses, 1) + removeTableResponse, ok = responses[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_RemoveTable) + require.True(t, ok) + require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.TableID) + require.Equal(t, schedulepb.TableStateStopped, removeTableResponse.RemoveTable.Status.State) + require.Equal(t, model.Ts(3), removeTableResponse.RemoveTable.Checkpoint.CheckpointTs) + require.NotContains(t, a.tableM.tables, model.TableID(1)) +} + +func TestAgentHandleMessageHeartbeat(t *testing.T) { + t.Parallel() + a := newBaseAgent4Test() mockTableExecutor := newMockTableExecutor() - a.tableExec = mockTableExecutor + a.tableM = newTableManager(mockTableExecutor) + + for i := 0; i < 5; i++ { + a.tableM.addTable(model.TableID(i)) + } + + a.tableM.tables[model.TableID(0)].state = schedulepb.TableStatePreparing + a.tableM.tables[model.TableID(1)].state = schedulepb.TableStatePrepared + a.tableM.tables[model.TableID(2)].state = schedulepb.TableStateReplicating + a.tableM.tables[model.TableID(3)].state = schedulepb.TableStateStopping + a.tableM.tables[model.TableID(4)].state = schedulepb.TableStateStopped mockTableExecutor.tables[model.TableID(0)] = pipeline.TableStatePreparing mockTableExecutor.tables[model.TableID(1)] = pipeline.TableStatePrepared @@ -55,16 +225,28 @@ func TestAgentCollectTableStatus(t *testing.T) { mockTableExecutor.tables[model.TableID(3)] = pipeline.TableStateStopping mockTableExecutor.tables[model.TableID(4)] = pipeline.TableStateStopped - expected := make([]model.TableID, 0, 10) - for i := 0; i < 10; i++ { - expected = append(expected, model.TableID(i)) + heartbeat := &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: "version-1", + OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, + }, + MsgType: schedulepb.MsgHeartbeat, + From: "owner-1", + Heartbeat: &schedulepb.Heartbeat{ + TableIDs: []model.TableID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, } - result := a.collectTableStatus(expected) + response := a.handleMessage([]*schedulepb.Message{heartbeat}) + require.Len(t, response, 1) + require.False(t, response[0].GetHeartbeatResponse().IsStopping) + + result := response[0].GetHeartbeatResponse().Tables require.Len(t, result, 10) sort.Slice(result, func(i, j int) bool { return result[i].TableID < result[j].TableID }) + require.Equal(t, schedulepb.TableStatePreparing, result[0].State) require.Equal(t, schedulepb.TableStatePrepared, result[1].State) require.Equal(t, schedulepb.TableStateReplicating, result[2].State) @@ -74,44 +256,83 @@ func TestAgentCollectTableStatus(t *testing.T) { require.Equal(t, schedulepb.TableStateAbsent, result[i].State) } - a.runningTasks[model.TableID(0)] = &dispatchTableTask{IsRemove: true} - status := a.newTableStatus(model.TableID(0)) - require.Equal(t, schedulepb.TableStateStopping, status.State) + a.tableM.tables[model.TableID(1)].task = &dispatchTableTask{IsRemove: true} + response = a.handleMessage([]*schedulepb.Message{heartbeat}) + result = response[0].GetHeartbeatResponse().Tables + sort.Slice(result, func(i, j int) bool { + return result[i].TableID < result[j].TableID + }) + require.Equal(t, schedulepb.TableStateStopping, result[1].State) - a.runningTasks[model.TableID(10)] = &dispatchTableTask{IsRemove: true} - status = a.newTableStatus(model.TableID(10)) - require.Equal(t, schedulepb.TableStateAbsent, status.State) + a.stopping = true + response = a.handleMessage([]*schedulepb.Message{heartbeat}) + require.Len(t, response, 1) + require.True(t, response[0].GetHeartbeatResponse().IsStopping) } -func TestAgentHandleDispatchTableTask(t *testing.T) { +func TestAgentPermuteMessages(t *testing.T) { t.Parallel() a := newBaseAgent4Test() - mockTableExecutor := newMockTableExecutor() - a.tableExec = mockTableExecutor + a.tableM = newTableManager(mockTableExecutor) - tableID := model.TableID(1) - epoch := schedulepb.ProcessorEpoch{} - // all possible tasks can be received, and should be correctly handled no matter table's status - var tasks []*dispatchTableTask - for _, isRemove := range []bool{true, false} { - for _, isPrepare := range []bool{true, false} { - if isPrepare && isRemove { - continue - } - task := &dispatchTableTask{ - TableID: tableID, - StartTs: 0, - IsRemove: isRemove, - IsPrepare: isPrepare, - Epoch: epoch, - status: dispatchTableTaskReceived, - } - tasks = append(tasks, task) - } + trans := newMockTrans() + a.trans = trans + + // all possible inbound Messages can be received + var inboundMessages []*schedulepb.Message + inboundMessages = append(inboundMessages, &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: a.ownerInfo.version, + OwnerRevision: a.ownerInfo.revision, + ProcessorEpoch: a.epoch, + }, + MsgType: schedulepb.MsgDispatchTableRequest, + From: a.ownerInfo.captureID, + To: a.captureID, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{ + TableID: 1, + }, + }, + }, + }) + for _, isSecondary := range []bool{true, false} { + inboundMessages = append(inboundMessages, &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: a.ownerInfo.version, + OwnerRevision: a.ownerInfo.revision, + ProcessorEpoch: a.epoch, + }, + MsgType: schedulepb.MsgDispatchTableRequest, + From: a.ownerInfo.captureID, + To: a.captureID, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: isSecondary, + }, + }, + }, + }) } + inboundMessages = append(inboundMessages, &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: "version-1", + OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, + ProcessorEpoch: a.epoch, + }, + MsgType: schedulepb.MsgHeartbeat, + From: "owner-1", + Heartbeat: &schedulepb.Heartbeat{ + TableIDs: []model.TableID{1}, + }, + }) + states := []schedulepb.TableState{ schedulepb.TableStateAbsent, schedulepb.TableStatePreparing, @@ -121,8 +342,9 @@ func TestAgentHandleDispatchTableTask(t *testing.T) { schedulepb.TableStateStopped, } ctx := context.Background() + tableID := model.TableID(1) for _, state := range states { - iterPermutation([]int{0, 1, 2}, func(sequence []int) { + iterPermutation([]int{0, 1, 2, 3}, func(sequence []int) { t.Logf("test %v, %v", state, sequence) switch state { case schedulepb.TableStatePreparing: @@ -138,146 +360,81 @@ func TestAgentHandleDispatchTableTask(t *testing.T) { case schedulepb.TableStateAbsent: default: } + for _, idx := range sequence { - task := tasks[idx] - task.status = dispatchTableTaskReceived - a.runningTasks[task.TableID] = task + message := inboundMessages[idx] + if message.MsgType == schedulepb.MsgHeartbeat { + trans.recvBuffer = append(trans.recvBuffer, message) + err := a.Tick(ctx) + require.NoError(t, err) + require.Len(t, trans.sendBuffer, 1) + heartbeatResponse := trans.sendBuffer[0].HeartbeatResponse + trans.sendBuffer = trans.sendBuffer[:0] + require.Equal(t, a.stopping, heartbeatResponse.IsStopping) + + continue + } - if task.IsRemove { + switch message.DispatchTableRequest.Request.(type) { + case *schedulepb.DispatchTableRequest_AddTable: for _, ok := range []bool{false, true} { - mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything).Return(ok) + mockTableExecutor.On("AddTable", mock.Anything, mock.Anything, + mock.Anything, mock.Anything).Return(ok, nil) for _, ok1 := range []bool{false, true} { - mockTableExecutor.On("IsRemoveTableFinished", mock.Anything, mock.Anything).Return(0, ok1) + mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mock.Anything, mock.Anything).Return(ok1, nil) - response, err := a.handleDispatchTableTasks(ctx) + trans.recvBuffer = append(trans.recvBuffer, message) + err := a.Tick(ctx) require.NoError(t, err) - require.NotNil(t, response) - if ok && ok1 && len(response) != 0 { - resp, ok := response[0].DispatchTableResponse.Response.(*schedulepb.DispatchTableResponse_RemoveTable) - require.True(t, ok) - require.Equal(t, schedulepb.TableStateStopped, resp.RemoveTable.Status.State) - } + trans.sendBuffer = trans.sendBuffer[:0] + mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] } mockTableExecutor.ExpectedCalls = nil } - } else { + case *schedulepb.DispatchTableRequest_RemoveTable: for _, ok := range []bool{false, true} { - mockTableExecutor.On("AddTable", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ok, nil) + mockTableExecutor.On("RemoveTable", mock.Anything, + mock.Anything).Return(ok) for _, ok1 := range []bool{false, true} { - mockTableExecutor.On("IsAddTableFinished", mock.Anything, mock.Anything, mock.Anything).Return(ok1, nil) - - response, err := a.handleDispatchTableTasks(ctx) + trans.recvBuffer = append(trans.recvBuffer, message) + mockTableExecutor.On("IsRemoveTableFinished", + mock.Anything, mock.Anything).Return(0, ok1) + err := a.Tick(ctx) require.NoError(t, err) - require.NotNil(t, response) - if ok && ok1 && len(response) != 0 { - resp, ok := response[0].DispatchTableResponse.Response.(*schedulepb.DispatchTableResponse_AddTable) - require.True(t, ok) - if task.IsPrepare { - require.Equal(t, schedulepb.TableStatePrepared, resp.AddTable.Status.State) - } else { - require.Equal(t, schedulepb.TableStateReplicating, resp.AddTable.Status.State) + if len(trans.sendBuffer) != 0 { + require.Len(t, trans.sendBuffer, 1) + response, yes := trans.sendBuffer[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_RemoveTable) + trans.sendBuffer = trans.sendBuffer[:0] + require.True(t, yes) + expected := schedulepb.TableStateStopping + if ok && ok1 { + expected = schedulepb.TableStateStopped } + require.Equal(t, expected, response.RemoveTable.Status.State) + mockTableExecutor.ExpectedCalls = mockTableExecutor. + ExpectedCalls[:1] } - mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] } mockTableExecutor.ExpectedCalls = nil } + default: + panic("unknown request") } } }) } } -func TestAgentHandleMessageStopping(t *testing.T) { - t.Parallel() - - a := newBaseAgent4Test() - a.tableExec = newMockTableExecutor() - a.stopping = true - - heartbeat := &schedulepb.Message{ - Header: &schedulepb.Message_Header{ - Version: "version-1", - OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, - }, - MsgType: schedulepb.MsgHeartbeat, - From: "owner-1", - Heartbeat: &schedulepb.Heartbeat{}, - } - response := a.handleMessage([]*schedulepb.Message{heartbeat}) - require.Len(t, response, 1) - require.NotNil(t, response[0].HeartbeatResponse) - // agent is stopping, let coordinator know this. - require.True(t, response[0].HeartbeatResponse.IsStopping) - - addTableRequest := &schedulepb.Message{ - Header: &schedulepb.Message_Header{ - Version: "version-1", - OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, - ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, - }, - MsgType: schedulepb.MsgDispatchTableRequest, - From: "owner-1", - DispatchTableRequest: &schedulepb.DispatchTableRequest{ - Request: &schedulepb.DispatchTableRequest_AddTable{ - AddTable: &schedulepb.AddTableRequest{ - TableID: 1, - IsSecondary: true, - }, - }, - }, - } - // add table request should not be handled, so the running task count is 0. - response = a.handleMessage([]*schedulepb.Message{addTableRequest}) - require.Len(t, response, 0) - require.Len(t, a.runningTasks, 0) - - // mock agent have running task before stopping but processed yet. - a.runningTasks[model.TableID(1)] = &dispatchTableTask{ - TableID: model.TableID(1), - StartTs: 0, - IsRemove: false, - IsPrepare: false, - Epoch: schedulepb.ProcessorEpoch{}, - status: dispatchTableTaskReceived, - } - - result, err := a.handleDispatchTableTasks(context.Background()) - require.NoError(t, err) - require.Len(t, a.runningTasks, 0) - require.Len(t, result, 1) - - addTableResponse, ok := result[0].DispatchTableResponse. - Response.(*schedulepb.DispatchTableResponse_AddTable) - require.True(t, ok) - require.True(t, addTableResponse.AddTable.Reject) -} - -func TestAgentHandleRemoveTableRequest(t *testing.T) { - t.Parallel() - - a := newBaseAgent4Test() - a.tableExec = newMockTableExecutor() - - // remove a table not exist, should not generate the task. - removeTableRequest := &schedulepb.DispatchTableRequest{ - Request: &schedulepb.DispatchTableRequest_RemoveTable{ - RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: 2, - }, - }, - } - - a.handleMessageDispatchTableRequest(removeTableRequest, a.epoch) - require.Len(t, a.runningTasks, 0) -} - func TestAgentHandleMessage(t *testing.T) { t.Parallel() + mockTableExecutor := newMockTableExecutor() + tableM := newTableManager(mockTableExecutor) a := newBaseAgent4Test() - a.tableExec = newMockTableExecutor() + a.tableM = tableM heartbeat := &schedulepb.Message{ Header: &schedulepb.Message_Header{ @@ -288,13 +445,10 @@ func TestAgentHandleMessage(t *testing.T) { From: a.ownerInfo.captureID, Heartbeat: &schedulepb.Heartbeat{}, } + // handle the first heartbeat, from the known owner. response := a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) - require.NotNil(t, response[0].HeartbeatResponse) - require.Equal(t, response[0].Header.Version, a.version) - require.Equal(t, response[0].Header.OwnerRevision, a.ownerInfo.revision) - require.Equal(t, response[0].Header.ProcessorEpoch, a.epoch) addTableRequest := &schedulepb.Message{ Header: &schedulepb.Message_Header{ @@ -316,19 +470,17 @@ func TestAgentHandleMessage(t *testing.T) { }, } // wrong epoch, ignored - response = a.handleMessage([]*schedulepb.Message{addTableRequest}) - require.Len(t, a.runningTasks, 0) - require.Len(t, response, 0) + responses := a.handleMessage([]*schedulepb.Message{addTableRequest}) + require.NotContains(t, tableM.tables, model.TableID(1)) + require.Len(t, responses, 0) // correct epoch, processing. addTableRequest.Header.ProcessorEpoch = a.epoch - response = a.handleMessage([]*schedulepb.Message{addTableRequest}) - require.Len(t, a.runningTasks, 1) - require.Len(t, response, 0) + _ = a.handleMessage([]*schedulepb.Message{addTableRequest}) + require.Contains(t, tableM.tables, model.TableID(1)) heartbeat.Header.OwnerRevision.Revision = 2 response = a.handleMessage([]*schedulepb.Message{heartbeat}) - require.Equal(t, len(a.runningTasks), 1) require.Len(t, response, 1) // this should never happen in real world @@ -374,7 +526,7 @@ func TestAgentTick(t *testing.T) { trans := newMockTrans() mockTableExecutor := newMockTableExecutor() a.trans = trans - a.tableExec = mockTableExecutor + a.tableM = newTableManager(mockTableExecutor) heartbeat := &schedulepb.Message{ Header: &schedulepb.Message_Header{ @@ -389,10 +541,10 @@ func TestAgentTick(t *testing.T) { } // receive first heartbeat from the owner - messages := []*schedulepb.Message{heartbeat} - trans.recvBuffer = append(trans.recvBuffer, messages...) + trans.recvBuffer = append(trans.recvBuffer, heartbeat) - require.NoError(t, a.Tick(context.Background())) + ctx := context.Background() + require.NoError(t, a.Tick(ctx)) require.Len(t, trans.sendBuffer, 1) heartbeatResponse := trans.sendBuffer[0] trans.sendBuffer = trans.sendBuffer[:0] @@ -436,6 +588,7 @@ func TestAgentTick(t *testing.T) { }, }, } + var messages []*schedulepb.Message messages = append(messages, addTableRequest) messages = append(messages, removeTableRequest) trans.recvBuffer = append(trans.recvBuffer, messages...) @@ -444,21 +597,16 @@ func TestAgentTick(t *testing.T) { mock.Anything, mock.Anything, mock.Anything).Return(true, nil) mockTableExecutor.On("IsAddTableFinished", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) - require.NoError(t, a.Tick(context.Background())) - responses := trans.sendBuffer[:len(trans.sendBuffer)] + require.NoError(t, a.Tick(ctx)) trans.sendBuffer = trans.sendBuffer[:0] - require.Equal(t, schedulepb.MsgHeartbeatResponse, responses[0].MsgType) - messages = messages[:0] - // this one should be ignored, since the previous one with the same tableID is not finished yet. - messages = append(messages, addTableRequest) - trans.recvBuffer = append(trans.recvBuffer, messages...) + trans.recvBuffer = append(trans.recvBuffer, addTableRequest) mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] mockTableExecutor.On("IsAddTableFinished", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) - require.NoError(t, a.Tick(context.Background())) - responses = trans.sendBuffer[:len(trans.sendBuffer)] + require.NoError(t, a.Tick(ctx)) + responses := trans.sendBuffer[:len(trans.sendBuffer)] trans.sendBuffer = trans.sendBuffer[:0] require.Len(t, responses, 1) require.Equal(t, schedulepb.MsgDispatchTableResponse, responses[0].MsgType) @@ -511,15 +659,41 @@ func (e *MockTableExecutor) AddTable( delete(e.tables, tableID) } } - args := e.Called(ctx, tableID, startTs, isPrepare) if args.Bool(0) { e.tables[tableID] = pipeline.TableStatePreparing } - return args.Bool(0), args.Error(1) } +// IsAddTableFinished determines if the table has been added. +func (e *MockTableExecutor) IsAddTableFinished(ctx context.Context, + tableID model.TableID, isPrepare bool, +) bool { + _, ok := e.tables[tableID] + if !ok { + log.Panic("table which was added is not found", + zap.Int64("tableID", tableID), + zap.Bool("isPrepare", isPrepare)) + } + + args := e.Called(ctx, tableID, isPrepare) + if args.Bool(0) { + e.tables[tableID] = pipeline.TableStatePrepared + if !isPrepare { + e.tables[tableID] = pipeline.TableStateReplicating + } + return true + } + + e.tables[tableID] = pipeline.TableStatePreparing + if !isPrepare { + e.tables[tableID] = pipeline.TableStatePrepared + } + + return false +} + // RemoveTable removes a table from the executor. func (e *MockTableExecutor) RemoveTable(ctx context.Context, tableID model.TableID) bool { state, ok := e.tables[tableID] @@ -533,38 +707,20 @@ func (e *MockTableExecutor) RemoveTable(ctx context.Context, tableID model.Table case pipeline.TableStatePreparing, pipeline.TableStatePrepared, pipeline.TableStateReplicating: default: } - // todo: how to handle table is already in `stopping` ? should return true directly ? // the current `processor implementation, does not consider table's state log.Info("RemoveTable", zap.Int64("tableID", tableID), zap.Any("state", state)) args := e.Called(ctx, tableID) if args.Bool(0) { - e.tables[tableID] = pipeline.TableStateStopping - } - return args.Bool(0) -} - -// IsAddTableFinished determines if the table has been added. -func (e *MockTableExecutor) IsAddTableFinished(ctx context.Context, tableID model.TableID, isPrepare bool) bool { - _, ok := e.tables[tableID] - if !ok { - log.Panic("table which was added is not found", - zap.Int64("tableID", tableID), - zap.Bool("isPrepare", isPrepare)) - } - - args := e.Called(ctx, tableID, isPrepare) - if args.Bool(0) { - e.tables[tableID] = pipeline.TableStatePrepared - if !isPrepare { - e.tables[tableID] = pipeline.TableStateReplicating - } + e.tables[tableID] = pipeline.TableStateStopped } return args.Bool(0) } // IsRemoveTableFinished determines if the table has been removed. -func (e *MockTableExecutor) IsRemoveTableFinished(ctx context.Context, tableID model.TableID) (model.Ts, bool) { +func (e *MockTableExecutor) IsRemoveTableFinished(ctx context.Context, + tableID model.TableID, +) (model.Ts, bool) { state, ok := e.tables[tableID] if !ok { // the real `table executor` processor, would panic in such case. @@ -577,7 +733,12 @@ func (e *MockTableExecutor) IsRemoveTableFinished(ctx context.Context, tableID m log.Info("remove table finished, remove it from the executor", zap.Int64("tableID", tableID), zap.Any("state", state)) delete(e.tables, tableID) + } else { + // revert the state back to old state, assume it's `replicating`, + // but `preparing` / `prepared` can also be removed. + e.tables[tableID] = pipeline.TableStateReplicating } + return model.Ts(args.Int(0)), args.Bool(1) } diff --git a/cdc/scheduler/internal/tp/capture_manager.go b/cdc/scheduler/internal/tp/capture_manager.go index 75a2c0ad8e1..f4caf6d840c 100644 --- a/cdc/scheduler/internal/tp/capture_manager.go +++ b/cdc/scheduler/internal/tp/capture_manager.go @@ -173,6 +173,8 @@ func (c *captureManager) HandleMessage( if msg.MsgType == schedulepb.MsgHeartbeatResponse { captureStatus, ok := c.Captures[msg.From] if !ok { + log.Warn("tpscheduler: heartbeat response from unknown capture", + zap.String("capture", msg.From)) continue } captureStatus.handleHeartbeatResponse( diff --git a/cdc/scheduler/internal/tp/replication_manager.go b/cdc/scheduler/internal/tp/replication_manager.go index a389f703209..099b548373a 100644 --- a/cdc/scheduler/internal/tp/replication_manager.go +++ b/cdc/scheduler/internal/tp/replication_manager.go @@ -111,7 +111,7 @@ func (r *replicationManager) HandleCaptureChanges( for i := range tables { table := tables[i] if _, ok := tableStatus[table.TableID]; !ok { - tableStatus[table.TableID] = map[string]*schedulepb.TableStatus{} + tableStatus[table.TableID] = map[model.CaptureID]*schedulepb.TableStatus{} } tableStatus[table.TableID][captureID] = &table } diff --git a/cdc/scheduler/internal/tp/replication_set.go b/cdc/scheduler/internal/tp/replication_set.go index 97bc6f09e0d..980ec43c130 100644 --- a/cdc/scheduler/internal/tp/replication_set.go +++ b/cdc/scheduler/internal/tp/replication_set.go @@ -107,23 +107,23 @@ func newReplicationSet( } committed := false for captureID, table := range tableStatus { - r.updateCheckpoint(table.Checkpoint) if r.TableID != table.TableID { return nil, r.inconsistentError(table, captureID, "tpscheduler: table id inconsistent") } + r.updateCheckpoint(table.Checkpoint) switch table.State { case schedulepb.TableStateReplicating: - // Recognize primary if it's table is in replicating state. - if len(r.Primary) == 0 { - r.Primary = captureID - r.Captures[captureID] = struct{}{} - } else { + if len(r.Primary) != 0 { return nil, r.multiplePrimaryError( table, captureID, "tpscheduler: multiple primary", + zap.Any("primary", r.Primary), zap.Any("status", tableStatus)) } + // Recognize primary if it's table is in replicating state. + r.Primary = captureID + r.Captures[captureID] = struct{}{} case schedulepb.TableStatePreparing: // Recognize secondary if it's table is in preparing state. r.Secondary = captureID diff --git a/cdc/scheduler/internal/tp/scheduler_rebalance.go b/cdc/scheduler/internal/tp/scheduler_rebalance.go index 6b3bdd9eb0e..7ea197581a0 100644 --- a/cdc/scheduler/internal/tp/scheduler_rebalance.go +++ b/cdc/scheduler/internal/tp/scheduler_rebalance.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "go.uber.org/zap" ) var _ scheduler = &rebalanceScheduler{} @@ -93,9 +92,6 @@ func newBurstBalanceMoveTables( for tableID, rep := range replications { if rep.State != ReplicationSetStateReplicating { - log.Info("tpscheduler: rebalance skip tables that are not in Replicating", - zap.Int64("tableID", tableID), - zap.Any("replication", rep)) continue } tablesPerCapture[rep.Primary].add(tableID) diff --git a/cdc/scheduler/internal/tp/table.go b/cdc/scheduler/internal/tp/table.go new file mode 100644 index 00000000000..f841c5a69e2 --- /dev/null +++ b/cdc/scheduler/internal/tp/table.go @@ -0,0 +1,366 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" + "github.com/pingcap/tiflow/cdc/scheduler/internal" + "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "go.uber.org/zap" +) + +// table is a state machine that manage the table's state, +// also tracking its progress by utilize the `TableExecutor` +type table struct { + id model.TableID + + state schedulepb.TableState + executor internal.TableExecutor + + task *dispatchTableTask +} + +func newTable(tableID model.TableID, executor internal.TableExecutor) *table { + return &table{ + id: tableID, + state: schedulepb.TableStateAbsent, // use `absent` as the default state. + executor: executor, + task: nil, + } +} + +// getAndUpdateTableState get the table' state, return true if the table state changed +func (t *table) getAndUpdateTableState() (schedulepb.TableState, bool) { + oldState := t.state + + meta := t.executor.GetTableMeta(t.id) + state := tableStatus2PB(meta.State) + t.state = state + + if oldState != state { + log.Info("tpscheduler: table state changed", + zap.Any("oldState", oldState), zap.Any("state", state)) + return t.state, true + + } + return t.state, false +} + +func (t *table) getTableStatus() schedulepb.TableStatus { + meta := t.executor.GetTableMeta(t.id) + state := tableStatus2PB(meta.State) + + return schedulepb.TableStatus{ + TableID: t.id, + State: state, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: meta.CheckpointTs, + ResolvedTs: meta.ResolvedTs, + }, + } +} + +func newAddTableResponseMessage(status schedulepb.TableStatus, reject bool) *schedulepb.Message { + return &schedulepb.Message{ + MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_AddTable{ + AddTable: &schedulepb.AddTableResponse{ + Status: &status, + Checkpoint: status.Checkpoint, + Reject: reject, + }, + }, + }, + } +} + +func newRemoveTableResponseMessage(status schedulepb.TableStatus) *schedulepb.Message { + message := &schedulepb.Message{ + MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableResponse{ + Status: &status, + Checkpoint: status.Checkpoint, + }, + }, + }, + } + + return message +} + +func (t *table) handleRemoveTableTask(ctx context.Context) *schedulepb.Message { + state, _ := t.getAndUpdateTableState() + changed := true + for changed { + switch state { + case schedulepb.TableStateAbsent: + log.Warn("tpscheduler: remove table, but table is absent", + zap.Int64("tableID", t.id)) + t.task = nil + return newRemoveTableResponseMessage(t.getTableStatus()) + case schedulepb.TableStateStopping, // stopping now is useless + schedulepb.TableStateStopped: + // release table resource, and get the latest checkpoint + // this will let the table become `absent` + checkpointTs, done := t.executor.IsRemoveTableFinished(ctx, t.id) + if !done { + // actually, this should never be hit, since we know that table is stopped. + status := t.getTableStatus() + status.State = schedulepb.TableStateStopping + return newRemoveTableResponseMessage(status) + } + t.task = nil + status := t.getTableStatus() + status.State = schedulepb.TableStateStopped + status.Checkpoint.CheckpointTs = checkpointTs + return newRemoveTableResponseMessage(status) + case schedulepb.TableStatePreparing, + schedulepb.TableStatePrepared, + schedulepb.TableStateReplicating: + done := t.executor.RemoveTable(ctx, t.task.TableID) + if !done { + status := t.getTableStatus() + status.State = schedulepb.TableStateStopping + return newRemoveTableResponseMessage(status) + } + state, changed = t.getAndUpdateTableState() + default: + log.Panic("tpscheduler: unknown table state", + zap.Int64("tableID", t.id), zap.Any("state", state)) + } + } + return nil +} + +func (t *table) handleAddTableTask(ctx context.Context, + stopping bool, +) (result *schedulepb.Message, err error) { + if stopping { + log.Info("tpscheduler: reject add table, since agent stopping", + zap.Int64("tableID", t.id), zap.Any("task", t.task)) + t.task = nil + return newAddTableResponseMessage(t.getTableStatus(), true), nil + } + + state, _ := t.getAndUpdateTableState() + changed := true + for changed { + switch state { + case schedulepb.TableStateAbsent: + done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, t.task.IsPrepare) + if err != nil || !done { + log.Info("tpscheduler: agent add table failed", + zap.Int64("tableID", t.id), + zap.Any("task", t.task), + zap.Error(err)) + status := t.getTableStatus() + return newAddTableResponseMessage(status, false), errors.Trace(err) + } + state, changed = t.getAndUpdateTableState() + case schedulepb.TableStateReplicating: + log.Info("tpscheduler: table is replicating", + zap.Int64("tableID", t.id), zap.Any("state", state)) + t.task = nil + status := t.getTableStatus() + return newAddTableResponseMessage(status, false), nil + case schedulepb.TableStatePrepared: + if t.task.IsPrepare { + // `prepared` is a stable state, if the task was to prepare the table. + log.Info("tpscheduler: table is prepared", + zap.Int64("tableID", t.id), zap.Any("state", state)) + t.task = nil + return newAddTableResponseMessage(t.getTableStatus(), false), nil + } + + if t.task.status == dispatchTableTaskReceived { + done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, false) + if err != nil || !done { + log.Info("tpscheduler: agent add table failed", + zap.Int64("tableID", t.id), zap.Any("state", state), + zap.Error(err)) + status := t.getTableStatus() + return newAddTableResponseMessage(status, false), errors.Trace(err) + } + t.task.status = dispatchTableTaskProcessed + } + + done := t.executor.IsAddTableFinished(ctx, t.task.TableID, false) + if !done { + return newAddTableResponseMessage(t.getTableStatus(), false), nil + } + state, changed = t.getAndUpdateTableState() + case schedulepb.TableStatePreparing: + // `preparing` is not stable state and would last a long time, + // it's no need to return such a state, to make the coordinator become burdensome. + done := t.executor.IsAddTableFinished(ctx, t.task.TableID, t.task.IsPrepare) + if !done { + return nil, nil + } + state, changed = t.getAndUpdateTableState() + log.Info("tpscheduler: add table finished", + zap.Int64("tableID", t.id), zap.Any("state", state)) + case schedulepb.TableStateStopping, + schedulepb.TableStateStopped: + log.Warn("tpscheduler: ignore add table", zap.Int64("tableID", t.id)) + t.task = nil + return newAddTableResponseMessage(t.getTableStatus(), false), nil + default: + log.Panic("tpscheduler: unknown table state", zap.Int64("tableID", t.id)) + } + } + + return nil, nil +} + +func (t *table) injectDispatchTableTask(task *dispatchTableTask) { + if t.id != task.TableID { + log.Panic("tpscheduler: tableID not match", + zap.Int64("tableID", t.id), + zap.Int64("task.TableID", task.TableID)) + } + if t.task == nil { + log.Info("tpscheduler: table found new task", + zap.Int64("tableID", t.id), + zap.Any("task", task)) + t.task = task + return + } + log.Debug("tpscheduler: table inject dispatch table task ignored,"+ + "since there is one not finished yet", + zap.Int64("tableID", t.id), + zap.Any("nowTask", t.task), + zap.Any("ignoredTask", task)) +} + +func (t *table) poll(ctx context.Context, stopping bool) (*schedulepb.Message, error) { + if t.task == nil { + return nil, nil + } + if t.task.IsRemove { + return t.handleRemoveTableTask(ctx), nil + } + return t.handleAddTableTask(ctx, stopping) +} + +type tableManager struct { + tables map[model.TableID]*table + executor internal.TableExecutor +} + +func newTableManager(executor internal.TableExecutor) *tableManager { + return &tableManager{ + tables: make(map[model.TableID]*table), + executor: executor, + } +} + +func (tm *tableManager) poll(ctx context.Context, stopping bool) ([]*schedulepb.Message, error) { + result := make([]*schedulepb.Message, 0) + for tableID, table := range tm.tables { + message, err := table.poll(ctx, stopping) + if err != nil { + return result, errors.Trace(err) + } + + state, _ := table.getAndUpdateTableState() + if state == schedulepb.TableStateAbsent { + tm.dropTable(tableID) + } + + if message == nil { + continue + } + result = append(result, message) + } + return result, nil +} + +func (tm *tableManager) getAllTables() map[model.TableID]*table { + return tm.tables +} + +// addTable add the target table, and return it. +func (tm *tableManager) addTable(tableID model.TableID) *table { + table, ok := tm.tables[tableID] + if !ok { + table = newTable(tableID, tm.executor) + tm.tables[tableID] = table + } + return table +} + +func (tm *tableManager) getTable(tableID model.TableID) (*table, bool) { + table, ok := tm.tables[tableID] + if ok { + return table, true + } + return nil, false +} + +func (tm *tableManager) dropTable(tableID model.TableID) { + table, ok := tm.tables[tableID] + if !ok { + log.Warn("tpscheduler: tableManager drop table not found", + zap.Int64("tableID", tableID)) + return + } + state, _ := table.getAndUpdateTableState() + if state != schedulepb.TableStateAbsent { + log.Panic("tpscheduler: tableManager drop table undesired", + zap.Int64("tableID", tableID), + zap.Any("state", table.state)) + } + + log.Debug("tpscheduler: tableManager drop table", zap.Any("tableID", tableID)) + delete(tm.tables, tableID) +} + +func (tm *tableManager) getTableStatus(tableID model.TableID) schedulepb.TableStatus { + table, ok := tm.getTable(tableID) + if ok { + return table.getTableStatus() + } + + return schedulepb.TableStatus{ + TableID: tableID, + State: schedulepb.TableStateAbsent, + } +} + +func tableStatus2PB(state pipeline.TableState) schedulepb.TableState { + switch state { + case pipeline.TableStatePreparing: + return schedulepb.TableStatePreparing + case pipeline.TableStatePrepared: + return schedulepb.TableStatePrepared + case pipeline.TableStateReplicating: + return schedulepb.TableStateReplicating + case pipeline.TableStateStopping: + return schedulepb.TableStateStopping + case pipeline.TableStateStopped: + return schedulepb.TableStateStopped + case pipeline.TableStateAbsent: + return schedulepb.TableStateAbsent + default: + } + return schedulepb.TableStateUnknown +} diff --git a/cdc/scheduler/internal/tp/table_test.go b/cdc/scheduler/internal/tp/table_test.go new file mode 100644 index 00000000000..53cb7784dff --- /dev/null +++ b/cdc/scheduler/internal/tp/table_test.go @@ -0,0 +1,37 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "github.com/stretchr/testify/require" +) + +func TestTableManager(t *testing.T) { + t.Parallel() + + // pretend there are 4 tables + mockTableExecutor := newMockTableExecutor() + + tableM := newTableManager(mockTableExecutor) + + tableM.addTable(model.TableID(1)) + require.Equal(t, schedulepb.TableStateAbsent, tableM.tables[model.TableID(1)].state) + + tableM.dropTable(model.TableID(1)) + require.NotContains(t, tableM.tables, model.TableID(1)) +}