Skip to content

Commit

Permalink
split dataflow in sinkNode
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 20, 2022
1 parent 86b704b commit bde4c16
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 132 deletions.
68 changes: 52 additions & 16 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -73,19 +74,25 @@ type sinkNode struct {
barrierTs model.Ts

flowController tableFlowController
redoManager redo.LogManager

replicaConfig *config.ReplicaConfig
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(
tableID model.TableID, sink sink.Sink,
startTs model.Ts, targetTs model.Ts,
flowController tableFlowController,
redoManager redo.LogManager,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,

tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,
flowController: flowController,
redoManager: redoManager,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
Expand Down Expand Up @@ -136,17 +143,38 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
err = n.stop(ctx)
}
}()

// flush redo log
currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := n.getCheckpointTs()
if resolved.Ts > currentBarrierTs {
resolved = model.NewResolvedTs(currentBarrierTs)
}
if resolved.Ts > n.targetTs {
resolved = model.NewResolvedTs(n.targetTs)
}

if n.redoManager != nil && n.redoManager.Enabled() {
// redo log is not supported in batch resoleved mode
if resolved.IsBatchMode() {
return nil
}
err = n.redoManager.FlushLog(ctx, n.tableID, resolved.Ts)

redoTs := n.redoManager.GetMinResolvedTs()
if redoTs < currentBarrierTs {
log.Error("redoTs should not less than current barrierTs",
zap.Int64("tableID", n.tableID),
zap.Uint64("redoTs", redoTs),
zap.Uint64("barrierTs", currentBarrierTs))
}
}

if resolved.Ts > currentBarrierTs {
resolved = model.NewResolvedTs(currentBarrierTs)
}

currentCheckpointTs := n.getCheckpointTs()
if currentCheckpointTs.EqualOrGreater(resolved) {
return nil
}

checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -177,6 +205,16 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
panic("ProcessorSyncResolvedPreEmit")
})

emitRows := func(rows ...*model.RowChangedEvent) error {
if n.redoManager != nil && n.redoManager.Enabled() {
err := n.redoManager.EmitRowChangedEvents(ctx, n.tableID, rows...)
if err != nil {
return err
}
}
return n.sink.EmitRowChangedEvents(ctx, rows...)
}

if event == nil || event.Row == nil {
log.Warn("skip emit nil event", zap.Any("event", event))
return nil
Expand All @@ -202,13 +240,13 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
return errors.Trace(err)
}
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
return n.sink.EmitRowChangedEvents(ctx, deleteEvent.Row, insertEvent.Row)
return emitRows(deleteEvent.Row, insertEvent.Row)
}
// If the handle key columns are not updated, PreColumns is directly ignored.
event.Row.PreColumns = nil
}

return n.sink.EmitRowChangedEvents(ctx, event.Row)
return emitRows(event.Row)
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
Expand Down Expand Up @@ -290,11 +328,9 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})

var resolved model.ResolvedTs
resolved := model.NewResolvedTs(event.CRTs)
if event.Resolved != nil {
resolved = *(event.Resolved)
} else {
resolved = model.NewResolvedTs(event.CRTs)
}

if err := n.flushSink(ctx, resolved); err != nil {
Expand Down
21 changes: 12 additions & 9 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -135,7 +136,7 @@ func TestStatus(t *testing.T) {
})

// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).
ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestStatus(t *testing.T) {
require.Equal(t, uint64(10), node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -222,7 +223,7 @@ func TestStatus(t *testing.T) {
require.Equal(t, uint64(2), node.CheckpointTs())

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -272,7 +273,9 @@ func TestStopStatus(t *testing.T) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node := newSinkNode(1,
&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100,
&mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -314,7 +317,7 @@ func TestManyTs(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -487,7 +490,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand All @@ -512,7 +515,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -574,7 +577,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -724,7 +727,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController)
sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager())
sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
sNode.barrierTs = 10
Expand Down
36 changes: 18 additions & 18 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type tableActor struct {
// backend mounter
mounter entry.Mounter
// backend tableSink
tableSink sink.Sink
redoLogEnabled bool
tableSink sink.Sink
redoManager redo.LogManager

pullerNode *pullerNode
sortNode *sorterNode
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewTableActor(cdcCtx cdcContext.Context,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
redoLogEnabled bool,
redoManager redo.LogManager,
targetTs model.Ts,
) (TablePipeline, error) {
config := cdcCtx.ChangefeedVars().Info.Config
Expand All @@ -124,18 +124,18 @@ func NewTableActor(cdcCtx cdcContext.Context,
wg: wg,
cancel: cancel,

tableID: tableID,
markTableID: replicaInfo.MarkTableID,
tableName: tableName,
memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota,
upstream: up,
mounter: mounter,
replicaInfo: replicaInfo,
replicaConfig: config,
tableSink: sink,
redoLogEnabled: redoLogEnabled,
targetTs: targetTs,
started: false,
tableID: tableID,
markTableID: replicaInfo.MarkTableID,
tableName: tableName,
memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota,
upstream: up,
mounter: mounter,
replicaInfo: replicaInfo,
replicaConfig: config,
tableSink: sink,
redoManager: redoManager,
targetTs: targetTs,
started: false,

changefeedID: changefeedVars.ID,
changefeedVars: changefeedVars,
Expand Down Expand Up @@ -279,7 +279,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
zap.String("tableName", t.tableName),
zap.Uint64("quota", t.memoryQuota))

flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoLogEnabled)
flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoManager.Enabled())
sorterNode := newSorterNode(t.tableName, t.tableID,
t.replicaInfo.StartTs, flowController,
t.mounter, t.replicaConfig,
Expand Down Expand Up @@ -315,7 +315,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {

actorSinkNode := newSinkNode(t.tableID, t.tableSink,
t.replicaInfo.StartTs,
t.targetTs, flowController)
t.targetTs, flowController, t.redoManager)
actorSinkNode.initWithReplicaConfig(t.replicaConfig)
t.sinkNode = actorSinkNode

Expand Down Expand Up @@ -389,7 +389,7 @@ func (t *tableActor) ResolvedTs() model.Ts {
// will be able to cooperate replication status directly. Then we will add
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) {
if t.redoManager.Enabled() {
return t.sinkNode.ResolvedTs()
}
return t.sortNode.ResolvedTs()
Expand Down
Loading

0 comments on commit bde4c16

Please sign in to comment.