From 4c07175c96bfc7574e53d570e26131acc2b7d78a Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 29 Jun 2022 19:17:54 +0800 Subject: [PATCH 1/4] sinkv2(ticdc): fix update resolved ts not taking effect issue --- cdc/sinkv2/eventsink/event.go | 2 +- cdc/sinkv2/eventsink/event_appender_test.go | 4 + cdc/sinkv2/eventsink/main_test.go | 24 ++ cdc/sinkv2/tablesink/event_table_sink.go | 26 +- cdc/sinkv2/tablesink/event_table_sink_test.go | 253 ++++++++++++++++++ cdc/sinkv2/tablesink/progress_tracker_test.go | 8 + 6 files changed, 313 insertions(+), 4 deletions(-) create mode 100644 cdc/sinkv2/eventsink/main_test.go create mode 100644 cdc/sinkv2/tablesink/event_table_sink_test.go diff --git a/cdc/sinkv2/eventsink/event.go b/cdc/sinkv2/eventsink/event.go index 5d1f9b70281..4cb9e096193 100644 --- a/cdc/sinkv2/eventsink/event.go +++ b/cdc/sinkv2/eventsink/event.go @@ -32,7 +32,7 @@ type CallbackFunc func() type CallbackableEvent[E TableEvent] struct { Event E Callback CallbackFunc - TableStatus *pipeline.TableState + TableStatus pipeline.TableState } // RowChangeCallbackableEvent is the row change event which can be callbacked. diff --git a/cdc/sinkv2/eventsink/event_appender_test.go b/cdc/sinkv2/eventsink/event_appender_test.go index e7cb4af0f2e..95a561256e3 100644 --- a/cdc/sinkv2/eventsink/event_appender_test.go +++ b/cdc/sinkv2/eventsink/event_appender_test.go @@ -21,6 +21,8 @@ import ( ) func TestRowChangeEventAppender(t *testing.T) { + t.Parallel() + tableInfo := &model.TableName{ Schema: "test", Table: "t1", @@ -53,6 +55,8 @@ func TestRowChangeEventAppender(t *testing.T) { } func TestTxnEventAppender(t *testing.T) { + t.Parallel() + tableInfo := &model.TableName{ Schema: "test", Table: "t1", diff --git a/cdc/sinkv2/eventsink/main_test.go b/cdc/sinkv2/eventsink/main_test.go new file mode 100644 index 00000000000..e1321fb3f07 --- /dev/null +++ b/cdc/sinkv2/eventsink/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventsink + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go index ffcabe19894..144cb9d657f 100644 --- a/cdc/sinkv2/tablesink/event_table_sink.go +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -33,7 +33,23 @@ type eventTableSink[E eventsink.TableEvent] struct { eventAppender eventsink.Appender[E] // NOTICE: It is ordered by commitTs. eventBuffer []E - state *pipeline.TableState + state pipeline.TableState +} + +// New an eventTableSink with given backendSink and event appender. +func New[E eventsink.TableEvent]( + backendSink eventsink.EventSink[E], + appender eventsink.Appender[E], +) *eventTableSink[E] { + return &eventTableSink[E]{ + eventID: 0, + maxResolvedTs: model.NewResolvedTs(0), + backendSink: backendSink, + progressTracker: newProgressTracker(), + eventAppender: appender, + eventBuffer: make([]E, 0, 1024), + state: pipeline.TableStatePreparing, + } } func (e *eventTableSink[E]) AppendRowChangedEvents(rows ...*model.RowChangedEvent) { @@ -58,18 +74,22 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { return } resolvedEvents := e.eventBuffer[:i] + e.eventBuffer = append(make([]E, 0, len(e.eventBuffer[i:])), e.eventBuffer[i:]...) resolvedCallbackableEvents := make([]*eventsink.CallbackableEvent[E], 0, len(resolvedEvents)) + for _, ev := range resolvedEvents { + // We have to record the event ID for the callback. + eventID := e.eventID ce := &eventsink.CallbackableEvent[E]{ Event: ev, Callback: func() { - e.progressTracker.remove(e.eventID) + e.progressTracker.remove(eventID) }, TableStatus: e.state, } resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce) - e.progressTracker.addEvent(e.eventID) + e.progressTracker.addEvent(eventID) e.eventID++ } // Do not forget to add the resolvedTs to progressTracker. diff --git a/cdc/sinkv2/tablesink/event_table_sink_test.go b/cdc/sinkv2/tablesink/event_table_sink_test.go new file mode 100644 index 00000000000..cd984af5af8 --- /dev/null +++ b/cdc/sinkv2/tablesink/event_table_sink_test.go @@ -0,0 +1,253 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tablesink + +import ( + "sort" + "testing" + + "github.com/pingcap/tiflow/cdc/processor/pipeline" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" + "github.com/stretchr/testify/require" +) + +// Assert TableSink implementation +var _ eventsink.EventSink[*model.SingleTableTxn] = (*mockEventSink)(nil) + +type mockEventSink struct { + events []*eventsink.TxnCallbackableEvent +} + +func (m *mockEventSink) WriteEvents(rows ...*eventsink.TxnCallbackableEvent) { + m.events = append(m.events, rows...) +} + +func (m *mockEventSink) Close() error { + // Do nothing. + return nil +} + +// acknowledge the txn events by call the callback function. +func (m *mockEventSink) acknowledge(commitTs uint64) { + i := sort.Search(len(m.events), func(i int) bool { + return m.events[i].Event.GetCommitTs() > commitTs + }) + if i == 0 { + return + } + ackedEvents := m.events[:i] + m.events = append( + make([]*eventsink.TxnCallbackableEvent, + 0, + len(m.events[i:])), + m.events[i:]..., + ) + + for _, event := range ackedEvents { + if event.TableStatus.Load() != pipeline.TableStateStopped { + event.Callback() + } + } +} + +func getTestRows() []*model.RowChangedEvent { + tableInfo := &model.TableName{ + Schema: "test", + Table: "t1", + TableID: 1, + IsPartition: false, + } + + return []*model.RowChangedEvent{ + { + Table: tableInfo, + CommitTs: 101, + StartTs: 98, + }, + { + Table: tableInfo, + CommitTs: 102, + StartTs: 99, + }, + { + Table: tableInfo, + CommitTs: 102, + StartTs: 100, + }, + { + Table: tableInfo, + CommitTs: 102, + StartTs: 100, + }, + { + Table: tableInfo, + CommitTs: 103, + StartTs: 101, + }, + { + Table: tableInfo, + CommitTs: 103, + StartTs: 101, + }, + { + Table: tableInfo, + CommitTs: 104, + StartTs: 102, + }, + { + Table: tableInfo, + CommitTs: 105, + StartTs: 103, + // Batch1 + SplitTxn: true, + }, + { + Table: tableInfo, + CommitTs: 105, + StartTs: 103, + }, + { + Table: tableInfo, + CommitTs: 105, + StartTs: 103, + }, + { + Table: tableInfo, + CommitTs: 105, + StartTs: 103, + // Batch2 + SplitTxn: true, + }, + { + Table: tableInfo, + CommitTs: 105, + StartTs: 103, + }, + } +} + +func TestNewEventTableSink(t *testing.T) { + t.Parallel() + + sink := &mockEventSink{} + tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{}) + + require.Equal(t, uint64(0), tb.eventID, "eventID should start from 0") + require.Equal(t, model.NewResolvedTs(0), tb.maxResolvedTs, "maxResolvedTs should start from 0") + require.NotNil(t, sink, tb.backendSink, "backendSink should be set") + require.NotNil(t, tb.progressTracker, "progressTracker should be set") + require.NotNil(t, tb.eventAppender, "eventAppender should be set") + require.Equal(t, 0, len(tb.eventBuffer), "eventBuffer should be empty") + require.Equal(t, pipeline.TableStatePreparing, tb.state, "tableState should be unknown") +} + +func TestAppendRowChangedEvents(t *testing.T) { + t.Parallel() + + sink := &mockEventSink{} + tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{}) + + tb.AppendRowChangedEvents(getTestRows()...) + require.Len(t, tb.eventBuffer, 7, "txn event buffer should have 7 txns") +} + +func TestUpdateResolvedTs(t *testing.T) { + t.Parallel() + + sink := &mockEventSink{} + tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{}) + + tb.AppendRowChangedEvents(getTestRows()...) + // No event will be flushed. + tb.UpdateResolvedTs(model.NewResolvedTs(100)) + require.Equal(t, model.NewResolvedTs(100), tb.maxResolvedTs, "maxResolvedTs should be updated") + require.Len(t, tb.eventBuffer, 7, "txn event buffer should have 7 txns") + require.Len(t, sink.events, 0, "no event should not be flushed") + + // One event will be flushed. + tb.UpdateResolvedTs(model.NewResolvedTs(101)) + require.Equal(t, model.NewResolvedTs(101), tb.maxResolvedTs, "maxResolvedTs should be updated") + require.Len(t, tb.eventBuffer, 6, "txn event buffer should have 6 txns") + require.Len(t, sink.events, 1, "one event should be flushed") + + // Two events will be flushed. + tb.UpdateResolvedTs(model.NewResolvedTs(102)) + require.Equal(t, model.NewResolvedTs(102), tb.maxResolvedTs, "maxResolvedTs should be updated") + require.Len(t, tb.eventBuffer, 4, "txn event buffer should have 4 txns") + require.Len(t, sink.events, 3, "two events should be flushed") + + // Same resolved ts will not be flushed. + tb.UpdateResolvedTs(model.NewResolvedTs(102)) + require.Equal( + t, + model.NewResolvedTs(102), + tb.maxResolvedTs, + "maxResolvedTs should not be updated", + ) + require.Len(t, tb.eventBuffer, 4, "txn event buffer should still have 4 txns") + require.Len(t, sink.events, 3, "no event should be flushed") + + // All events will be flushed. + tb.UpdateResolvedTs(model.NewResolvedTs(105)) + require.Equal(t, model.NewResolvedTs(105), tb.maxResolvedTs, "maxResolvedTs should be updated") + require.Len(t, tb.eventBuffer, 0, "txn event buffer should be empty") + require.Len(t, sink.events, 7, "all events should be flushed") +} + +func TestGetCheckpointTs(t *testing.T) { + t.Parallel() + + sink := &mockEventSink{} + tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{}) + + tb.AppendRowChangedEvents(getTestRows()...) + require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0") + + // One event will be flushed. + tb.UpdateResolvedTs(model.NewResolvedTs(101)) + require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0") + sink.acknowledge(101) + require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101") + + // Flush all events. + tb.UpdateResolvedTs(model.NewResolvedTs(105)) + require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101") + + // Only acknowledge some events. + sink.acknowledge(102) + require.Equal( + t, + model.NewResolvedTs(101), + tb.GetCheckpointTs(), + "checkpointTs should still be 101", + ) + + // Ack all events. + sink.acknowledge(105) + require.Equal(t, model.NewResolvedTs(105), tb.GetCheckpointTs(), "checkpointTs should be 105") +} + +func TestClose(t *testing.T) { + t.Parallel() + + sink := &mockEventSink{} + tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{}) + + tb.AppendRowChangedEvents(getTestRows()...) + tb.UpdateResolvedTs(model.NewResolvedTs(100)) + tb.Close() + require.Equal(t, pipeline.TableStateStopped, tb.state, "tableState should be closed") +} diff --git a/cdc/sinkv2/tablesink/progress_tracker_test.go b/cdc/sinkv2/tablesink/progress_tracker_test.go index 575ff410783..04d3c921cf5 100644 --- a/cdc/sinkv2/tablesink/progress_tracker_test.go +++ b/cdc/sinkv2/tablesink/progress_tracker_test.go @@ -21,6 +21,8 @@ import ( ) func TestNewProgressTracker(t *testing.T) { + t.Parallel() + tracker := newProgressTracker() require.NotNil( t, @@ -36,6 +38,8 @@ func TestNewProgressTracker(t *testing.T) { } func TestAddEvent(t *testing.T) { + t.Parallel() + tracker := newProgressTracker() tracker.addEvent(1) tracker.addEvent(2) @@ -44,6 +48,8 @@ func TestAddEvent(t *testing.T) { } func TestAddResolvedTs(t *testing.T) { + t.Parallel() + // There is no event in the tracker. tracker := newProgressTracker() tracker.addResolvedTs(1, model.NewResolvedTs(1)) @@ -62,6 +68,8 @@ func TestAddResolvedTs(t *testing.T) { } func TestRemove(t *testing.T) { + t.Parallel() + // Only event. tracker := newProgressTracker() tracker.addEvent(1) From 7918eaa4c7fd8c194a0aacdc212974397ca376c4 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 29 Jun 2022 19:20:39 +0800 Subject: [PATCH 2/4] sinkv2(ticdc): refine import --- cdc/sinkv2/tablesink/event_table_sink_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cdc/sinkv2/tablesink/event_table_sink_test.go b/cdc/sinkv2/tablesink/event_table_sink_test.go index cd984af5af8..8650524729d 100644 --- a/cdc/sinkv2/tablesink/event_table_sink_test.go +++ b/cdc/sinkv2/tablesink/event_table_sink_test.go @@ -17,9 +17,8 @@ import ( "sort" "testing" - "github.com/pingcap/tiflow/cdc/processor/pipeline" - "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/stretchr/testify/require" ) From d78e21771bd4663d42b948ca413bea4b75a324a8 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 30 Jun 2022 17:25:26 +0800 Subject: [PATCH 3/4] sinkv2(ticdc): address comments --- cdc/sinkv2/tablesink/event_table_sink.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go index 144cb9d657f..c735b78b4c7 100644 --- a/cdc/sinkv2/tablesink/event_table_sink.go +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -69,18 +69,19 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { }) // Despite the lack of data, we have to move forward with progress. if i == 0 { - e.progressTracker.addResolvedTs(e.eventID, resolvedTs) - e.eventID++ + e.progressTracker.addResolvedTs(e.genEventID(), resolvedTs) return } resolvedEvents := e.eventBuffer[:i] e.eventBuffer = append(make([]E, 0, len(e.eventBuffer[i:])), e.eventBuffer[i:]...) + // We have to create a new slice for the rest of the elements, + // otherwise we cannot GC the flushed values as soon as possible. resolvedCallbackableEvents := make([]*eventsink.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { // We have to record the event ID for the callback. - eventID := e.eventID + eventID := e.genEventID() ce := &eventsink.CallbackableEvent[E]{ Event: ev, Callback: func() { @@ -90,11 +91,9 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { } resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce) e.progressTracker.addEvent(eventID) - e.eventID++ } // Do not forget to add the resolvedTs to progressTracker. - e.progressTracker.addResolvedTs(e.eventID, resolvedTs) - e.eventID++ + e.progressTracker.addResolvedTs(e.genEventID(), resolvedTs) e.backendSink.WriteEvents(resolvedCallbackableEvents...) } @@ -105,3 +104,10 @@ func (e *eventTableSink[E]) GetCheckpointTs() model.ResolvedTs { func (e *eventTableSink[E]) Close() { e.state.Store(pipeline.TableStateStopped) } + +// genEventID generates an unique ID for event. +func (e *eventTableSink[E]) genEventID() uint64 { + res := e.eventID + e.eventID++ + return res +} From a94d4c3f7ce89500446af060669d8d0eb39db31a Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Fri, 1 Jul 2022 11:51:26 +0800 Subject: [PATCH 4/4] sinkv2(ticdc): address comments --- cdc/sinkv2/eventsink/event.go | 2 +- cdc/sinkv2/tablesink/event_table_sink.go | 2 +- cdc/sinkv2/tablesink/event_table_sink_test.go | 22 +++++++++++++------ 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/cdc/sinkv2/eventsink/event.go b/cdc/sinkv2/eventsink/event.go index 4cb9e096193..5d1f9b70281 100644 --- a/cdc/sinkv2/eventsink/event.go +++ b/cdc/sinkv2/eventsink/event.go @@ -32,7 +32,7 @@ type CallbackFunc func() type CallbackableEvent[E TableEvent] struct { Event E Callback CallbackFunc - TableStatus pipeline.TableState + TableStatus *pipeline.TableState } // RowChangeCallbackableEvent is the row change event which can be callbacked. diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go index c735b78b4c7..cf56fe36de2 100644 --- a/cdc/sinkv2/tablesink/event_table_sink.go +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -87,7 +87,7 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { Callback: func() { e.progressTracker.remove(eventID) }, - TableStatus: e.state, + TableStatus: &e.state, } resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce) e.progressTracker.addEvent(eventID) diff --git a/cdc/sinkv2/tablesink/event_table_sink_test.go b/cdc/sinkv2/tablesink/event_table_sink_test.go index 8650524729d..f2ad57401d9 100644 --- a/cdc/sinkv2/tablesink/event_table_sink_test.go +++ b/cdc/sinkv2/tablesink/event_table_sink_test.go @@ -48,18 +48,23 @@ func (m *mockEventSink) acknowledge(commitTs uint64) { return } ackedEvents := m.events[:i] - m.events = append( - make([]*eventsink.TxnCallbackableEvent, - 0, - len(m.events[i:])), - m.events[i:]..., - ) for _, event := range ackedEvents { if event.TableStatus.Load() != pipeline.TableStateStopped { event.Callback() + } else { + // If the table is stopped, the event should be ignored. + return } } + + // Remove the acked events from the event buffer. + m.events = append( + make([]*eventsink.TxnCallbackableEvent, + 0, + len(m.events[i:])), + m.events[i:]..., + ) } func getTestRows() []*model.RowChangedEvent { @@ -246,7 +251,10 @@ func TestClose(t *testing.T) { tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{}) tb.AppendRowChangedEvents(getTestRows()...) - tb.UpdateResolvedTs(model.NewResolvedTs(100)) + tb.UpdateResolvedTs(model.NewResolvedTs(105)) + require.Len(t, sink.events, 7, "all events should be flushed") tb.Close() require.Equal(t, pipeline.TableStateStopped, tb.state, "tableState should be closed") + sink.acknowledge(105) + require.Len(t, sink.events, 7, "no event should be acked") }