Skip to content

Commit

Permalink
scheduler(2pc): agent for 2 phase scheduling (#5593)
Browse files Browse the repository at this point in the history
* fix some typo.

* update table scheduler proto

* add some new to agent.

* track owner info.

* try to handle dispatch table request.

* add more and more to agent implementation.

* fix update owner info.

* finish handle dispatch table.

* tackle epoch

* remove checkpoint from proto

* handle heartbeat with stopping.

* add benchmark for heartbeat response.

* fix agent.

* fix agent code layout.

* refine benchmark test.

* refine coordinator / capture_manager / relication_manager.

* fix agent.

* add a lot of test.

* revise the code.

* fix by suggestion.

* fix by suggestion.

* remoe pendingTask.

* fix unit test.
  • Loading branch information
3AceShowHand authored May 31, 2022
1 parent c7c6354 commit dfc2d3e
Show file tree
Hide file tree
Showing 26 changed files with 1,409 additions and 455 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
status: tablepipeline.TableStatusReplicating,
status: tablepipeline.TableStateReplicating,
resolvedTs: replicaInfo.StartTs,
checkpointTs: replicaInfo.StartTs,
}, nil
Expand Down
18 changes: 9 additions & 9 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (

type sinkNode struct {
sink sink.Sink
status TableStatus
status TableState
tableID model.TableID

// atomic oprations for model.ResolvedTs
Expand All @@ -55,7 +55,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusPrepared,
status: TableStatePrepared,
targetTs: targetTs,
checkpointTs: startTs,
barrierTs: startTs,
Expand All @@ -69,7 +69,7 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }
func (n *sinkNode) Status() TableState { return n.status.Load() }

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
n.replicaConfig = ctx.ChangefeedVars().Info.Config
Expand All @@ -87,7 +87,7 @@ func (n *sinkNode) initWithReplicaConfig(isTableActorMode bool, replicaConfig *c
// no more events can be sent to this sink node afterwards.
func (n *sinkNode) stop(ctx context.Context) (err error) {
// table stopped status must be set after underlying sink is closed
defer n.status.Store(TableStatusStopped)
defer n.status.Store(TableStateStopped)
err = n.sink.Close(ctx)
if err != nil {
return
Expand All @@ -102,7 +102,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) {
func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) {
defer func() {
if err != nil {
n.status.Store(TableStatusStopped)
n.status.Store(TableStateStopped)
return
}
if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs {
Expand Down Expand Up @@ -255,15 +255,15 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (bool, error) {
if n.status.Load() == TableStatusStopped {
if n.status.Load() == TableStateStopped {
return false, cerror.ErrTableProcessorStoppedSafely.GenWithStackByArgs()
}
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.IsResolved() {
if n.status.Load() == TableStatusPrepared {
n.status.Store(TableStatusReplicating)
if n.status.Load() == TableStatePrepared {
n.status.Store(TableStateReplicating)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
Expand Down Expand Up @@ -310,7 +310,7 @@ func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) releaseResource(ctx context.Context) error {
n.status.Store(TableStatusStopped)
n.status.Store(TableStateStopped)
n.flowController.Abort()
return n.sink.Close(ctx)
}
52 changes: 26 additions & 26 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ func TestStatus(t *testing.T) {
// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

err := node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil))
require.NoError(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -151,95 +151,95 @@ func TestStatus(t *testing.T) {
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.NoError(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.NoError(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.NoError(t, err)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, model.Ts(10), node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
err = node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))
require.NoError(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

require.Nil(t, node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}), nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, uint64(2), node.CheckpointTs())

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
err = node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))
require.Nil(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

err = node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(20), nil))
require.Nil(t, err)
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

err = node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}), nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
err = node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
require.Equal(t, uint64(7), node.CheckpointTs())
}

Expand All @@ -258,14 +258,14 @@ func TestStopStatus(t *testing.T) {
closeCh := make(chan interface{}, 1)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -275,11 +275,11 @@ func TestStopStatus(t *testing.T) {
err := node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop}), nil))
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, TableStateStopped, node.Status())
}()
// wait to ensure stop message is sent to the sink node
time.Sleep(time.Millisecond * 50)
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())
closeCh <- struct{}{}
wg.Wait()
}
Expand All @@ -296,7 +296,7 @@ func TestManyTs(t *testing.T) {
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -316,7 +316,7 @@ func TestManyTs(t *testing.T) {
},
},
}), nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx,
pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -336,14 +336,14 @@ func TestManyTs(t *testing.T) {
},
},
}), nil)))
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, TableStatePrepared, node.Status())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
require.Nil(t, node.Receive(pipeline.MockNodeContext4Test(ctx, msg, nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestManyTs(t *testing.T) {

require.Nil(t, node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(1), nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())

sink.Check(t, []struct {
resolvedTs model.Ts
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestManyTs(t *testing.T) {

require.Nil(t, node.Receive(
pipeline.MockNodeContext4Test(ctx, pmessage.BarrierMessage(5), nil)))
require.Equal(t, TableStatusReplicating, node.Status())
require.Equal(t, TableStateReplicating, node.Status())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand Down
12 changes: 6 additions & 6 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type sorterNode struct {
// The latest barrier ts that sorter has received.
barrierTs model.Ts

status TableStatus
status TableState
preparedCh chan struct{}

// started indicate that the sink is really replicating, not idle.
Expand All @@ -88,7 +88,7 @@ func newSorterNode(
mounter: mounter,
resolvedTs: startTs,
barrierTs: startTs,
status: TableStatusPreparing,
status: TableStatePreparing,
preparedCh: make(chan struct{}, 1),
startTsCh: make(chan model.Ts, 1),
replConfig: replConfig,
Expand Down Expand Up @@ -186,7 +186,7 @@ func (n *sorterNode) start(
case <-n.preparedCh:
}

n.status.Store(TableStatusReplicating)
n.status.Store(TableStateReplicating)
eventSorter.EmitStartTs(stdCtx, startTs)

for {
Expand Down Expand Up @@ -323,9 +323,9 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi
}
// sorterNode is preparing, this is must the first `Resolved event` received
// the indicator that all regions connected.
if n.status.Load() == TableStatusPreparing {
if n.status.Load() == TableStatePreparing {
log.Info("sorterNode, first resolved event received", zap.Any("event", event))
n.status.Store(TableStatusPrepared)
n.status.Store(TableStatePrepared)
close(n.preparedCh)
}
}
Expand Down Expand Up @@ -377,4 +377,4 @@ func (n *sorterNode) BarrierTs() model.Ts {
return atomic.LoadUint64(&n.barrierTs)
}

func (n *sorterNode) Status() TableStatus { return n.status.Load() }
func (n *sorterNode) Status() TableState { return n.status.Load() }
Loading

0 comments on commit dfc2d3e

Please sign in to comment.