diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 375c1685f9b..368a82f64a1 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -272,6 +272,11 @@ type RowChangedEvent struct { SplitTxn bool `json:"-" msg:"-"` } +// GetCommitTs returns the commit timestamp of this event. +func (r *RowChangedEvent) GetCommitTs() uint64 { + return r.CommitTs +} + // IsDelete returns true if the row is a delete event func (r *RowChangedEvent) IsDelete() bool { return len(r.PreColumns) != 0 && len(r.Columns) == 0 @@ -611,6 +616,11 @@ type SingleTableTxn struct { FinishWg *sync.WaitGroup } +// GetCommitTs returns the commit timestamp of the transaction. +func (t *SingleTableTxn) GetCommitTs() uint64 { + return t.CommitTs +} + // Append adds a row changed event into SingleTableTxn func (t *SingleTableTxn) Append(row *RowChangedEvent) { if row.StartTs != t.StartTs || row.CommitTs != t.CommitTs || row.Table.TableID != t.Table.TableID { diff --git a/cdc/sinkv2/ddlsink/ddl_sink.go b/cdc/sinkv2/ddlsink/ddl_sink.go new file mode 100644 index 00000000000..2c724540ecc --- /dev/null +++ b/cdc/sinkv2/ddlsink/ddl_sink.go @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddlsink + +import ( + "context" + + "github.com/pingcap/tiflow/cdc/model" +) + +// DDLEventSink is the interface for sink of DDL events. +type DDLEventSink interface { + // WriteDDLEvent writes a DDL event to the sink. + // Note: This is a synchronous and thread-safe method. + WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error + // WriteCheckpointTs writes a checkpoint timestamp to the sink. + // Note: This is a synchronous and thread-safe method. + // This only for MQSink for now. + WriteCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error + // Close closes the sink. + Close() error +} diff --git a/cdc/sinkv2/ddlsink/mq/.keep b/cdc/sinkv2/ddlsink/mq/.keep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/cdc/sinkv2/ddlsink/mysql/.keep b/cdc/sinkv2/ddlsink/mysql/.keep new file mode 100644 index 00000000000..e69de29bb2d diff --git a/cdc/sinkv2/eventsink/event.go b/cdc/sinkv2/eventsink/event.go new file mode 100644 index 00000000000..5d1f9b70281 --- /dev/null +++ b/cdc/sinkv2/eventsink/event.go @@ -0,0 +1,42 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventsink + +import ( + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" +) + +// TableEvent is the interface for events which can be written to sink by TableSink. +type TableEvent interface { + // GetCommitTs returns the commit timestamp of the event. + GetCommitTs() uint64 +} + +// CallbackFunc is the callback function for callbackable event. +type CallbackFunc func() + +// CallbackableEvent means the event can be callbacked. +// It also contains the table status. +type CallbackableEvent[E TableEvent] struct { + Event E + Callback CallbackFunc + TableStatus *pipeline.TableState +} + +// RowChangeCallbackableEvent is the row change event which can be callbacked. +type RowChangeCallbackableEvent = CallbackableEvent[*model.RowChangedEvent] + +// TxnCallbackableEvent is the txn event which can be callbacked. +type TxnCallbackableEvent = CallbackableEvent[*model.SingleTableTxn] diff --git a/cdc/sinkv2/eventsink/event_appender.go b/cdc/sinkv2/eventsink/event_appender.go new file mode 100644 index 00000000000..20dfbd1c6e1 --- /dev/null +++ b/cdc/sinkv2/eventsink/event_appender.go @@ -0,0 +1,51 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License + +package eventsink + +import "github.com/pingcap/tiflow/cdc/model" + +// Appender is the interface for appending events to buffer. +type Appender[E TableEvent] interface { + // Append appends the event to buffer. + Append(buffer []E, rows ...*model.RowChangedEvent) []E +} + +// Assert Appender[E TableEvent] implementation +var _ Appender[*model.RowChangedEvent] = (*RowChangeEventAppender)(nil) + +// RowChangeEventAppender is the builder for RowChangedEvent. +type RowChangeEventAppender struct{} + +// Append appends the given rows to the given buffer. +func (r *RowChangeEventAppender) Append( + buffer []*model.RowChangedEvent, + rows ...*model.RowChangedEvent, +) []*model.RowChangedEvent { + return append(buffer, rows...) +} + +// Assert Appender[E TableEvent] implementation +var _ Appender[*model.SingleTableTxn] = (*TxnEventAppender)(nil) + +// TxnEventAppender is the appender for SingleTableTxn. +type TxnEventAppender struct{} + +// Append appends the given rows to the given txn buffer. +func (t *TxnEventAppender) Append( + buffer []*model.SingleTableTxn, + rows ...*model.RowChangedEvent, +) []*model.SingleTableTxn { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/eventsink/event_sink.go b/cdc/sinkv2/eventsink/event_sink.go new file mode 100644 index 00000000000..227df967eae --- /dev/null +++ b/cdc/sinkv2/eventsink/event_sink.go @@ -0,0 +1,23 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventsink + +// EventSink is the interface for event sink. +type EventSink[E TableEvent] interface { + // WriteEvents writes events to the sink. + // This is an asynchronously and thread-safe method. + WriteEvents(rows ...*CallbackableEvent[E]) + // Close closes the sink. + Close() error +} diff --git a/cdc/sinkv2/eventsink/mq/mq_sink.go b/cdc/sinkv2/eventsink/mq/mq_sink.go new file mode 100644 index 00000000000..034d7533fa8 --- /dev/null +++ b/cdc/sinkv2/eventsink/mq/mq_sink.go @@ -0,0 +1,37 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mq + +import ( + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" +) + +// Assert EventSink[E event.TableEvent] implementation +var _ eventsink.EventSink[*model.RowChangedEvent] = (*Sink)(nil) + +// Sink is the mq sink. +type Sink struct{} + +// WriteEvents writes events to the sink. +func (s *Sink) WriteEvents(rows ...*eventsink.RowChangeCallbackableEvent) { + // TODO implement me + panic("implement me") +} + +// Close closes the sink. +func (s *Sink) Close() error { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/eventsink/txn/txn_sink.go b/cdc/sinkv2/eventsink/txn/txn_sink.go new file mode 100644 index 00000000000..dcb81c90ed2 --- /dev/null +++ b/cdc/sinkv2/eventsink/txn/txn_sink.go @@ -0,0 +1,37 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txn + +import ( + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" +) + +// Assert EventSink[E event.TableEvent] implementation +var _ eventsink.EventSink[*model.SingleTableTxn] = (*Sink)(nil) + +// Sink is the sink for SingleTableTxn. +type Sink struct{} + +// WriteEvents writes events to the sink. +func (s *Sink) WriteEvents(rows ...*eventsink.TxnCallbackableEvent) { + // TODO implement me + panic("implement me") +} + +// Close closes the sink. +func (s *Sink) Close() error { + // TODO implement me + panic("implement me") +} diff --git a/cdc/sinkv2/tablesink/event_table_sink.go b/cdc/sinkv2/tablesink/event_table_sink.go new file mode 100644 index 00000000000..ffcabe19894 --- /dev/null +++ b/cdc/sinkv2/tablesink/event_table_sink.go @@ -0,0 +1,87 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tablesink + +import ( + "sort" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/pipeline" + "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" +) + +// Assert TableSink implementation +var _ TableSink = (*eventTableSink[*model.RowChangedEvent])(nil) +var _ TableSink = (*eventTableSink[*model.SingleTableTxn])(nil) + +type eventTableSink[E eventsink.TableEvent] struct { + eventID uint64 + maxResolvedTs model.ResolvedTs + backendSink eventsink.EventSink[E] + progressTracker *progressTracker + eventAppender eventsink.Appender[E] + // NOTICE: It is ordered by commitTs. + eventBuffer []E + state *pipeline.TableState +} + +func (e *eventTableSink[E]) AppendRowChangedEvents(rows ...*model.RowChangedEvent) { + e.eventBuffer = e.eventAppender.Append(e.eventBuffer, rows...) +} + +func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) { + // If resolvedTs is not greater than maxResolvedTs, + // the flush is unnecessary. + if !e.maxResolvedTs.Less(resolvedTs) { + return + } + e.maxResolvedTs = resolvedTs + + i := sort.Search(len(e.eventBuffer), func(i int) bool { + return e.eventBuffer[i].GetCommitTs() > resolvedTs.Ts + }) + // Despite the lack of data, we have to move forward with progress. + if i == 0 { + e.progressTracker.addResolvedTs(e.eventID, resolvedTs) + e.eventID++ + return + } + resolvedEvents := e.eventBuffer[:i] + + resolvedCallbackableEvents := make([]*eventsink.CallbackableEvent[E], 0, len(resolvedEvents)) + for _, ev := range resolvedEvents { + ce := &eventsink.CallbackableEvent[E]{ + Event: ev, + Callback: func() { + e.progressTracker.remove(e.eventID) + }, + TableStatus: e.state, + } + resolvedCallbackableEvents = append(resolvedCallbackableEvents, ce) + e.progressTracker.addEvent(e.eventID) + e.eventID++ + } + // Do not forget to add the resolvedTs to progressTracker. + e.progressTracker.addResolvedTs(e.eventID, resolvedTs) + e.eventID++ + e.backendSink.WriteEvents(resolvedCallbackableEvents...) +} + +func (e *eventTableSink[E]) GetCheckpointTs() model.ResolvedTs { + return e.progressTracker.minTs() +} + +func (e *eventTableSink[E]) Close() { + e.state.Store(pipeline.TableStateStopped) +} diff --git a/cdc/sinkv2/tablesink/progress_tracker.go b/cdc/sinkv2/tablesink/progress_tracker.go new file mode 100644 index 00000000000..8e4f60947b9 --- /dev/null +++ b/cdc/sinkv2/tablesink/progress_tracker.go @@ -0,0 +1,110 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tablesink + +import ( + "sync" + + "github.com/emirpasic/gods/maps/linkedhashmap" + "github.com/pingcap/tiflow/cdc/model" +) + +// progressTracker is used to track the progress of the table sink. +// For example, +// We have txn1, txn2, resolvedTs2, txn3-1, txn3-2, resolvedTs3, resolvedTs4, resolvedTs5. +// txn3-1 and txn3-2 are in the same big txn. +// First txn1 and txn2 are written, then the progress can be updated to resolvedTs2. +// Then txn3-1 and txn3-2 are written, then the progress can be updated to resolvedTs3. +// Next, since no data is being written, we can update to resolvedTs5 in order. +type progressTracker struct { + // This lock for both pendingEventAndResolvedTs and lastMinResolvedTs. + lock sync.Mutex + // pendingEventAndResolvedTs is used to store the pending event keys and resolved tss. + // The key is the key of the event or the resolved ts. + // The value is nil or the resolved ts. **nil for event**. + // Since the data in TableSink is sequential, + // we only need to maintain an insertion order. + pendingEventAndResolvedTs *linkedhashmap.Map + // lastMinResolvedTs is used to store the last min resolved ts. + // It is used to indicate the progress of the table sink. + lastMinResolvedTs model.ResolvedTs +} + +// newProgressTracker is used to create a new progress tracker. +// The last min resolved ts is set to 0. +// It means that the table sink has not started yet. +// nolint:deadcode +func newProgressTracker() *progressTracker { + return &progressTracker{ + pendingEventAndResolvedTs: linkedhashmap.New(), + // It means the start of the table. + // It's Ok to use 0 here. + // Because sink node only update the checkpoint when it's growing. + lastMinResolvedTs: model.NewResolvedTs(0), + } +} + +// addEvent is used to add the pending event key. +func (r *progressTracker) addEvent(key uint64) { + r.lock.Lock() + defer r.lock.Unlock() + r.pendingEventAndResolvedTs.Put(key, nil) +} + +// addResolvedTs is used to add the pending resolved ts. +func (r *progressTracker) addResolvedTs(key uint64, resolvedTs model.ResolvedTs) { + r.lock.Lock() + defer r.lock.Unlock() + // If no pending event and resolved ts, + // we can directly advance the progress. + if r.pendingEventAndResolvedTs.Empty() { + r.lastMinResolvedTs = resolvedTs + return + } + r.pendingEventAndResolvedTs.Put(key, resolvedTs) +} + +// remove is used to remove the pending resolved ts. +// If we are deleting the last value before resolved ts, +// that means we can advance the progress, +// and we will update lastMinResolvedTs. +func (r *progressTracker) remove(key uint64) { + r.lock.Lock() + defer r.lock.Unlock() + r.pendingEventAndResolvedTs.Remove(key) + iterator := r.pendingEventAndResolvedTs.Iterator() + for iterator.Next() { + // If the element is resolved ts, + // it means we can advance the progress. + if iterator.Value() != nil { + r.lastMinResolvedTs = iterator.Value().(model.ResolvedTs) + // Do not forget to remove the resolved ts. + r.pendingEventAndResolvedTs.Remove(iterator.Key()) + } else { + // When we met the first event, + // we couldn't advance anymore. + return + } + } +} + +// minTs returns the last min resolved ts. +// This means that all data prior to this point has already been processed. +// It can be considered as CheckpointTs of the table. +func (r *progressTracker) minTs() model.ResolvedTs { + r.lock.Lock() + defer r.lock.Unlock() + + return r.lastMinResolvedTs +} diff --git a/cdc/sinkv2/tablesink/table_sink.go b/cdc/sinkv2/tablesink/table_sink.go new file mode 100644 index 00000000000..502632a2697 --- /dev/null +++ b/cdc/sinkv2/tablesink/table_sink.go @@ -0,0 +1,35 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tablesink + +import "github.com/pingcap/tiflow/cdc/model" + +// TableSink is the interface for table sink. +// It is used to sink data in table units. +type TableSink interface { + // AppendRowChangedEvents appends row changed events to the table sink. + // Usually, it is used to cache the row changed events into table sink. + // This is a not thread-safe method. Please do not call it concurrently. + AppendRowChangedEvents(rows ...*model.RowChangedEvent) + // UpdateResolvedTs writes the buffered row changed events to the eventTableSink. + // Note: This is an asynchronous and not thread-safe method. + // Please do not call it concurrently. + UpdateResolvedTs(resolvedTs model.ResolvedTs) + // GetCheckpointTs returns the current checkpoint ts of table sink. + // For example, calculating the current progress from the statistics of the table sink. + // This is a thread-safe method. + GetCheckpointTs() model.ResolvedTs + // Close closes the table sink. + Close() +} diff --git a/go.mod b/go.mod index e7a8dca0f3e..785de7672ef 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/docker/go-units v0.4.0 github.com/dustin/go-humanize v1.0.0 github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 + github.com/emirpasic/gods v1.18.1 github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 github.com/fatih/color v1.13.0 github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 diff --git a/go.sum b/go.sum index c5e1952d0b1..1cbe5f50c4d 100644 --- a/go.sum +++ b/go.sum @@ -296,6 +296,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17 h1:8i9x3Q4hW1kLE4ScsOtUlwVHT76LKhkmOw9zbDxnyUc= github.com/edwingeng/deque v0.0.0-20191220032131-8596380dee17/go.mod h1:3Ys1pJhyVaB6iWigv4o2r6Ug1GZmfDWqvqmO6bjojg0= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=