Skip to content

Commit

Permalink
sinkv2(ticdc): address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Jul 1, 2022
1 parent d78e217 commit a94d4c3
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type CallbackFunc func()
type CallbackableEvent[E TableEvent] struct {
Event E
Callback CallbackFunc
TableStatus pipeline.TableState
TableStatus *pipeline.TableState
}

// RowChangeCallbackableEvent is the row change event which can be callbacked.
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/tablesink/event_table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) {
Callback: func() {
e.progressTracker.remove(eventID)
},
TableStatus: e.state,
TableStatus: &e.state,
}
resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce)
e.progressTracker.addEvent(eventID)
Expand Down
22 changes: 15 additions & 7 deletions cdc/sinkv2/tablesink/event_table_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,23 @@ func (m *mockEventSink) acknowledge(commitTs uint64) {
return
}
ackedEvents := m.events[:i]
m.events = append(
make([]*eventsink.TxnCallbackableEvent,
0,
len(m.events[i:])),
m.events[i:]...,
)

for _, event := range ackedEvents {
if event.TableStatus.Load() != pipeline.TableStateStopped {
event.Callback()
} else {
// If the table is stopped, the event should be ignored.
return
}
}

// Remove the acked events from the event buffer.
m.events = append(
make([]*eventsink.TxnCallbackableEvent,
0,
len(m.events[i:])),
m.events[i:]...,
)
}

func getTestRows() []*model.RowChangedEvent {
Expand Down Expand Up @@ -246,7 +251,10 @@ func TestClose(t *testing.T) {
tb := New[*model.SingleTableTxn](sink, &eventsink.TxnEventAppender{})

tb.AppendRowChangedEvents(getTestRows()...)
tb.UpdateResolvedTs(model.NewResolvedTs(100))
tb.UpdateResolvedTs(model.NewResolvedTs(105))
require.Len(t, sink.events, 7, "all events should be flushed")
tb.Close()
require.Equal(t, pipeline.TableStateStopped, tb.state, "tableState should be closed")
sink.acknowledge(105)
require.Len(t, sink.events, 7, "no event should be acked")
}

0 comments on commit a94d4c3

Please sign in to comment.