From 0b7969deea495ff6462b9a980a5717cca2fbcec5 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Fri, 1 Jul 2022 14:38:39 +0800 Subject: [PATCH] sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038) ref pingcap/tiflow#5231 --- cdc/api/validator/validator_test.go | 1 + cdc/processor/pipeline/sink.go | 30 +++++++ cdc/processor/pipeline/sink_test.go | 95 +++++++++++++++++---- cdc/processor/pipeline/table_actor.go | 7 +- cdc/processor/pipeline/table_actor_test.go | 3 +- cdc/sink/flowcontrol/flow_control.go | 51 +++++++---- cdc/sink/flowcontrol/flow_control_test.go | 20 ++--- cdc/sink/mq/mq.go | 8 -- cdc/sink/mq/mq_test.go | 3 +- cdc/sink/sink.go | 3 + pkg/cmd/cli/cli_changefeed_create.go | 6 +- pkg/cmd/util/helper_test.go | 18 ++-- pkg/config/config_test_data.go | 3 +- pkg/config/replica_config.go | 15 +--- pkg/config/replica_config_test.go | 9 +- pkg/config/sink.go | 98 +++++++++++++++++++++- pkg/config/sink_test.go | 78 ++++++++++++++++- tests/integration_tests/big_txn/run.sh | 4 +- 18 files changed, 361 insertions(+), 91 deletions(-) diff --git a/cdc/api/validator/validator_test.go b/cdc/api/validator/validator_test.go index 8e863265569..d8c811121be 100644 --- a/cdc/api/validator/validator_test.go +++ b/cdc/api/validator/validator_test.go @@ -39,6 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { // test no change error changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"} oldInfo.SinkURI = "blackhole://" + oldInfo.Config.Sink.TxnAtomicity = "table" newInfo, err = VerifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo) require.NotNil(t, err) require.Regexp(t, ".*changefeed config is the same with the old one.*", err) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 804118aa346..dbb97d68323 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -15,6 +15,7 @@ package pipeline import ( "context" + "fmt" "sync/atomic" "time" @@ -47,6 +48,7 @@ type sinkNode struct { redoManager redo.LogManager replicaConfig *config.ReplicaConfig + splitTxn bool } func newSinkNode( @@ -56,6 +58,7 @@ func newSinkNode( redoManager redo.LogManager, state *TableState, changefeed model.ChangeFeedID, + splitTxn bool, ) *sinkNode { sn := &sinkNode{ tableID: tableID, @@ -66,6 +69,7 @@ func newSinkNode( changefeed: changefeed, flowController: flowController, redoManager: redoManager, + splitTxn: splitTxn, } sn.resolvedTs.Store(model.NewResolvedTs(startTs)) sn.checkpointTs.Store(model.NewResolvedTs(startTs)) @@ -307,6 +311,10 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo switch msg.Tp { case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent + if err := n.verifySplitTxn(event); err != nil { + return false, errors.Trace(err) + } + if event.IsResolved() { if n.state.Load() == TableStatePrepared { n.state.Store(TableStateReplicating) @@ -360,3 +368,25 @@ func (n *sinkNode) releaseResource(ctx context.Context) error { n.flowController.Abort() return n.sink.Close(ctx) } + +// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent +// with `SplitTxn==true`. +func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error { + if n.splitTxn { + return nil + } + + // Fail-fast check, this situation should never happen normally when split transactions + // are not supported. + if e.Resolved != nil && e.Resolved.IsBatchMode() { + msg := fmt.Sprintf("batch mode resolved ts is not supported "+ + "when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) + } + + if e.Row != nil && e.Row.SplitTxn { + msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn) + return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg) + } + return nil +} diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index cf27ed57f06..05fb1c2de98 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -127,18 +127,19 @@ func (s *mockCloseControlSink) Close(ctx context.Context) error { func TestState(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test-status"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) state := TableStatePrepared // test stop at targetTs node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil). ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -189,7 +190,7 @@ func TestState(t *testing.T) { // test the stop at ts command state = TableStatePrepared node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -229,7 +230,7 @@ func TestState(t *testing.T) { // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts state = TableStatePrepared node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -270,11 +271,12 @@ func TestState(t *testing.T) { // until the underlying sink is closed func TestStopStatus(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test-state"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) @@ -283,7 +285,7 @@ func TestStopStatus(t *testing.T) { node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -317,17 +319,18 @@ func TestStopStatus(t *testing.T) { func TestManyTs(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) state := TableStatePrepared sink := &mockSink{} node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatePrepared, node.State()) @@ -492,17 +495,18 @@ func TestManyTs(t *testing.T) { func TestIgnoreEmptyRowChangeEvent(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) state := TableStatePreparing sink := &mockSink{} node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -519,17 +523,18 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) { func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) state := TableStatePreparing sink := &mockSink{} node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -581,19 +586,19 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) - cfg := config.GetDefaultReplicaConfig() - cfg.EnableOldValue = false + config := config.GetDefaultReplicaConfig() + config.EnableOldValue = false ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: cfg, + Config: config, }, }) state := TableStatePreparing sink := &mockSink{} node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -745,7 +750,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { sink := &flushSink{} // sNode is a sinkNode sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(), - &state, ctx.ChangefeedVars().ID) + &state, ctx.ChangefeedVars().ID, false) sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) sNode.barrierTs = 10 @@ -760,3 +765,59 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { require.Equal(t, uint64(8), sNode.CheckpointTs()) require.Equal(t, 2, flowController.releaseCounter) } + +func TestSplitTxn(t *testing.T) { + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: model.DefaultChangeFeedID("changefeed-id-test"), + Info: &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config, + }, + }) + state := TableStatePrepared + flowController := &flushFlowController{} + sink := &flushSink{} + // sNode is a sinkNode + sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager(), + &state, ctx.ChangefeedVars().ID, false) + sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, + pmessage.Message{}, nil).ChangefeedVars().Info.Config) + msg := pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, + Row: &model.RowChangedEvent{}, + }) + _, err := sNode.HandleMessage(ctx, msg) + require.Nil(t, err) + + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 2, SplitTxn: true}, + }) + _, err = sNode.HandleMessage(ctx, msg) + require.Regexp(t, ".*should not split txn when sink.splitTxn is.*", err) + + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 1, + RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, + Row: &model.RowChangedEvent{CommitTs: 2}, + }) + _, err = sNode.HandleMessage(ctx, msg) + require.Nil(t, err) + + msg = pmessage.PolymorphicEventMessage(&model.PolymorphicEvent{ + CRTs: 7, + Resolved: &model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: 7, + BatchID: 1, + }, + RawKV: &model.RawKVEntry{OpType: model.OpTypeResolved}, + Row: &model.RowChangedEvent{}, + }) + _, err = sNode.HandleMessage(ctx, msg) + require.Regexp(t, ".*batch mode resolved ts is not supported.*", err) +} diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 8890b9693fd..57d16653a57 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -282,7 +282,10 @@ func (t *tableActor) start(sdtTableContext context.Context) error { zap.String("tableName", t.tableName), zap.Uint64("quota", t.memoryQuota)) - flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoManager.Enabled()) + splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn() + + flowController := flowcontrol.NewTableFlowController(t.memoryQuota, + t.redoManager.Enabled(), splitTxn) sorterNode := newSorterNode(t.tableName, t.tableID, t.replicaInfo.StartTs, flowController, t.mounter, t.replicaConfig, &t.state, t.changefeedID, @@ -318,7 +321,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { actorSinkNode := newSinkNode(t.tableID, t.tableSink, t.replicaInfo.StartTs, - t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID) + t.targetTs, flowController, t.redoManager, &t.state, t.changefeedID, splitTxn) actorSinkNode.initWithReplicaConfig(t.replicaConfig) t.sinkNode = actorSinkNode diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index aa3b7f844da..c4be1425c1b 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -53,7 +53,7 @@ func TestAsyncStopFailed(t *testing.T) { state: TableStatePreparing, } tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, tbl.redoManager, - &tbl.state, model.DefaultChangeFeedID("changefeed-test")) + &tbl.state, model.DefaultChangeFeedID("changefeed-test"), false) require.True(t, tbl.AsyncStop(1)) mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0) @@ -447,6 +447,7 @@ func TestTableActorStart(t *testing.T) { StartTs: 0, MarkTableID: 1, }, + replicaConfig: config.GetDefaultReplicaConfig(), } require.Nil(t, tbl.start(ctx)) require.Equal(t, 1, len(tbl.nodes)) diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index 597c2b17cc0..0f1e0ce5050 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -32,14 +32,17 @@ const ( // TableFlowController provides a convenient interface to control the memory consumption of a per table event stream type TableFlowController struct { - memoryQuota *tableMemoryQuota - redoLogEnabled bool - lastCommitTs uint64 + memoryQuota *tableMemoryQuota + lastCommitTs uint64 queueMu struct { sync.Mutex queue deque.Deque } + + redoLogEnabled bool + splitTxn bool + // batchGroupCount is the number of txnSizeEntries with same commitTs, which could be: // 1. Different txns with same commitTs but different startTs // 2. TxnSizeEntry split from the same txns which exceeds max rows or max size @@ -62,24 +65,29 @@ type txnSizeEntry struct { } // NewTableFlowController creates a new TableFlowController -func NewTableFlowController(quota uint64, redoLogEnabled bool) *TableFlowController { +func NewTableFlowController(quota uint64, redoLogEnabled bool, splitTxn bool) *TableFlowController { + log.Info("create table flow controller", + zap.Uint64("quota", quota), + zap.Bool("redoLogEnabled", redoLogEnabled), + zap.Bool("splitTxn", splitTxn)) maxSizePerTxn := uint64(defaultSizePerTxn) - if maxSizePerTxn > quota && !redoLogEnabled { + if maxSizePerTxn > quota { maxSizePerTxn = quota } return &TableFlowController{ - memoryQuota: newTableMemoryQuota(quota), - redoLogEnabled: redoLogEnabled, + memoryQuota: newTableMemoryQuota(quota), queueMu: struct { sync.Mutex queue deque.Deque }{ queue: deque.NewDeque(), }, - batchSize: defaultBatchSize, - maxRowsPerTxn: defaultRowsPerTxn, - maxSizePerTxn: maxSizePerTxn, + redoLogEnabled: redoLogEnabled, + splitTxn: splitTxn, + batchSize: defaultBatchSize, + maxRowsPerTxn: defaultRowsPerTxn, + maxSizePerTxn: maxSizePerTxn, } } @@ -92,8 +100,14 @@ func (c *TableFlowController) Consume( ) error { commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) - blockingCallBack := func() error { - err := callBack(c.batchID) + blockingCallBack := func() (err error) { + if commitTs > lastCommitTs || c.splitTxn { + // Call `callback` in two condition: + // 1. commitTs > lastCommitTs, handle new txn and send a normal resolved ts + // 2. commitTs == lastCommitTs && splitTxn = true, split the same txn and + // send a batch resolved ts + err = callBack(c.batchID) + } if commitTs == lastCommitTs { c.batchID++ @@ -108,11 +122,10 @@ func (c *TableFlowController) Consume( zap.Uint64("lastCommitTs", c.lastCommitTs)) } - if commitTs == lastCommitTs && c.redoLogEnabled { - // Here commitTs == lastCommitTs, which means we are not crossing transaction - // boundaries, and redo log currently does not support split transactions, hence - // we use `forceConsume` to avoid deadlock. - // TODO: fix this after we figure out how to make redo log support split txn. + if commitTs == lastCommitTs && (c.redoLogEnabled || !c.splitTxn) { + // Here `commitTs == lastCommitTs` means we are not crossing transaction + // boundaries, `c.redoLogEnabled || !c.splitTxn` means batch resolved mode + // are not supported, hence we should use `forceConsume` to avoid deadlock. if err := c.memoryQuota.forceConsume(size); err != nil { return errors.Trace(err) } @@ -198,7 +211,9 @@ func (c *TableFlowController) addEntry(msg *model.PolymorphicEvent, size uint64) rowCount: 1, batchID: c.batchID, }) - msg.Row.SplitTxn = true + if c.splitTxn { + msg.Row.SplitTxn = true + } } // resetBatch reset batchID and batchGroupCount if handling a new txn, Otherwise, diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index 7c2cefac86a..aedd9d92445 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -181,7 +181,7 @@ func TestFlowControlWithForceConsume(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(2048, true) + flowController := NewTableFlowController(2048, true, true) errg.Go(func() error { lastCommitTs := uint64(1) @@ -297,7 +297,7 @@ func TestFlowControlWithBatchAndForceConsume(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(512, true) + flowController := NewTableFlowController(512, true, true) maxBatch := uint64(3) // simulate a big txn @@ -366,7 +366,7 @@ func TestFlowControlWithBatchAndForceConsume(t *testing.T) { } } require.Less(t, uint64(0), flowController.GetConsumption()) - require.Equal(t, maxBatch, maxBatchID) + require.LessOrEqual(t, maxBatch, maxBatchID) select { case <-ctx.Done(): return ctx.Err() @@ -418,7 +418,7 @@ func TestFlowControlWithoutForceConsume(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(512, false) + flowController := NewTableFlowController(512, false, true) maxBatch := uint64(3) // simulate a big txn @@ -535,7 +535,7 @@ func TestFlowControlAbort(t *testing.T) { t.Parallel() callBacker := &mockCallBacker{} - controller := NewTableFlowController(1024, false) + controller := NewTableFlowController(1024, false, false) var wg sync.WaitGroup wg.Add(1) go func() { @@ -566,7 +566,7 @@ func TestFlowControlCallBack(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(512, false) + flowController := NewTableFlowController(512, false, false) errg.Go(func() error { lastCommitTs := uint64(1) @@ -670,7 +670,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { t.Parallel() var wg sync.WaitGroup - controller := NewTableFlowController(512, false) + controller := NewTableFlowController(512, false, false) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -712,7 +712,7 @@ func TestFlowControlCallBackError(t *testing.T) { t.Parallel() var wg sync.WaitGroup - controller := NewTableFlowController(512, false) + controller := NewTableFlowController(512, false, false) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -741,7 +741,7 @@ func TestFlowControlCallBackError(t *testing.T) { func TestFlowControlConsumeLargerThanQuota(t *testing.T) { t.Parallel() - controller := NewTableFlowController(1024, false) + controller := NewTableFlowController(1024, false, false) err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(uint64) error { t.Error("unreachable") return nil @@ -754,7 +754,7 @@ func BenchmarkTableFlowController(B *testing.B) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 102400) - flowController := NewTableFlowController(20*1024*1024, false) // 20M + flowController := NewTableFlowController(20*1024*1024, false, false) // 20M errg.Go(func() error { lastCommitTs := uint64(1) diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index 51422093c3f..72106acdd8f 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -386,10 +386,6 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } - if err := replicaConfig.ApplyProtocol(sinkURI).Validate(); err != nil { - return nil, errors.Trace(err) - } - saramaConfig, err := kafka.NewSaramaConfig(ctx, baseConfig) if err != nil { return nil, errors.Trace(err) @@ -482,10 +478,6 @@ func NewPulsarSink(ctx context.Context, sinkURI *url.URL, if s != "" { replicaConfig.Sink.Protocol = s } - err := replicaConfig.Validate() - if err != nil { - return nil, err - } var protocol config.Protocol if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil { diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index adcff8e6ed8..53f14a5e267 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -86,8 +86,8 @@ func TestKafkaSink(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() + require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI)) sink, err := NewKafkaSaramaSink(ctx, sinkURI, replicaConfig, errCh) - require.Nil(t, err) encoder := sink.encoderBuilder.Build() @@ -211,6 +211,7 @@ func TestFlushRowChangedEvents(t *testing.T) { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient }() + require.Nil(t, replicaConfig.ValidateAndAdjust(sinkURI)) sink, err := NewKafkaSaramaSink(ctx, sinkURI, replicaConfig, errCh) require.Nil(t, err) diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index b058da75ec7..b9a5d1c7e71 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -162,6 +162,9 @@ func New( if err != nil { return nil, cerror.WrapError(cerror.ErrSinkURIInvalid, err) } + if err := config.ValidateAndAdjust(sinkURI); err != nil { + return nil, err + } if newSink, ok := sinkIniterMap[strings.ToLower(sinkURI.Scheme)]; ok { return newSink(ctx, changefeedID, sinkURI, config, errCh) } diff --git a/pkg/cmd/cli/cli_changefeed_create.go b/pkg/cmd/cli/cli_changefeed_create.go index aa13f6d29d4..268030e6f52 100644 --- a/pkg/cmd/cli/cli_changefeed_create.go +++ b/pkg/cmd/cli/cli_changefeed_create.go @@ -258,11 +258,15 @@ func (o *createChangefeedOptions) validate(ctx context.Context, cmd *cobra.Comma return errors.New("Creating changefeed without a sink-uri") } - err := o.cfg.Validate() + paredSinkURI, err := url.Parse(o.commonChangefeedOptions.sinkURI) if err != nil { return err } + if err = o.cfg.ValidateAndAdjust(paredSinkURI); err != nil { + return err + } + if err := o.validateStartTs(ctx); err != nil { return err } diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index e52becf0a64..cca1875fd92 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -153,14 +153,14 @@ func TestStrictDecodeInvalidFile(t *testing.T) { configPath := filepath.Join(tmpDir, "ticdc.toml") configContent := fmt.Sprintf(` -unknown = "128.0.0.1:1234" -data-dir = "%+v" - -[log.unkown] -max-size = 200 -max-days = 1 -max-backups = 1 -`, dataDir) + unknown = "128.0.0.1:1234" + data-dir = "%+v" + + [log.unkown] + max-size = 200 + max-days = 1 + max-backups = 1 + `, dataDir) err := os.WriteFile(configPath, []byte(configContent), 0o644) require.Nil(t, err) @@ -182,7 +182,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { require.Equal(t, &config.MounterConfig{ WorkerNum: 16, }, cfg.Mounter) - err = cfg.Validate() + err = cfg.ValidateAndAdjust(nil) require.Nil(t, err) require.Equal(t, &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 32518ffd5ec..53cd9ce99a6 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -166,7 +166,8 @@ const ( ] } ], - "schema-registry": "" + "schema-registry": "", + "transaction-atomicity": "" }, "consistent": { "level": "none", diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index f4c8fb44343..5d9100c90f9 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -114,10 +114,10 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } } -// Validate verifies that each parameter is valid. -func (c *ReplicaConfig) Validate() error { +// ValidateAndAdjust verifies and adjusts the replica configuration. +func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { if c.Sink != nil { - err := c.Sink.validate(c.EnableOldValue) + err := c.Sink.validateAndAdjust(sinkURI, c.EnableOldValue) if err != nil { return err } @@ -125,15 +125,6 @@ func (c *ReplicaConfig) Validate() error { return nil } -// ApplyProtocol sinkURI to fill the `ReplicaConfig` -func (c *ReplicaConfig) ApplyProtocol(sinkURI *url.URL) *ReplicaConfig { - params := sinkURI.Query() - if s := params.Get(ProtocolKey); s != "" { - c.Sink.Protocol = s - } - return c -} - // GetDefaultReplicaConfig returns the default replica config. func GetDefaultReplicaConfig() *ReplicaConfig { return defaultReplicaConfig.Clone() diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 64fba75187f..07ed1b36a21 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -81,27 +81,28 @@ func TestReplicaConfigOutDated(t *testing.T) { {Matcher: []string{"a.c"}, DispatcherRule: "r2"}, {Matcher: []string{"a.d"}, DispatcherRule: "r2"}, } + conf.Sink.TxnAtomicity = unknowTxnAtomicity require.Equal(t, conf, conf2) } func TestReplicaConfigValidate(t *testing.T) { t.Parallel() conf := GetDefaultReplicaConfig() - require.Nil(t, conf.Validate()) + require.Nil(t, conf.ValidateAndAdjust(nil)) // Incorrect sink configuration. conf = GetDefaultReplicaConfig() conf.Sink.Protocol = "canal" conf.EnableOldValue = false require.Regexp(t, ".*canal protocol requires old value to be enabled.*", - conf.Validate()) + conf.ValidateAndAdjust(nil)) conf = GetDefaultReplicaConfig() conf.Sink.DispatchRules = []*DispatchRule{ {Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"}, } require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", - conf.Validate()) + conf.ValidateAndAdjust(nil)) // Correct sink configuration. conf = GetDefaultReplicaConfig() @@ -110,7 +111,7 @@ func TestReplicaConfigValidate(t *testing.T) { {Matcher: []string{"a.c"}, PartitionRule: "p1"}, {Matcher: []string{"a.d"}}, } - err := conf.Validate() + err := conf.ValidateAndAdjust(nil) require.Nil(t, err) rules := conf.Sink.DispatchRules require.Equal(t, "d1", rules[0].PartitionRule) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index b05c9be39e7..30964527e30 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -15,6 +15,7 @@ package config import ( "fmt" + "net/url" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -22,9 +23,36 @@ import ( "go.uber.org/zap" ) -// DefaultMaxMessageBytes sets the default value for max-message-bytes +// DefaultMaxMessageBytes sets the default value for max-message-bytes. const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M +// AtomicityLevel represents the atomicity level of a changefeed. +type AtomicityLevel string + +const ( + // unknowTxnAtomicity is the default atomicity level, which is invalid and will + // be set to a valid value when initializing sink in processor. + unknowTxnAtomicity AtomicityLevel = "" + + // noneTxnAtomicity means atomicity of transactions is not guaranteed + noneTxnAtomicity AtomicityLevel = "none" + + // tableTxnAtomicity means atomicity of single table transactions is guaranteed. + tableTxnAtomicity AtomicityLevel = "table" + + // globalTxnAtomicity means atomicity of cross table transactions is guaranteed, which + // is currently not supported by TiCDC. + // globalTxnAtomicity AtomicityLevel = "global" + + defaultMqTxnAtomicity AtomicityLevel = noneTxnAtomicity + defaultMysqlTxnAtomicity AtomicityLevel = tableTxnAtomicity +) + +// ShouldSplitTxn returns whether the sink should split txn. +func (l AtomicityLevel) ShouldSplitTxn() bool { + return l == noneTxnAtomicity +} + // ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. var ForceEnableOldValueProtocols = []string{ ProtocolCanal.String(), @@ -38,9 +66,10 @@ type SinkConfig struct { Protocol string `toml:"protocol" json:"protocol"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors"` SchemaRegistry string `toml:"schema-registry" json:"schema-registry"` + TxnAtomicity AtomicityLevel `toml:"transaction-atomicity" json:"transaction-atomicity"` } -// DispatchRule represents partition rule for a table +// DispatchRule represents partition rule for a table. type DispatchRule struct { Matcher []string `toml:"matcher" json:"matcher"` // Deprecated, please use PartitionRule. @@ -57,7 +86,11 @@ type ColumnSelector struct { Columns []string `toml:"columns" json:"columns"` } -func (s *SinkConfig) validate(enableOldValue bool) error { +func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL, enableOldValue bool) error { + if err := s.applyParameter(sinkURI); err != nil { + return err + } + if !enableOldValue { for _, protocolStr := range ForceEnableOldValueProtocols { if protocolStr == s.Protocol { @@ -86,3 +119,62 @@ func (s *SinkConfig) validate(enableOldValue bool) error { return nil } + +// applyParameter fill the `ReplicaConfig` and `TxnAtomicity` by sinkURI. +func (s *SinkConfig) applyParameter(sinkURI *url.URL) error { + if sinkURI == nil { + return nil + } + params := sinkURI.Query() + + txnAtomicity := params.Get("transaction-atomicity") + switch AtomicityLevel(txnAtomicity) { + case unknowTxnAtomicity: + // Set default value according to scheme. + if isMqScheme(sinkURI.Scheme) { + s.TxnAtomicity = defaultMqTxnAtomicity + } else { + s.TxnAtomicity = defaultMysqlTxnAtomicity + } + case noneTxnAtomicity: + s.TxnAtomicity = noneTxnAtomicity + case tableTxnAtomicity: + // MqSink only support `noneTxnAtomicity`. + if isMqScheme(sinkURI.Scheme) { + log.Warn("The configuration of transaction-atomicity is incompatible with scheme", + zap.Any("txnAtomicity", s.TxnAtomicity), + zap.String("scheme", sinkURI.Scheme), + zap.String("protocol", s.Protocol)) + s.TxnAtomicity = defaultMqTxnAtomicity + } else { + s.TxnAtomicity = tableTxnAtomicity + } + default: + errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme", + txnAtomicity, sinkURI.Scheme) + return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg) + } + + s.Protocol = params.Get(ProtocolKey) + // validate that protocol is compatible with the scheme + if isMqScheme(sinkURI.Scheme) { + var protocol Protocol + err := protocol.FromString(s.Protocol) + if err != nil { + return err + } + } else if s.Protocol != "" { + return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol cannot "+ + "be configured when using %s scheme", sinkURI.Scheme)) + } + + log.Info("succeed to parse parameter from sink uri", + zap.String("protocol", s.Protocol), + zap.String("txnAtomicity", string(s.TxnAtomicity))) + return nil +} + +func isMqScheme(scheme string) bool { + return scheme == "kafka" || scheme == "kafka+ssl" || + scheme == "pulsar" || scheme == "pulsar+ssl" +} diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index c34fcd832bf..94378a25095 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -14,12 +14,13 @@ package config import ( + "net/url" "testing" "github.com/stretchr/testify/require" ) -func TestValidate(t *testing.T) { +func TestValidateOldValue(t *testing.T) { t.Parallel() testCases := []struct { protocol string @@ -73,9 +74,80 @@ func TestValidate(t *testing.T) { Protocol: tc.protocol, } if tc.expectedErr == "" { - require.Nil(t, cfg.validate(tc.enableOldValue)) + require.Nil(t, cfg.validateAndAdjust(nil, tc.enableOldValue)) } else { - require.Regexp(t, tc.expectedErr, cfg.validate(tc.enableOldValue)) + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(nil, tc.enableOldValue)) + } + } +} + +func TestValidateApplyParameter(t *testing.T) { + t.Parallel() + testCases := []struct { + sinkURI string + expectedErr string + expectedLevel AtomicityLevel + }{ + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306", + expectedErr: "", + expectedLevel: tableTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=table", + expectedErr: "", + expectedLevel: tableTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=global", + expectedErr: "global level atomicity is not supported by.*", + }, + { + sinkURI: "tidb://normal:123456@127.0.0.1:3306?protocol=canal", + expectedErr: ".*protocol cannot be configured when using tidb scheme.*", + }, + { + sinkURI: "blackhole://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=none" + + "&protocol=open-protocol", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "pulsar://127.0.0.1:9092?transaction-atomicity=table" + + "&protocol=open-protocol", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?protocol=default", + expectedErr: "", + expectedLevel: noneTxnAtomicity, + }, + { + sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=table", + expectedErr: ".*unknown .* protocol for Message Queue sink.*", + }, + } + + for _, tc := range testCases { + cfg := SinkConfig{} + parsedSinkURI, err := url.Parse(tc.sinkURI) + require.Nil(t, err) + if tc.expectedErr == "" { + require.Nil(t, cfg.validateAndAdjust(parsedSinkURI, true)) + require.Equal(t, tc.expectedLevel, cfg.TxnAtomicity) + } else { + require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI, true)) } } } diff --git a/tests/integration_tests/big_txn/run.sh b/tests/integration_tests/big_txn/run.sh index ee93cc5a757..ca969dcb0d7 100755 --- a/tests/integration_tests/big_txn/run.sh +++ b/tests/integration_tests/big_txn/run.sh @@ -20,10 +20,12 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + TOPIC_NAME="ticdc-big-txn-test-$RANDOM" case $SINK_TYPE in kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; - *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"