From ad3b452de26b2d079931a2f8a25235324f7e416a Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 20 Jun 2022 15:11:31 +0800 Subject: [PATCH 1/9] sinkv2(ticdc): add interfaces --- cdc/sinkv2/ddlsink/ddl_sink.go | 33 +++++++ cdc/sinkv2/ddlsink/mq/.keep | 0 cdc/sinkv2/ddlsink/mysql/.keep | 0 cdc/sinkv2/roweventsink/mq/.keep | 0 cdc/sinkv2/roweventsink/row_event_sink.go | 39 ++++++++ cdc/sinkv2/tablesink/progress_tracker.go | 99 ++++++++++++++++++++ cdc/sinkv2/tablesink/row_event_table_sink.go | 83 ++++++++++++++++ cdc/sinkv2/tablesink/table_sink.go | 33 +++++++ cdc/sinkv2/tablesink/txn_event_table_sink.go | 84 +++++++++++++++++ cdc/sinkv2/txneventsink/mysql/.keep | 0 cdc/sinkv2/txneventsink/txn_event_sink.go | 39 ++++++++ go.mod | 1 + go.sum | 2 + 13 files changed, 413 insertions(+) create mode 100644 cdc/sinkv2/ddlsink/ddl_sink.go create mode 100644 cdc/sinkv2/ddlsink/mq/.keep create mode 100644 cdc/sinkv2/ddlsink/mysql/.keep create mode 100644 cdc/sinkv2/roweventsink/mq/.keep create mode 100644 cdc/sinkv2/roweventsink/row_event_sink.go create mode 100644 cdc/sinkv2/tablesink/progress_tracker.go create mode 100644 cdc/sinkv2/tablesink/row_event_table_sink.go create mode 100644 cdc/sinkv2/tablesink/table_sink.go create mode 100644 cdc/sinkv2/tablesink/txn_event_table_sink.go create mode 100644 cdc/sinkv2/txneventsink/mysql/.keep create mode 100644 cdc/sinkv2/txneventsink/txn_event_sink.go diff --git a/cdc/sinkv2/ddlsink/ddl_sink.go b/cdc/sinkv2/ddlsink/ddl_sink.go new file mode 100644 index 00000000000..2c724540ecc --- /dev/null +++ b/cdc/sinkv2/ddlsink/ddl_sink.go @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddlsink + +import ( + "context" + + "github.com/pingcap/tiflow/cdc/model" +) + +// DDLEventSink is the interface for sink of DDL events. +type DDLEventSink interface { + // WriteDDLEvent writes a DDL event to the sink. + // Note: This is a synchronous and thread-safe method. + WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error + // WriteCheckpointTs writes a checkpoint timestamp to the sink. + // Note: This is a synchronous and thread-safe method. + // This only for MQSink for now. + WriteCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error + // Close closes the sink. + Close() error +} diff --git a/cdc/sinkv2/ddlsink/mq/.keep b/cdc/sinkv2/ddlsink/mq/.keep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/cdc/sinkv2/ddlsink/mysql/.keep b/cdc/sinkv2/ddlsink/mysql/.keep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/cdc/sinkv2/roweventsink/mq/.keep b/cdc/sinkv2/roweventsink/mq/.keep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/cdc/sinkv2/roweventsink/row_event_sink.go b/cdc/sinkv2/roweventsink/row_event_sink.go new file mode 100644 index 00000000000..5e4d1aa901b --- /dev/null +++ b/cdc/sinkv2/roweventsink/row_event_sink.go @@ -0,0 +1,39 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package roweventsink + +import ( + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/atomic" +) + +// RowEvent represents a row event with callbacks. +// In addition, it contains the state of the table. +// When we process row events, TableStopped is used to +// determine if we really need to process the event. +type RowEvent struct { + Row *model.RowChangedEvent + Callback func() + TableStatus *atomic.Uint32 +} + +// RowEventSink is a sink that processes row events. +// Usually, it is a MQ sink or S3 sink(not implemented). +type RowEventSink interface { + // WriteRowChangedEvents writes row changed events to the sink. + // Note: This is an asynchronous and thread-safe method. + WriteRowChangedEvents(rows ...*RowEvent) + // Close closes the sink. + Close() error +} diff --git a/cdc/sinkv2/tablesink/progress_tracker.go b/cdc/sinkv2/tablesink/progress_tracker.go new file mode 100644 index 00000000000..c4ccc337fb9 --- /dev/null +++ b/cdc/sinkv2/tablesink/progress_tracker.go @@ -0,0 +1,99 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tablesink + +import ( + "sync" + + "github.com/emirpasic/gods/maps/linkedhashmap" + "github.com/pingcap/tiflow/cdc/model" +) + +// progressTracker is used to track the progress of the table sink. +type progressTracker struct { + // This lock for both pendingEventAndResolvedTs and lastMinCommitTs. + lock sync.Mutex + // pendingEventAndResolvedTs is used to store the pending events and resolved tss. + // The key is the key of the event or the resolved ts. + // The value is nil or the resolved ts. + // Since the data in TableSink is sequential, + // we only need to maintain an insertion order. + pendingEventAndResolvedTs *linkedhashmap.Map + // lastMinCommitTs is used to store the last min commits ts. + // It is used to indicate the progress of the table sink. + lastMinCommitTs model.ResolvedTs +} + +// newProgressTracker is used to create a new progress tracker. +// The last min commit ts is set to 0. +// It means that the table sink has not started yet. +// nolint:deadcode +func newProgressTracker() *progressTracker { + return &progressTracker{ + pendingEventAndResolvedTs: linkedhashmap.New(), + // It means the start of the table. + lastMinCommitTs: model.NewResolvedTs(0), + } +} + +// addEvent is used to add the pending event. +func (r *progressTracker) addEvent(key uint64) { + r.lock.Lock() + defer r.lock.Unlock() + r.pendingEventAndResolvedTs.Put(key, nil) +} + +// addResolvedTs is used to add the pending resolved ts. +func (r *progressTracker) addResolvedTs(key uint64, resolvedTs model.ResolvedTs) { + r.lock.Lock() + defer r.lock.Unlock() + // If no pending event and resolved ts, + // we can directly advance the progress. + if r.pendingEventAndResolvedTs.Empty() { + r.lastMinCommitTs = resolvedTs + } + r.pendingEventAndResolvedTs.Put(key, resolvedTs) +} + +// remove is used to remove the pending resolved ts. +// If we are deleting the smallest row or txn, +// that means we can advance the progress, +// and we will update lastMinCommitTs. +func (r *progressTracker) remove(key uint64) { + r.lock.Lock() + defer r.lock.Unlock() + r.pendingEventAndResolvedTs.Remove(key) + iterator := r.pendingEventAndResolvedTs.Iterator() + // No need to update lastMinCommitTs + // if there is no pending event and resolved ts. + if !iterator.First() { + return + } + + // If the first element is resolved ts, + // it means we can advance the progress. + if iterator.Value() != nil { + r.lastMinCommitTs = iterator.Value().(model.ResolvedTs) + } +} + +// minTs returns the last min resolved ts. +// This means that all data prior to this point has already been processed. +// It can be considered as CheckpointTs of the table. +func (r *progressTracker) minTs() model.ResolvedTs { + r.lock.Lock() + defer r.lock.Unlock() + + return r.lastMinCommitTs +} diff --git a/cdc/sinkv2/tablesink/row_event_table_sink.go b/cdc/sinkv2/tablesink/row_event_table_sink.go new file mode 100644 index 00000000000..bc9b8b04501 --- /dev/null +++ b/cdc/sinkv2/tablesink/row_event_table_sink.go @@ -0,0 +1,83 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tablesink + +import ( + "sort" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sinkv2/roweventsink" + "go.uber.org/atomic" +) + +// Assert TableSink implementation +var _ TableSink = (*rowEventTableSink)(nil) + +type rowEventTableSink struct { + rowID uint64 + maxResolvedTs model.ResolvedTs + backendSink roweventsink.RowEventSink + rowEventProgressTracker *progressTracker + // NOTICE: It is ordered by commitTs. + rowBuffer []*model.RowChangedEvent + TableStatus *atomic.Uint32 +} + +func (r *rowEventTableSink) AppendRowChangedEvents(rows ...*model.RowChangedEvent) { + r.rowBuffer = append(r.rowBuffer, rows...) +} + +func (r *rowEventTableSink) UpdateResolvedTs(resolvedTs model.ResolvedTs) { + // If resolvedTs is not greater than maxResolvedTs, + // the flush is unnecessary. + if !r.maxResolvedTs.Less(resolvedTs) { + return + } + r.maxResolvedTs = resolvedTs + + i := sort.Search(len(r.rowBuffer), func(i int) bool { + return r.rowBuffer[i].CommitTs > resolvedTs.Ts + }) + if i == 0 { + return + } + resolvedRows := r.rowBuffer[:i] + + resolvedRowEvents := make([]*roweventsink.RowEvent, 0, len(resolvedRows)) + for _, row := range resolvedRows { + rowEvent := &roweventsink.RowEvent{ + Row: row, + Callback: func() { + r.rowEventProgressTracker.remove(r.rowID) + }, + TableStatus: r.TableStatus, + } + resolvedRowEvents = append(resolvedRowEvents, rowEvent) + r.rowEventProgressTracker.addEvent(r.rowID) + r.rowID++ + } + r.rowEventProgressTracker.addResolvedTs(r.rowID, resolvedTs) + r.rowID++ + + r.backendSink.WriteRowChangedEvents(resolvedRowEvents...) +} + +func (r *rowEventTableSink) GetCheckpointTs() model.ResolvedTs { + return r.rowEventProgressTracker.minTs() +} + +func (r *rowEventTableSink) Close() { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/tablesink/table_sink.go b/cdc/sinkv2/tablesink/table_sink.go new file mode 100644 index 00000000000..53d3397d8d6 --- /dev/null +++ b/cdc/sinkv2/tablesink/table_sink.go @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tablesink + +import "github.com/pingcap/tiflow/cdc/model" + +// TableSink is the interface for table sink. +// It is used to sink data in table units. +type TableSink interface { + // AppendRowChangedEvents appends row changed events to the table sink. + // Usually, it is used to cache the row changed events into table sink. + AppendRowChangedEvents(rows ...*model.RowChangedEvent) + // UpdateResolvedTs writes the buffered row changed events to the TxnEventSink/RowEventSink. + // Note: This is an asynchronous method. + UpdateResolvedTs(resolvedTs model.ResolvedTs) + // GetCheckpointTs returns the current checkpoint ts of table sink. + // Usually, it requires some computational work. + // For example, calculating the current progress from the statistics of the table sink. + GetCheckpointTs() model.ResolvedTs + // Close closes the table sink. + Close() +} diff --git a/cdc/sinkv2/tablesink/txn_event_table_sink.go b/cdc/sinkv2/tablesink/txn_event_table_sink.go new file mode 100644 index 00000000000..fb66521718e --- /dev/null +++ b/cdc/sinkv2/tablesink/txn_event_table_sink.go @@ -0,0 +1,84 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tablesink + +import ( + "sort" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sinkv2/txneventsink" + "go.uber.org/atomic" +) + +// Assert TableSink implementation +var _ TableSink = (*txnEventTableSink)(nil) + +type txnEventTableSink struct { + txnID uint64 + maxResolvedTs model.ResolvedTs + backendSink txneventsink.TxnEventSink + txnEventProgressTracker *progressTracker + // NOTICE: It is ordered by commitTs. + txnBuffer []*model.SingleTableTxn + TableStatus *atomic.Uint32 +} + +func (t *txnEventTableSink) AppendRowChangedEvents(rows ...*model.RowChangedEvent) { + // TODO implement me + // Assemble each txn with the same startTs and the same commitTs in the order of commitTs. + panic("implement me") +} + +func (t *txnEventTableSink) UpdateResolvedTs(resolvedTs model.ResolvedTs) { + // If resolvedTs is not greater than maxResolvedTs, + // the flush is unnecessary. + if !t.maxResolvedTs.Less(resolvedTs) { + return + } + t.maxResolvedTs = resolvedTs + + i := sort.Search(len(t.txnBuffer), func(i int) bool { + return t.txnBuffer[i].CommitTs > resolvedTs.Ts + }) + if i == 0 { + return + } + resolvedTxns := t.txnBuffer[:i] + + resolvedTxnEvents := make([]*txneventsink.TxnEvent, 0, len(resolvedTxns)) + for _, txn := range resolvedTxns { + txnEvent := &txneventsink.TxnEvent{ + Txn: txn, + Callback: func() { + t.txnEventProgressTracker.remove(t.txnID) + }, + TableStatus: t.TableStatus, + } + resolvedTxnEvents = append(resolvedTxnEvents, txnEvent) + t.txnEventProgressTracker.addEvent(t.txnID) + t.txnID++ + } + t.txnEventProgressTracker.addResolvedTs(t.txnID, resolvedTs) + t.txnID++ + t.backendSink.WriteTxnEvents(resolvedTxnEvents...) +} + +func (t *txnEventTableSink) GetCheckpointTs() model.ResolvedTs { + return t.txnEventProgressTracker.minTs() +} + +func (t *txnEventTableSink) Close() { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/txneventsink/mysql/.keep b/cdc/sinkv2/txneventsink/mysql/.keep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/cdc/sinkv2/txneventsink/txn_event_sink.go b/cdc/sinkv2/txneventsink/txn_event_sink.go new file mode 100644 index 00000000000..55a983ec38f --- /dev/null +++ b/cdc/sinkv2/txneventsink/txn_event_sink.go @@ -0,0 +1,39 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txneventsink + +import ( + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/atomic" +) + +// TxnEvent represents a transaction event with callbacks. +// In addition, it contains the state of the table. +// When we process row events, TableStopped is used to +// determine if we really need to process the event. +type TxnEvent struct { + Txn *model.SingleTableTxn + Callback func() + TableStatus *atomic.Uint32 +} + +// TxnEventSink is a sink that processes transaction events. +// Usually, it is a MySQL sink. +type TxnEventSink interface { + // WriteTxnEvents writes transaction events to the sink. + // Note: This is an asynchronous and thread-safe method. + WriteTxnEvents(txns ...*TxnEvent) + // Close closes the sink. + Close() error +} diff --git a/go.mod b/go.mod index e7a8dca0f3e..785de7672ef 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/docker/go-units v0.4.0 github.com/dustin/go-humanize v1.0.0 github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 + github.com/emirpasic/gods v1.18.1 github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 github.com/fatih/color v1.13.0 github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 diff --git a/go.sum b/go.sum index c5e1952d0b1..1cbe5f50c4d 100644 --- a/go.sum +++ b/go.sum @@ -296,6 +296,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 h1:8i9x3Q4hW1kLE4ScsOtUlwVHT76LKhkmOw9zbDxnyUc= github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17/go.mod h1:3Ys1pJhyVaB6iWigv4o2r6Ug1GZmfDWqvqmO6bjojg0= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= From b4bca8ffba254c2770c1dd712d82609c684f5675 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 23 Jun 2022 23:06:29 +0800 Subject: [PATCH 2/9] sinkv2(ticdc): add interfaces [generic version] --- cdc/model/sink.go | 10 +++ cdc/sinkv2/eventsink/event_sink.go | 25 ++++++ cdc/sinkv2/eventsink/mq/mq_sink.go | 38 ++++++++ cdc/sinkv2/eventsink/txn/txn_sink.go | 38 ++++++++ cdc/sinkv2/roweventsink/mq/.keep | 0 cdc/sinkv2/roweventsink/row_event_sink.go | 39 -------- cdc/sinkv2/tableevent/event.go | 48 ++++++++++ .../tableevent/row_change_event_appender.go | 30 +++++++ cdc/sinkv2/tableevent/txn_event_appender.go | 31 +++++++ cdc/sinkv2/tablesink/event_table_sink.go | 89 +++++++++++++++++++ cdc/sinkv2/tablesink/progress_tracker.go | 39 +++++--- cdc/sinkv2/tablesink/row_event_table_sink.go | 83 ----------------- cdc/sinkv2/tablesink/table_sink.go | 8 +- cdc/sinkv2/tablesink/txn_event_table_sink.go | 84 ----------------- cdc/sinkv2/txneventsink/mysql/.keep | 0 cdc/sinkv2/txneventsink/txn_event_sink.go | 39 -------- 16 files changed, 339 insertions(+), 262 deletions(-) create mode 100644 cdc/sinkv2/eventsink/event_sink.go create mode 100644 cdc/sinkv2/eventsink/mq/mq_sink.go create mode 100644 cdc/sinkv2/eventsink/txn/txn_sink.go delete mode 100644 cdc/sinkv2/roweventsink/mq/.keep delete mode 100644 cdc/sinkv2/roweventsink/row_event_sink.go create mode 100644 cdc/sinkv2/tableevent/event.go create mode 100644 cdc/sinkv2/tableevent/row_change_event_appender.go create mode 100644 cdc/sinkv2/tableevent/txn_event_appender.go create mode 100644 cdc/sinkv2/tablesink/event_table_sink.go delete mode 100644 cdc/sinkv2/tablesink/row_event_table_sink.go delete mode 100644 cdc/sinkv2/tablesink/txn_event_table_sink.go delete mode 100644 cdc/sinkv2/txneventsink/mysql/.keep delete mode 100644 cdc/sinkv2/txneventsink/txn_event_sink.go diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 375c1685f9b..368a82f64a1 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -272,6 +272,11 @@ type RowChangedEvent struct { SplitTxn bool `json:"-" msg:"-"` } +// GetCommitTs returns the commit timestamp of this event. +func (r *RowChangedEvent) GetCommitTs() uint64 { + return r.CommitTs +} + // IsDelete returns true if the row is a delete event func (r *RowChangedEvent) IsDelete() bool { return len(r.PreColumns) != 0 && len(r.Columns) == 0 @@ -611,6 +616,11 @@ type SingleTableTxn struct { FinishWg *sync.WaitGroup } +// GetCommitTs returns the commit timestamp of the transaction. +func (t *SingleTableTxn) GetCommitTs() uint64 { + return t.CommitTs +} + // Append adds a row changed event into SingleTableTxn func (t *SingleTableTxn) Append(row *RowChangedEvent) { if row.StartTs != t.StartTs || row.CommitTs != t.CommitTs || row.Table.TableID != t.Table.TableID { diff --git a/cdc/sinkv2/eventsink/event_sink.go b/cdc/sinkv2/eventsink/event_sink.go new file mode 100644 index 00000000000..80a8dee993b --- /dev/null +++ b/cdc/sinkv2/eventsink/event_sink.go @@ -0,0 +1,25 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventsink + +import "github.com/pingcap/tiflow/cdc/sinkv2/tableevent" + +// EventSink is the interface for event sink. +type EventSink[E tableevent.TableEvent] interface { + // WriteEvents writes events to the sink. + // This is an asynchronously and thread-safe method. + WriteEvents(rows ...*tableevent.CallbackableEvent[E]) + // Close closes the sink. + Close() error +} diff --git a/cdc/sinkv2/eventsink/mq/mq_sink.go b/cdc/sinkv2/eventsink/mq/mq_sink.go new file mode 100644 index 00000000000..2633d7b699b --- /dev/null +++ b/cdc/sinkv2/eventsink/mq/mq_sink.go @@ -0,0 +1,38 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" + "github.com/pingcap/tiflow/cdc/sinkv2/tableevent" +) + +// Assert EventSink[E event.TableEvent] implementation +var _ eventsink.EventSink[*model.RowChangedEvent] = (*Sink)(nil) + +// Sink is the mq sink. +type Sink struct{} + +// WriteEvents writes events to the sink. +func (s *Sink) WriteEvents(rows ...*tableevent.RowChangeCallbackableEvent) { + // TODO implement me + panic("implement me") +} + +// Close closes the sink. +func (s *Sink) Close() error { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/eventsink/txn/txn_sink.go b/cdc/sinkv2/eventsink/txn/txn_sink.go new file mode 100644 index 00000000000..e76c4604477 --- /dev/null +++ b/cdc/sinkv2/eventsink/txn/txn_sink.go @@ -0,0 +1,38 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txn + +import ( + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" + "github.com/pingcap/tiflow/cdc/sinkv2/tableevent" +) + +// Assert EventSink[E event.TableEvent] implementation +var _ eventsink.EventSink[*model.SingleTableTxn] = (*Sink)(nil) + +// Sink is the sink for SingleTableTxn. +type Sink struct{} + +// WriteEvents writes events to the sink. +func (s *Sink) WriteEvents(rows ...*tableevent.TxnCallbackableEvent) { + // TODO implement me + panic("implement me") +} + +// Close closes the sink. +func (s *Sink) Close() error { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/roweventsink/mq/.keep b/cdc/sinkv2/roweventsink/mq/.keep deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/cdc/sinkv2/roweventsink/row_event_sink.go b/cdc/sinkv2/roweventsink/row_event_sink.go deleted file mode 100644 index 5e4d1aa901b..00000000000 --- a/cdc/sinkv2/roweventsink/row_event_sink.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package roweventsink - -import ( - "github.com/pingcap/tiflow/cdc/model" - "go.uber.org/atomic" -) - -// RowEvent represents a row event with callbacks. -// In addition, it contains the state of the table. -// When we process row events, TableStopped is used to -// determine if we really need to process the event. -type RowEvent struct { - Row *model.RowChangedEvent - Callback func() - TableStatus *atomic.Uint32 -} - -// RowEventSink is a sink that processes row events. -// Usually, it is a MQ sink or S3 sink(not implemented). -type RowEventSink interface { - // WriteRowChangedEvents writes row changed events to the sink. - // Note: This is an asynchronous and thread-safe method. - WriteRowChangedEvents(rows ...*RowEvent) - // Close closes the sink. - Close() error -} diff --git a/cdc/sinkv2/tableevent/event.go b/cdc/sinkv2/tableevent/event.go new file mode 100644 index 00000000000..e87f53c418e --- /dev/null +++ b/cdc/sinkv2/tableevent/event.go @@ -0,0 +1,48 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tableevent + +import ( + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/atomic" +) + +// TableEvent is the interface for events which can be written to sink by TableSink. +type TableEvent interface { + // GetCommitTs returns the commit timestamp of the event. + GetCommitTs() uint64 +} + +// CallbackFunc is the callback function for callbackable event. +type CallbackFunc func() + +// CallbackableEvent means the event can be callbacked. +// It also contains the table status. +type CallbackableEvent[E TableEvent] struct { + Event E + Callback CallbackFunc + TableStatus *atomic.Uint32 +} + +// RowChangeCallbackableEvent is the row change event which can be callbacked. +type RowChangeCallbackableEvent = CallbackableEvent[*model.RowChangedEvent] + +// TxnCallbackableEvent is the txn event which can be callbacked. +type TxnCallbackableEvent = CallbackableEvent[*model.SingleTableTxn] + +// Appender is the interface for appending events to buffer. +type Appender[E TableEvent] interface { + // Append appends the event to buffer. + Append(buffer []E, rows ...*model.RowChangedEvent) []E +} diff --git a/cdc/sinkv2/tableevent/row_change_event_appender.go b/cdc/sinkv2/tableevent/row_change_event_appender.go new file mode 100644 index 00000000000..df36ddcd2f1 --- /dev/null +++ b/cdc/sinkv2/tableevent/row_change_event_appender.go @@ -0,0 +1,30 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tableevent + +import "github.com/pingcap/tiflow/cdc/model" + +// Assert Appender[E TableEvent] implementation +var _ Appender[*model.RowChangedEvent] = (*RowChangeEventAppender)(nil) + +// RowChangeEventAppender is the builder for RowChangedEvent. +type RowChangeEventAppender struct{} + +// Append appends the given rows to the given buffer. +func (r *RowChangeEventAppender) Append( + buffer []*model.RowChangedEvent, + rows ...*model.RowChangedEvent, +) []*model.RowChangedEvent { + return append(buffer, rows...) +} diff --git a/cdc/sinkv2/tableevent/txn_event_appender.go b/cdc/sinkv2/tableevent/txn_event_appender.go new file mode 100644 index 00000000000..0042473f761 --- /dev/null +++ b/cdc/sinkv2/tableevent/txn_event_appender.go @@ -0,0 +1,31 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tableevent + +import "github.com/pingcap/tiflow/cdc/model" + +// Assert Appender[E TableEvent] implementation +var _ Appender[*model.SingleTableTxn] = (*TxnEventAppender)(nil) + +// TxnEventAppender is the appender for SingleTableTxn. +type TxnEventAppender struct{} + +// Append appends the given rows to the given txn buffer. +func (t *TxnEventAppender) Append( + buffer []*model.SingleTableTxn, + rows ...*model.RowChangedEvent, +) []*model.SingleTableTxn { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go new file mode 100644 index 00000000000..64302c9d1f3 --- /dev/null +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -0,0 +1,89 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tablesink + +import ( + "sort" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" + "github.com/pingcap/tiflow/cdc/sinkv2/tableevent" + "go.uber.org/atomic" +) + +// Assert TableSink implementation +var _ TableSink = (*eventTableSink[*model.RowChangedEvent])(nil) +var _ TableSink = (*eventTableSink[*model.SingleTableTxn])(nil) + +type eventTableSink[E tableevent.TableEvent] struct { + eventID uint64 + maxResolvedTs model.ResolvedTs + backendSink eventsink.EventSink[E] + progressTracker *progressTracker + eventAppender tableevent.Appender[E] + // NOTICE: It is ordered by commitTs. + eventBuffer []E + TableStatus *atomic.Uint32 +} + +func (e *eventTableSink[E]) AppendRowChangedEvents(rows ...*model.RowChangedEvent) { + e.eventAppender.Append(e.eventBuffer, rows...) +} + +func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { + // If resolvedTs is not greater than maxResolvedTs, + // the flush is unnecessary. + if !e.maxResolvedTs.Less(resolvedTs) { + return + } + e.maxResolvedTs = resolvedTs + + i := sort.Search(len(e.eventBuffer), func(i int) bool { + return e.eventBuffer[i].GetCommitTs() > resolvedTs.Ts + }) + // Despite the lack of data, we have to move forward with progress. + if i == 0 { + e.progressTracker.addResolvedTs(e.eventID, resolvedTs) + e.eventID++ + return + } + resolvedEvents := e.eventBuffer[:i] + + resolvedTxnEvents := make([]*tableevent.CallbackableEvent[E], 0, len(resolvedEvents)) + for _, ev := range resolvedEvents { + txnEvent := &tableevent.CallbackableEvent[E]{ + Event: ev, + Callback: func() { + e.progressTracker.remove(e.eventID) + }, + TableStatus: e.TableStatus, + } + resolvedTxnEvents = append(resolvedTxnEvents, txnEvent) + e.progressTracker.addEvent(e.eventID) + e.eventID++ + } + // Do not forget to add the resolvedTs to progressTracker. + e.progressTracker.addResolvedTs(e.eventID, resolvedTs) + e.eventID++ + e.backendSink.WriteEvents(resolvedTxnEvents...) +} + +func (e *eventTableSink[E]) GetCheckpointTs() model.ResolvedTs { + return e.progressTracker.minTs() +} + +func (e *eventTableSink[E]) Close() { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/tablesink/progress_tracker.go b/cdc/sinkv2/tablesink/progress_tracker.go index c4ccc337fb9..fe85e602e21 100644 --- a/cdc/sinkv2/tablesink/progress_tracker.go +++ b/cdc/sinkv2/tablesink/progress_tracker.go @@ -21,33 +21,41 @@ import ( ) // progressTracker is used to track the progress of the table sink. +// For example, +// We have txn1, txn2, resolvedTs2, txn3-1, txn3-2, resolvedTs3, resolvedTs4, resolvedTs5. +// txn3-1 and txn3-2 are in the same big txn. +// First txn1 and txn2 are written, then the progress can be updated to resolvedTs2. +// Then txn3-1 and txn3-2 are written, then the progress can be updated to resolvedTs3. +// Next, since no data is being written, we can update to resolvedTs5 in order. type progressTracker struct { - // This lock for both pendingEventAndResolvedTs and lastMinCommitTs. + // This lock for both pendingEventAndResolvedTs and lastMinResolvedTs. lock sync.Mutex - // pendingEventAndResolvedTs is used to store the pending events and resolved tss. + // pendingEventAndResolvedTs is used to store the pending event keys and resolved tss. // The key is the key of the event or the resolved ts. - // The value is nil or the resolved ts. + // The value is nil or the resolved ts. **nil for event**. // Since the data in TableSink is sequential, // we only need to maintain an insertion order. pendingEventAndResolvedTs *linkedhashmap.Map - // lastMinCommitTs is used to store the last min commits ts. + // lastMinResolvedTs is used to store the last min resolved ts. // It is used to indicate the progress of the table sink. - lastMinCommitTs model.ResolvedTs + lastMinResolvedTs model.ResolvedTs } // newProgressTracker is used to create a new progress tracker. -// The last min commit ts is set to 0. +// The last min resolved ts is set to 0. // It means that the table sink has not started yet. // nolint:deadcode func newProgressTracker() *progressTracker { return &progressTracker{ pendingEventAndResolvedTs: linkedhashmap.New(), // It means the start of the table. - lastMinCommitTs: model.NewResolvedTs(0), + // It's Ok to use 0 here. + // Because sink node only update the checkpoint when it's growing. + lastMinResolvedTs: model.NewResolvedTs(0), } } -// addEvent is used to add the pending event. +// addEvent is used to add the pending event key. func (r *progressTracker) addEvent(key uint64) { r.lock.Lock() defer r.lock.Unlock() @@ -61,21 +69,22 @@ func (r *progressTracker) addResolvedTs(key uint64, resolvedTs model.ResolvedTs) // If no pending event and resolved ts, // we can directly advance the progress. if r.pendingEventAndResolvedTs.Empty() { - r.lastMinCommitTs = resolvedTs + r.lastMinResolvedTs = resolvedTs + return } r.pendingEventAndResolvedTs.Put(key, resolvedTs) } // remove is used to remove the pending resolved ts. -// If we are deleting the smallest row or txn, +// If we are deleting the last value before resolved ts, // that means we can advance the progress, -// and we will update lastMinCommitTs. +// and we will update lastMinResolvedTs. func (r *progressTracker) remove(key uint64) { r.lock.Lock() defer r.lock.Unlock() r.pendingEventAndResolvedTs.Remove(key) iterator := r.pendingEventAndResolvedTs.Iterator() - // No need to update lastMinCommitTs + // No need to update lastMinResolvedTs // if there is no pending event and resolved ts. if !iterator.First() { return @@ -84,7 +93,9 @@ func (r *progressTracker) remove(key uint64) { // If the first element is resolved ts, // it means we can advance the progress. if iterator.Value() != nil { - r.lastMinCommitTs = iterator.Value().(model.ResolvedTs) + r.lastMinResolvedTs = iterator.Value().(model.ResolvedTs) + // Do not forget to remove the resolved ts. + r.pendingEventAndResolvedTs.Remove(iterator.Key()) } } @@ -95,5 +106,5 @@ func (r *progressTracker) minTs() model.ResolvedTs { r.lock.Lock() defer r.lock.Unlock() - return r.lastMinCommitTs + return r.lastMinResolvedTs } diff --git a/cdc/sinkv2/tablesink/row_event_table_sink.go b/cdc/sinkv2/tablesink/row_event_table_sink.go deleted file mode 100644 index bc9b8b04501..00000000000 --- a/cdc/sinkv2/tablesink/row_event_table_sink.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tablesink - -import ( - "sort" - - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sinkv2/roweventsink" - "go.uber.org/atomic" -) - -// Assert TableSink implementation -var _ TableSink = (*rowEventTableSink)(nil) - -type rowEventTableSink struct { - rowID uint64 - maxResolvedTs model.ResolvedTs - backendSink roweventsink.RowEventSink - rowEventProgressTracker *progressTracker - // NOTICE: It is ordered by commitTs. - rowBuffer []*model.RowChangedEvent - TableStatus *atomic.Uint32 -} - -func (r *rowEventTableSink) AppendRowChangedEvents(rows ...*model.RowChangedEvent) { - r.rowBuffer = append(r.rowBuffer, rows...) -} - -func (r *rowEventTableSink) UpdateResolvedTs(resolvedTs model.ResolvedTs) { - // If resolvedTs is not greater than maxResolvedTs, - // the flush is unnecessary. - if !r.maxResolvedTs.Less(resolvedTs) { - return - } - r.maxResolvedTs = resolvedTs - - i := sort.Search(len(r.rowBuffer), func(i int) bool { - return r.rowBuffer[i].CommitTs > resolvedTs.Ts - }) - if i == 0 { - return - } - resolvedRows := r.rowBuffer[:i] - - resolvedRowEvents := make([]*roweventsink.RowEvent, 0, len(resolvedRows)) - for _, row := range resolvedRows { - rowEvent := &roweventsink.RowEvent{ - Row: row, - Callback: func() { - r.rowEventProgressTracker.remove(r.rowID) - }, - TableStatus: r.TableStatus, - } - resolvedRowEvents = append(resolvedRowEvents, rowEvent) - r.rowEventProgressTracker.addEvent(r.rowID) - r.rowID++ - } - r.rowEventProgressTracker.addResolvedTs(r.rowID, resolvedTs) - r.rowID++ - - r.backendSink.WriteRowChangedEvents(resolvedRowEvents...) -} - -func (r *rowEventTableSink) GetCheckpointTs() model.ResolvedTs { - return r.rowEventProgressTracker.minTs() -} - -func (r *rowEventTableSink) Close() { - // TODO implement me - panic("implement me") -} diff --git a/cdc/sinkv2/tablesink/table_sink.go b/cdc/sinkv2/tablesink/table_sink.go index 53d3397d8d6..502632a2697 100644 --- a/cdc/sinkv2/tablesink/table_sink.go +++ b/cdc/sinkv2/tablesink/table_sink.go @@ -20,13 +20,15 @@ import "github.com/pingcap/tiflow/cdc/model" type TableSink interface { // AppendRowChangedEvents appends row changed events to the table sink. // Usually, it is used to cache the row changed events into table sink. + // This is a not thread-safe method. Please do not call it concurrently. AppendRowChangedEvents(rows ...*model.RowChangedEvent) - // UpdateResolvedTs writes the buffered row changed events to the TxnEventSink/RowEventSink. - // Note: This is an asynchronous method. + // UpdateResolvedTs writes the buffered row changed events to the eventTableSink. + // Note: This is an asynchronous and not thread-safe method. + // Please do not call it concurrently. UpdateResolvedTs(resolvedTs model.ResolvedTs) // GetCheckpointTs returns the current checkpoint ts of table sink. - // Usually, it requires some computational work. // For example, calculating the current progress from the statistics of the table sink. + // This is a thread-safe method. GetCheckpointTs() model.ResolvedTs // Close closes the table sink. Close() diff --git a/cdc/sinkv2/tablesink/txn_event_table_sink.go b/cdc/sinkv2/tablesink/txn_event_table_sink.go deleted file mode 100644 index fb66521718e..00000000000 --- a/cdc/sinkv2/tablesink/txn_event_table_sink.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package tablesink - -import ( - "sort" - - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sinkv2/txneventsink" - "go.uber.org/atomic" -) - -// Assert TableSink implementation -var _ TableSink = (*txnEventTableSink)(nil) - -type txnEventTableSink struct { - txnID uint64 - maxResolvedTs model.ResolvedTs - backendSink txneventsink.TxnEventSink - txnEventProgressTracker *progressTracker - // NOTICE: It is ordered by commitTs. - txnBuffer []*model.SingleTableTxn - TableStatus *atomic.Uint32 -} - -func (t *txnEventTableSink) AppendRowChangedEvents(rows ...*model.RowChangedEvent) { - // TODO implement me - // Assemble each txn with the same startTs and the same commitTs in the order of commitTs. - panic("implement me") -} - -func (t *txnEventTableSink) UpdateResolvedTs(resolvedTs model.ResolvedTs) { - // If resolvedTs is not greater than maxResolvedTs, - // the flush is unnecessary. - if !t.maxResolvedTs.Less(resolvedTs) { - return - } - t.maxResolvedTs = resolvedTs - - i := sort.Search(len(t.txnBuffer), func(i int) bool { - return t.txnBuffer[i].CommitTs > resolvedTs.Ts - }) - if i == 0 { - return - } - resolvedTxns := t.txnBuffer[:i] - - resolvedTxnEvents := make([]*txneventsink.TxnEvent, 0, len(resolvedTxns)) - for _, txn := range resolvedTxns { - txnEvent := &txneventsink.TxnEvent{ - Txn: txn, - Callback: func() { - t.txnEventProgressTracker.remove(t.txnID) - }, - TableStatus: t.TableStatus, - } - resolvedTxnEvents = append(resolvedTxnEvents, txnEvent) - t.txnEventProgressTracker.addEvent(t.txnID) - t.txnID++ - } - t.txnEventProgressTracker.addResolvedTs(t.txnID, resolvedTs) - t.txnID++ - t.backendSink.WriteTxnEvents(resolvedTxnEvents...) -} - -func (t *txnEventTableSink) GetCheckpointTs() model.ResolvedTs { - return t.txnEventProgressTracker.minTs() -} - -func (t *txnEventTableSink) Close() { - // TODO implement me - panic("implement me") -} diff --git a/cdc/sinkv2/txneventsink/mysql/.keep b/cdc/sinkv2/txneventsink/mysql/.keep deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/cdc/sinkv2/txneventsink/txn_event_sink.go b/cdc/sinkv2/txneventsink/txn_event_sink.go deleted file mode 100644 index 55a983ec38f..00000000000 --- a/cdc/sinkv2/txneventsink/txn_event_sink.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package txneventsink - -import ( - "github.com/pingcap/tiflow/cdc/model" - "go.uber.org/atomic" -) - -// TxnEvent represents a transaction event with callbacks. -// In addition, it contains the state of the table. -// When we process row events, TableStopped is used to -// determine if we really need to process the event. -type TxnEvent struct { - Txn *model.SingleTableTxn - Callback func() - TableStatus *atomic.Uint32 -} - -// TxnEventSink is a sink that processes transaction events. -// Usually, it is a MySQL sink. -type TxnEventSink interface { - // WriteTxnEvents writes transaction events to the sink. - // Note: This is an asynchronous and thread-safe method. - WriteTxnEvents(txns ...*TxnEvent) - // Close closes the sink. - Close() error -} From 914eda99b2105a8b859342410c1f28782e2be471 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Fri, 24 Jun 2022 12:09:49 +0800 Subject: [PATCH 3/9] sinkv2(ticdc): fix append --- cdc/sinkv2/tablesink/event_table_sink.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go index 64302c9d1f3..81138460b97 100644 --- a/cdc/sinkv2/tablesink/event_table_sink.go +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -38,7 +38,7 @@ type eventTableSink[E tableevent.TableEvent] struct { } func (e *eventTableSink[E]) AppendRowChangedEvents(rows ...*model.RowChangedEvent) { - e.eventAppender.Append(e.eventBuffer, rows...) + e.eventBuffer = e.eventAppender.Append(e.eventBuffer, rows...) } func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { From f10944946a0498c50ca60fd436f074e27488ad05 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Sun, 26 Jun 2022 16:34:55 +0800 Subject: [PATCH 4/9] sinkv2(ticdc): add status and close it --- cdc/sinkv2/tableevent/event.go | 4 ++-- cdc/sinkv2/tablesink/event_table_sink.go | 9 ++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cdc/sinkv2/tableevent/event.go b/cdc/sinkv2/tableevent/event.go index e87f53c418e..168ca6d4b4e 100644 --- a/cdc/sinkv2/tableevent/event.go +++ b/cdc/sinkv2/tableevent/event.go @@ -15,7 +15,7 @@ package tableevent import ( "github.com/pingcap/tiflow/cdc/model" - "go.uber.org/atomic" + "github.com/pingcap/tiflow/cdc/processor/pipeline" ) // TableEvent is the interface for events which can be written to sink by TableSink. @@ -32,7 +32,7 @@ type CallbackFunc func() type CallbackableEvent[E TableEvent] struct { Event E Callback CallbackFunc - TableStatus *atomic.Uint32 + TableStatus *pipeline.TableState } // RowChangeCallbackableEvent is the row change event which can be callbacked. diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go index 81138460b97..599884eb0cb 100644 --- a/cdc/sinkv2/tablesink/event_table_sink.go +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -17,9 +17,9 @@ import ( "sort" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/tableevent" - "go.uber.org/atomic" ) // Assert TableSink implementation @@ -34,7 +34,7 @@ type eventTableSink[E tableevent.TableEvent] struct { eventAppender tableevent.Appender[E] // NOTICE: It is ordered by commitTs. eventBuffer []E - TableStatus *atomic.Uint32 + state *pipeline.TableState } func (e *eventTableSink[E]) AppendRowChangedEvents(rows ...*model.RowChangedEvent) { @@ -67,7 +67,7 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { Callback: func() { e.progressTracker.remove(e.eventID) }, - TableStatus: e.TableStatus, + TableStatus: e.state, } resolvedTxnEvents = append(resolvedTxnEvents, txnEvent) e.progressTracker.addEvent(e.eventID) @@ -84,6 +84,5 @@ func (e *eventTableSink[E]) GetCheckpointTs() model.ResolvedTs { } func (e *eventTableSink[E]) Close() { - // TODO implement me - panic("implement me") + e.state.Store(pipeline.TableStateStopped) } From 721f2d1b7cacfa891a2e605c37ecd6d34382bea2 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 27 Jun 2022 14:10:24 +0800 Subject: [PATCH 5/9] sinkv2(ticdc): fix advance bug --- cdc/sinkv2/tablesink/progress_tracker.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/cdc/sinkv2/tablesink/progress_tracker.go b/cdc/sinkv2/tablesink/progress_tracker.go index fe85e602e21..f6c5b2806ee 100644 --- a/cdc/sinkv2/tablesink/progress_tracker.go +++ b/cdc/sinkv2/tablesink/progress_tracker.go @@ -84,18 +84,18 @@ func (r *progressTracker) remove(key uint64) { defer r.lock.Unlock() r.pendingEventAndResolvedTs.Remove(key) iterator := r.pendingEventAndResolvedTs.Iterator() - // No need to update lastMinResolvedTs - // if there is no pending event and resolved ts. - if !iterator.First() { - return - } - - // If the first element is resolved ts, - // it means we can advance the progress. - if iterator.Value() != nil { - r.lastMinResolvedTs = iterator.Value().(model.ResolvedTs) - // Do not forget to remove the resolved ts. - r.pendingEventAndResolvedTs.Remove(iterator.Key()) + if iterator.Next() { + // If the element is resolved ts, + // it means we can advance the progress. + if iterator.Value() != nil { + r.lastMinResolvedTs = iterator.Value().(model.ResolvedTs) + // Do not forget to remove the resolved ts. + r.pendingEventAndResolvedTs.Remove(iterator.Key()) + } else { + // When we met the first event, + // we couldn't advance anymore. + return + } } } From f58d7522005020434975d7420b27ce8c47d8caa6 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 27 Jun 2022 14:12:04 +0800 Subject: [PATCH 6/9] sinkv2(ticdc): fix advance bug --- cdc/sinkv2/tablesink/progress_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/sinkv2/tablesink/progress_tracker.go b/cdc/sinkv2/tablesink/progress_tracker.go index f6c5b2806ee..8e4f60947b9 100644 --- a/cdc/sinkv2/tablesink/progress_tracker.go +++ b/cdc/sinkv2/tablesink/progress_tracker.go @@ -84,7 +84,7 @@ func (r *progressTracker) remove(key uint64) { defer r.lock.Unlock() r.pendingEventAndResolvedTs.Remove(key) iterator := r.pendingEventAndResolvedTs.Iterator() - if iterator.Next() { + for iterator.Next() { // If the element is resolved ts, // it means we can advance the progress. if iterator.Value() != nil { From 4dffbb78f9ea966275bd704f58b464c41a4ce5e7 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 27 Jun 2022 17:10:24 +0800 Subject: [PATCH 7/9] sinkv2(ticdc): rename --- cdc/sinkv2/tablesink/event_table_sink.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go index 599884eb0cb..865c469e887 100644 --- a/cdc/sinkv2/tablesink/event_table_sink.go +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -60,23 +60,23 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { } resolvedEvents := e.eventBuffer[:i] - resolvedTxnEvents := make([]*tableevent.CallbackableEvent[E], 0, len(resolvedEvents)) + resolvedCallbackableEvents := make([]*tableevent.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { - txnEvent := &tableevent.CallbackableEvent[E]{ + ce := &tableevent.CallbackableEvent[E]{ Event: ev, Callback: func() { e.progressTracker.remove(e.eventID) }, TableStatus: e.state, } - resolvedTxnEvents = append(resolvedTxnEvents, txnEvent) + resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce) e.progressTracker.addEvent(e.eventID) e.eventID++ } // Do not forget to add the resolvedTs to progressTracker. e.progressTracker.addResolvedTs(e.eventID, resolvedTs) e.eventID++ - e.backendSink.WriteEvents(resolvedTxnEvents...) + e.backendSink.WriteEvents(resolvedCallbackableEvents...) } func (e *eventTableSink[E]) GetCheckpointTs() model.ResolvedTs { From 5f91acc5b5ef78e3ed3a4bca6b50b6491b1ed072 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 27 Jun 2022 17:20:05 +0800 Subject: [PATCH 8/9] sinkv2(ticdc): move tableevent into eventsink --- cdc/sinkv2/{tableevent => eventsink}/event.go | 2 +- cdc/sinkv2/eventsink/event_sink.go | 6 ++---- cdc/sinkv2/eventsink/mq/mq_sink.go | 3 +-- .../row_change_event_appender.go | 2 +- cdc/sinkv2/eventsink/txn/txn_sink.go | 3 +-- .../{tableevent => eventsink}/txn_event_appender.go | 2 +- cdc/sinkv2/tablesink/event_table_sink.go | 9 ++++----- 7 files changed, 11 insertions(+), 16 deletions(-) rename cdc/sinkv2/{tableevent => eventsink}/event.go (98%) rename cdc/sinkv2/{tableevent => eventsink}/row_change_event_appender.go (98%) rename cdc/sinkv2/{tableevent => eventsink}/txn_event_appender.go (98%) diff --git a/cdc/sinkv2/tableevent/event.go b/cdc/sinkv2/eventsink/event.go similarity index 98% rename from cdc/sinkv2/tableevent/event.go rename to cdc/sinkv2/eventsink/event.go index 168ca6d4b4e..331ea1b8c60 100644 --- a/cdc/sinkv2/tableevent/event.go +++ b/cdc/sinkv2/eventsink/event.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tableevent +package eventsink import ( "github.com/pingcap/tiflow/cdc/model" diff --git a/cdc/sinkv2/eventsink/event_sink.go b/cdc/sinkv2/eventsink/event_sink.go index 80a8dee993b..227df967eae 100644 --- a/cdc/sinkv2/eventsink/event_sink.go +++ b/cdc/sinkv2/eventsink/event_sink.go @@ -13,13 +13,11 @@ package eventsink -import "github.com/pingcap/tiflow/cdc/sinkv2/tableevent" - // EventSink is the interface for event sink. -type EventSink[E tableevent.TableEvent] interface { +type EventSink[E TableEvent] interface { // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. - WriteEvents(rows ...*tableevent.CallbackableEvent[E]) + WriteEvents(rows ...*CallbackableEvent[E]) // Close closes the sink. Close() error } diff --git a/cdc/sinkv2/eventsink/mq/mq_sink.go b/cdc/sinkv2/eventsink/mq/mq_sink.go index 2633d7b699b..034d7533fa8 100644 --- a/cdc/sinkv2/eventsink/mq/mq_sink.go +++ b/cdc/sinkv2/eventsink/mq/mq_sink.go @@ -16,7 +16,6 @@ package mq import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" - "github.com/pingcap/tiflow/cdc/sinkv2/tableevent" ) // Assert EventSink[E event.TableEvent] implementation @@ -26,7 +25,7 @@ var _ eventsink.EventSink[*model.RowChangedEvent] = (*Sink)(nil) type Sink struct{} // WriteEvents writes events to the sink. -func (s *Sink) WriteEvents(rows ...*tableevent.RowChangeCallbackableEvent) { +func (s *Sink) WriteEvents(rows ...*eventsink.RowChangeCallbackableEvent) { // TODO implement me panic("implement me") } diff --git a/cdc/sinkv2/tableevent/row_change_event_appender.go b/cdc/sinkv2/eventsink/row_change_event_appender.go similarity index 98% rename from cdc/sinkv2/tableevent/row_change_event_appender.go rename to cdc/sinkv2/eventsink/row_change_event_appender.go index df36ddcd2f1..8f6d383fe5a 100644 --- a/cdc/sinkv2/tableevent/row_change_event_appender.go +++ b/cdc/sinkv2/eventsink/row_change_event_appender.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tableevent +package eventsink import "github.com/pingcap/tiflow/cdc/model" diff --git a/cdc/sinkv2/eventsink/txn/txn_sink.go b/cdc/sinkv2/eventsink/txn/txn_sink.go index e76c4604477..dcb81c90ed2 100644 --- a/cdc/sinkv2/eventsink/txn/txn_sink.go +++ b/cdc/sinkv2/eventsink/txn/txn_sink.go @@ -16,7 +16,6 @@ package txn import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" - "github.com/pingcap/tiflow/cdc/sinkv2/tableevent" ) // Assert EventSink[E event.TableEvent] implementation @@ -26,7 +25,7 @@ var _ eventsink.EventSink[*model.SingleTableTxn] = (*Sink)(nil) type Sink struct{} // WriteEvents writes events to the sink. -func (s *Sink) WriteEvents(rows ...*tableevent.TxnCallbackableEvent) { +func (s *Sink) WriteEvents(rows ...*eventsink.TxnCallbackableEvent) { // TODO implement me panic("implement me") } diff --git a/cdc/sinkv2/tableevent/txn_event_appender.go b/cdc/sinkv2/eventsink/txn_event_appender.go similarity index 98% rename from cdc/sinkv2/tableevent/txn_event_appender.go rename to cdc/sinkv2/eventsink/txn_event_appender.go index 0042473f761..9b2d4fa08be 100644 --- a/cdc/sinkv2/tableevent/txn_event_appender.go +++ b/cdc/sinkv2/eventsink/txn_event_appender.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tableevent +package eventsink import "github.com/pingcap/tiflow/cdc/model" diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go index 865c469e887..ffcabe19894 100644 --- a/cdc/sinkv2/tablesink/event_table_sink.go +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -19,19 +19,18 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/pipeline" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" - "github.com/pingcap/tiflow/cdc/sinkv2/tableevent" ) // Assert TableSink implementation var _ TableSink = (*eventTableSink[*model.RowChangedEvent])(nil) var _ TableSink = (*eventTableSink[*model.SingleTableTxn])(nil) -type eventTableSink[E tableevent.TableEvent] struct { +type eventTableSink[E eventsink.TableEvent] struct { eventID uint64 maxResolvedTs model.ResolvedTs backendSink eventsink.EventSink[E] progressTracker *progressTracker - eventAppender tableevent.Appender[E] + eventAppender eventsink.Appender[E] // NOTICE: It is ordered by commitTs. eventBuffer []E state *pipeline.TableState @@ -60,9 +59,9 @@ func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { } resolvedEvents := e.eventBuffer[:i] - resolvedCallbackableEvents := make([]*tableevent.CallbackableEvent[E], 0, len(resolvedEvents)) + resolvedCallbackableEvents := make([]*eventsink.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { - ce := &tableevent.CallbackableEvent[E]{ + ce := &eventsink.CallbackableEvent[E]{ Event: ev, Callback: func() { e.progressTracker.remove(e.eventID) From 7695a55d0600afd4eca3d999f0bc5f8fc5f145e3 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Mon, 27 Jun 2022 17:42:27 +0800 Subject: [PATCH 9/9] sinkv2(ticdc): split event appender --- cdc/sinkv2/eventsink/event.go | 6 ---- ...ge_event_appender.go => event_appender.go} | 23 +++++++++++++- cdc/sinkv2/eventsink/txn_event_appender.go | 31 ------------------- 3 files changed, 22 insertions(+), 38 deletions(-) rename cdc/sinkv2/eventsink/{row_change_event_appender.go => event_appender.go} (59%) delete mode 100644 cdc/sinkv2/eventsink/txn_event_appender.go diff --git a/cdc/sinkv2/eventsink/event.go b/cdc/sinkv2/eventsink/event.go index 331ea1b8c60..5d1f9b70281 100644 --- a/cdc/sinkv2/eventsink/event.go +++ b/cdc/sinkv2/eventsink/event.go @@ -40,9 +40,3 @@ type RowChangeCallbackableEvent = CallbackableEvent[*model.RowChangedEvent] // TxnCallbackableEvent is the txn event which can be callbacked. type TxnCallbackableEvent = CallbackableEvent[*model.SingleTableTxn] - -// Appender is the interface for appending events to buffer. -type Appender[E TableEvent] interface { - // Append appends the event to buffer. - Append(buffer []E, rows ...*model.RowChangedEvent) []E -} diff --git a/cdc/sinkv2/eventsink/row_change_event_appender.go b/cdc/sinkv2/eventsink/event_appender.go similarity index 59% rename from cdc/sinkv2/eventsink/row_change_event_appender.go rename to cdc/sinkv2/eventsink/event_appender.go index 8f6d383fe5a..20dfbd1c6e1 100644 --- a/cdc/sinkv2/eventsink/row_change_event_appender.go +++ b/cdc/sinkv2/eventsink/event_appender.go @@ -9,12 +9,18 @@ // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and -// limitations under the License. +// limitations under the License package eventsink import "github.com/pingcap/tiflow/cdc/model" +// Appender is the interface for appending events to buffer. +type Appender[E TableEvent] interface { + // Append appends the event to buffer. + Append(buffer []E, rows ...*model.RowChangedEvent) []E +} + // Assert Appender[E TableEvent] implementation var _ Appender[*model.RowChangedEvent] = (*RowChangeEventAppender)(nil) @@ -28,3 +34,18 @@ func (r *RowChangeEventAppender) Append( ) []*model.RowChangedEvent { return append(buffer, rows...) } + +// Assert Appender[E TableEvent] implementation +var _ Appender[*model.SingleTableTxn] = (*TxnEventAppender)(nil) + +// TxnEventAppender is the appender for SingleTableTxn. +type TxnEventAppender struct{} + +// Append appends the given rows to the given txn buffer. +func (t *TxnEventAppender) Append( + buffer []*model.SingleTableTxn, + rows ...*model.RowChangedEvent, +) []*model.SingleTableTxn { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/eventsink/txn_event_appender.go b/cdc/sinkv2/eventsink/txn_event_appender.go deleted file mode 100644 index 9b2d4fa08be..00000000000 --- a/cdc/sinkv2/eventsink/txn_event_appender.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package eventsink - -import "github.com/pingcap/tiflow/cdc/model" - -// Assert Appender[E TableEvent] implementation -var _ Appender[*model.SingleTableTxn] = (*TxnEventAppender)(nil) - -// TxnEventAppender is the appender for SingleTableTxn. -type TxnEventAppender struct{} - -// Append appends the given rows to the given txn buffer. -func (t *TxnEventAppender) Append( - buffer []*model.SingleTableTxn, - rows ...*model.RowChangedEvent, -) []*model.SingleTableTxn { - // TODO implement me - panic("implement me") -}