Skip to content

Commit

Permalink
scheduler(cdc): table executor support 2 phase scheduling (pingcap#5462)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored and overvenus committed Jun 24, 2022
1 parent 7668625 commit 912c707
Show file tree
Hide file tree
Showing 23 changed files with 472 additions and 225 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *managerTester) resetSuit(ctx cdcContext.Context, t *testing.T) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
status: tablepipeline.TableStatusRunning,
status: tablepipeline.TableStatusReplicating,
resolvedTs: replicaInfo.StartTs,
checkpointTs: replicaInfo.StartTs,
}, nil
Expand Down
41 changes: 3 additions & 38 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,6 @@ import (
"go.uber.org/zap"
)

// TableStatus is status of the table pipeline
type TableStatus int32

// TableStatus for table pipeline
const (
TableStatusPreparing TableStatus = iota
TableStatusPrepared
TableStatusRunning
TableStatusStopped
)

func (s TableStatus) String() string {
switch s {
case TableStatusPreparing:
return "Preparing"
case TableStatusPrepared:
return "Prepared"
case TableStatusRunning:
return "Running"
case TableStatusStopped:
return "Stopped"
}
return "Unknown"
}

// Load TableStatus with THREAD-SAFE
func (s *TableStatus) Load() TableStatus {
return TableStatus(atomic.LoadInt32((*int32)(s)))
}

// Store TableStatus with THREAD-SAFE
func (s *TableStatus) Store(new TableStatus) {
atomic.StoreInt32((*int32)(s), int32(new))
}

type sinkNode struct {
sink sink.Sink
status TableStatus
Expand All @@ -91,7 +56,7 @@ func newSinkNode(
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusPreparing,
status: TableStatusPrepared,
targetTs: targetTs,
barrierTs: startTs,
flowController: flowController,
Expand Down Expand Up @@ -327,8 +292,8 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if event.IsResolved() {
if n.status.Load() == TableStatusPreparing {
n.status.Store(TableStatusRunning)
if n.status.Load() == TableStatusPrepared {
n.status.Store(TableStatusReplicating)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
Expand Down
42 changes: 21 additions & 21 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ func TestStatus(t *testing.T) {
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).
ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())

ok, err := node.HandleMessage(ctx, pmessage.BarrierMessage(20))
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -154,7 +154,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Expand All @@ -163,7 +163,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -172,7 +172,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusRunning, node.Status())
require.Equal(t, TableStatusReplicating, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 15, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -182,19 +182,19 @@ func TestStatus(t *testing.T) {
require.False(t, ok)
require.True(t, cerrors.ErrTableProcessorStoppedSafely.Equal(err))
require.Equal(t, TableStatusStopped, node.Status())
require.Equal(t, uint64(10), node.CheckpointTs())
require.Equal(t, model.Ts(10), node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())

msg = pmessage.BarrierMessage(20)
ok, err = node.HandleMessage(ctx, msg)
require.True(t, ok)
require.Nil(t, err)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())
require.Equal(t, model.Ts(20), node.BarrierTs())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
Expand All @@ -204,7 +204,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusRunning, node.Status())
require.Equal(t, TableStatusReplicating, node.Status())

msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop})
ok, err = node.HandleMessage(ctx, msg)
Expand All @@ -226,13 +226,13 @@ func TestStatus(t *testing.T) {
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())

msg = pmessage.BarrierMessage(20)
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -241,7 +241,7 @@ func TestStatus(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusRunning, node.Status())
require.Equal(t, TableStatusReplicating, node.Status())

msg = pmessage.CommandMessage(&pmessage.Command{Tp: pmessage.CommandTypeStop})
ok, err = node.HandleMessage(ctx, msg)
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestStopStatus(t *testing.T) {
&mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 2, RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Expand All @@ -287,7 +287,7 @@ func TestStopStatus(t *testing.T) {
ok, err := node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusRunning, node.Status())
require.Equal(t, TableStatusReplicating, node.Status())

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -302,7 +302,7 @@ func TestStopStatus(t *testing.T) {
}()
// wait to ensure stop message is sent to the sink node
time.Sleep(time.Millisecond * 50)
require.Equal(t, TableStatusRunning, node.Status())
require.Equal(t, TableStatusReplicating, node.Status())
closeCh <- struct{}{}
wg.Wait()
}
Expand All @@ -320,7 +320,7 @@ func TestManyTs(t *testing.T) {
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())

msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{
Expand All @@ -339,7 +339,7 @@ func TestManyTs(t *testing.T) {
},
},
})
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())
ok, err := node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
Expand All @@ -361,7 +361,7 @@ func TestManyTs(t *testing.T) {
},
},
})
require.Equal(t, TableStatusPreparing, node.Status())
require.Equal(t, TableStatusPrepared, node.Status())
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
Expand All @@ -373,7 +373,7 @@ func TestManyTs(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusRunning, node.Status())
require.Equal(t, TableStatusReplicating, node.Status())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestManyTs(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusRunning, node.Status())
require.Equal(t, TableStatusReplicating, node.Status())

sink.Check(t, []struct {
resolvedTs model.Ts
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestManyTs(t *testing.T) {
ok, err = node.HandleMessage(ctx, msg)
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, TableStatusRunning, node.Status())
require.Equal(t, TableStatusReplicating, node.Status())
sink.Check(t, []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand Down
50 changes: 48 additions & 2 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ type sorterNode struct {
// The latest barrier ts that sorter has received.
barrierTs model.Ts

status TableStatus
preparedCh chan struct{}

// started indicate that the sink is really replicating, not idle.
started int32
// startTsCh is used to receive start-ts for sink
startTsCh chan model.Ts

replConfig *config.ReplicaConfig
}

Expand All @@ -73,6 +81,9 @@ func newSorterNode(
mounter: mounter,
resolvedTs: startTs,
barrierTs: startTs,
status: TableStatusPreparing,
preparedCh: make(chan struct{}, 1),
startTsCh: make(chan model.Ts, 1),
replConfig: replConfig,
}
}
Expand All @@ -85,7 +96,7 @@ func createSorter(ctx pipeline.NodeContext, tableName string, tableID model.Tabl
case model.SortUnified, model.SortInFile /* `file` becomes an alias of `unified` for backward compatibility */ :
if sortEngine == model.SortInFile {
log.Warn("File sorter is obsolete and replaced by unified sorter. Please revise your changefeed settings",
zap.String("namesapce", ctx.ChangefeedVars().ID.Namespace),
zap.String("namespace", ctx.ChangefeedVars().ID.Namespace),
zap.String("changefeed", ctx.ChangefeedVars().ID.ID),
zap.String("tableName", tableName))
}
Expand Down Expand Up @@ -157,6 +168,23 @@ func (n *sorterNode) start(
ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg))
}

// once receive startTs, which means sink should start replicating data to downstream.
var startTs model.Ts
select {
case <-stdCtx.Done():
return nil
case startTs = <-n.startTsCh:
}

select {
case <-stdCtx.Done():
return nil
case <-n.preparedCh:
}

n.status.Store(TableStatusReplicating)
eventSorter.EmitStartTs(stdCtx, startTs)

for {
// We must call `sorter.Output` before receiving resolved events.
// Skip calling `sorter.Output` and caching output channel may fail
Expand All @@ -170,9 +198,18 @@ func (n *sorterNode) start(
// sorter output channel closed
return nil
}

if msg == nil || msg.RawKV == nil {
log.Panic("unexpected empty msg", zap.Reflect("msg", msg))
log.Panic("unexpected empty msg", zap.Any("msg", msg))
}

if msg.CRTs < startTs {
// Ignore messages are less than initial checkpoint ts.
log.Info("sorterNode: ignore sorter output event",
zap.Uint64("CRTs", msg.CRTs), zap.Uint64("startTs", startTs))
continue
}

if msg.RawKV.OpType != model.OpTypeResolved {
err := n.mounter.DecodeEvent(ctx, msg)
if err != nil {
Expand Down Expand Up @@ -269,6 +306,13 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi
// resolved ts.
event = model.NewResolvedPolymorphicEvent(0, n.BarrierTs())
}
// sorterNode is preparing, this is must the first `Resolved event` received
// the indicator that all regions connected.
if n.status.Load() == TableStatusPreparing {
log.Info("sorterNode, first resolved event received", zap.Any("event", event))
n.status.Store(TableStatusPrepared)
close(n.preparedCh)
}
}
n.sorter.AddEntry(ctx, event)
}
Expand Down Expand Up @@ -310,3 +354,5 @@ func (n *sorterNode) ResolvedTs() model.Ts {
func (n *sorterNode) BarrierTs() model.Ts {
return atomic.LoadUint64(&n.barrierTs)
}

func (n *sorterNode) Status() TableStatus { return n.status.Load() }
4 changes: 4 additions & 0 deletions cdc/processor/pipeline/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (c *checkSorter) Output() <-chan *model.PolymorphicEvent {
return c.ch
}

func (c *checkSorter) EmitStartTs(ctx context.Context, ts uint64) {
panic("unimplemented")
}

func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) {
t.Parallel()
sch := make(chan *model.PolymorphicEvent, 1)
Expand Down
Loading

0 comments on commit 912c707

Please sign in to comment.