Skip to content

Commit

Permalink
changefeed(ticdc): refactor initialize the ddl puller and schema stor…
Browse files Browse the repository at this point in the history
…age related steps (#10199)

close #10246
  • Loading branch information
3AceShowHand authored Dec 6, 2023
1 parent e16c52d commit cf3d7bf
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 236 deletions.
15 changes: 5 additions & 10 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,7 @@ func TestCreateSnapFromMeta(t *testing.T) {
tk.MustExec("create table test2.simple_test5 (a bigint)")
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
require.Nil(t, err)
meta := kv.GetSnapshotMeta(store, ver.Ver)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
snap, err := schema.NewSnapshotFromMeta(meta, ver.Ver, false, f)
Expand Down Expand Up @@ -729,14 +728,12 @@ func TestExplicitTables(t *testing.T) {
tk.MustExec("create table test2.simple_test5 (a varchar(20))")
ver2, err := store.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
meta1, err := kv.GetSnapshotMeta(store, ver1.Ver)
require.Nil(t, err)
meta1 := kv.GetSnapshotMeta(store, ver1.Ver)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
snap1, err := schema.NewSnapshotFromMeta(meta1, ver1.Ver, true /* forceReplicate */, f)
require.Nil(t, err)
meta2, err := kv.GetSnapshotMeta(store, ver2.Ver)
require.Nil(t, err)
meta2 := kv.GetSnapshotMeta(store, ver2.Ver)
snap2, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, false /* forceReplicate */, f)
require.Nil(t, err)
snap3, err := schema.NewSnapshotFromMeta(meta2, ver2.Ver, true /* forceReplicate */, f)
Expand Down Expand Up @@ -892,8 +889,7 @@ func TestSchemaStorage(t *testing.T) {

for _, job := range jobs {
ts := job.BinlogInfo.FinishedTS
meta, err := kv.GetSnapshotMeta(store, ts)
require.Nil(t, err)
meta := kv.GetSnapshotMeta(store, ts)
snapFromMeta, err := schema.NewSnapshotFromMeta(meta, ts, false, f)
require.Nil(t, err)
snapFromSchemaStore, err := schemaStorage.GetSnapshot(ctx, ts)
Expand Down Expand Up @@ -973,8 +969,7 @@ func TestHandleKey(t *testing.T) {
tk.MustExec("create table test.simple_test3 (id bigint, age int)")
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
meta, err := kv.GetSnapshotMeta(store, ver.Ver)
require.Nil(t, err)
meta := kv.GetSnapshotMeta(store, ver.Ver)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig(), "")
require.Nil(t, err)
snap, err := schema.NewSnapshotFromMeta(meta, ver.Ver, false, f)
Expand Down
5 changes: 1 addition & 4 deletions cdc/entry/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ func VerifyTables(
eligibleTables []model.TableName,
err error,
) {
meta, err := kv.GetSnapshotMeta(storage, startTs)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
meta := kv.GetSnapshotMeta(storage, startTs)
snap, err := schema.NewSingleSnapshotFromMeta(meta, startTs, false /* explicitTables */, f)
if err != nil {
return nil, nil, nil, errors.Trace(err)
Expand Down
5 changes: 2 additions & 3 deletions cdc/kv/store_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import (
)

// GetSnapshotMeta returns tidb meta information
// TODO: Simplify the signature of this function
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) (*meta.Meta, error) {
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) *meta.Meta {
snapshot := tiStore.GetSnapshot(tidbkv.NewVersion(ts))
return meta.NewSnapshotMeta(snapshot), nil
return meta.NewSnapshotMeta(snapshot)
}

// CreateTiStore creates a tikv storage client
Expand Down
33 changes: 10 additions & 23 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,12 @@ type changefeed struct {
observerLastTick *atomic.Time

newDDLPuller func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error)
) puller.DDLPuller

newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
Expand Down Expand Up @@ -209,13 +208,12 @@ func newChangefeed4Test(
cfStatus *model.ChangeFeedStatus,
cfstateManager FeedStateManager, up *upstream.Upstream,
newDDLPuller func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error),
) puller.DDLPuller,
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
Expand Down Expand Up @@ -331,11 +329,10 @@ func (c *changefeed) handleWarning(err error) {
})
}

func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context,
cfInfo *model.ChangeFeedInfo,
checkpointTs uint64,
func (c *changefeed) checkStaleCheckpointTs(
ctx cdcContext.Context, checkpointTs uint64,
) error {
if cfInfo.NeedBlockGC() {
if c.latestInfo.NeedBlockGC() {
failpoint.Inject("InjectChangefeedFastFailError", func() error {
return cerror.ErrStartTsBeforeGC.FastGen("InjectChangefeedFastFailError")
})
Expand All @@ -355,7 +352,7 @@ func (c *changefeed) tick(ctx cdcContext.Context,
preCheckpointTs := c.latestInfo.GetCheckpointTs(c.latestStatus)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, c.latestInfo, preCheckpointTs); err != nil {
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs); err != nil {
return 0, 0, errors.Trace(err)
}

Expand Down Expand Up @@ -385,8 +382,7 @@ func (c *changefeed) tick(ctx cdcContext.Context,
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs)
if err != nil {
return 0, 0, errors.Trace(err)
}
Expand Down Expand Up @@ -568,7 +564,7 @@ LOOP2:
}
c.barriers.Update(finishBarrier, c.latestInfo.GetTargetTs())

filter, err := filter.NewFilter(c.latestInfo.Config, "")
f, err := filter.NewFilter(c.latestInfo.Config, "")
if err != nil {
return errors.Trace(err)
}
Expand All @@ -577,7 +573,7 @@ LOOP2:
ddlStartTs,
c.latestInfo.Config,
c.id,
filter)
f)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -604,16 +600,7 @@ LOOP2:
})
c.ddlSink.run(cancelCtx)

c.ddlPuller, err = c.newDDLPuller(cancelCtx,
c.latestInfo.Config,
c.upstream, ddlStartTs,
c.id,
c.schema,
filter)
if err != nil {
return errors.Trace(err)
}

c.ddlPuller = c.newDDLPuller(cancelCtx, c.upstream, ddlStartTs, c.id, c.schema, f)
c.wg.Add(1)
go func() {
defer c.wg.Done()
Expand Down
9 changes: 4 additions & 5 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,13 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
state.Info, state.Status, newFeedStateManager(up, state), up,
// new ddl puller
func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
up *upstream.Upstream,
startTs uint64,
changefeed model.ChangeFeedID,
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error) {
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}, nil
) puller.DDLPuller {
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}
},
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
Expand Down Expand Up @@ -641,7 +640,7 @@ func TestBarrierAdvance(t *testing.T) {
if i == 1 {
cf.ddlManager.ddlResolvedTs += 10
}
_, barrier, err := cf.ddlManager.tick(ctx, state.Status.CheckpointTs, nil)
_, barrier, err := cf.ddlManager.tick(ctx, state.Status.CheckpointTs)

require.Nil(t, err)

Expand All @@ -668,7 +667,7 @@ func TestBarrierAdvance(t *testing.T) {

// Then the last tick barrier must be advanced correctly.
cf.ddlManager.ddlResolvedTs += 1000000000000
_, barrier, err = cf.ddlManager.tick(ctx, state.Status.CheckpointTs+10, nil)
_, barrier, err = cf.ddlManager.tick(ctx, state.Status.CheckpointTs+10)
require.Nil(t, err)
err = cf.handleBarrier(ctx, state.Info, state.Status, barrier)

Expand Down
Loading

0 comments on commit cf3d7bf

Please sign in to comment.