Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sinkv2(ticdc): add interfaces #6017

Merged
merged 13 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ type RowChangedEvent struct {
SplitTxn bool `json:"-" msg:"-"`
}

// GetCommitTs returns the commit timestamp of this event.
func (r *RowChangedEvent) GetCommitTs() uint64 {
return r.CommitTs
}

// IsDelete returns true if the row is a delete event
func (r *RowChangedEvent) IsDelete() bool {
return len(r.PreColumns) != 0 && len(r.Columns) == 0
Expand Down Expand Up @@ -611,6 +616,11 @@ type SingleTableTxn struct {
FinishWg *sync.WaitGroup
}

// GetCommitTs returns the commit timestamp of the transaction.
func (t *SingleTableTxn) GetCommitTs() uint64 {
return t.CommitTs
}

// Append adds a row changed event into SingleTableTxn
func (t *SingleTableTxn) Append(row *RowChangedEvent) {
if row.StartTs != t.StartTs || row.CommitTs != t.CommitTs || row.Table.TableID != t.Table.TableID {
Expand Down
33 changes: 33 additions & 0 deletions cdc/sinkv2/ddlsink/ddl_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddlsink

import (
"context"

"github.com/pingcap/tiflow/cdc/model"
)

// DDLEventSink is the interface for sink of DDL events.
type DDLEventSink interface {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
// WriteDDLEvent writes a DDL event to the sink.
// Note: This is a synchronous and thread-safe method.
WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
// WriteCheckpointTs writes a checkpoint timestamp to the sink.
// Note: This is a synchronous and thread-safe method.
// This only for MQSink for now.
WriteCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error
// Close closes the sink.
Close() error
}
Empty file added cdc/sinkv2/ddlsink/mq/.keep
Empty file.
Empty file added cdc/sinkv2/ddlsink/mysql/.keep
Empty file.
25 changes: 25 additions & 0 deletions cdc/sinkv2/eventsink/event_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package eventsink

import "github.com/pingcap/tiflow/cdc/sinkv2/tableevent"

// EventSink is the interface for event sink.
type EventSink[E tableevent.TableEvent] interface {
// WriteEvents writes events to the sink.
// This is an asynchronously and thread-safe method.
WriteEvents(rows ...*tableevent.CallbackableEvent[E])
// Close closes the sink.
Close() error
}
38 changes: 38 additions & 0 deletions cdc/sinkv2/eventsink/mq/mq_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mq

import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tableevent"
)

// Assert EventSink[E event.TableEvent] implementation
var _ eventsink.EventSink[*model.RowChangedEvent] = (*Sink)(nil)

// Sink is the mq sink.
type Sink struct{}

// WriteEvents writes events to the sink.
func (s *Sink) WriteEvents(rows ...*tableevent.RowChangeCallbackableEvent) {
// TODO implement me
panic("implement me")
}

// Close closes the sink.
func (s *Sink) Close() error {
// TODO implement me
panic("implement me")
}
38 changes: 38 additions & 0 deletions cdc/sinkv2/eventsink/txn/txn_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tableevent"
)

// Assert EventSink[E event.TableEvent] implementation
var _ eventsink.EventSink[*model.SingleTableTxn] = (*Sink)(nil)

// Sink is the sink for SingleTableTxn.
type Sink struct{}

// WriteEvents writes events to the sink.
func (s *Sink) WriteEvents(rows ...*tableevent.TxnCallbackableEvent) {
// TODO implement me
panic("implement me")
}

// Close closes the sink.
func (s *Sink) Close() error {
// TODO implement me
panic("implement me")
}
48 changes: 48 additions & 0 deletions cdc/sinkv2/tableevent/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tableevent

import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
)

// TableEvent is the interface for events which can be written to sink by TableSink.
type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
}

// CallbackFunc is the callback function for callbackable event.
type CallbackFunc func()

// CallbackableEvent means the event can be callbacked.
// It also contains the table status.
type CallbackableEvent[E TableEvent] struct {
Event E
Callback CallbackFunc
TableStatus *pipeline.TableState
}

// RowChangeCallbackableEvent is the row change event which can be callbacked.
type RowChangeCallbackableEvent = CallbackableEvent[*model.RowChangedEvent]

// TxnCallbackableEvent is the txn event which can be callbacked.
type TxnCallbackableEvent = CallbackableEvent[*model.SingleTableTxn]

// Appender is the interface for appending events to buffer.
type Appender[E TableEvent] interface {
// Append appends the event to buffer.
Append(buffer []E, rows ...*model.RowChangedEvent) []E
}
30 changes: 30 additions & 0 deletions cdc/sinkv2/tableevent/row_change_event_appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tableevent

import "github.com/pingcap/tiflow/cdc/model"

// Assert Appender[E TableEvent] implementation
var _ Appender[*model.RowChangedEvent] = (*RowChangeEventAppender)(nil)

// RowChangeEventAppender is the builder for RowChangedEvent.
type RowChangeEventAppender struct{}

// Append appends the given rows to the given buffer.
func (r *RowChangeEventAppender) Append(
buffer []*model.RowChangedEvent,
rows ...*model.RowChangedEvent,
) []*model.RowChangedEvent {
return append(buffer, rows...)
}
31 changes: 31 additions & 0 deletions cdc/sinkv2/tableevent/txn_event_appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tableevent

import "github.com/pingcap/tiflow/cdc/model"

// Assert Appender[E TableEvent] implementation
var _ Appender[*model.SingleTableTxn] = (*TxnEventAppender)(nil)

// TxnEventAppender is the appender for SingleTableTxn.
type TxnEventAppender struct{}

// Append appends the given rows to the given txn buffer.
func (t *TxnEventAppender) Append(
buffer []*model.SingleTableTxn,
rows ...*model.RowChangedEvent,
) []*model.SingleTableTxn {
// TODO implement me
panic("implement me")
}
88 changes: 88 additions & 0 deletions cdc/sinkv2/tablesink/event_table_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tablesink

import (
"sort"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tableevent"
)

// Assert TableSink implementation
var _ TableSink = (*eventTableSink[*model.RowChangedEvent])(nil)
var _ TableSink = (*eventTableSink[*model.SingleTableTxn])(nil)

type eventTableSink[E tableevent.TableEvent] struct {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
eventID uint64
maxResolvedTs model.ResolvedTs
backendSink eventsink.EventSink[E]
progressTracker *progressTracker
eventAppender tableevent.Appender[E]
// NOTICE: It is ordered by commitTs.
eventBuffer []E
state *pipeline.TableState
}

func (e *eventTableSink[E]) AppendRowChangedEvents(rows ...*model.RowChangedEvent) {
e.eventBuffer = e.eventAppender.Append(e.eventBuffer, rows...)
}

func (e *eventTableSink[E]) UpdateResolvedTs(resolvedTs model.ResolvedTs) {
// If resolvedTs is not greater than maxResolvedTs,
// the flush is unnecessary.
if !e.maxResolvedTs.Less(resolvedTs) {
return
}
e.maxResolvedTs = resolvedTs

i := sort.Search(len(e.eventBuffer), func(i int) bool {
return e.eventBuffer[i].GetCommitTs() > resolvedTs.Ts
})
// Despite the lack of data, we have to move forward with progress.
if i == 0 {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
e.progressTracker.addResolvedTs(e.eventID, resolvedTs)
e.eventID++
return
}
resolvedEvents := e.eventBuffer[:i]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the relationship between the event and the txnevent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event can be txnevent or rowchangeevent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Assert TableSink implementation
var _ TableSink = (*eventTableSink[*model.RowChangedEvent])(nil)
var _ TableSink = (*eventTableSink[*model.SingleTableTxn])(nil)

resolvedTxnEvents := make([]*tableevent.CallbackableEvent[E], 0, len(resolvedEvents))
for _, ev := range resolvedEvents {
txnEvent := &tableevent.CallbackableEvent[E]{
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
Event: ev,
Callback: func() {
e.progressTracker.remove(e.eventID)
},
TableStatus: e.state,
}
resolvedTxnEvents = append(resolvedTxnEvents, txnEvent)
e.progressTracker.addEvent(e.eventID)
e.eventID++
}
// Do not forget to add the resolvedTs to progressTracker.
e.progressTracker.addResolvedTs(e.eventID, resolvedTs)
e.eventID++
e.backendSink.WriteEvents(resolvedTxnEvents...)
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *eventTableSink[E]) GetCheckpointTs() model.ResolvedTs {
return e.progressTracker.minTs()
}

func (e *eventTableSink[E]) Close() {
e.state.Store(pipeline.TableStateStopped)
}
Loading