From b17cfccabb5fbbaf28866150bbc5aecec12a150d Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 8 Jun 2022 14:54:19 +0800 Subject: [PATCH] latency(cdc): fix ddl does not execute caused by incorrect checkpoint calculation (#5761) * fix tp advance checkpoint. * fix ddl sink log. * add a ut for replication manager advance checkpoint ts. --- cdc/owner/ddl_puller.go | 3 - cdc/owner/ddl_sink.go | 2 +- .../internal/tp/replication_manager.go | 8 +- .../internal/tp/replication_manager_test.go | 88 +++++++++++++++++++ 4 files changed, 91 insertions(+), 10 deletions(-) diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index 6f560bc3a7e..b4dc9c4ff7a 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -147,9 +147,6 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error { return errors.Trace(err) } if job == nil { - log.Info("ddl job is nil after unmarshal", - zap.String("namespace", h.changefeedID.Namespace), - zap.String("changefeed", h.changefeedID.ID)) return nil } if h.filter.ShouldDiscardDDL(job.Type) { diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index f1c629e3551..466303a22b9 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -269,7 +269,7 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) log.Info("ddl is sent", zap.String("namespace", ctx.ChangefeedVars().ID.Namespace), zap.String("changefeed", ctx.ChangefeedVars().ID.ID), - zap.Uint64("ddlSentTs", ddlSentTs)) + zap.Uint64("ddlSentTs", ddl.CommitTs)) default: log.Warn("ddl chan full, send it the next round", zap.String("namespace", ctx.ChangefeedVars().ID.Namespace), diff --git a/cdc/scheduler/internal/tp/replication_manager.go b/cdc/scheduler/internal/tp/replication_manager.go index ddeea1e5ae7..81e6acbd283 100644 --- a/cdc/scheduler/internal/tp/replication_manager.go +++ b/cdc/scheduler/internal/tp/replication_manager.go @@ -387,25 +387,21 @@ func (r *replicationManager) AdvanceCheckpoint( currentTables []model.TableID, ) (newCheckpointTs, newResolvedTs model.Ts) { newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64 - checkpointChanged := false for _, tableID := range currentTables { table, ok := r.tables[tableID] if !ok { // Can not advance checkpoint there is a table missing. + log.Warn("tpscheduler: cannot advance checkpoint since missing table", + zap.Int64("tableID", tableID)) return checkpointCannotProceed, checkpointCannotProceed } // Find the minimum checkpoint ts and resolved ts. if newCheckpointTs > table.Checkpoint.CheckpointTs { newCheckpointTs = table.Checkpoint.CheckpointTs - checkpointChanged = true } if newResolvedTs > table.Checkpoint.ResolvedTs { newResolvedTs = table.Checkpoint.ResolvedTs - checkpointChanged = true } } - if !checkpointChanged { - return checkpointCannotProceed, checkpointCannotProceed - } return newCheckpointTs, newResolvedTs } diff --git a/cdc/scheduler/internal/tp/replication_manager_test.go b/cdc/scheduler/internal/tp/replication_manager_test.go index ae83bb2564e..dfc35dbfdef 100644 --- a/cdc/scheduler/internal/tp/replication_manager_test.go +++ b/cdc/scheduler/internal/tp/replication_manager_test.go @@ -542,6 +542,94 @@ func TestReplicationManagerMaxTaskConcurrency(t *testing.T) { require.Len(t, msgs, 0) } +func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { + t.Parallel() + + r := newReplicationManager(1) + rs, err := newReplicationSet(model.TableID(1), model.Ts(10), + map[model.CaptureID]*schedulepb.TableStatus{ + "1": { + TableID: model.TableID(1), + State: schedulepb.TableStateReplicating, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: model.Ts(10), + ResolvedTs: model.Ts(20), + }, + }, + }) + require.NoError(t, err) + r.tables[model.TableID(1)] = rs + + rs, err = newReplicationSet(model.TableID(2), model.Ts(15), + map[model.CaptureID]*schedulepb.TableStatus{ + "2": { + TableID: model.TableID(2), + State: schedulepb.TableStateReplicating, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: model.Ts(15), + ResolvedTs: model.Ts(30), + }, + }, + }) + require.NoError(t, err) + r.tables[model.TableID(2)] = rs + + // all table is replicating + currentTables := []model.TableID{1, 2} + checkpoint, resolved := r.AdvanceCheckpoint(currentTables) + require.Equal(t, model.Ts(10), checkpoint) + require.Equal(t, model.Ts(20), resolved) + + // some table not exist yet. + currentTables = append(currentTables, 3) + checkpoint, resolved = r.AdvanceCheckpoint(currentTables) + require.Equal(t, checkpointCannotProceed, checkpoint) + require.Equal(t, checkpointCannotProceed, resolved) + + rs, err = newReplicationSet(model.TableID(3), model.Ts(5), + map[model.CaptureID]*schedulepb.TableStatus{ + "1": { + TableID: model.TableID(3), + State: schedulepb.TableStateReplicating, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: model.Ts(5), + ResolvedTs: model.Ts(40), + }, + }, + "2": { + TableID: model.TableID(3), + State: schedulepb.TableStatePreparing, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: model.Ts(5), + ResolvedTs: model.Ts(40), + }, + }, + }) + require.NoError(t, err) + r.tables[model.TableID(3)] = rs + checkpoint, resolved = r.AdvanceCheckpoint(currentTables) + require.Equal(t, model.Ts(5), checkpoint) + require.Equal(t, model.Ts(20), resolved) + + currentTables = append(currentTables, 4) + rs, err = newReplicationSet(model.TableID(4), model.Ts(3), + map[model.CaptureID]*schedulepb.TableStatus{ + "1": { + TableID: model.TableID(4), + State: schedulepb.TableStatePrepared, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: model.Ts(3), + ResolvedTs: model.Ts(10), + }, + }, + }) + require.NoError(t, err) + r.tables[model.TableID(4)] = rs + checkpoint, resolved = r.AdvanceCheckpoint(currentTables) + require.Equal(t, model.Ts(3), checkpoint) + require.Equal(t, model.Ts(10), resolved) +} + func TestReplicationManagerHandleCaptureChanges(t *testing.T) { t.Parallel()