Skip to content

Commit

Permalink
simple (ticdc): support send all tables bootstrap message at changefe…
Browse files Browse the repository at this point in the history
…ed start (#11239) (#11400)

close #11315
  • Loading branch information
ti-chi-bot authored Jul 24, 2024
1 parent 893d2ae commit b00c3ee
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 17 deletions.
8 changes: 8 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,9 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*c.Sink.SendBootstrapToAllPartition)
}

if c.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*c.Sink.SendAllBootstrapAtStart)
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -818,6 +821,10 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
if cloned.Sink.SendBootstrapToAllPartition != nil {
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*cloned.Sink.SendBootstrapToAllPartition)
}

if cloned.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*cloned.Sink.SendAllBootstrapAtStart)
}
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Expand Down Expand Up @@ -994,6 +1001,7 @@ type SinkConfig struct {
SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"`
SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"`
SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"`
SendAllBootstrapAtStart *bool `json:"send-all-bootstrap-at-start,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
}

Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var defaultAPIConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true},
},
Consistent: &ConsistentConfig{
Expand Down
5 changes: 5 additions & 0 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,8 @@ func (s *SchemaTestHelper) Close() {
s.domain.Close()
s.storage.Close() //nolint:errcheck
}

// SchemaStorage returns the schema storage
func (s *SchemaTestHelper) SchemaStorage() SchemaStorage {
return s.schemaStorage
}
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ LOOP2:
c.redoMetaMgr,
downstreamType,
util.GetOrZero(info.Config.BDRMode),
)
info.Config.Sink.ShouldSendAllBootstrapAtStart())

// create scheduler
cfg := *c.cfg
Expand Down
11 changes: 9 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type mockDDLSink struct {
// whether to record the DDL history, only for rename table
recordDDLHistory bool
// a slice of DDL history, only for rename table
ddlHistory []string
ddlHistory []*model.DDLEvent
mu struct {
sync.Mutex
checkpointTs model.Ts
Expand Down Expand Up @@ -117,7 +117,7 @@ func (m *mockDDLSink) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
}
}()
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, ddl.Query)
m.ddlHistory = append(m.ddlHistory, ddl)
} else {
m.ddlHistory = nil
}
Expand Down Expand Up @@ -155,6 +155,13 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, bootstrap)
}
return nil
}

type mockScheduler struct {
currentTables []model.TableID
}
Expand Down
58 changes: 53 additions & 5 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type ddlManager struct {
BDRMode bool
sinkType model.DownstreamType
ddlResolvedTs model.Ts

shouldSendAllBootstrapAtStart bool
bootstraped bool
}

func newDDLManager(
Expand All @@ -147,6 +150,7 @@ func newDDLManager(
redoMetaManager redo.MetaManager,
sinkType model.DownstreamType,
bdrMode bool,
shouldSendAllBootstrapAtStart bool,
) *ddlManager {
log.Info("create ddl manager",
zap.String("namaspace", changefeedID.Namespace),
Expand All @@ -169,12 +173,49 @@ func newDDLManager(
ddlResolvedTs: startTs,
BDRMode: bdrMode,
// use the passed sinkType after we support get resolvedTs from sink
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
shouldSendAllBootstrapAtStart: shouldSendAllBootstrapAtStart,
}
}

func (m *ddlManager) checkAndSendBootstrapMsgs(ctx context.Context) (bool, error) {
if !m.shouldSendAllBootstrapAtStart || m.bootstraped {
return true, nil
}
start := time.Now()
defer func() {
log.Info("send bootstrap messages finished",
zap.Stringer("changefeed", m.changfeedID),
zap.Duration("cost", time.Since(start)))
}()
// Send bootstrap messages to downstream.
tableInfo, err := m.allTables(ctx)
if err != nil {
return false, errors.Trace(err)
}
log.Info("start to send bootstrap messages",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(tableInfo)))

for _, table := range tableInfo {
if table.TableInfo.IsView() {
continue
}
ddlEvent := &model.DDLEvent{
TableInfo: table,
IsBootstrap: true,
}
err := m.ddlSink.emitBootstrap(ctx, ddlEvent)
if err != nil {
return false, errors.Trace(err)
}
}
m.bootstraped = true
return true, nil
}

// tick the ddlHandler, it does the following things:
// 1. get DDL jobs from ddlPuller.
// 2. uses schema to turn DDL jobs into DDLEvents.
Expand All @@ -192,6 +233,14 @@ func (m *ddlManager) tick(
m.justSentDDL = nil
m.updateCheckpointTs(checkpointTs, tableCheckpoint)

ok, err := m.checkAndSendBootstrapMsgs(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
}
if !ok {
return nil, nil, nil
}

currentTables, err := m.allTables(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -542,8 +591,7 @@ func (m *ddlManager) barrier() *schedulepb.BarrierWithMinTs {
return barrier
}

// allTables returns all tables in the schema that
// less or equal than the checkpointTs.
// allTables returns all tables in the schema in current checkpointTs.
func (m *ddlManager) allTables(ctx context.Context) ([]*model.TableInfo, error) {
if m.tableInfoCache != nil {
return m.tableInfoCache, nil
Expand Down
41 changes: 38 additions & 3 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package owner

import (
"context"
"encoding/json"
"fmt"
"testing"
Expand Down Expand Up @@ -50,7 +51,7 @@ func createDDLManagerForTest(t *testing.T) *ddlManager {
schema,
redo.NewDisabledDDLManager(),
redo.NewDisabledMetaManager(),
model.DB, false)
model.DB, false, false)
return res
}

Expand Down Expand Up @@ -246,9 +247,9 @@ func TestExecRenameTablesDDL(t *testing.T) {
}
require.Len(t, mockDDLSink.ddlHistory, 2)
require.Equal(t, "RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`",
mockDDLSink.ddlHistory[0])
mockDDLSink.ddlHistory[0].Query)
require.Equal(t, "RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`",
mockDDLSink.ddlHistory[1])
mockDDLSink.ddlHistory[1].Query)

// mock all rename table statements have been done
mockDDLSink.resetDDLDone = false
Expand Down Expand Up @@ -457,3 +458,37 @@ func TestIsGlobalDDL(t *testing.T) {
require.Equal(t, c.ret, isGlobalDDL(c.ddl))
}
}

func TestCheckAndSendBootstrapMsgs(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ddl1 := helper.DDL2Event("create table test.tb1(id int primary key)")
ddl2 := helper.DDL2Event("create table test.tb2(id int primary key)")

ctx := context.Background()
dm := createDDLManagerForTest(t)
dm.schema = helper.SchemaStorage()
dm.startTs, dm.checkpointTs = ddl2.CommitTs, ddl2.CommitTs

mockDDLSink := dm.ddlSink.(*mockDDLSink)
mockDDLSink.recordDDLHistory = true

// do not send all bootstrap messages
send, err := dm.checkAndSendBootstrapMsgs(ctx)
require.Nil(t, err)
require.True(t, send)
require.False(t, dm.bootstraped)
require.Equal(t, 0, len(mockDDLSink.ddlHistory))

// send all bootstrap messages -> tb1 and tb2
dm.shouldSendAllBootstrapAtStart = true
send, err = dm.checkAndSendBootstrapMsgs(ctx)
require.Nil(t, err)
require.True(t, send)
require.True(t, dm.bootstraped)
require.Equal(t, 2, len(mockDDLSink.ddlHistory))
require.True(t, mockDDLSink.ddlHistory[0].IsBootstrap)
require.True(t, mockDDLSink.ddlHistory[1].IsBootstrap)
require.Equal(t, ddl1.TableInfo.TableName, mockDDLSink.ddlHistory[0].TableInfo.TableName)
require.Equal(t, ddl2.TableInfo.TableName, mockDDLSink.ddlHistory[1].TableInfo.TableName)
}
14 changes: 10 additions & 4 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type DDLSink interface {
// the caller of this function can call again and again until a true returned
emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error)
emitSyncPoint(ctx context.Context, checkpointTs uint64) error
// emitBootstrap emits the table bootstrap event in a blocking way.
// It will return after the bootstrap event is sent.
emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error
// close the ddlsink, cancel running goroutine.
close(ctx context.Context) error
}
Expand Down Expand Up @@ -122,10 +125,6 @@ func ddlSinkInitializer(ctx context.Context, a *ddlSinkImpl) error {
return errors.Trace(err)
}
a.sink = s

if !util.GetOrZero(a.info.Config.EnableSyncPoint) {
return nil
}
return nil
}

Expand Down Expand Up @@ -476,3 +475,10 @@ func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {

return result, nil
}

func (s *ddlSinkImpl) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
if err := s.makeSinkReady(ctx); err != nil {
return errors.Trace(err)
}
return s.sink.WriteDDLEvent(ctx, bootstrap)
}
2 changes: 2 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
SendAllBootstrapAtStart: util.AddressOf(false),
}, cfg.Sink)
}

Expand Down Expand Up @@ -254,6 +255,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
}, cfg.Sink)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"open": {
"output-old-value": true
}
Expand Down Expand Up @@ -316,6 +317,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"open": {
"output-old-value": true
}
Expand Down Expand Up @@ -482,6 +484,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"open": {
"output-old-value": true
}
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var defaultReplicaConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(DefaultSendBootstrapIntervalInSec),
SendBootstrapInMsgCount: util.AddressOf(DefaultSendBootstrapInMsgCount),
SendBootstrapToAllPartition: util.AddressOf(DefaultSendBootstrapToAllPartition),
SendAllBootstrapAtStart: util.AddressOf(DefaultSendAllBootstrapAtStart),
OpenProtocol: &OpenProtocolConfig{OutputOldValue: true},
},
Consistent: &ConsistentConfig{
Expand Down
15 changes: 14 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ const (
// DefaultSendBootstrapToAllPartition is the default value of
// whether to send bootstrap message to all partitions.
DefaultSendBootstrapToAllPartition = true
// DefaultSendAllBootstrapAtStart is the default value of whether
// to send all tables bootstrap message at changefeed start.
DefaultSendAllBootstrapAtStart = false

// DefaultMaxReconnectToPulsarBroker is the default max reconnect times to pulsar broker.
// The pulsar client uses an exponential backoff with jitter to reconnect to the broker.
Expand Down Expand Up @@ -188,7 +191,7 @@ type SinkConfig struct {
// If set to false, bootstrap message will only be sent to the first partition of each topic.
// Default value is true.
SendBootstrapToAllPartition *bool `toml:"send-bootstrap-to-all-partition" json:"send-bootstrap-to-all-partition,omitempty"`

SendAllBootstrapAtStart *bool `toml:"send-all-bootstrap-at-start" json:"send-all-bootstrap-at-start,omitempty"`
// OpenProtocol related configurations
OpenProtocol *OpenProtocolConfig `toml:"open" json:"open,omitempty"`
}
Expand Down Expand Up @@ -220,6 +223,16 @@ func (s *SinkConfig) ShouldSendBootstrapMsg() bool {
util.GetOrZero(s.SendBootstrapInMsgCount) > 0
}

// ShouldSendAllBootstrapAtStart returns whether the should send all bootstrap message at changefeed start.
func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool {
if s == nil {
return false
}
should := s.ShouldSendBootstrapMsg() && util.GetOrZero(s.SendAllBootstrapAtStart)
log.Info("should send all bootstrap at start", zap.Bool("should", should))
return should
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields
Expand Down
26 changes: 26 additions & 0 deletions pkg/config/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,29 @@ func TestValidateAndAdjustStorageConfig(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 16, util.GetOrZero(s.Sink.FileIndexWidth))
}

func TestShouldSendBootstrapMsg(t *testing.T) {
t.Parallel()
sinkConfig := GetDefaultReplicaConfig().Sink
require.False(t, sinkConfig.ShouldSendBootstrapMsg())

protocol := "simple"
sinkConfig.Protocol = &protocol
require.True(t, sinkConfig.ShouldSendBootstrapMsg())

count := int32(0)
sinkConfig.SendBootstrapInMsgCount = &count
require.False(t, sinkConfig.ShouldSendBootstrapMsg())
}

func TestShouldSendAllBootstrapAtStart(t *testing.T) {
t.Parallel()
sinkConfig := GetDefaultReplicaConfig().Sink
protocol := "simple"
sinkConfig.Protocol = &protocol
require.False(t, sinkConfig.ShouldSendAllBootstrapAtStart())

should := true
sinkConfig.SendAllBootstrapAtStart = &should
require.True(t, sinkConfig.ShouldSendAllBootstrapAtStart())
}
Loading

0 comments on commit b00c3ee

Please sign in to comment.