Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-sinkv2-event-table-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 1, 2022
2 parents 96fbf6a + 0b7969d commit f4c8145
Show file tree
Hide file tree
Showing 18 changed files with 361 additions and 91 deletions.
1 change: 1 addition & 0 deletions cdc/api/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
// test no change error
changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"}
oldInfo.SinkURI = "blackhole://"
oldInfo.Config.Sink.TxnAtomicity = "table"
newInfo, err = VerifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.NotNil(t, err)
require.Regexp(t, ".*changefeed config is the same with the old one.*", err)
Expand Down
30 changes: 30 additions & 0 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -47,6 +48,7 @@ type sinkNode struct {
redoManager redo.LogManager

replicaConfig *config.ReplicaConfig
splitTxn bool
}

func newSinkNode(
Expand All @@ -56,6 +58,7 @@ func newSinkNode(
redoManager redo.LogManager,
state *TableState,
changefeed model.ChangeFeedID,
splitTxn bool,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
Expand All @@ -66,6 +69,7 @@ func newSinkNode(
changefeed: changefeed,
flowController: flowController,
redoManager: redoManager,
splitTxn: splitTxn,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
Expand Down Expand Up @@ -307,6 +311,10 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if err := n.verifySplitTxn(event); err != nil {
return false, errors.Trace(err)
}

if event.IsResolved() {
if n.state.Load() == TableStatePrepared {
n.state.Store(TableStateReplicating)
Expand Down Expand Up @@ -360,3 +368,25 @@ func (n *sinkNode) releaseResource(ctx context.Context) error {
n.flowController.Abort()
return n.sink.Close(ctx)
}

// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent
// with `SplitTxn==true`.
func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error {
if n.splitTxn {
return nil
}

// Fail-fast check, this situation should never happen normally when split transactions
// are not supported.
if e.Resolved != nil && e.Resolved.IsBatchMode() {
msg := fmt.Sprintf("batch mode resolved ts is not supported "+
"when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}

if e.Row != nil && e.Row.SplitTxn {
msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}
return nil
}
95 changes: 78 additions & 17 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,19 @@ func (s *mockCloseControlSink) Close(ctx context.Context) error {

func TestState(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test-status"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})

state := TableStatePrepared
// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).
ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -189,7 +190,7 @@ func TestState(t *testing.T) {
// test the stop at ts command
state = TableStatePrepared
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -229,7 +230,7 @@ func TestState(t *testing.T) {
// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
state = TableStatePrepared
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -270,11 +271,12 @@ func TestState(t *testing.T) {
// until the underlying sink is closed
func TestStopStatus(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test-state"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})

Expand All @@ -283,7 +285,7 @@ func TestStopStatus(t *testing.T) {
node := newSinkNode(1,
&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100,
&mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -317,17 +319,18 @@ func TestStopStatus(t *testing.T) {

func TestManyTs(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})
state := TableStatePrepared
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatePrepared, node.State())
Expand Down Expand Up @@ -492,17 +495,18 @@ func TestManyTs(t *testing.T) {

func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand All @@ -519,17 +523,18 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {

func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -581,19 +586,19 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {

func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
config := config.GetDefaultReplicaConfig()
config.EnableOldValue = false
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: cfg,
Config: config,
},
})
state := TableStatePreparing
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -745,7 +750,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID)
&state, ctx.ChangefeedVars().ID, false)
sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
sNode.barrierTs = 10
Expand All @@ -760,3 +765,59 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
require.Equal(t, uint64(8), sNode.CheckpointTs())
require.Equal(t, 2, flowController.releaseCounter)
}

func TestSplitTxn(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config,
},
})
state := TableStatePrepared
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(),
&state, ctx.ChangefeedVars().ID, false)
sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1,
RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
_, err := sNode.HandleMessage(ctx, msg)
require.Nil(t, err)

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1,
RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Row: &model.RowChangedEvent{CommitTs: 2, SplitTxn: true},
})
_, err = sNode.HandleMessage(ctx, msg)
require.Regexp(t, ".*should not split txn when sink.splitTxn is.*", err)

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 1,
RawKV: &model.RawKVEntry{OpType: model.OpTypePut},
Row: &model.RowChangedEvent{CommitTs: 2},
})
_, err = sNode.HandleMessage(ctx, msg)
require.Nil(t, err)

msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{
CRTs: 7,
Resolved: &model.ResolvedTs{
Mode: model.BatchResolvedMode,
Ts: 7,
BatchID: 1,
},
RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved},
Row: &model.RowChangedEvent{},
})
_, err = sNode.HandleMessage(ctx, msg)
require.Regexp(t, ".*batch mode resolved ts is not supported.*", err)
}
7 changes: 5 additions & 2 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,10 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
zap.String("tableName", t.tableName),
zap.Uint64("quota", t.memoryQuota))

flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoManager.Enabled())
splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn()

flowController := flowcontrol.NewTableFlowController(t.memoryQuota,
t.redoManager.Enabled(), splitTxn)
sorterNode := newSorterNode(t.tableName, t.tableID,
t.replicaInfo.StartTs, flowController,
t.mounter, t.replicaConfig, &t.state, t.changefeedID,
Expand Down Expand Up @@ -318,7 +321,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {

actorSinkNode := newSinkNode(t.tableID, t.tableSink,
t.replicaInfo.StartTs,
t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID)
t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID, splitTxn)
actorSinkNode.initWithReplicaConfig(t.replicaConfig)
t.sinkNode = actorSinkNode

Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestAsyncStopFailed(t *testing.T) {
state: TableStatePreparing,
}
tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, tbl.redoManager,
&tbl.state, model.DefaultChangeFeedID("changefeed-test"))
&tbl.state, model.DefaultChangeFeedID("changefeed-test"), false)
require.True(t, tbl.AsyncStop(1))

mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0)
Expand Down Expand Up @@ -447,6 +447,7 @@ func TestTableActorStart(t *testing.T) {
StartTs: 0,
MarkTableID: 1,
},
replicaConfig: config.GetDefaultReplicaConfig(),
}
require.Nil(t, tbl.start(ctx))
require.Equal(t, 1, len(tbl.nodes))
Expand Down
Loading

0 comments on commit f4c8145

Please sign in to comment.