diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index 1fdbd908d9b..63055a3d7e2 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -55,7 +55,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) { return &mockTablePipeline{ tableID: tableID, name: fmt.Sprintf("`test`.`table%d`", tableID), - status: tablepipeline.TableStatusReplicating, + status: tablepipeline.TableStateReplicating, resolvedTs: replicaInfo.StartTs, checkpointTs: replicaInfo.StartTs, }, nil diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index cabc39fadda..36ee6280370 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -32,7 +32,7 @@ import ( type sinkNode struct { sink sink.Sink - status TableStatus + status TableState tableID model.TableID // atomic oprations for model.ResolvedTs @@ -56,7 +56,7 @@ func newSinkNode( sn := &sinkNode{ tableID: tableID, sink: sink, - status: TableStatusPrepared, + status: TableStatePrepared, targetTs: targetTs, barrierTs: startTs, flowController: flowController, @@ -70,7 +70,7 @@ func newSinkNode( func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() } func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() } func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } -func (n *sinkNode) Status() TableStatus { return n.status.Load() } +func (n *sinkNode) Status() TableState { return n.status.Load() } func (n *sinkNode) getResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) @@ -89,7 +89,7 @@ func (n *sinkNode) initWithReplicaConfig(replicaConfig *config.ReplicaConfig) { // no more events can be sent to this sink node afterwards. func (n *sinkNode) stop(ctx context.Context) (err error) { // table stopped status must be set after underlying sink is closed - defer n.status.Store(TableStatusStopped) + defer n.status.Store(TableStateStopped) err = n.sink.Close(ctx) if err != nil { return @@ -104,7 +104,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) { func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) { defer func() { if err != nil { - n.status.Store(TableStatusStopped) + n.status.Store(TableStateStopped) return } if n.CheckpointTs() >= n.targetTs { @@ -285,15 +285,15 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv } func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) { - if n.status.Load() == TableStatusStopped { + if n.status.Load() == TableStateStopped { return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs() } switch msg.Tp { case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent if event.IsResolved() { - if n.status.Load() == TableStatusPrepared { - n.status.Store(TableStatusReplicating) + if n.status.Load() == TableStatePrepared { + n.status.Store(TableStateReplicating) } failpoint.Inject("ProcessorSyncResolvedError", func() { failpoint.Return(false, errors.New("processor sync resolved injected error")) @@ -340,7 +340,7 @@ func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error { } func (n *sinkNode) releaseResource(ctx context.Context) error { - n.status.Store(TableStatusStopped) + n.status.Store(TableStateStopped) n.flowController.Abort() return n.sink.Close(ctx) } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index ccecd767d49..a142afa9759 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -139,12 +139,12 @@ func TestStatus(t *testing.T) { node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil). ChangefeedVars().Info.Config) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) ok, err := node.HandleMessage(ctx, pmessage.BarrierMessage(20)) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) require.Equal(t, model.Ts(20), node.BarrierTs()) msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -154,7 +154,7 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, @@ -163,7 +163,7 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -172,7 +172,7 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.Status()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -181,20 +181,20 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStatusStopped, node.Status()) + require.Equal(t, TableStateStopped, node.Status()) require.Equal(t, model.Ts(10), node.CheckpointTs()) // test the stop at ts command node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) msg = pmessage.BarrierMessage(20) ok, err = node.HandleMessage(ctx, msg) require.True(t, ok) require.Nil(t, err) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) require.Equal(t, model.Ts(20), node.BarrierTs()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -204,13 +204,13 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.Status()) msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}) ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStatusStopped, node.Status()) + require.Equal(t, TableStateStopped, node.Status()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -219,20 +219,20 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStatusStopped, node.Status()) + require.Equal(t, TableStateStopped, node.Status()) require.Equal(t, uint64(2), node.CheckpointTs()) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) msg = pmessage.BarrierMessage(20) ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -241,13 +241,13 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.Status()) msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}) ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStatusStopped, node.Status()) + require.Equal(t, TableStateStopped, node.Status()) msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -256,7 +256,7 @@ func TestStatus(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStatusStopped, node.Status()) + require.Equal(t, TableStateStopped, node.Status()) require.Equal(t, uint64(7), node.CheckpointTs()) } @@ -278,7 +278,7 @@ func TestStopStatus(t *testing.T) { &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, @@ -287,7 +287,7 @@ func TestStopStatus(t *testing.T) { ok, err := node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.Status()) var wg sync.WaitGroup wg.Add(1) @@ -298,11 +298,11 @@ func TestStopStatus(t *testing.T) { ok, err := node.HandleMessage(ctx, msg) require.False(t, ok) require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err)) - require.Equal(t, TableStatusStopped, node.Status()) + require.Equal(t, TableStateStopped, node.Status()) }() // wait to ensure stop message is sent to the sink node time.Sleep(time.Millisecond * 50) - require.Equal(t, TableStatusReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.Status()) closeCh <- struct{}{} wg.Wait() } @@ -320,7 +320,7 @@ func TestManyTs(t *testing.T) { node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{ @@ -339,7 +339,7 @@ func TestManyTs(t *testing.T) { }, }, }) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) ok, err := node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) @@ -361,7 +361,7 @@ func TestManyTs(t *testing.T) { }, }, }) - require.Equal(t, TableStatusPrepared, node.Status()) + require.Equal(t, TableStatePrepared, node.Status()) ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) @@ -373,7 +373,7 @@ func TestManyTs(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.Status()) sink.Check(t, []struct { resolvedTs model.Ts row *model.RowChangedEvent @@ -418,7 +418,7 @@ func TestManyTs(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.Status()) sink.Check(t, []struct { resolvedTs model.Ts @@ -468,7 +468,7 @@ func TestManyTs(t *testing.T) { ok, err = node.HandleMessage(ctx, msg) require.Nil(t, err) require.True(t, ok) - require.Equal(t, TableStatusReplicating, node.Status()) + require.Equal(t, TableStateReplicating, node.Status()) sink.Check(t, []struct { resolvedTs model.Ts row *model.RowChangedEvent diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index e5bd80db155..dda9745145d 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -58,7 +58,7 @@ type sorterNode struct { // The latest barrier ts that sorter has received. barrierTs model.Ts - status TableStatus + status TableState preparedCh chan struct{} // started indicate that the sink is really replicating, not idle. @@ -81,7 +81,7 @@ func newSorterNode( mounter: mounter, resolvedTs: startTs, barrierTs: startTs, - status: TableStatusPreparing, + status: TableStatePreparing, preparedCh: make(chan struct{}, 1), startTsCh: make(chan model.Ts, 1), replConfig: replConfig, @@ -182,7 +182,7 @@ func (n *sorterNode) start( case <-n.preparedCh: } - n.status.Store(TableStatusReplicating) + n.status.Store(TableStateReplicating) eventSorter.EmitStartTs(stdCtx, startTs) for { @@ -308,9 +308,9 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi } // sorterNode is preparing, this is must the first `Resolved event` received // the indicator that all regions connected. - if n.status.Load() == TableStatusPreparing { + if n.status.Load() == TableStatePreparing { log.Info("sorterNode, first resolved event received", zap.Any("event", event)) - n.status.Store(TableStatusPrepared) + n.status.Store(TableStatePrepared) close(n.preparedCh) } } @@ -355,4 +355,4 @@ func (n *sorterNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } -func (n *sorterNode) Status() TableStatus { return n.status.Load() } +func (n *sorterNode) Status() TableState { return n.status.Load() } diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 18b24b8edc6..7da6174214c 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -26,42 +26,55 @@ const ( resolvedTsInterpolateInterval = 200 * time.Millisecond ) -// TableStatus is status of the table pipeline -type TableStatus int32 +// TableState is status of the table pipeline +type TableState int32 -// TableStatus for table pipeline +// TableState for table pipeline const ( - // TableStatusPreparing indicate that the table is preparing connecting to regions - TableStatusPreparing TableStatus = iota - // TableStatusPrepared means the first `Resolved Ts` is received. - TableStatusPrepared - // TableStatusReplicating means that sink is consuming data from the sorter, and replicating it to downstream - TableStatusReplicating - // TableStatusStopped means sink stop all works. - TableStatusStopped + // TableStatePreparing indicate that the table is preparing connecting to regions + TableStatePreparing TableState = iota + // TableStatePrepared means the first `Resolved Ts` is received. + TableStatePrepared + // TableStateReplicating means that sink is consuming data from the sorter, and replicating it to downstream + TableStateReplicating + // TableStateStopping means the table is stopping, but not guaranteed yet. + TableStateStopping + // TableStateStopped means sink stop all works. + TableStateStopped + // TableStateAbsent means the table not found + TableStateAbsent ) -var tableStatusStringMap = map[TableStatus]string{ - TableStatusPreparing: "Preparing", - TableStatusPrepared: "Prepared", - TableStatusReplicating: "Replicating", - TableStatusStopped: "Stopped", +var tableStatusStringMap = map[TableState]string{ + TableStatePreparing: "Preparing", + TableStatePrepared: "Prepared", + TableStateReplicating: "Replicating", + TableStateStopping: "Stopping", + TableStateStopped: "Stopped", + TableStateAbsent: "Absent", } -func (s TableStatus) String() string { +func (s TableState) String() string { return tableStatusStringMap[s] } -// Load TableStatus with THREAD-SAFE -func (s *TableStatus) Load() TableStatus { - return TableStatus(atomic.LoadInt32((*int32)(s))) +// Load TableState with THREAD-SAFE +func (s *TableState) Load() TableState { + return TableState(atomic.LoadInt32((*int32)(s))) } -// Store TableStatus with THREAD-SAFE -func (s *TableStatus) Store(new TableStatus) { +// Store TableState with THREAD-SAFE +func (s *TableState) Store(new TableState) { atomic.StoreInt32((*int32)(s), int32(new)) } +type TableMeta struct { + TableID model.TableID + CheckpointTs model.Ts + ResolvedTs model.Ts + Status TableState +} + // TablePipeline is a pipeline which capture the change log from tikv in a table type TablePipeline interface { // ID returns the ID of source table and mark table @@ -83,7 +96,7 @@ type TablePipeline interface { // Workload returns the workload of this table Workload() model.WorkloadInfo // Status returns the status of this table pipeline - Status() TableStatus + Status() TableState // Cancel stops this table pipeline immediately and destroy all resources created by this table pipeline Cancel() // Wait waits for table pipeline destroyed diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 8ef2822990f..bca2f6fa3ec 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -442,11 +442,11 @@ func (t *tableActor) Workload() model.WorkloadInfo { } // Status returns the status of this table pipeline -func (t *tableActor) Status() TableStatus { +func (t *tableActor) Status() TableState { sortStatus := t.sortNode.Status() // first resolved ts not received yet, still preparing... - if sortStatus == TableStatusPreparing { - return TableStatusPreparing + if sortStatus == TableStatePreparing { + return TableStatePreparing } // sinkNode is status indicator now. diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index 9ce3aca85d6..fb80150fd1c 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -63,7 +63,7 @@ func TestAsyncStopFailed(t *testing.T) { } func TestTableActorInterface(t *testing.T) { - sink := &sinkNode{status: TableStatusPrepared} + sink := &sinkNode{status: TableStatePrepared} sorter := &sorterNode{resolvedTs: 5} tbl := &tableActor{ markTableID: 2, @@ -82,11 +82,11 @@ func TestTableActorInterface(t *testing.T) { require.Equal(t, int64(1), tableID) require.Equal(t, int64(2), markID) require.Equal(t, "t1", tbl.Name()) - require.Equal(t, TableStatusPreparing, tbl.Status()) + require.Equal(t, TableStatePreparing, tbl.Status()) - sorter.status.Store(TableStatusPrepared) - sink.status.Store(TableStatusStopped) - require.Equal(t, TableStatusStopped, tbl.Status()) + sorter.status.Store(TableStatePrepared) + sink.status.Store(TableStateStopped) + require.Equal(t, TableStateStopped, tbl.Status()) require.Equal(t, uint64(1), tbl.Workload().Workload) sink.checkpointTs.Store(model.NewResolvedTs(3)) @@ -157,7 +157,7 @@ func TestHandleError(t *testing.T) { }, sinkNode: &sinkNode{ sink: &errorCloseSink{}, - status: TableStatusPreparing, + status: TableStatePreparing, flowController: &mockFlowController{}, }, sortNode: &sorterNode{ @@ -167,7 +167,7 @@ func TestHandleError(t *testing.T) { } // table is already stopped tbl.handleError(nil) - require.Equal(t, TableStatusPreparing, tbl.sinkNode.status) + require.Equal(t, TableStatePreparing, tbl.sinkNode.status) require.False(t, canceled) require.True(t, reporterErr) @@ -177,7 +177,7 @@ func TestHandleError(t *testing.T) { require.True(t, canceled) require.True(t, reporterErr) require.Equal(t, stopped, tbl.stopped) - require.Equal(t, TableStatusStopped, tbl.sinkNode.status) + require.Equal(t, TableStateStopped, tbl.sinkNode.status) } func TestPollStoppedActor(t *testing.T) { @@ -194,7 +194,7 @@ func TestPollTickMessage(t *testing.T) { startTime := time.Now().Add(-sinkFlushInterval) sn := &sinkNode{ - status: TableStatusPreparing, + status: TableStatePreparing, sink: &mockSink{}, flowController: &mockFlowController{}, targetTs: 11, @@ -218,7 +218,7 @@ func TestPollTickMessage(t *testing.T) { })) require.True(t, tbl.lastFlushSinkTime.Equal(startTime)) tbl.lastFlushSinkTime = time.Now().Add(-2 * sinkFlushInterval) - tbl.sinkNode.status = TableStatusStopped + tbl.sinkNode.status = TableStateStopped require.False(t, tbl.Poll(context.TODO(), []message.Message[pmessage.Message]{ message.ValueMessage[pmessage.Message](pmessage.TickMessage()), })) @@ -229,7 +229,7 @@ func TestPollStopMessage(t *testing.T) { wg.Add(1) tbl := tableActor{ sinkNode: &sinkNode{ - status: TableStatusStopped, + status: TableStateStopped, sink: &mockSink{}, flowController: &mockFlowController{}, }, diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 9cae0c6efab..33fa3ec3a7f 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -134,7 +134,7 @@ func (p *processor) AddTable( switch table.Status() { // table is still `preparing`, which means the table is `replicating` on other captures. // no matter `isPrepare` or not, just ignore it should be ok. - case pipeline.TableStatusPreparing: + case pipeline.TableStatePreparing: log.Warn("table is still preparing, ignore the request", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), @@ -143,14 +143,14 @@ func (p *processor) AddTable( zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) return true, nil - case pipeline.TableStatusPrepared: + case pipeline.TableStatePrepared: // table is `prepared`, and a `isPrepare = false` request indicate that old table should // be stopped on original capture already, it's safe to start replicating data now. if !isPrepare { table.Start(startTs) } return true, nil - case pipeline.TableStatusReplicating: + case pipeline.TableStateReplicating: log.Warn("Ignore existing table", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), @@ -159,7 +159,7 @@ func (p *processor) AddTable( zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) return true, nil - case pipeline.TableStatusStopped: + case pipeline.TableStateStopped: log.Warn("The same table exists but is stopped. Cancel it and continue.", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), @@ -212,18 +212,19 @@ func (p *processor) AddTable( } // RemoveTable implements TableExecutor interface. -func (p *processor) RemoveTable(ctx context.Context, tableID model.TableID) (bool, error) { +func (p *processor) RemoveTable(ctx context.Context, tableID model.TableID) bool { if !p.checkReadyForMessages() { - return false, nil + return false } table, ok := p.tables[tableID] if !ok { log.Warn("table which will be deleted is not found", + zap.String("capture", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), zap.Int64("tableID", tableID)) - return true, nil + return true } boundaryTs := p.changefeed.Status.CheckpointTs @@ -231,13 +232,14 @@ func (p *processor) RemoveTable(ctx context.Context, tableID model.TableID) (boo // We use a Debug log because it is conceivable for the pipeline to block for a legitimate reason, // and we do not want to alarm the user. log.Debug("AsyncStop has failed, possible due to a full pipeline", + zap.String("capture", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), zap.Uint64("checkpointTs", table.CheckpointTs()), zap.Int64("tableID", tableID)) - return false, nil + return false } - return true, nil + return true } // IsAddTableFinished implements TableExecutor interface. @@ -264,11 +266,11 @@ func (p *processor) IsAddTableFinished(ctx context.Context, tableID model.TableI done := func() bool { if isPrepare { // todo: add ut to cover this, after 2ps supported. - return table.Status() == pipeline.TableStatusPrepared + return table.Status() == pipeline.TableStatePrepared } // todo: revise these 2 conditions, after 2ps supported. - // how about just check status is `TableStatusReplicating`. + // how about just check status is `TableStateReplicating`. if table.CheckpointTs() < localCheckpointTs || localCheckpointTs < globalCheckpointTs { return false } @@ -325,7 +327,7 @@ func (p *processor) IsRemoveTableFinished(ctx context.Context, tableID model.Tab return 0, true } status := table.Status() - if status != pipeline.TableStatusStopped { + if status != pipeline.TableStateStopped { log.Debug("table is still not stopped", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), @@ -364,6 +366,23 @@ func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { return p.checkpointTs, p.resolvedTs } +// GetTableMeta implements TableExecutor interface +func (p *processor) GetTableMeta(tableID model.TableID) pipeline.TableMeta { + table, ok := p.tables[tableID] + if !ok { + return pipeline.TableMeta{ + CheckpointTs: 0, + ResolvedTs: 0, + Status: pipeline.TableStateAbsent, + } + } + return pipeline.TableMeta{ + CheckpointTs: table.CheckpointTs(), + ResolvedTs: table.ResolvedTs(), + Status: table.Status(), + } +} + // newProcessor creates a new processor func newProcessor(ctx cdcContext.Context, up *upstream.Upstream) *processor { changefeedID := ctx.ChangefeedVars().ID @@ -763,8 +782,8 @@ func (p *processor) handlePosition(currentTs int64) { } for _, table := range p.tables { status := table.Status() - if status == pipeline.TableStatusPreparing || - status == pipeline.TableStatusPrepared { + if status == pipeline.TableStatePreparing || + status == pipeline.TableStatePrepared { continue } ts := table.ResolvedTs() @@ -778,8 +797,8 @@ func (p *processor) handlePosition(currentTs int64) { minCheckpointTableID := int64(0) for _, table := range p.tables { status := table.Status() - if status == pipeline.TableStatusPreparing || - status == pipeline.TableStatusPrepared { + if status == pipeline.TableStatePreparing || + status == pipeline.TableStatePrepared { continue } ts := table.CheckpointTs() @@ -815,7 +834,7 @@ func (p *processor) pushResolvedTs2Table() { resolvedTs = schemaResolvedTs } for _, table := range p.tables { - if table.Status() == pipeline.TableStatusReplicating { + if table.Status() == pipeline.TableStateReplicating { table.UpdateBarrierTs(resolvedTs) } } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index a3c0f1c3a2d..c725f10afd2 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -113,7 +113,7 @@ func newMockTablePipeline(ctx cdcContext.Context, tableID model.TableID, replica return &mockTablePipeline{ tableID: tableID, name: fmt.Sprintf("`test`.`table%d`", tableID), - status: pipeline.TableStatusPreparing, + status: pipeline.TableStatePreparing, resolvedTs: replicaInfo.StartTs, checkpointTs: replicaInfo.StartTs, }, nil @@ -126,7 +126,7 @@ type mockTablePipeline struct { checkpointTs model.Ts barrierTs model.Ts stopTs model.Ts - status pipeline.TableStatus + status pipeline.TableState canceled bool sinkStartTs model.Ts @@ -161,7 +161,7 @@ func (m *mockTablePipeline) Workload() model.WorkloadInfo { return model.WorkloadInfo{Workload: 1} } -func (m *mockTablePipeline) Status() pipeline.TableStatus { +func (m *mockTablePipeline) Status() pipeline.TableState { return m.status } @@ -183,7 +183,7 @@ func (m *mockTablePipeline) MemoryConsumption() uint64 { func (m *mockTablePipeline) Start(ts model.Ts) bool { m.sinkStartTs = ts - m.status = pipeline.TableStatusReplicating + m.status = pipeline.TableStateReplicating return true } @@ -259,28 +259,28 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.True(t, ok) table1 := p.tables[1].(*mockTablePipeline) require.Equal(t, model.Ts(20), table1.sinkStartTs) - require.Equal(t, pipeline.TableStatusReplicating, table1.status) + require.Equal(t, pipeline.TableStateReplicating, table1.status) ok, err = p.AddTable(ctx, 2, 20, false) require.Nil(t, err) require.True(t, ok) table2 := p.tables[2].(*mockTablePipeline) require.Equal(t, model.Ts(20), table2.sinkStartTs) - require.Equal(t, pipeline.TableStatusReplicating, table2.status) + require.Equal(t, pipeline.TableStateReplicating, table2.status) ok, err = p.AddTable(ctx, 3, 20, false) require.Nil(t, err) require.True(t, ok) table3 := p.tables[3].(*mockTablePipeline) require.Equal(t, model.Ts(20), table3.sinkStartTs) - require.Equal(t, pipeline.TableStatusReplicating, table3.status) + require.Equal(t, pipeline.TableStateReplicating, table3.status) ok, err = p.AddTable(ctx, 4, 20, false) require.Nil(t, err) require.True(t, ok) table4 := p.tables[4].(*mockTablePipeline) require.Equal(t, model.Ts(20), table4.sinkStartTs) - require.Equal(t, pipeline.TableStatusReplicating, table4.status) + require.Equal(t, pipeline.TableStateReplicating, table4.status) require.Len(t, p.tables, 4) checkpointTs := p.agent.GetLastSentCheckpointTs() @@ -342,8 +342,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.Nil(t, err) tester.MustApplyPatches() - ok, err = p.RemoveTable(ctx, 3) - require.Nil(t, err) + ok = p.RemoveTable(ctx, 3) require.True(t, ok) _, err = p.Tick(ctx, p.changefeed) @@ -367,7 +366,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.Equal(t, model.Ts(60), checkpointTs) // finish remove operations - table3.status = pipeline.TableStatusStopped + table3.status = pipeline.TableStateStopped table3.checkpointTs = 65 _, err = p.Tick(ctx, p.changefeed) diff --git a/cdc/scheduler/internal/base/agent.go b/cdc/scheduler/internal/base/agent.go index d760056c225..1a26d13812c 100644 --- a/cdc/scheduler/internal/base/agent.go +++ b/cdc/scheduler/internal/base/agent.go @@ -286,10 +286,7 @@ func (a *Agent) processOperations(ctx context.Context) (err error) { } } else { // delete table - done, err = a.executor.RemoveTable(ctx, op.TableID) - if err != nil { - return errors.Trace(err) - } + done = a.executor.RemoveTable(ctx, op.TableID) if !done { break } diff --git a/cdc/scheduler/internal/base/agent_mock.go b/cdc/scheduler/internal/base/agent_mock.go index 7569722e589..1f844b1b73e 100644 --- a/cdc/scheduler/internal/base/agent_mock.go +++ b/cdc/scheduler/internal/base/agent_mock.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" "github.com/pingcap/tiflow/cdc/scheduler/internal/base/protocol" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -135,14 +136,14 @@ func (e *MockTableExecutor) AddTable( } // RemoveTable removes a table from the executor. -func (e *MockTableExecutor) RemoveTable(ctx context.Context, tableID model.TableID) (bool, error) { +func (e *MockTableExecutor) RemoveTable(ctx context.Context, tableID model.TableID) bool { log.Info("RemoveTable", zap.Int64("tableID", tableID)) args := e.Called(ctx, tableID) require.Contains(e.t, e.Running, tableID) require.NotContains(e.t, e.Removing, tableID) delete(e.Running, tableID) e.Removing[tableID] = struct{}{} - return args.Bool(0), args.Error(1) + return args.Bool(0) } // IsAddTableFinished determines if the table has been added. @@ -181,3 +182,8 @@ func (e *MockTableExecutor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) args := e.Called() return args.Get(0).(model.Ts), args.Get(1).(model.Ts) } + +// GetTableMeta implements TableExecutor interface +func (e *MockTableExecutor) GetTableMeta(tableID model.TableID) pipeline.TableMeta { + return pipeline.TableMeta{} +} diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index 07c046495d1..de2da36e2d8 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" ) // TableExecutor is an abstraction for "Processor". @@ -35,9 +36,8 @@ type TableExecutor interface { // IsAddTableFinished make sure the requested table is in the proper status IsAddTableFinished(ctx context.Context, tableID model.TableID, isPrepare bool) (done bool) - // RemoveTable remove the table, return true if the table if already removed - // todo: revise the logic behind `remove table`, make sure comment is correct. - RemoveTable(ctx context.Context, tableID model.TableID) (done bool, err error) + // RemoveTable remove the table, return true if the table is already removed + RemoveTable(ctx context.Context, tableID model.TableID) (done bool) // IsRemoveTableFinished convince the table is fully stopped. // return false if table is not stopped // return true and corresponding checkpoint otherwise. @@ -56,4 +56,7 @@ type TableExecutor interface { // tables that would have been returned if GetAllCurrentTables had been // called immediately before. GetCheckpoint() (checkpointTs, resolvedTs model.Ts) + + // GetTableMeta return the checkpoint and resolved ts for the given table + GetTableMeta(tableID model.TableID) pipeline.TableMeta } diff --git a/cdc/scheduler/internal/tp/agent.go b/cdc/scheduler/internal/tp/agent.go index 47f4e7b24d1..2fb84c1ba61 100644 --- a/cdc/scheduler/internal/tp/agent.go +++ b/cdc/scheduler/internal/tp/agent.go @@ -15,10 +15,22 @@ package tp import ( "context" + "time" + "github.com/google/uuid" + "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" "github.com/pingcap/tiflow/cdc/scheduler/internal" "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/etcd" + "github.com/pingcap/tiflow/pkg/p2p" + "github.com/pingcap/tiflow/pkg/version" + "go.etcd.io/etcd/client/v3/concurrency" + "go.uber.org/zap" ) var _ internal.Agent = (*agent)(nil) @@ -27,31 +39,564 @@ type agent struct { trans transport tableExec internal.TableExecutor - tables map[model.TableID]*schedulepb.TableStatus - runningTasks map[model.TableID]*schedulepb.Message + // runningTasks track all in progress dispatch table task + runningTasks map[model.TableID]*dispatchTableTask + + // owner's information + ownerInfo ownerInfo + + // maintain the capture's information + version string + captureID model.CaptureID + changeFeedID model.ChangeFeedID + epoch schedulepb.ProcessorEpoch + + // the capture is stopping, should reject all add table request + stopping bool +} + +type ownerInfo struct { + revision schedulepb.OwnerRevision + version string + captureID string +} + +func NewAgent(ctx context.Context, + captureID model.CaptureID, + changeFeedID model.ChangeFeedID, + messageServer *p2p.MessageServer, + messageRouter p2p.MessageRouter, + etcdClient *etcd.CDCEtcdClient, + tableExecutor internal.TableExecutor) (internal.Agent, error) { + result := &agent{ + version: version.ReleaseSemver(), + captureID: captureID, + changeFeedID: changeFeedID, + tableExec: tableExecutor, + runningTasks: make(map[model.TableID]*dispatchTableTask), + } + trans, err := newTransport(ctx, changeFeedID, messageServer, messageRouter) + if err != nil { + return nil, errors.Trace(err) + } + result.trans = trans + + conf := config.GetGlobalServerConfig() + flushInterval := time.Duration(conf.ProcessorFlushInterval) + + log.Debug("tpscheduler: creating processor agent", + zap.String("capture", captureID), + zap.String("namespace", changeFeedID.Namespace), + zap.String("changefeed", changeFeedID.ID), + zap.Duration("sendCheckpointTsInterval", flushInterval)) + + etcdCliCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + ownerCaptureID, err := etcdClient.GetOwnerID(etcdCliCtx, etcd.CaptureOwnerKey) + if err != nil { + if err != concurrency.ErrElectionNoLeader { + return nil, errors.Trace(err) + } + // We tolerate the situation where there is no owner. + // If we are registered in Etcd, an elected Owner will have to + // contact us before it can schedule any table. + log.Info("tpscheduler: no owner found. We will wait for an owner to contact us.", + zap.String("capture", captureID), + zap.String("namespace", changeFeedID.Namespace), + zap.String("changefeed", changeFeedID.ID), + zap.Error(err)) + return nil, nil + } + + log.Debug("tpscheduler: owner found", + zap.String("capture", captureID), + zap.String("namespace", changeFeedID.Namespace), + zap.String("changefeed", changeFeedID.ID), + zap.String("ownerID", captureID)) + + revision, err := etcdClient.GetOwnerRevision(etcdCliCtx, captureID) + if err != nil { + if cerror.ErrOwnerNotFound.Equal(err) || cerror.ErrNotOwner.Equal(err) { + // These are expected errors when no owner has been elected + log.Info("tpscheduler: no owner found when querying for the owner revision", + zap.String("capture", captureID), + zap.String("namespace", changeFeedID.Namespace), + zap.String("changefeed", changeFeedID.ID), + zap.Error(err)) + return nil, nil + } + return nil, err + } + + result.resetEpoch() + result.ownerInfo = ownerInfo{ + // owner's version can only be got by receiving heartbeat + version: "", + captureID: ownerCaptureID, + revision: schedulepb.OwnerRevision{Revision: revision}, + } + return result, nil } +// Tick implement agent interface func (a *agent) Tick(ctx context.Context) error { + inboundMessages, err := a.recvMsgs(ctx) + if err != nil { + return errors.Trace(err) + } + + outboundMessages := a.handleMessage(inboundMessages) + + responses, err := a.handleDispatchTableTasks(ctx) + outboundMessages = append(outboundMessages, responses...) + + err2 := a.sendMsgs(ctx, outboundMessages) + + if err != nil { + return errors.Trace(err) + } + if err2 != nil { + return errors.Trace(err) + } + return nil } +func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message { + result := make([]*schedulepb.Message, 0) + for _, message := range msg { + ownerCaptureID := message.GetFrom() + header := message.GetHeader() + ownerVersion := header.GetVersion() + ownerRevision := header.GetOwnerRevision().Revision + processorEpoch := header.GetProcessorEpoch() + + if !a.handleOwnerInfo(ownerCaptureID, ownerRevision, ownerVersion) { + continue + } + + switch message.GetMsgType() { + case schedulepb.MsgDispatchTableRequest: + a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch) + case schedulepb.MsgHeartbeat: + response := a.handleMessageHeartbeat(message.Heartbeat.GetTableIDs()) + result = append(result, response) + case schedulepb.MsgUnknown: + default: + log.Warn("tpscheduler: unknown message received", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("message", message)) + } + } + + return result +} + +func (a *agent) tableStatus2PB(status pipeline.TableState) schedulepb.TableState { + switch status { + case pipeline.TableStatePreparing: + return schedulepb.TableStatePreparing + case pipeline.TableStatePrepared: + return schedulepb.TableStatePrepared + case pipeline.TableStateReplicating: + return schedulepb.TableStateReplicating + case pipeline.TableStateStopping: + return schedulepb.TableStateStopping + case pipeline.TableStateStopped: + return schedulepb.TableStateStopped + case pipeline.TableStateAbsent: + return schedulepb.TableStateAbsent + default: + } + log.Warn("tpscheduler: table state unknown", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + ) + return schedulepb.TableStateUnknown +} + +func (a *agent) newTableStatus(tableID model.TableID) schedulepb.TableStatus { + meta := a.tableExec.GetTableMeta(tableID) + state := a.tableStatus2PB(meta.Status) + + if task, ok := a.runningTasks[tableID]; ok { + // remove table task is not processed, or failed, + // return `stopping` instead of the real table state, + // to indicate that the remove table request was received. + if task.IsRemove == true { + state = schedulepb.TableStateStopping + } + } + + return schedulepb.TableStatus{ + TableID: meta.TableID, + State: state, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: meta.CheckpointTs, + ResolvedTs: meta.ResolvedTs, + }, + } +} + +func (a *agent) collectTableStatus(expected []model.TableID) []schedulepb.TableStatus { + allTables := make(map[model.TableID]struct{}) + + currentTables := a.tableExec.GetAllCurrentTables() + for _, tableID := range currentTables { + allTables[tableID] = struct{}{} + } + + for _, tableID := range expected { + allTables[tableID] = struct{}{} + } + + result := make([]schedulepb.TableStatus, 0, len(allTables)) + for tableID := range allTables { + status := a.newTableStatus(tableID) + result = append(result, status) + } + + return result +} + +func (a *agent) handleMessageHeartbeat(expected []model.TableID) *schedulepb.Message { + tables := a.collectTableStatus(expected) + response := &schedulepb.HeartbeatResponse{ + Tables: tables, + IsStopping: a.stopping, + } + return &schedulepb.Message{ + Header: a.newMessageHeader(), + MsgType: schedulepb.MsgHeartbeatResponse, + From: a.captureID, + To: a.ownerInfo.captureID, + HeartbeatResponse: response, + } +} + +type dispatchTableTaskStatus int32 + +const ( + dispatchTableTaskReceived = dispatchTableTaskStatus(iota + 1) + dispatchTableTaskProcessed +) + +type dispatchTableTask struct { + TableID model.TableID + StartTs model.Ts + IsRemove bool + IsPrepare bool + Epoch schedulepb.ProcessorEpoch + status dispatchTableTaskStatus +} + +func (a *agent) handleMessageDispatchTableRequest( + request *schedulepb.DispatchTableRequest, + epoch schedulepb.ProcessorEpoch, +) { + if a.epoch != epoch { + log.Info("tpscheduler: agent receive dispatch table request "+ + "epoch does not match, ignore it", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("epoch", epoch), + zap.Any("expected", a.epoch)) + return + } + task := new(dispatchTableTask) + switch req := request.Request.(type) { + case *schedulepb.DispatchTableRequest_AddTable: + if a.stopping { + log.Info("tpscheduler: agent decline handle add table request", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID)) + return + } + task = &dispatchTableTask{ + TableID: req.AddTable.GetTableID(), + StartTs: req.AddTable.GetCheckpoint().GetCheckpointTs(), + IsRemove: false, + IsPrepare: req.AddTable.GetIsSecondary(), + Epoch: epoch, + status: dispatchTableTaskReceived, + } + case *schedulepb.DispatchTableRequest_RemoveTable: + task = &dispatchTableTask{ + TableID: req.RemoveTable.GetTableID(), + IsRemove: true, + Epoch: epoch, + status: dispatchTableTaskReceived, + } + default: + log.Warn("tpscheduler: agent ignore unknown dispatch table request", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("request", request)) + return + } + + if _, ok := a.runningTasks[task.TableID]; ok { + log.Warn("tpscheduler: agent found duplicate dispatch table request", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("request", request)) + return + } + + a.runningTasks[task.TableID] = task +} + +func (a *agent) handleRemoveTableTask( + ctx context.Context, + task *dispatchTableTask, +) (response *schedulepb.Message) { + if task.status == dispatchTableTaskReceived { + done := a.tableExec.RemoveTable(ctx, task.TableID) + if !done { + status := a.newTableStatus(task.TableID) + return a.newRemoveTableResponseMessage(status) + } + task.status = dispatchTableTaskProcessed + } + + checkpointTs, done := a.tableExec.IsRemoveTableFinished(ctx, task.TableID) + if !done { + return nil + } + status := schedulepb.TableStatus{ + TableID: task.TableID, + // todo: if the table is `absent`, also return a stopped here, `stopped` is identical to `absent`. + State: schedulepb.TableStateStopped, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: checkpointTs, + }, + } + message := a.newRemoveTableResponseMessage(status) + delete(a.runningTasks, task.TableID) + return message +} + +func (a *agent) newRemoveTableResponseMessage( + status schedulepb.TableStatus, +) *schedulepb.Message { + message := &schedulepb.Message{ + Header: a.newMessageHeader(), + MsgType: schedulepb.MsgDispatchTableResponse, + From: a.captureID, + To: a.ownerInfo.captureID, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableResponse{ + Status: &status, + Checkpoint: &status.Checkpoint, + }, + }, + }, + } + + return message +} + +func (a *agent) handleAddTableTask( + ctx context.Context, + task *dispatchTableTask, +) (*schedulepb.Message, error) { + if task.status == dispatchTableTaskReceived { + if a.stopping { + status := a.newTableStatus(task.TableID) + message := a.newAddTableResponseMessage(status, true) + delete(a.runningTasks, task.TableID) + return message, nil + } + done, err := a.tableExec.AddTable(ctx, task.TableID, task.StartTs, task.IsPrepare) + if err != nil || !done { + log.Info("tpscheduler: agent add table failed", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("task", task), + zap.Error(err)) + status := a.newTableStatus(task.TableID) + message := a.newAddTableResponseMessage(status, false) + return message, errors.Trace(err) + } + task.status = dispatchTableTaskProcessed + } + + done := a.tableExec.IsAddTableFinished(ctx, task.TableID, task.IsPrepare) + if !done { + // no need send a special message, table status will be reported by the heartbeat. + return nil, nil + } + log.Info("tpscheduler: agent finish processing add table task", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("task", task)) + + status := a.newTableStatus(task.TableID) + message := a.newAddTableResponseMessage(status, false) + delete(a.runningTasks, task.TableID) + + return message, nil +} + +func (a *agent) newAddTableResponseMessage( + status schedulepb.TableStatus, + reject bool, +) *schedulepb.Message { + return &schedulepb.Message{ + Header: a.newMessageHeader(), + MsgType: schedulepb.MsgDispatchTableResponse, + From: a.captureID, + To: a.ownerInfo.captureID, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_AddTable{ + AddTable: &schedulepb.AddTableResponse{ + Status: &status, + Checkpoint: &status.Checkpoint, + Reject: reject, + }, + }, + }, + } +} + +func (a *agent) handleDispatchTableTasks( + ctx context.Context, +) (result []*schedulepb.Message, err error) { + result = make([]*schedulepb.Message, 0) + for _, task := range a.runningTasks { + var response *schedulepb.Message + if task.IsRemove { + response = a.handleRemoveTableTask(ctx, task) + } else { + response, err = a.handleAddTableTask(ctx, task) + } + if err != nil { + return result, errors.Trace(err) + } + if response != nil { + result = append(result, response) + } + } + return result, nil +} + +// GetLastSentCheckpointTs implement agent interface func (a *agent) GetLastSentCheckpointTs() (checkpointTs model.Ts) { + // no need to implement this. return internal.CheckpointCannotProceed } +// Close implement agent interface func (a *agent) Close() error { - return nil + log.Debug("tpscheduler: agent closed", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID)) + return a.trans.Close() } -func (a *agent) handleMessage(msg []*schedulepb.Message) { - // s.handleMessageHeartbeat() - // s.handleMessageDispatchTableRequest() +func (a *agent) newMessageHeader() *schedulepb.Message_Header { + return &schedulepb.Message_Header{ + Version: a.version, + OwnerRevision: a.ownerInfo.revision, + ProcessorEpoch: a.epoch, + } } -func (a *agent) handleMessageHeartbeat(msg *schedulepb.Heartbeat) { - // TODO: build s.tables from Heartbeat message. +// handleOwnerInfo return false, if the given owner's info is staled. +// update owner's info to the latest otherwise. +// id: the incoming owner's capture ID +// revision: the incoming owner's revision as generated by Etcd election. +// version: the incoming owner's semantic version string +func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version string) bool { + if a.ownerInfo.revision.Revision == revision { + if a.ownerInfo.captureID != id { + // This panic will happen only if two messages have been received + // with the same ownerRev but with different ownerIDs. + // This should never happen unless the election via Etcd is buggy. + log.Panic("tpscheduler: owner IDs do not match", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.String("expected", a.ownerInfo.captureID), + zap.String("actual", id)) + } + return true + } + + // the current owner is staled + if a.ownerInfo.revision.Revision < revision { + a.ownerInfo.captureID = id + a.ownerInfo.revision.Revision = revision + a.ownerInfo.version = version + + a.resetEpoch() + + log.Info("tpscheduler: new owner in power, drop pending dispatch table tasks", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("owner", a.ownerInfo)) + return true + } + + // staled owner heartbeat, just ignore it. + log.Info("tpscheduler: message from staled owner", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("staledOwner", ownerInfo{ + captureID: id, + revision: schedulepb.OwnerRevision{Revision: revision}, + version: version, + }), + zap.Any("owner", a.ownerInfo)) + return false +} + +func (a *agent) resetEpoch() { + a.epoch = schedulepb.ProcessorEpoch{Epoch: uuid.New().String()} +} + +func (a *agent) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) { + messages, err := a.trans.Recv(ctx) + if err != nil { + return nil, errors.Trace(err) + } + + n := 0 + for _, val := range messages { + // Filter stale messages. + if val.Header.OwnerRevision == a.ownerInfo.revision { + messages[n] = val + n++ + } + } + return messages[:n], nil } -func (a *agent) handleMessageDispatchTableRequest(msg *schedulepb.DispatchTableResponse) { - // TODO: update s.tables from DispatchTableResponse message. +func (a *agent) sendMsgs(ctx context.Context, msgs []*schedulepb.Message) error { + for i := range msgs { + m := msgs[i] + // Correctness check. + if len(m.To) == 0 || m.MsgType == schedulepb.MsgUnknown { + log.Panic("tpscheduler: invalid message no destination or unknown message type", + zap.String("capture", a.captureID), + zap.String("namespace", a.changeFeedID.Namespace), + zap.String("changefeed", a.changeFeedID.ID), + zap.Any("message", m)) + } + } + return a.trans.Send(ctx, msgs) } diff --git a/cdc/scheduler/internal/tp/agent_bench_test.go b/cdc/scheduler/internal/tp/agent_bench_test.go new file mode 100644 index 00000000000..4d1b4b508ff --- /dev/null +++ b/cdc/scheduler/internal/tp/agent_bench_test.go @@ -0,0 +1,50 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "fmt" + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" +) + +func benchmarkCollectTableStatus(b *testing.B, bench func(b *testing.B, a *agent)) { + upperBound := 16384 + for size := 1; size <= upperBound; size *= 2 { + tableExec := NewMockTableExecutor() + a := &agent{ + tableExec: tableExec, + } + for j := 0; j < size; j++ { + tableExec.tables[model.TableID(10000+j)] = pipeline.TableStateReplicating + } + + b.ResetTimer() + bench(b, a) + b.StopTimer() + } +} + +func BenchmarkCollectTableStatus(b *testing.B) { + benchmarkCollectTableStatus(b, func(b *testing.B, a *agent) { + total := len(a.tableExec.GetAllCurrentTables()) + b.Run(fmt.Sprintf("%d tables", total), func(b *testing.B) { + for i := 0; i < b.N; i++ { + a.collectTableStatus([]model.TableID{}) + } + }) + }) +} diff --git a/cdc/scheduler/internal/tp/agent_test.go b/cdc/scheduler/internal/tp/agent_test.go new file mode 100644 index 00000000000..4013e57139e --- /dev/null +++ b/cdc/scheduler/internal/tp/agent_test.go @@ -0,0 +1,354 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "context" + "testing" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" + "github.com/pingcap/tiflow/cdc/scheduler/internal/base" + "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func newBaseAgent4Test() *agent { + return &agent{ + ownerInfo: ownerInfo{ + version: "owner-version-1", + captureID: "owner-1", + revision: schedulepb.OwnerRevision{Revision: 1}, + }, + version: "agent-version-1", + epoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, + runningTasks: make(map[model.TableID]*dispatchTableTask), + } +} + +func TestAgentHandleDispatchTableTask(t *testing.T) { + t.Parallel() + + a := newBaseAgent4Test() + + mockTableExecutor := NewMockTableExecutor() + a.tableExec = mockTableExecutor + + tableID := model.TableID(1) + epoch := schedulepb.ProcessorEpoch{} + // all possible tasks can be received, and should be correctly handled no matter table's status + var tasks []*dispatchTableTask + for _, isRemove := range []bool{true, false} { + for _, isPrepare := range []bool{true, false} { + if isPrepare && isRemove { + continue + } + task := &dispatchTableTask{ + TableID: tableID, + StartTs: 0, + IsRemove: isRemove, + IsPrepare: isPrepare, + Epoch: epoch, + status: dispatchTableTaskReceived, + } + tasks = append(tasks, task) + } + } + + states := []schedulepb.TableState{ + schedulepb.TableStateAbsent, + schedulepb.TableStatePreparing, + schedulepb.TableStatePrepared, + schedulepb.TableStateReplicating, + schedulepb.TableStateStopping, + schedulepb.TableStateStopped, + } + ctx := context.Background() + for _, state := range states { + iterPermutation([]int{0, 1, 2}, func(sequence []int) { + t.Logf("test %v, %v", state, sequence) + switch state { + case schedulepb.TableStatePreparing: + mockTableExecutor.tables[tableID] = pipeline.TableStatePreparing + case schedulepb.TableStatePrepared: + mockTableExecutor.tables[tableID] = pipeline.TableStatePrepared + case schedulepb.TableStateReplicating: + mockTableExecutor.tables[tableID] = pipeline.TableStateReplicating + case schedulepb.TableStateStopping: + mockTableExecutor.tables[tableID] = pipeline.TableStateStopping + case schedulepb.TableStateStopped: + mockTableExecutor.tables[tableID] = pipeline.TableStateStopped + case schedulepb.TableStateAbsent: + default: + } + for _, idx := range sequence { + task := tasks[idx] + task.status = dispatchTableTaskReceived + a.runningTasks[task.TableID] = task + + if task.IsRemove { + for _, ok := range []bool{false, true} { + mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything).Return(ok) + for _, ok1 := range []bool{false, true} { + mockTableExecutor.On("IsRemoveTableFinished", mock.Anything, mock.Anything).Return(0, ok1) + + response, err := a.handleDispatchTableTasks(ctx) + require.NoError(t, err) + require.NotNil(t, response) + if ok && ok1 && len(response) != 0 { + resp, ok := response[0].DispatchTableResponse.Response.(*schedulepb.DispatchTableResponse_RemoveTable) + require.True(t, ok) + require.Equal(t, schedulepb.TableStateStopped, resp.RemoveTable.Status.State) + } + mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] + } + mockTableExecutor.ExpectedCalls = nil + } + } else { + for _, ok := range []bool{false, true} { + mockTableExecutor.On("AddTable", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ok, nil) + for _, ok1 := range []bool{false, true} { + mockTableExecutor.On("IsAddTableFinished", mock.Anything, mock.Anything, mock.Anything).Return(ok1, nil) + + response, err := a.handleDispatchTableTasks(ctx) + require.NoError(t, err) + require.NotNil(t, response) + if ok && ok1 && len(response) != 0 { + resp, ok := response[0].DispatchTableResponse.Response.(*schedulepb.DispatchTableResponse_AddTable) + require.True(t, ok) + if task.IsPrepare { + require.Equal(t, schedulepb.TableStatePrepared, resp.AddTable.Status.State) + } else { + require.Equal(t, schedulepb.TableStateReplicating, resp.AddTable.Status.State) + } + } + mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] + } + mockTableExecutor.ExpectedCalls = nil + } + } + } + }) + } +} + +func TestAgentHandleMessage(t *testing.T) { + t.Parallel() + + a := newBaseAgent4Test() + a.tableExec = base.NewMockTableExecutor(t) + + heartbeat := &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: "version-1", + OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, + }, + MsgType: schedulepb.MsgHeartbeat, + From: "owner-1", + Heartbeat: &schedulepb.Heartbeat{}, + } + // handle the first heartbeat, from the known owner. + response := a.handleMessage([]*schedulepb.Message{heartbeat}) + require.Len(t, response, 1) + require.NotNil(t, response[0].HeartbeatResponse) + require.Equal(t, response[0].Header.Version, a.version) + require.Equal(t, response[0].Header.OwnerRevision, a.ownerInfo.revision) + require.Equal(t, response[0].Header.ProcessorEpoch, a.epoch) + + addTableRequest := &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: "version-1", + OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, + ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, + }, + MsgType: schedulepb.MsgDispatchTableRequest, + From: "owner-1", + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{}, + }, + }, + }, + } + // add table request in pending + response = a.handleMessage([]*schedulepb.Message{addTableRequest}) + require.Equal(t, len(a.runningTasks), 1) + + heartbeat.Header.OwnerRevision.Revision = 2 + response = a.handleMessage([]*schedulepb.Message{heartbeat}) + require.Equal(t, len(a.runningTasks), 1) + require.Len(t, response, 1) +} + +func TestAgentUpdateOwnerInfo(t *testing.T) { + t.Parallel() + + a := newBaseAgent4Test() + ok := a.handleOwnerInfo("owner-1", 1, "version-1") + require.True(t, ok) + + // staled owner + ok = a.handleOwnerInfo("owner-2", 0, "version-1") + require.False(t, ok) + + // new owner with higher revision + ok = a.handleOwnerInfo("owner-2", 2, "version-1") + require.True(t, ok) +} + +// MockTableExecutor is a mock implementation of TableExecutor. +type MockTableExecutor struct { + mock.Mock + + t *testing.T + // it's preferred to use `pipeline.MockPipeline` here to make the test more vivid. + tables map[model.TableID]pipeline.TableState +} + +// NewMockTableExecutor creates a new mock table executor. +func NewMockTableExecutor() *MockTableExecutor { + return &MockTableExecutor{ + tables: map[model.TableID]pipeline.TableState{}, + } +} + +// AddTable adds a table to the executor. +func (e *MockTableExecutor) AddTable( + ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, +) (bool, error) { + log.Info("AddTable", + zap.Int64("tableID", tableID), + zap.Any("startTs", startTs), + zap.Bool("isPrepare", isPrepare)) + + state, ok := e.tables[tableID] + if ok { + switch state { + case pipeline.TableStatePreparing: + return true, nil + case pipeline.TableStatePrepared: + if !isPrepare { + e.tables[tableID] = pipeline.TableStateReplicating + } + return true, nil + case pipeline.TableStateReplicating: + return true, nil + case pipeline.TableStateStopped: + delete(e.tables, tableID) + } + + } + + args := e.Called(ctx, tableID, startTs, isPrepare) + if args.Bool(0) { + e.tables[tableID] = pipeline.TableStatePreparing + } + + return args.Bool(0), args.Error(1) +} + +// RemoveTable removes a table from the executor. +func (e *MockTableExecutor) RemoveTable(ctx context.Context, tableID model.TableID) bool { + state, ok := e.tables[tableID] + if !ok { + log.Warn("table to be remove is not found", zap.Int64("tableID", tableID)) + return true + } + switch state { + case pipeline.TableStateStopping, pipeline.TableStateStopped: + return true + case pipeline.TableStatePreparing, pipeline.TableStatePrepared, pipeline.TableStateReplicating: + default: + } + // todo: how to handle table is already in `stopping` ? should return true directly ? + // the current `processor implementation, does not consider table's state + log.Info("RemoveTable", zap.Int64("tableID", tableID), zap.Any("state", state)) + + args := e.Called(ctx, tableID) + if args.Bool(0) { + e.tables[tableID] = pipeline.TableStateStopping + } + return args.Bool(0) +} + +// IsAddTableFinished determines if the table has been added. +func (e *MockTableExecutor) IsAddTableFinished(ctx context.Context, tableID model.TableID, isPrepare bool) bool { + _, ok := e.tables[tableID] + if !ok { + log.Panic("table which was added is not found", + zap.Int64("tableID", tableID), + zap.Bool("isPrepare", isPrepare)) + } + + args := e.Called(ctx, tableID, isPrepare) + if args.Bool(0) { + e.tables[tableID] = pipeline.TableStatePrepared + if !isPrepare { + e.tables[tableID] = pipeline.TableStateReplicating + } + } + return args.Bool(0) +} + +// IsRemoveTableFinished determines if the table has been removed. +func (e *MockTableExecutor) IsRemoveTableFinished(ctx context.Context, tableID model.TableID) (model.Ts, bool) { + state, ok := e.tables[tableID] + if !ok { + log.Warn("table to be removed is not found", + zap.Int64("tableID", tableID)) + return 0, true + } + args := e.Called(ctx, tableID) + if args.Bool(1) { + log.Info("remove table finished, remove it from the executor", + zap.Int64("tableID", tableID), zap.Any("state", state)) + delete(e.tables, tableID) + } + return model.Ts(args.Int(0)), args.Bool(1) +} + +// GetAllCurrentTables returns all tables that are currently being adding, running, or removing. +func (e *MockTableExecutor) GetAllCurrentTables() []model.TableID { + var result []model.TableID + for tableID := range e.tables { + result = append(result, tableID) + } + return result +} + +// GetCheckpoint returns the last checkpoint. +func (e *MockTableExecutor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { + args := e.Called() + return args.Get(0).(model.Ts), args.Get(1).(model.Ts) +} + +// GetTableMeta implements TableExecutor interface +func (e *MockTableExecutor) GetTableMeta(tableID model.TableID) pipeline.TableMeta { + state, ok := e.tables[tableID] + if !ok { + state = pipeline.TableStateAbsent + } + return pipeline.TableMeta{ + TableID: tableID, + CheckpointTs: 0, + ResolvedTs: 0, + Status: state, + } +} diff --git a/cdc/scheduler/internal/tp/capture_manager.go b/cdc/scheduler/internal/tp/capture_manager.go index 143fd0c298d..de1216da82f 100644 --- a/cdc/scheduler/internal/tp/capture_manager.go +++ b/cdc/scheduler/internal/tp/capture_manager.go @@ -41,7 +41,7 @@ const ( CaptureStateStopping CaptureState = 3 ) -// CaptureStatus represent captrue's status. +// CaptureStatus represent capture's status. type CaptureStatus struct { OwnerRev schedulepb.OwnerRevision Epoch schedulepb.ProcessorEpoch @@ -95,8 +95,8 @@ func (c *captureManager) CaptureTableSets() map[model.CaptureID]*CaptureStatus { } func (c *captureManager) CheckAllCaptureInitialized() bool { - for _, captrueStatus := range c.Captures { - if captrueStatus.State == CaptureStateUninitialize { + for _, captureStatus := range c.Captures { + if captureStatus.State == CaptureStateUninitialize { return false } } diff --git a/cdc/scheduler/internal/tp/coordinator.go b/cdc/scheduler/internal/tp/coordinator.go index 6e592a53f72..968019d8a31 100644 --- a/cdc/scheduler/internal/tp/coordinator.go +++ b/cdc/scheduler/internal/tp/coordinator.go @@ -58,7 +58,7 @@ func NewCoordinator( ownerRevision int64, cfg *config.SchedulerConfig, ) (internal.Scheduler, error) { - trans, err := newTranport(ctx, changeFeedID, messageServer, messageRouter) + trans, err := newTransport(ctx, changeFeedID, messageServer, messageRouter) if err != nil { return nil, errors.Trace(err) } @@ -110,7 +110,7 @@ func (c *coordinator) poll( sentMsgs = append(sentMsgs, msgs...) if c.captureM.CheckAllCaptureInitialized() { // Skip polling replication manager as not all capture are initialized. - err := c.trans.Send(ctx, sentMsgs) + err := c.sendMsgs(ctx, sentMsgs) return errors.Trace(err) } @@ -142,7 +142,7 @@ func (c *coordinator) poll( return errors.Trace(err) } - // checkpoint calcuation + // checkpoint calculation return nil } diff --git a/cdc/scheduler/internal/tp/coordinator_test.go b/cdc/scheduler/internal/tp/coordinator_test.go index 921727ce3a8..d6558480f07 100644 --- a/cdc/scheduler/internal/tp/coordinator_test.go +++ b/cdc/scheduler/internal/tp/coordinator_test.go @@ -26,6 +26,10 @@ type mockTrans struct { recv func(ctx context.Context) ([]*schedulepb.Message, error) } +func (m *mockTrans) Close() error { + return nil +} + func (m *mockTrans) Send(ctx context.Context, msgs []*schedulepb.Message) error { return m.send(ctx, msgs) } diff --git a/cdc/scheduler/internal/tp/replication_manager.go b/cdc/scheduler/internal/tp/replication_manager.go index b03287bd068..8cb406f1b8b 100644 --- a/cdc/scheduler/internal/tp/replication_manager.go +++ b/cdc/scheduler/internal/tp/replication_manager.go @@ -23,8 +23,8 @@ import ( type callback func() -// burstBalance for changefeed set up or unplaned TiCDC node failure. -// TiCDC needs to balance intrrupted tables as soon as possible. +// burstBalance for changefeed set up or unplanned TiCDC node failure. +// TiCDC needs to balance interrupted tables as soon as possible. type burstBalance struct { // Add tables to captures Tables map[model.TableID]model.CaptureID @@ -76,12 +76,6 @@ func (r *replicationManager) HandleMessage( for i := range msgs { msg := msgs[i] switch msg.MsgType { - case schedulepb.MsgCheckpoint: - msgs, err := r.handleMessageCheckpoint(msg.Checkpoints) - if err != nil { - return nil, errors.Trace(err) - } - sentMegs = append(sentMegs, msgs...) case schedulepb.MsgDispatchTableResponse: msgs, err := r.handleMessageDispatchTableResponse(msg.From, msg.DispatchTableResponse) if err != nil { @@ -156,12 +150,6 @@ func (r *replicationManager) handleMessageDispatchTableResponse( return msgs, nil } -func (r *replicationManager) handleMessageCheckpoint( - checkpoints map[model.TableID]schedulepb.Checkpoint, -) ([]*schedulepb.Message, error) { - return nil, nil -} - func (r *replicationManager) HandleTasks( tasks []*scheduleTask, ) ([]*schedulepb.Message, error) { @@ -194,7 +182,7 @@ func (r *replicationManager) HandleTasks( continue } - // Check if accpeting one more task exceeds maxTaskConcurrency. + // Check if accepting one more task exceeds maxTaskConcurrency. if len(r.runningTasks)+1 > r.maxTaskConcurrency { log.Debug("tpcheduler: too many running task") // Does not use break, in case there is burst balance task diff --git a/cdc/scheduler/internal/tp/replication_set.go b/cdc/scheduler/internal/tp/replication_set.go index e75e730eb59..61b188b142a 100644 --- a/cdc/scheduler/internal/tp/replication_set.go +++ b/cdc/scheduler/internal/tp/replication_set.go @@ -287,7 +287,7 @@ func (r *ReplicationSet) pollOnAbsent( schedulepb.TableStateStopping, schedulepb.TableStateStopped: } - log.Warn("tpscheduler: ingore input, unexpected replication set state", + log.Warn("tpscheduler: ignore input, unexpected replication set state", zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -332,7 +332,7 @@ func (r *ReplicationSet) pollOnPrepare( return nil, false, nil } } - log.Warn("tpscheduler: ingore input, unexpected replication set state", + log.Warn("tpscheduler: ignore input, unexpected replication set state", zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -364,7 +364,7 @@ func (r *ReplicationSet) pollOnCommit( r.Secondary = "" log.Info("tpscheduler: replication state promote secondary", zap.Stringer("tableState", input), - zap.String("orignial", original), + zap.String("original", original), zap.String("captureID", captureID)) return &schedulepb.Message{ To: captureID, @@ -388,7 +388,7 @@ func (r *ReplicationSet) pollOnCommit( r.Secondary = "" log.Info("tpscheduler: replication state promote secondary", zap.Stringer("tableState", input), - zap.String("orignial", original), + zap.String("original", original), zap.String("captureID", captureID)) return &schedulepb.Message{ To: r.Primary, @@ -434,7 +434,7 @@ func (r *ReplicationSet) pollOnCommit( } case schedulepb.TableStatePreparing: } - log.Warn("tpscheduler: ingore input, unexpected replication set state", + log.Warn("tpscheduler: ignore input, unexpected replication set state", zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -458,7 +458,7 @@ func (r *ReplicationSet) pollOnReplicating( case schedulepb.TableStateStopping: case schedulepb.TableStateStopped: } - log.Warn("tpscheduler: ingore input, unexpected replication set state", + log.Warn("tpscheduler: ignore input, unexpected replication set state", zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) @@ -496,7 +496,7 @@ func (r *ReplicationSet) pollOnRemoving( case schedulepb.TableStateStopping: return nil, false, nil } - log.Warn("tpscheduler: ingore input, unexpected replication set state", + log.Warn("tpscheduler: ignore input, unexpected replication set state", zap.Stringer("tableState", input), zap.String("captureID", captureID), zap.Any("replicationSet", r)) diff --git a/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go b/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go index c498d4e5648..1a5cfaef53f 100644 --- a/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go +++ b/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go @@ -24,7 +24,7 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -// TableState is the state of table repliction in processor. +// TableState is the state of table replication in processor. // // ┌────────┐ ┌───────────┐ ┌──────────┐ // │ Absent ├─> │ Preparing ├─> │ Prepared │ @@ -77,29 +77,26 @@ type MessageType int32 const ( MsgUnknown MessageType = 0 - MsgCheckpoint MessageType = 1 - MsgDispatchTableRequest MessageType = 2 - MsgDispatchTableResponse MessageType = 3 - MsgHeartbeat MessageType = 4 - MsgHeartbeatResponse MessageType = 5 + MsgDispatchTableRequest MessageType = 1 + MsgDispatchTableResponse MessageType = 2 + MsgHeartbeat MessageType = 3 + MsgHeartbeatResponse MessageType = 4 ) var MessageType_name = map[int32]string{ 0: "MsgUnknown", - 1: "MsgCheckpoint", - 2: "MsgDispatchTableRequest", - 3: "MsgDispatchTableResponse", - 4: "MsgHeartbeat", - 5: "MsgHeartbeatResponse", + 1: "MsgDispatchTableRequest", + 2: "MsgDispatchTableResponse", + 3: "MsgHeartbeat", + 4: "MsgHeartbeatResponse", } var MessageType_value = map[string]int32{ "MsgUnknown": 0, - "MsgCheckpoint": 1, - "MsgDispatchTableRequest": 2, - "MsgDispatchTableResponse": 3, - "MsgHeartbeat": 4, - "MsgHeartbeatResponse": 5, + "MsgDispatchTableRequest": 1, + "MsgDispatchTableResponse": 2, + "MsgHeartbeat": 3, + "MsgHeartbeatResponse": 4, } func (x MessageType) String() string { @@ -549,6 +546,7 @@ func (*DispatchTableResponse) XXX_OneofWrappers() []interface{} { } type Heartbeat struct { + TableIDs []github_com_pingcap_tiflow_cdc_model.TableID `protobuf:"varint,1,rep,packed,name=table_ids,json=tableIds,proto3,casttype=github.com/pingcap/tiflow/cdc/model.TableID" json:"table_ids,omitempty"` } func (m *Heartbeat) Reset() { *m = Heartbeat{} } @@ -584,6 +582,13 @@ func (m *Heartbeat) XXX_DiscardUnknown() { var xxx_messageInfo_Heartbeat proto.InternalMessageInfo +func (m *Heartbeat) GetTableIDs() []github_com_pingcap_tiflow_cdc_model.TableID { + if m != nil { + return m.TableIDs + } + return nil +} + type TableStatus struct { TableID github_com_pingcap_tiflow_cdc_model.TableID `protobuf:"varint,1,opt,name=table_id,json=tableId,proto3,casttype=github.com/pingcap/tiflow/cdc/model.TableID" json:"table_id,omitempty"` State TableState `protobuf:"varint,2,opt,name=state,proto3,enum=pingcap.tiflow.cdc.schedulepb.TableState" json:"state,omitempty"` @@ -785,15 +790,14 @@ func (m *ProcessorEpoch) GetEpoch() string { } type Message struct { - Header *Message_Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` - MsgType MessageType `protobuf:"varint,2,opt,name=msg_type,json=msgType,proto3,enum=pingcap.tiflow.cdc.schedulepb.MessageType" json:"msg_type,omitempty"` - From github_com_pingcap_tiflow_cdc_model.CaptureID `protobuf:"bytes,3,opt,name=from,proto3,casttype=github.com/pingcap/tiflow/cdc/model.CaptureID" json:"from,omitempty"` - To github_com_pingcap_tiflow_cdc_model.CaptureID `protobuf:"bytes,4,opt,name=to,proto3,casttype=github.com/pingcap/tiflow/cdc/model.CaptureID" json:"to,omitempty"` - DispatchTableRequest *DispatchTableRequest `protobuf:"bytes,5,opt,name=dispatch_table_request,json=dispatchTableRequest,proto3" json:"dispatch_table_request,omitempty"` - DispatchTableResponse *DispatchTableResponse `protobuf:"bytes,6,opt,name=dispatch_table_response,json=dispatchTableResponse,proto3" json:"dispatch_table_response,omitempty"` - Heartbeat *Heartbeat `protobuf:"bytes,7,opt,name=heartbeat,proto3" json:"heartbeat,omitempty"` - HeartbeatResponse *HeartbeatResponse `protobuf:"bytes,8,opt,name=heartbeat_response,json=heartbeatResponse,proto3" json:"heartbeat_response,omitempty"` - Checkpoints map[github_com_pingcap_tiflow_cdc_model.TableID]Checkpoint `protobuf:"bytes,9,rep,name=checkpoints,proto3,castkey=github.com/pingcap/tiflow/cdc/model.TableID" json:"checkpoints" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Header *Message_Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` + MsgType MessageType `protobuf:"varint,2,opt,name=msg_type,json=msgType,proto3,enum=pingcap.tiflow.cdc.schedulepb.MessageType" json:"msg_type,omitempty"` + From github_com_pingcap_tiflow_cdc_model.CaptureID `protobuf:"bytes,3,opt,name=from,proto3,casttype=github.com/pingcap/tiflow/cdc/model.CaptureID" json:"from,omitempty"` + To github_com_pingcap_tiflow_cdc_model.CaptureID `protobuf:"bytes,4,opt,name=to,proto3,casttype=github.com/pingcap/tiflow/cdc/model.CaptureID" json:"to,omitempty"` + DispatchTableRequest *DispatchTableRequest `protobuf:"bytes,5,opt,name=dispatch_table_request,json=dispatchTableRequest,proto3" json:"dispatch_table_request,omitempty"` + DispatchTableResponse *DispatchTableResponse `protobuf:"bytes,6,opt,name=dispatch_table_response,json=dispatchTableResponse,proto3" json:"dispatch_table_response,omitempty"` + Heartbeat *Heartbeat `protobuf:"bytes,7,opt,name=heartbeat,proto3" json:"heartbeat,omitempty"` + HeartbeatResponse *HeartbeatResponse `protobuf:"bytes,8,opt,name=heartbeat_response,json=heartbeatResponse,proto3" json:"heartbeat_response,omitempty"` } func (m *Message) Reset() { *m = Message{} } @@ -885,13 +889,6 @@ func (m *Message) GetHeartbeatResponse() *HeartbeatResponse { return nil } -func (m *Message) GetCheckpoints() map[github_com_pingcap_tiflow_cdc_model.TableID]Checkpoint { - if m != nil { - return m.Checkpoints - } - return nil -} - type Message_Header struct { // The semantic version of the node that sent this message. Version string `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` @@ -969,89 +966,84 @@ func init() { proto.RegisterType((*OwnerRevision)(nil), "pingcap.tiflow.cdc.schedulepb.OwnerRevision") proto.RegisterType((*ProcessorEpoch)(nil), "pingcap.tiflow.cdc.schedulepb.ProcessorEpoch") proto.RegisterType((*Message)(nil), "pingcap.tiflow.cdc.schedulepb.Message") - proto.RegisterMapType((map[github_com_pingcap_tiflow_cdc_model.TableID]Checkpoint)(nil), "pingcap.tiflow.cdc.schedulepb.Message.CheckpointsEntry") proto.RegisterType((*Message_Header)(nil), "pingcap.tiflow.cdc.schedulepb.Message.Header") } func init() { proto.RegisterFile("table_schedule.proto", fileDescriptor_ab4bb9c6b16cfa4d) } var fileDescriptor_ab4bb9c6b16cfa4d = []byte{ - // 1186 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xcf, 0x6f, 0xe3, 0xc4, - 0x17, 0x8f, 0x93, 0x34, 0x3f, 0x5e, 0xda, 0xae, 0x3b, 0x9b, 0xee, 0xe6, 0xeb, 0x2f, 0x24, 0xc6, - 0x5a, 0x55, 0x25, 0xcb, 0x26, 0x6c, 0x8b, 0xc4, 0xaa, 0x17, 0xd4, 0xec, 0x16, 0xb5, 0x42, 0xa1, - 0x2b, 0xb7, 0x0b, 0x02, 0x21, 0x45, 0x8e, 0x3d, 0xeb, 0x98, 0x26, 0x1e, 0xe3, 0x71, 0x5a, 0xf5, - 0xc2, 0x81, 0x63, 0x0e, 0x88, 0x13, 0xb7, 0x1c, 0x90, 0x38, 0xf1, 0x1f, 0xf0, 0x1f, 0xf4, 0xc0, - 0xa1, 0x27, 0x84, 0x84, 0x54, 0xa0, 0xfd, 0x1f, 0x38, 0x54, 0x42, 0x42, 0x1e, 0x4f, 0xec, 0x24, - 0xcd, 0x92, 0x84, 0x15, 0x88, 0xdb, 0xcc, 0xbc, 0xf7, 0x3e, 0xef, 0xe7, 0xe7, 0x59, 0x86, 0xbc, - 0xa7, 0x35, 0xdb, 0xb8, 0x41, 0xf5, 0x16, 0x36, 0xba, 0x6d, 0x5c, 0x71, 0x5c, 0xe2, 0x11, 0xf4, - 0xaa, 0x63, 0xd9, 0xa6, 0xae, 0x39, 0x15, 0xcf, 0x7a, 0xde, 0x26, 0x27, 0x15, 0xdd, 0xd0, 0x2b, - 0x03, 0x15, 0xa7, 0x29, 0xe5, 0x4d, 0x62, 0x12, 0xa6, 0x59, 0xf5, 0x4f, 0x81, 0x91, 0xf2, 0x9d, - 0x00, 0xf0, 0xb8, 0x85, 0xf5, 0x23, 0x87, 0x58, 0xb6, 0x87, 0xf6, 0x61, 0x49, 0x0f, 0x6f, 0x0d, - 0x8f, 0x16, 0x04, 0x59, 0x58, 0x4f, 0xd6, 0xca, 0xd7, 0x17, 0xa5, 0x35, 0xd3, 0xf2, 0x5a, 0xdd, - 0x66, 0x45, 0x27, 0x9d, 0x2a, 0xf7, 0x54, 0x0d, 0x3c, 0x55, 0x75, 0x43, 0xaf, 0x76, 0x88, 0x81, - 0xdb, 0x95, 0x43, 0xaa, 0x2e, 0x46, 0x00, 0x87, 0x14, 0xbd, 0x07, 0x39, 0x17, 0x53, 0xd2, 0x3e, - 0xc6, 0x86, 0x0f, 0x17, 0x9f, 0x1b, 0x0e, 0x06, 0xe6, 0x87, 0x54, 0xf9, 0x51, 0x80, 0x5b, 0xdb, - 0x86, 0x71, 0xe8, 0x67, 0xaf, 0xe2, 0xcf, 0xba, 0x98, 0x7a, 0xe8, 0x19, 0x64, 0x82, 0x6a, 0x58, - 0x06, 0x0b, 0x36, 0x51, 0xdb, 0xba, 0xbc, 0x28, 0xa5, 0x99, 0xce, 0xde, 0x93, 0xeb, 0x8b, 0xd2, - 0xfd, 0x99, 0x1c, 0x05, 0xea, 0x6a, 0x9a, 0x61, 0xed, 0x19, 0xe8, 0x35, 0x58, 0xb4, 0x68, 0x83, - 0x62, 0x9d, 0xd8, 0x86, 0xe6, 0x9e, 0xb2, 0xc0, 0x33, 0x6a, 0xce, 0xa2, 0x07, 0x83, 0x27, 0xb4, - 0x07, 0x10, 0xa5, 0x5a, 0x48, 0xc8, 0xc2, 0x7a, 0x6e, 0xe3, 0xf5, 0xca, 0x5f, 0x36, 0xa1, 0x12, - 0x95, 0x5a, 0x1d, 0x32, 0x56, 0x8e, 0x00, 0xa9, 0xb8, 0x43, 0x8e, 0xf1, 0xbf, 0x90, 0x9a, 0x72, - 0x26, 0x40, 0xfe, 0x89, 0x45, 0x1d, 0xcd, 0xd3, 0x5b, 0x23, 0xfe, 0xea, 0x90, 0xd5, 0x0c, 0xa3, - 0xc1, 0xf4, 0x98, 0xc3, 0xdc, 0x46, 0x65, 0x4a, 0x3e, 0x63, 0xdd, 0xd8, 0x8d, 0xa9, 0x19, 0x8d, - 0x3f, 0xa1, 0x0f, 0x60, 0xd1, 0x65, 0x49, 0x71, 0xc4, 0x38, 0x43, 0x7c, 0x38, 0x05, 0xf1, 0x66, - 0x1d, 0x76, 0x63, 0x6a, 0xce, 0x8d, 0x5e, 0x6b, 0x59, 0x48, 0xbb, 0x81, 0x44, 0xf9, 0x5e, 0x00, - 0x31, 0x0a, 0x81, 0x3a, 0xc4, 0xa6, 0x18, 0xd5, 0x20, 0x45, 0x3d, 0xcd, 0xeb, 0x52, 0x9e, 0x43, - 0x79, 0x8a, 0x47, 0x66, 0x7d, 0xc0, 0x2c, 0x54, 0x6e, 0x39, 0xd6, 0xdb, 0xf8, 0x4b, 0xf4, 0x16, - 0xdd, 0x81, 0x94, 0x8b, 0x3f, 0xc5, 0x7a, 0x30, 0x22, 0x19, 0x95, 0xdf, 0x94, 0x6f, 0x05, 0xb8, - 0x3d, 0x92, 0xec, 0x7f, 0x32, 0x7c, 0xe5, 0x07, 0x01, 0x56, 0xc7, 0xa6, 0x85, 0x07, 0xfa, 0xfe, - 0xcd, 0x71, 0xa9, 0xce, 0x3c, 0x2e, 0x01, 0xc6, 0xc8, 0xbc, 0x7c, 0x38, 0x71, 0x5e, 0x36, 0xe6, - 0x99, 0x97, 0x10, 0x75, 0x64, 0x60, 0x00, 0x32, 0x2e, 0x17, 0x29, 0x39, 0xc8, 0xee, 0x62, 0xcd, - 0xf5, 0x9a, 0x58, 0xf3, 0x94, 0xdf, 0x05, 0xc8, 0x0d, 0x95, 0xef, 0x9f, 0xda, 0x25, 0xef, 0xc0, - 0x82, 0xdf, 0x97, 0x20, 0xa3, 0xe5, 0xa9, 0x8d, 0x08, 0x23, 0xc2, 0x6a, 0x60, 0x87, 0xf6, 0x5f, - 0x6a, 0xd3, 0xd4, 0x92, 0x67, 0x17, 0xa5, 0xd8, 0x48, 0x53, 0x3f, 0x87, 0x95, 0xb0, 0x0a, 0x61, - 0x3f, 0x77, 0x21, 0xc5, 0x22, 0xf6, 0x07, 0x2f, 0x31, 0xdf, 0xe0, 0x71, 0x17, 0xdc, 0x1e, 0x95, - 0x20, 0xe7, 0x2f, 0x4f, 0x8f, 0x38, 0x3e, 0x02, 0xdf, 0x9d, 0x60, 0xd1, 0x03, 0xfe, 0xa2, 0xdc, - 0x87, 0xa5, 0xfd, 0x13, 0x1b, 0xbb, 0x2a, 0x3e, 0xb6, 0xa8, 0x45, 0x6c, 0x24, 0xf9, 0x2d, 0x0a, - 0xce, 0x41, 0xe5, 0xd5, 0xf0, 0xae, 0xac, 0xc1, 0xf2, 0x53, 0x97, 0xe8, 0x98, 0x52, 0xe2, 0xee, - 0x38, 0x44, 0x6f, 0xa1, 0x3c, 0x2c, 0x60, 0xff, 0xc0, 0x54, 0xb3, 0x6a, 0x70, 0x51, 0xfe, 0xc8, - 0x40, 0xba, 0x8e, 0x29, 0xd5, 0x4c, 0x8c, 0x76, 0x20, 0xd5, 0xc2, 0x9a, 0x81, 0x5d, 0x3e, 0x98, - 0x0f, 0xa6, 0xe4, 0xc2, 0xed, 0x2a, 0xbb, 0xcc, 0x48, 0xe5, 0xc6, 0x68, 0x07, 0x32, 0x1d, 0x6a, - 0x36, 0xbc, 0x53, 0x67, 0xd0, 0xbc, 0xf2, 0x6c, 0x40, 0x87, 0xa7, 0x0e, 0x56, 0xd3, 0x1d, 0x6a, - 0xfa, 0x07, 0xb4, 0x03, 0xc9, 0xe7, 0x2e, 0xe9, 0xb0, 0xce, 0x65, 0x6b, 0x0f, 0xaf, 0x2f, 0x4a, - 0x0f, 0x66, 0x19, 0xa4, 0xc7, 0x9a, 0xe3, 0x75, 0x5d, 0x7f, 0x94, 0x98, 0x39, 0xda, 0x86, 0xb8, - 0x47, 0x0a, 0xc9, 0xbf, 0x0b, 0x12, 0xf7, 0x08, 0xb2, 0xe0, 0x8e, 0xc1, 0xc9, 0x1c, 0xb0, 0xac, - 0xc1, 0x57, 0x69, 0x61, 0x81, 0xd5, 0x69, 0x73, 0x4a, 0x7a, 0x93, 0xbe, 0x1b, 0x6a, 0xde, 0x98, - 0xf4, 0x35, 0x69, 0xc3, 0xdd, 0x1b, 0xae, 0x82, 0x49, 0x2b, 0xa4, 0x98, 0xaf, 0xb7, 0xe6, 0xf3, - 0x15, 0xd8, 0xaa, 0xab, 0xc6, 0xc4, 0x65, 0xf4, 0x2e, 0x64, 0x5b, 0x83, 0x89, 0x2e, 0xa4, 0x19, - 0xfe, 0xfa, 0x14, 0xfc, 0x88, 0x01, 0x91, 0x29, 0x6a, 0x00, 0x0a, 0x2f, 0x51, 0xc0, 0x19, 0x06, - 0xf8, 0xe6, 0xcc, 0x80, 0x83, 0x60, 0x57, 0x5a, 0x37, 0x58, 0xf6, 0xa5, 0x00, 0xb9, 0x88, 0x89, - 0xb4, 0x90, 0x65, 0x5c, 0x7b, 0x7b, 0xc6, 0xf9, 0x8c, 0x58, 0x4d, 0x77, 0x6c, 0xcf, 0x3d, 0xad, - 0x6d, 0xfa, 0xc4, 0xfb, 0xe2, 0x97, 0xf9, 0x36, 0xd3, 0x70, 0x00, 0xd2, 0xcf, 0x02, 0xa4, 0x82, - 0xb1, 0x47, 0x05, 0x48, 0x1f, 0x63, 0x37, 0x24, 0x61, 0x56, 0x1d, 0x5c, 0xd1, 0x47, 0xb0, 0x4c, - 0x7c, 0xc2, 0x36, 0x42, 0x96, 0x06, 0xdb, 0xf9, 0x8d, 0x29, 0x71, 0x8f, 0xb0, 0x9c, 0x6f, 0x89, - 0x25, 0x32, 0x42, 0xfd, 0x4f, 0xe0, 0x96, 0x33, 0xa0, 0x77, 0x23, 0xa0, 0x75, 0x62, 0x26, 0xce, - 0x8e, 0x2e, 0x05, 0x0e, 0xbe, 0xec, 0x8c, 0xbc, 0x4a, 0x16, 0x88, 0xe3, 0x35, 0x43, 0x22, 0x24, - 0x8e, 0xf0, 0x29, 0xdf, 0x33, 0xfe, 0xd1, 0xdf, 0xd0, 0xc7, 0x5a, 0xbb, 0x8b, 0xe7, 0xff, 0x54, - 0x06, 0x76, 0x5b, 0xf1, 0x47, 0x42, 0xf9, 0xeb, 0x38, 0x40, 0xb4, 0xbb, 0x91, 0x02, 0xe9, 0x67, - 0xf6, 0x91, 0x4d, 0x4e, 0x6c, 0x31, 0x26, 0xad, 0xf6, 0xfa, 0xf2, 0x4a, 0x24, 0xe4, 0x02, 0x24, - 0x43, 0x6a, 0xbb, 0x49, 0xb1, 0xed, 0x89, 0x82, 0x94, 0xef, 0xf5, 0x65, 0x31, 0x52, 0x09, 0xde, - 0xd1, 0x1a, 0x64, 0x9f, 0xba, 0xd8, 0xd1, 0x5c, 0xcb, 0x36, 0xc5, 0xb8, 0x74, 0xb7, 0xd7, 0x97, - 0x6f, 0x47, 0x4a, 0xa1, 0x08, 0xdd, 0x83, 0x4c, 0x70, 0xc1, 0x86, 0x98, 0x90, 0xee, 0xf4, 0xfa, - 0x32, 0x1a, 0x57, 0xc3, 0x06, 0x2a, 0x43, 0x4e, 0xc5, 0x4e, 0xdb, 0xd2, 0x35, 0xcf, 0xc7, 0x4b, - 0x4a, 0xff, 0xeb, 0xf5, 0xe5, 0xd5, 0xa1, 0x0f, 0x4e, 0x24, 0xf4, 0x11, 0x07, 0xfb, 0x5a, 0x5c, - 0x18, 0x47, 0x1c, 0x48, 0xfc, 0x2c, 0xd9, 0x19, 0x1b, 0x62, 0x6a, 0x3c, 0x4b, 0x2e, 0x28, 0x7f, - 0x13, 0x87, 0xdc, 0xd0, 0x5e, 0x44, 0x45, 0x80, 0x3a, 0x35, 0xa3, 0xe2, 0x2c, 0xf7, 0xfa, 0xf2, - 0xd0, 0x0b, 0xba, 0x07, 0x4b, 0x75, 0x6a, 0x46, 0x45, 0x16, 0x05, 0x69, 0xa5, 0xd7, 0x97, 0x47, - 0x1f, 0xd1, 0x23, 0xb8, 0x5b, 0xa7, 0xe6, 0xa4, 0x85, 0x24, 0xc6, 0xa5, 0xff, 0xf7, 0xfa, 0xf2, - 0x8b, 0xc4, 0x68, 0x0b, 0x0a, 0x37, 0x45, 0x01, 0x3d, 0xc5, 0x84, 0xf4, 0x4a, 0xaf, 0x2f, 0xbf, - 0x50, 0x8e, 0x14, 0x58, 0xac, 0x53, 0x33, 0x64, 0xba, 0x98, 0x94, 0xc4, 0x5e, 0x5f, 0x1e, 0x79, - 0x43, 0x1b, 0x90, 0x1f, 0xbe, 0x87, 0xd8, 0x0b, 0x52, 0xa1, 0xd7, 0x97, 0x27, 0xca, 0x6a, 0xeb, - 0xe7, 0xbf, 0x15, 0x63, 0x67, 0x97, 0x45, 0xe1, 0xfc, 0xb2, 0x28, 0xfc, 0x7a, 0x59, 0x14, 0xbe, - 0xba, 0x2a, 0xc6, 0xce, 0xaf, 0x8a, 0xb1, 0x9f, 0xae, 0x8a, 0xb1, 0x8f, 0x21, 0x1a, 0xbe, 0x66, - 0x8a, 0xfd, 0xb8, 0x6d, 0xfe, 0x19, 0x00, 0x00, 0xff, 0xff, 0xcb, 0x2c, 0x20, 0x3a, 0x05, 0x0e, - 0x00, 0x00, + // 1127 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x4b, 0x6f, 0xe3, 0xd4, + 0x17, 0x8f, 0x93, 0x34, 0x8f, 0x93, 0x4e, 0xc7, 0x73, 0x27, 0x6d, 0xf3, 0xf7, 0x1f, 0x12, 0x63, + 0xa1, 0xaa, 0x64, 0x98, 0x84, 0xe9, 0xb0, 0x40, 0xc3, 0x02, 0x35, 0x33, 0x45, 0xad, 0x50, 0xe9, + 0xc8, 0xed, 0xf0, 0x12, 0x52, 0xe4, 0xd8, 0x77, 0x12, 0xd3, 0xc4, 0xd7, 0xf8, 0xba, 0xad, 0xba, + 0x41, 0x62, 0x9b, 0x15, 0x2b, 0x76, 0xd9, 0xb1, 0xe2, 0x1b, 0xf0, 0x0d, 0xba, 0x60, 0xd1, 0x15, + 0x42, 0x42, 0x8a, 0xa0, 0xfd, 0x0e, 0x2c, 0xca, 0x06, 0xf9, 0xde, 0x6b, 0x3b, 0x49, 0x33, 0x24, + 0x65, 0x04, 0x62, 0xe7, 0x7b, 0x1e, 0xbf, 0xf3, 0xfa, 0x9d, 0x13, 0x05, 0x8a, 0xbe, 0xd1, 0xea, + 0xe2, 0x26, 0x35, 0x3b, 0xd8, 0x3a, 0xea, 0xe2, 0x9a, 0xeb, 0x11, 0x9f, 0xa0, 0x57, 0x5d, 0xdb, + 0x69, 0x9b, 0x86, 0x5b, 0xf3, 0xed, 0xe7, 0x5d, 0x72, 0x52, 0x33, 0x2d, 0xb3, 0x16, 0x9a, 0xb8, + 0x2d, 0xa5, 0xd8, 0x26, 0x6d, 0xc2, 0x2c, 0xeb, 0xc1, 0x17, 0x77, 0xd2, 0xbe, 0x97, 0x00, 0x1e, + 0x77, 0xb0, 0x79, 0xe8, 0x12, 0xdb, 0xf1, 0xd1, 0x1e, 0xdc, 0x32, 0xa3, 0x57, 0xd3, 0xa7, 0x25, + 0x49, 0x95, 0xd6, 0xd3, 0x8d, 0xea, 0xd5, 0xb0, 0xb2, 0xd6, 0xb6, 0xfd, 0xce, 0x51, 0xab, 0x66, + 0x92, 0x5e, 0x5d, 0x44, 0xaa, 0xf3, 0x48, 0x75, 0xd3, 0x32, 0xeb, 0x3d, 0x62, 0xe1, 0x6e, 0xed, + 0x80, 0xea, 0x8b, 0x31, 0xc0, 0x01, 0x45, 0x1f, 0x40, 0xc1, 0xc3, 0x94, 0x74, 0x8f, 0xb1, 0x15, + 0xc0, 0x25, 0x6f, 0x0c, 0x07, 0xa1, 0xfb, 0x01, 0xd5, 0x7e, 0x92, 0xe0, 0xf6, 0xa6, 0x65, 0x1d, + 0x04, 0xd5, 0xeb, 0xf8, 0xcb, 0x23, 0x4c, 0x7d, 0xf4, 0x0c, 0x72, 0xbc, 0x1b, 0xb6, 0xc5, 0x92, + 0x4d, 0x35, 0x1e, 0x5d, 0x0c, 0x2b, 0x59, 0x66, 0xb3, 0xf3, 0xe4, 0x6a, 0x58, 0xb9, 0x37, 0x57, + 0x20, 0x6e, 0xae, 0x67, 0x19, 0xd6, 0x8e, 0x85, 0x5e, 0x83, 0x45, 0x9b, 0x36, 0x29, 0x36, 0x89, + 0x63, 0x19, 0xde, 0x29, 0x4b, 0x3c, 0xa7, 0x17, 0x6c, 0xba, 0x1f, 0x8a, 0xd0, 0x0e, 0x40, 0x5c, + 0x6a, 0x29, 0xa5, 0x4a, 0xeb, 0x85, 0x8d, 0x37, 0x6a, 0x7f, 0x39, 0x84, 0x5a, 0xdc, 0x6a, 0x7d, + 0xc4, 0x59, 0x3b, 0x04, 0xa4, 0xe3, 0x1e, 0x39, 0xc6, 0xff, 0x42, 0x69, 0xda, 0x99, 0x04, 0xc5, + 0x27, 0x36, 0x75, 0x0d, 0xdf, 0xec, 0x8c, 0xc5, 0xdb, 0x85, 0xbc, 0x61, 0x59, 0x4d, 0x66, 0xc7, + 0x02, 0x16, 0x36, 0x6a, 0x33, 0xea, 0x99, 0x98, 0xc6, 0x76, 0x42, 0xcf, 0x19, 0x42, 0x84, 0x3e, + 0x82, 0x45, 0x8f, 0x15, 0x25, 0x10, 0x93, 0x0c, 0xf1, 0xc1, 0x0c, 0xc4, 0xeb, 0x7d, 0xd8, 0x4e, + 0xe8, 0x05, 0x2f, 0x96, 0x36, 0xf2, 0x90, 0xf5, 0xb8, 0x46, 0xfb, 0x41, 0x02, 0x39, 0x4e, 0x81, + 0xba, 0xc4, 0xa1, 0x18, 0x35, 0x20, 0x43, 0x7d, 0xc3, 0x3f, 0xa2, 0xa2, 0x86, 0xea, 0x8c, 0x88, + 0xcc, 0x7b, 0x9f, 0x79, 0xe8, 0xc2, 0x73, 0x62, 0xb6, 0xc9, 0x97, 0x98, 0x2d, 0x5a, 0x81, 0x8c, + 0x87, 0xbf, 0xc0, 0x26, 0xa7, 0x48, 0x4e, 0x17, 0x2f, 0xed, 0x3b, 0x09, 0xee, 0x8e, 0x15, 0xfb, + 0x9f, 0x4c, 0x5f, 0xfb, 0x51, 0x82, 0xe5, 0x09, 0xb6, 0x88, 0x44, 0x3f, 0xbc, 0x4e, 0x97, 0xfa, + 0xdc, 0x74, 0xe1, 0x18, 0x63, 0x7c, 0xf9, 0x78, 0x2a, 0x5f, 0x36, 0x6e, 0xc2, 0x97, 0x08, 0x75, + 0x8c, 0x30, 0x00, 0x39, 0x4f, 0xa8, 0x34, 0x0c, 0xf9, 0x6d, 0x6c, 0x78, 0x7e, 0x0b, 0x1b, 0x3e, + 0xfa, 0x04, 0xf2, 0xe1, 0x82, 0x05, 0xdd, 0x4e, 0xad, 0xa7, 0x1a, 0xef, 0x5e, 0x0c, 0x2b, 0x39, + 0xb1, 0x32, 0xf4, 0xa6, 0x2b, 0x96, 0x13, 0x2b, 0x46, 0xb5, 0xdf, 0x25, 0x28, 0x8c, 0x0c, 0xe6, + 0x9f, 0xba, 0x52, 0xef, 0xc1, 0x42, 0x30, 0x71, 0xde, 0xab, 0xa5, 0x99, 0x23, 0x8e, 0x32, 0xc2, + 0x3a, 0xf7, 0x43, 0x7b, 0x2f, 0x75, 0xc3, 0x1a, 0xe9, 0xb3, 0x61, 0x25, 0x31, 0x46, 0x97, 0xaf, + 0xe0, 0x4e, 0xd4, 0xdf, 0x88, 0x29, 0xdb, 0x90, 0x61, 0x19, 0xf3, 0x26, 0xdf, 0x88, 0xd2, 0x22, + 0x84, 0xf0, 0x47, 0x15, 0x28, 0x04, 0x67, 0xd9, 0x27, 0x6e, 0x80, 0x20, 0xae, 0x32, 0xd8, 0x74, + 0x5f, 0x48, 0xb4, 0x7b, 0x70, 0x6b, 0xef, 0xc4, 0xc1, 0x9e, 0x8e, 0x8f, 0x6d, 0x6a, 0x13, 0x07, + 0x29, 0xc1, 0xf0, 0xf9, 0x37, 0xef, 0xbc, 0x1e, 0xbd, 0xb5, 0x35, 0x58, 0x7a, 0xea, 0x11, 0x13, + 0x53, 0x4a, 0xbc, 0x2d, 0x97, 0x98, 0x1d, 0x54, 0x84, 0x05, 0x1c, 0x7c, 0x30, 0xd3, 0xbc, 0xce, + 0x1f, 0xda, 0xd7, 0x59, 0xc8, 0xee, 0x62, 0x4a, 0x8d, 0x36, 0x46, 0x5b, 0x90, 0xe9, 0x60, 0xc3, + 0xc2, 0x9e, 0xa0, 0xfc, 0xfd, 0x19, 0xb5, 0x08, 0xbf, 0xda, 0x36, 0x73, 0xd2, 0x85, 0x33, 0xda, + 0x82, 0x5c, 0x8f, 0xb6, 0x9b, 0xfe, 0xa9, 0x1b, 0x0e, 0xaf, 0x3a, 0x1f, 0xd0, 0xc1, 0xa9, 0x8b, + 0xf5, 0x6c, 0x8f, 0xb6, 0x83, 0x0f, 0xb4, 0x05, 0xe9, 0xe7, 0x1e, 0xe9, 0xb1, 0xc9, 0xe5, 0x1b, + 0x0f, 0xae, 0x86, 0x95, 0xfb, 0xf3, 0x10, 0xe9, 0xb1, 0xe1, 0xfa, 0x47, 0x5e, 0x40, 0x25, 0xe6, + 0x8e, 0x36, 0x21, 0xe9, 0x93, 0x52, 0xfa, 0xef, 0x82, 0x24, 0x7d, 0x82, 0x6c, 0x58, 0xb1, 0xc4, + 0x99, 0xe0, 0xfb, 0xdb, 0x14, 0x47, 0xba, 0xb4, 0xc0, 0xfa, 0xf4, 0x70, 0x46, 0x79, 0xd3, 0x7e, + 0x91, 0xf4, 0xa2, 0x35, 0xed, 0x77, 0xaa, 0x0b, 0xab, 0xd7, 0x42, 0x71, 0xa6, 0x95, 0x32, 0x2c, + 0xd6, 0xdb, 0x37, 0x8b, 0xc5, 0x7d, 0xf5, 0x65, 0x6b, 0xea, 0x99, 0x7b, 0x1f, 0xf2, 0x9d, 0x90, + 0xd1, 0xa5, 0x2c, 0xc3, 0x5f, 0x9f, 0x81, 0x1f, 0x6f, 0x40, 0xec, 0x8a, 0x9a, 0x80, 0xa2, 0x47, + 0x9c, 0x70, 0x8e, 0x01, 0xbe, 0x35, 0x37, 0x60, 0x98, 0xec, 0x9d, 0xce, 0xa4, 0x48, 0xf9, 0x45, + 0x82, 0x0c, 0x67, 0x19, 0x2a, 0x41, 0xf6, 0x18, 0x7b, 0x11, 0xe7, 0xf3, 0x7a, 0xf8, 0x44, 0x9f, + 0xc2, 0x12, 0x09, 0xf6, 0xa3, 0x19, 0x2d, 0x05, 0x3f, 0xb3, 0x6f, 0xce, 0xc8, 0x60, 0x6c, 0xa9, + 0xc4, 0x52, 0xde, 0x22, 0x63, 0x9b, 0xf6, 0x39, 0xdc, 0x76, 0xc3, 0x6d, 0x6a, 0xf2, 0x2d, 0x4a, + 0xcd, 0xb5, 0x22, 0xe3, 0x3b, 0x28, 0xc0, 0x97, 0xdc, 0x31, 0x69, 0xf5, 0xdb, 0x24, 0x40, 0x7c, + 0xbf, 0x90, 0x06, 0xd9, 0x67, 0xce, 0xa1, 0x43, 0x4e, 0x1c, 0x39, 0xa1, 0x2c, 0xf7, 0x07, 0xea, + 0x9d, 0x58, 0x29, 0x14, 0x48, 0x85, 0xcc, 0x66, 0x8b, 0x62, 0xc7, 0x97, 0x25, 0xa5, 0xd8, 0x1f, + 0xa8, 0x72, 0x6c, 0xc2, 0xe5, 0x68, 0x0d, 0xf2, 0x4f, 0x3d, 0xec, 0x1a, 0x9e, 0xed, 0xb4, 0xe5, + 0xa4, 0xb2, 0xda, 0x1f, 0xa8, 0x77, 0x63, 0xa3, 0x48, 0x85, 0x5e, 0x87, 0x1c, 0x7f, 0x60, 0x4b, + 0x4e, 0x29, 0x2b, 0xfd, 0x81, 0x8a, 0x26, 0xcd, 0xb0, 0x85, 0xaa, 0x50, 0xd0, 0xb1, 0xdb, 0xb5, + 0x4d, 0xc3, 0x0f, 0xf0, 0xd2, 0xca, 0xff, 0xfa, 0x03, 0x75, 0x79, 0xe4, 0xe8, 0xc6, 0xca, 0x00, + 0x31, 0xbc, 0x59, 0xf2, 0xc2, 0x24, 0x62, 0xa8, 0x09, 0xaa, 0x64, 0xdf, 0xd8, 0x92, 0x33, 0x93, + 0x55, 0x0a, 0x45, 0xf5, 0x0f, 0x09, 0x0a, 0x23, 0xb7, 0x01, 0x95, 0x01, 0x76, 0x69, 0x3b, 0x6e, + 0xce, 0x52, 0x7f, 0xa0, 0x8e, 0x48, 0xd0, 0x3b, 0xb0, 0xba, 0x4b, 0xdb, 0xd3, 0xd6, 0x4d, 0x96, + 0x94, 0xff, 0xf7, 0x07, 0xea, 0x8b, 0xd4, 0xe8, 0x11, 0x94, 0xae, 0xab, 0x38, 0xf9, 0xe4, 0xa4, + 0xf2, 0x4a, 0x7f, 0xa0, 0xbe, 0x50, 0x8f, 0x34, 0x58, 0xdc, 0xa5, 0xed, 0x88, 0xc7, 0x72, 0x4a, + 0x91, 0xfb, 0x03, 0x75, 0x4c, 0x86, 0x36, 0xa0, 0x38, 0xfa, 0x8e, 0xb0, 0xd3, 0x4a, 0xa9, 0x3f, + 0x50, 0xa7, 0xea, 0x1a, 0xeb, 0xe7, 0xbf, 0x95, 0x13, 0x67, 0x17, 0x65, 0xe9, 0xfc, 0xa2, 0x2c, + 0xfd, 0x7a, 0x51, 0x96, 0xbe, 0xb9, 0x2c, 0x27, 0xce, 0x2f, 0xcb, 0x89, 0x9f, 0x2f, 0xcb, 0x89, + 0xcf, 0x20, 0x66, 0x59, 0x2b, 0xc3, 0xfe, 0xf0, 0x3c, 0xfc, 0x33, 0x00, 0x00, 0xff, 0xff, 0xb8, + 0xe4, 0x98, 0x15, 0x3d, 0x0d, 0x00, 0x00, } func (m *Checkpoint) Marshal() (dAtA []byte, err error) { @@ -1437,6 +1429,25 @@ func (m *Heartbeat) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.TableIDs) > 0 { + dAtA11 := make([]byte, len(m.TableIDs)*10) + var j10 int + for _, num1 := range m.TableIDs { + num := uint64(num1) + for num >= 1<<7 { + dAtA11[j10] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j10++ + } + dAtA11[j10] = uint8(num) + j10++ + } + i -= j10 + copy(dAtA[i:], dAtA11[:j10]) + i = encodeVarintTableSchedule(dAtA, i, uint64(j10)) + i-- + dAtA[i] = 0xa + } return len(dAtA) - i, nil } @@ -1608,28 +1619,6 @@ func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Checkpoints) > 0 { - for k := range m.Checkpoints { - v := m.Checkpoints[k] - baseI := i - { - size, err := (&v).MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintTableSchedule(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - i = encodeVarintTableSchedule(dAtA, i, uint64(k)) - i-- - dAtA[i] = 0x8 - i = encodeVarintTableSchedule(dAtA, i, uint64(baseI-i)) - i-- - dAtA[i] = 0x4a - } - } if m.HeartbeatResponse != nil { { size, err := m.HeartbeatResponse.MarshalToSizedBuffer(dAtA[:i]) @@ -1934,6 +1923,13 @@ func (m *Heartbeat) Size() (n int) { } var l int _ = l + if len(m.TableIDs) > 0 { + l = 0 + for _, e := range m.TableIDs { + l += sovTableSchedule(uint64(e)) + } + n += 1 + sovTableSchedule(uint64(l)) + l + } return n } @@ -2034,15 +2030,6 @@ func (m *Message) Size() (n int) { l = m.HeartbeatResponse.Size() n += 1 + l + sovTableSchedule(uint64(l)) } - if len(m.Checkpoints) > 0 { - for k, v := range m.Checkpoints { - _ = k - _ = v - l = v.Size() - mapEntrySize := 1 + sovTableSchedule(uint64(k)) + 1 + l + sovTableSchedule(uint64(l)) - n += mapEntrySize + 1 + sovTableSchedule(uint64(mapEntrySize)) - } - } return n } @@ -2884,6 +2871,82 @@ func (m *Heartbeat) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: Heartbeat: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType == 0 { + var v github_com_pingcap_tiflow_cdc_model.TableID + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTableSchedule + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= github_com_pingcap_tiflow_cdc_model.TableID(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.TableIDs = append(m.TableIDs, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTableSchedule + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthTableSchedule + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return ErrInvalidLengthTableSchedule + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + var count int + for _, integer := range dAtA[iNdEx:postIndex] { + if integer < 128 { + count++ + } + } + elementCount = count + if elementCount != 0 && len(m.TableIDs) == 0 { + m.TableIDs = make([]github_com_pingcap_tiflow_cdc_model.TableID, 0, elementCount) + } + for iNdEx < postIndex { + var v github_com_pingcap_tiflow_cdc_model.TableID + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTableSchedule + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= github_com_pingcap_tiflow_cdc_model.TableID(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.TableIDs = append(m.TableIDs, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field TableIDs", wireType) + } default: iNdEx = preIndex skippy, err := skipTableSchedule(dAtA[iNdEx:]) @@ -3573,121 +3636,6 @@ func (m *Message) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex - case 9: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Checkpoints", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTableSchedule - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthTableSchedule - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthTableSchedule - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Checkpoints == nil { - m.Checkpoints = make(map[github_com_pingcap_tiflow_cdc_model.TableID]Checkpoint) - } - var mapkey int64 - mapvalue := &Checkpoint{} - for iNdEx < postIndex { - entryPreIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTableSchedule - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - if fieldNum == 1 { - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTableSchedule - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - mapkey |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - } else if fieldNum == 2 { - var mapmsglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTableSchedule - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - mapmsglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if mapmsglen < 0 { - return ErrInvalidLengthTableSchedule - } - postmsgIndex := iNdEx + mapmsglen - if postmsgIndex < 0 { - return ErrInvalidLengthTableSchedule - } - if postmsgIndex > l { - return io.ErrUnexpectedEOF - } - mapvalue = &Checkpoint{} - if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { - return err - } - iNdEx = postmsgIndex - } else { - iNdEx = entryPreIndex - skippy, err := skipTableSchedule(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthTableSchedule - } - if (iNdEx + skippy) > postIndex { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - m.Checkpoints[github_com_pingcap_tiflow_cdc_model.TableID(mapkey)] = *mapvalue - iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTableSchedule(dAtA[iNdEx:]) diff --git a/cdc/scheduler/internal/tp/schedulepb/table_schedule_manual_test.go b/cdc/scheduler/internal/tp/schedulepb/table_schedule_manual_test.go index 2195526aacb..40ffa45787e 100644 --- a/cdc/scheduler/internal/tp/schedulepb/table_schedule_manual_test.go +++ b/cdc/scheduler/internal/tp/schedulepb/table_schedule_manual_test.go @@ -14,77 +14,87 @@ package schedulepb import ( - fmt "fmt" - testing "testing" + "fmt" + "testing" "github.com/gogo/protobuf/proto" + "github.com/pingcap/tiflow/cdc/model" ) -func benchmarkMessageCheckpoints(b *testing.B, bench func(b *testing.B, m *Message)) { +func benchmarkMessageHeartbeatResponse(b *testing.B, bench func(b *testing.B, m *Message)) { size := 16384 for total := 1; total <= size; total *= 2 { msg := Message{ - Checkpoints: map[int64]Checkpoint{}, + MsgType: MsgHeartbeatResponse, + HeartbeatResponse: &HeartbeatResponse{ + Tables: make([]TableStatus, 0, total), + IsStopping: false, + }, } + for i := 0; i < total; i++ { - msg.Checkpoints[int64(10000+i)] = Checkpoint{ - CheckpointTs: 433331421532337260, - ResolvedTs: 433331421532337261, - } + msg.HeartbeatResponse.Tables = append(msg.HeartbeatResponse.Tables, + TableStatus{ + TableID: model.TableID(10000 + i), + State: TableStateReplicating, + Checkpoint: Checkpoint{ + CheckpointTs: 433331421532337260, + ResolvedTs: 433331421532337261, + }, + }) } + b.ResetTimer() bench(b, &msg) b.StopTimer() } - } -func BenchmarkMessageCheckpointsProtoMarshal(b *testing.B) { - benchmarkMessageCheckpoints(b, func(b *testing.B, msg *Message) { - total := len(msg.Checkpoints) - b.Run(fmt.Sprintf("%d checkpoint(s) marshal", total), func(b *testing.B) { +func BenchmarkMessageHeartbeatResponseProtoMarshal(b *testing.B) { + benchmarkMessageHeartbeatResponse(b, func(b *testing.B, m *Message) { + total := len(m.HeartbeatResponse.Tables) + b.Run(fmt.Sprintf("%d checkpoints(s) marshal", total), func(b *testing.B) { totalLen := 0 for i := 0; i < b.N; i++ { - dAtA, err := proto.Marshal(msg) + bytes, err := proto.Marshal(m) if err != nil { panic(err) } - totalLen += len(dAtA) + totalLen += len(bytes) } b.SetBytes(int64(totalLen / b.N)) }) }) } -func BenchmarkMessageCheckpointsProtoUnmarshal(b *testing.B) { - benchmarkMessageCheckpoints(b, func(b *testing.B, msg *Message) { - total := len(msg.Checkpoints) - b.Run(fmt.Sprintf("%d checkpoint(s) unmarshal", total), func(b *testing.B) { - dAtA, err := proto.Marshal(msg) +func BenchmarkMessageHeartbeatResponseProtoUnmarshal(b *testing.B) { + benchmarkMessageHeartbeatResponse(b, func(b *testing.B, m *Message) { + total := len(m.HeartbeatResponse.Tables) + b.Run(fmt.Sprintf("%d checkpoints(s) marshal", total), func(b *testing.B) { + bytes, err := proto.Marshal(m) if err != nil { panic(err) } + dst := Message{} totalLen := 0 - for i := 0; i < b.N; i++ { - err := proto.Unmarshal(dAtA, &dst) - if err != nil { + if err := proto.Unmarshal(bytes, &dst); err != nil { panic(err) } - totalLen += len(dAtA) + totalLen += len(bytes) } b.SetBytes(int64(totalLen / b.N)) }) }) } -func BenchmarkMessageCheckpointsProtoSize(b *testing.B) { - benchmarkMessageCheckpoints(b, func(b *testing.B, msg *Message) { - total := len(msg.Checkpoints) +func BenchmarkMessageHeartbeatResponseProtoSize(b *testing.B) { + benchmarkMessageHeartbeatResponse(b, func(b *testing.B, m *Message) { + total := len(m.HeartbeatResponse.Tables) b.Run(fmt.Sprintf("%d checkpoint(s) size", total), func(b *testing.B) { for i := 0; i < b.N; i++ { - msg.Size() + m.Size() } }) }) diff --git a/cdc/scheduler/internal/tp/transport.go b/cdc/scheduler/internal/tp/transport.go index 9eff5fa8650..24d9d2c625e 100644 --- a/cdc/scheduler/internal/tp/transport.go +++ b/cdc/scheduler/internal/tp/transport.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -30,6 +31,7 @@ import ( type transport interface { Send(ctx context.Context, msgs []*schedulepb.Message) error Recv(ctx context.Context) ([]*schedulepb.Message, error) + Close() error } func p2pTopic(changefeed model.ChangeFeedID) p2p.Topic { @@ -52,7 +54,7 @@ type p2pTransport struct { } } -func newTranport( +func newTransport( ctx context.Context, changefeed model.ChangeFeedID, server *p2p.MessageServer, router p2p.MessageRouter, ) (*p2pTransport, error) { @@ -128,3 +130,14 @@ func (t *p2pTransport) Recv(ctx context.Context) ([]*schedulepb.Message, error) t.mu.msgBuf = make([]*schedulepb.Message, 0) return recvMsgs, nil } + +func (t *p2pTransport) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := t.messageServer.SyncRemoveHandler(ctx, t.topic) + if err != nil { + return errors.Trace(err) + } + return nil +} diff --git a/cdc/scheduler/internal/tp/transport_test.go b/cdc/scheduler/internal/tp/transport_test.go index 4593b2bf2aa..53d554f11a6 100644 --- a/cdc/scheduler/internal/tp/transport_test.go +++ b/cdc/scheduler/internal/tp/transport_test.go @@ -37,7 +37,7 @@ func TestTransSendRecv(t *testing.T) { var err error transMap := make(map[string]transport) for addr, node := range cluster.Nodes { - transMap[addr], err = newTranport(ctx, changefeedID, node.Server, node.Router) + transMap[addr], err = newTransport(ctx, changefeedID, node.Server, node.Router) require.Nil(t, err) } @@ -76,7 +76,7 @@ func TestTransUnknownAddr(t *testing.T) { var err error transMap := make(map[string]transport) for addr, node := range cluster.Nodes { - transMap[addr], err = newTranport(ctx, changefeedID, node.Server, node.Router) + transMap[addr], err = newTransport(ctx, changefeedID, node.Server, node.Router) require.Nil(t, err) } @@ -101,7 +101,7 @@ func TestTransEmptyRecv(t *testing.T) { var err error transMap := make(map[string]transport) for addr, node := range cluster.Nodes { - transMap[addr], err = newTranport(ctx, changefeedID, node.Server, node.Router) + transMap[addr], err = newTransport(ctx, changefeedID, node.Server, node.Router) require.Nil(t, err) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 553120cdbc6..c864cee2668 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -128,6 +128,11 @@ const ( "server-max-pending-message-count": 102400, "server-ack-interval": 100000000, "server-worker-pool-size": 4 + }, + "EnableTwoPhaseScheduler": false, + "Scheduler": { + "heartbeat-tick": 2, + "max-task-concurrency": 10 } } }` diff --git a/proto/table_schedule.proto b/proto/table_schedule.proto index e52c96bb69d..a7b2c3667a5 100644 --- a/proto/table_schedule.proto +++ b/proto/table_schedule.proto @@ -65,9 +65,14 @@ message DispatchTableResponse { } } -message Heartbeat {} +message Heartbeat { + repeated int64 table_ids = 1 [ + (gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.TableID", + (gogoproto.customname) = "TableIDs" + ]; +} -// TableState is the state of table repliction in processor. +// TableState is the state of table replication in processor. // // ┌────────┐ ┌───────────┐ ┌──────────┐ // │ Absent ├─> │ Preparing ├─> │ Prepared │ @@ -102,11 +107,10 @@ message HeartbeatResponse { enum MessageType { MsgUnknown = 0 [(gogoproto.enumvalue_customname) = "MsgUnknown"]; - MsgCheckpoint = 1 [(gogoproto.enumvalue_customname) = "MsgCheckpoint"]; - MsgDispatchTableRequest = 2 [(gogoproto.enumvalue_customname) = "MsgDispatchTableRequest"]; - MsgDispatchTableResponse = 3 [(gogoproto.enumvalue_customname) = "MsgDispatchTableResponse"]; - MsgHeartbeat = 4 [(gogoproto.enumvalue_customname) = "MsgHeartbeat"]; - MsgHeartbeatResponse = 5 [(gogoproto.enumvalue_customname) = "MsgHeartbeatResponse"]; + MsgDispatchTableRequest = 1 [(gogoproto.enumvalue_customname) = "MsgDispatchTableRequest"]; + MsgDispatchTableResponse = 2 [(gogoproto.enumvalue_customname) = "MsgDispatchTableResponse"]; + MsgHeartbeat = 3 [(gogoproto.enumvalue_customname) = "MsgHeartbeat"]; + MsgHeartbeatResponse = 4 [(gogoproto.enumvalue_customname) = "MsgHeartbeatResponse"]; } message OwnerRevision { int64 revision = 1; } @@ -128,8 +132,4 @@ message Message { DispatchTableResponse dispatch_table_response = 6; Heartbeat heartbeat = 7; HeartbeatResponse heartbeat_response = 8; - map checkpoints = 9 [ - (gogoproto.castkey) = "github.com/pingcap/tiflow/cdc/model.TableID", - (gogoproto.nullable) = false - ]; }