From a94d4c3f7ce89500446af060669d8d0eb39db31a Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Fri, 1 Jul 2022 11:51:26 +0800 Subject: [PATCH] sinkv2(ticdc): address comments --- cdc/sinkv2/eventsink/event.go | 2 +- cdc/sinkv2/tablesink/event_table_sink.go | 2 +- cdc/sinkv2/tablesink/event_table_sink_test.go | 22 +++++++++++++------ 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/cdc/sinkv2/eventsink/event.go b/cdc/sinkv2/eventsink/event.go index 4cb9e096193..5d1f9b70281 100644 --- a/cdc/sinkv2/eventsink/event.go +++ b/cdc/sinkv2/eventsink/event.go @@ -32,7 +32,7 @@ type CallbackFunc func() type CallbackableEvent[E TableEvent] struct { Event E Callback CallbackFunc - TableStatus pipeline.TableState + TableStatus *pipeline.TableState } // RowChangeCallbackableEvent is the row change event which can be callbacked. diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go index c735b78b4c7..cf56fe36de2 100644 --- a/cdc/sinkv2/tablesink/event_table_sink.go +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -87,7 +87,7 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { Callback: func() { e.progressTracker.remove(eventID) }, - TableStatus: e.state, + TableStatus: &e.state, } resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce) e.progressTracker.addEvent(eventID) diff --git a/cdc/sinkv2/tablesink/event_table_sink_test.go b/cdc/sinkv2/tablesink/event_table_sink_test.go index 8650524729d..f2ad57401d9 100644 --- a/cdc/sinkv2/tablesink/event_table_sink_test.go +++ b/cdc/sinkv2/tablesink/event_table_sink_test.go @@ -48,18 +48,23 @@ func (m *mockEventSink) acknowledge(commitTs uint64) { return } ackedEvents := m.events[:i] - m.events = append( - make([]*eventsink.TxnCallbackableEvent, - 0, - len(m.events[i:])), - m.events[i:]..., - ) for _, event := range ackedEvents { if event.TableStatus.Load() != pipeline.TableStateStopped { event.Callback() + } else { + // If the table is stopped, the event should be ignored. + return } } + + // Remove the acked events from the event buffer. + m.events = append( + make([]*eventsink.TxnCallbackableEvent, + 0, + len(m.events[i:])), + m.events[i:]..., + ) } func getTestRows() []*model.RowChangedEvent { @@ -246,7 +251,10 @@ func TestClose(t *testing.T) { tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{}) tb.AppendRowChangedEvents(getTestRows()...) - tb.UpdateResolvedTs(model.NewResolvedTs(100)) + tb.UpdateResolvedTs(model.NewResolvedTs(105)) + require.Len(t, sink.events, 7, "all events should be flushed") tb.Close() require.Equal(t, pipeline.TableStateStopped, tb.state, "tableState should be closed") + sink.acknowledge(105) + require.Len(t, sink.events, 7, "no event should be acked") }