Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sinkv2(ticdc): fix update resolved ts not taking effect issue #6118

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type CallbackFunc func()
type CallbackableEvent[E TableEvent] struct {
Event E
Callback CallbackFunc
TableStatus *pipeline.TableState
TableStatus pipeline.TableState
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
}

// RowChangeCallbackableEvent is the row change event which can be callbacked.
Expand Down
4 changes: 4 additions & 0 deletions cdc/sinkv2/eventsink/event_appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
)

func TestRowChangeEventAppender(t *testing.T) {
t.Parallel()

tableInfo := &model.TableName{
Schema: "test",
Table: "t1",
Expand Down Expand Up @@ -53,6 +55,8 @@ func TestRowChangeEventAppender(t *testing.T) {
}

func TestTxnEventAppender(t *testing.T) {
t.Parallel()

tableInfo := &model.TableName{
Schema: "test",
Table: "t1",
Expand Down
24 changes: 24 additions & 0 deletions cdc/sinkv2/eventsink/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package eventsink

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
26 changes: 23 additions & 3 deletions cdc/sinkv2/tablesink/event_table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,23 @@ type eventTableSink[E eventsink.TableEvent] struct {
eventAppender eventsink.Appender[E]
// NOTICE: It is ordered by commitTs.
eventBuffer []E
state *pipeline.TableState
state pipeline.TableState
}

// New an eventTableSink with given backendSink and event appender.
func New[E eventsink.TableEvent](
backendSink eventsink.EventSink[E],
appender eventsink.Appender[E],
) *eventTableSink[E] {
return &eventTableSink[E]{
eventID: 0,
maxResolvedTs: model.NewResolvedTs(0),
backendSink: backendSink,
progressTracker: newProgressTracker(),
eventAppender: appender,
eventBuffer: make([]E, 0, 1024),
state: pipeline.TableStatePreparing,
}
}

func (e *eventTableSink[E]) AppendRowChangedEvents(rows ...*model.RowChangedEvent) {
Expand All @@ -58,18 +74,22 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) {
return
}
resolvedEvents := e.eventBuffer[:i]
e.eventBuffer = append(make([]E, 0, len(e.eventBuffer[i:])), e.eventBuffer[i:]...)
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved

resolvedCallbackableEvents := make([]*eventsink.CallbackableEvent[E], 0, len(resolvedEvents))

for _, ev := range resolvedEvents {
// We have to record the event ID for the callback.
eventID := e.eventID
ce := &eventsink.CallbackableEvent[E]{
Event: ev,
Callback: func() {
e.progressTracker.remove(e.eventID)
e.progressTracker.remove(eventID)
},
TableStatus: e.state,
}
resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce)
e.progressTracker.addEvent(e.eventID)
e.progressTracker.addEvent(eventID)
e.eventID++
}
// Do not forget to add the resolvedTs to progressTracker.
Expand Down
252 changes: 252 additions & 0 deletions cdc/sinkv2/tablesink/event_table_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tablesink

import (
"sort"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/stretchr/testify/require"
)

// Assert TableSink implementation
var _ eventsink.EventSink[*model.SingleTableTxn] = (*mockEventSink)(nil)

type mockEventSink struct {
events []*eventsink.TxnCallbackableEvent
}

func (m *mockEventSink) WriteEvents(rows ...*eventsink.TxnCallbackableEvent) {
m.events = append(m.events, rows...)
}

func (m *mockEventSink) Close() error {
// Do nothing.
return nil
}

// acknowledge the txn events by call the callback function.
func (m *mockEventSink) acknowledge(commitTs uint64) {
i := sort.Search(len(m.events), func(i int) bool {
return m.events[i].Event.GetCommitTs() > commitTs
})
if i == 0 {
return
}
ackedEvents := m.events[:i]
m.events = append(
make([]*eventsink.TxnCallbackableEvent,
0,
len(m.events[i:])),
m.events[i:]...,
)

for _, event := range ackedEvents {
if event.TableStatus.Load() != pipeline.TableStateStopped {
event.Callback()
}
}
}

func getTestRows() []*model.RowChangedEvent {
tableInfo := &model.TableName{
Schema: "test",
Table: "t1",
TableID: 1,
IsPartition: false,
}

return []*model.RowChangedEvent{
{
Table: tableInfo,
CommitTs: 101,
StartTs: 98,
},
{
Table: tableInfo,
CommitTs: 102,
StartTs: 99,
},
{
Table: tableInfo,
CommitTs: 102,
StartTs: 100,
},
{
Table: tableInfo,
CommitTs: 102,
StartTs: 100,
},
{
Table: tableInfo,
CommitTs: 103,
StartTs: 101,
},
{
Table: tableInfo,
CommitTs: 103,
StartTs: 101,
},
{
Table: tableInfo,
CommitTs: 104,
StartTs: 102,
},
{
Table: tableInfo,
CommitTs: 105,
StartTs: 103,
// Batch1
SplitTxn: true,
},
{
Table: tableInfo,
CommitTs: 105,
StartTs: 103,
},
{
Table: tableInfo,
CommitTs: 105,
StartTs: 103,
},
{
Table: tableInfo,
CommitTs: 105,
StartTs: 103,
// Batch2
SplitTxn: true,
},
{
Table: tableInfo,
CommitTs: 105,
StartTs: 103,
},
}
}

func TestNewEventTableSink(t *testing.T) {
t.Parallel()

sink := &mockEventSink{}
tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{})

require.Equal(t, uint64(0), tb.eventID, "eventID should start from 0")
require.Equal(t, model.NewResolvedTs(0), tb.maxResolvedTs, "maxResolvedTs should start from 0")
require.NotNil(t, sink, tb.backendSink, "backendSink should be set")
require.NotNil(t, tb.progressTracker, "progressTracker should be set")
require.NotNil(t, tb.eventAppender, "eventAppender should be set")
require.Equal(t, 0, len(tb.eventBuffer), "eventBuffer should be empty")
require.Equal(t, pipeline.TableStatePreparing, tb.state, "tableState should be unknown")
}

func TestAppendRowChangedEvents(t *testing.T) {
t.Parallel()

sink := &mockEventSink{}
tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{})

tb.AppendRowChangedEvents(getTestRows()...)
require.Len(t, tb.eventBuffer, 7, "txn event buffer should have 7 txns")
}

func TestUpdateResolvedTs(t *testing.T) {
t.Parallel()

sink := &mockEventSink{}
tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{})

tb.AppendRowChangedEvents(getTestRows()...)
// No event will be flushed.
tb.UpdateResolvedTs(model.NewResolvedTs(100))
require.Equal(t, model.NewResolvedTs(100), tb.maxResolvedTs, "maxResolvedTs should be updated")
require.Len(t, tb.eventBuffer, 7, "txn event buffer should have 7 txns")
require.Len(t, sink.events, 0, "no event should not be flushed")

// One event will be flushed.
tb.UpdateResolvedTs(model.NewResolvedTs(101))
require.Equal(t, model.NewResolvedTs(101), tb.maxResolvedTs, "maxResolvedTs should be updated")
require.Len(t, tb.eventBuffer, 6, "txn event buffer should have 6 txns")
require.Len(t, sink.events, 1, "one event should be flushed")

// Two events will be flushed.
tb.UpdateResolvedTs(model.NewResolvedTs(102))
require.Equal(t, model.NewResolvedTs(102), tb.maxResolvedTs, "maxResolvedTs should be updated")
require.Len(t, tb.eventBuffer, 4, "txn event buffer should have 4 txns")
require.Len(t, sink.events, 3, "two events should be flushed")

// Same resolved ts will not be flushed.
tb.UpdateResolvedTs(model.NewResolvedTs(102))
require.Equal(
t,
model.NewResolvedTs(102),
tb.maxResolvedTs,
"maxResolvedTs should not be updated",
)
require.Len(t, tb.eventBuffer, 4, "txn event buffer should still have 4 txns")
require.Len(t, sink.events, 3, "no event should be flushed")

// All events will be flushed.
tb.UpdateResolvedTs(model.NewResolvedTs(105))
require.Equal(t, model.NewResolvedTs(105), tb.maxResolvedTs, "maxResolvedTs should be updated")
require.Len(t, tb.eventBuffer, 0, "txn event buffer should be empty")
require.Len(t, sink.events, 7, "all events should be flushed")
}

func TestGetCheckpointTs(t *testing.T) {
t.Parallel()

sink := &mockEventSink{}
tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{})

tb.AppendRowChangedEvents(getTestRows()...)
require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0")

// One event will be flushed.
tb.UpdateResolvedTs(model.NewResolvedTs(101))
require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0")
sink.acknowledge(101)
require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101")

// Flush all events.
tb.UpdateResolvedTs(model.NewResolvedTs(105))
require.Equal(t, model.NewResolvedTs(101), tb.GetCheckpointTs(), "checkpointTs should be 101")

// Only acknowledge some events.
sink.acknowledge(102)
require.Equal(
t,
model.NewResolvedTs(101),
tb.GetCheckpointTs(),
"checkpointTs should still be 101",
)

// Ack all events.
sink.acknowledge(105)
require.Equal(t, model.NewResolvedTs(105), tb.GetCheckpointTs(), "checkpointTs should be 105")
}

func TestClose(t *testing.T) {
t.Parallel()

sink := &mockEventSink{}
tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{})

tb.AppendRowChangedEvents(getTestRows()...)
tb.UpdateResolvedTs(model.NewResolvedTs(100))
tb.Close()
require.Equal(t, pipeline.TableStateStopped, tb.state, "tableState should be closed")
}
8 changes: 8 additions & 0 deletions cdc/sinkv2/tablesink/progress_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
)

func TestNewProgressTracker(t *testing.T) {
t.Parallel()

tracker := newProgressTracker()
require.NotNil(
t,
Expand All @@ -36,6 +38,8 @@ func TestNewProgressTracker(t *testing.T) {
}

func TestAddEvent(t *testing.T) {
t.Parallel()

tracker := newProgressTracker()
tracker.addEvent(1)
tracker.addEvent(2)
Expand All @@ -44,6 +48,8 @@ func TestAddEvent(t *testing.T) {
}

func TestAddResolvedTs(t *testing.T) {
t.Parallel()

// There is no event in the tracker.
tracker := newProgressTracker()
tracker.addResolvedTs(1, model.NewResolvedTs(1))
Expand All @@ -62,6 +68,8 @@ func TestAddResolvedTs(t *testing.T) {
}

func TestRemove(t *testing.T) {
t.Parallel()

// Only event.
tracker := newProgressTracker()
tracker.addEvent(1)
Expand Down