Skip to content

Commit

Permalink
latency(cdc): fix ddl does not execute caused by incorrect checkpoint…
Browse files Browse the repository at this point in the history
… calculation (pingcap#5761)

* fix tp advance checkpoint.
* fix ddl sink log.
* add a ut for replication manager advance checkpoint ts.
  • Loading branch information
3AceShowHand authored and overvenus committed Jun 24, 2022
1 parent 86c9445 commit edb8ef5
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 10 deletions.
3 changes: 0 additions & 3 deletions cdc/owner/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,6 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error {
return errors.Trace(err)
}
if job == nil {
log.Info("ddl job is nil after unmarshal",
zap.String("namespace", h.changefeedID.Namespace),
zap.String("changefeed", h.changefeedID.ID))
return nil
}
if h.filter.ShouldDiscardDDL(job.Type) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent)
log.Info("ddl is sent",
zap.String("namespace", ctx.ChangefeedVars().ID.Namespace),
zap.String("changefeed", ctx.ChangefeedVars().ID.ID),
zap.Uint64("ddlSentTs", ddlSentTs))
zap.Uint64("ddlSentTs", ddl.CommitTs))
default:
log.Warn("ddl chan full, send it the next round",
zap.String("namespace", ctx.ChangefeedVars().ID.Namespace),
Expand Down
8 changes: 2 additions & 6 deletions cdc/scheduler/internal/tp/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,25 +387,21 @@ func (r *replicationManager) AdvanceCheckpoint(
currentTables []model.TableID,
) (newCheckpointTs, newResolvedTs model.Ts) {
newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
checkpointChanged := false
for _, tableID := range currentTables {
table, ok := r.tables[tableID]
if !ok {
// Can not advance checkpoint there is a table missing.
log.Warn("tpscheduler: cannot advance checkpoint since missing table",
zap.Int64("tableID", tableID))
return checkpointCannotProceed, checkpointCannotProceed
}
// Find the minimum checkpoint ts and resolved ts.
if newCheckpointTs > table.Checkpoint.CheckpointTs {
newCheckpointTs = table.Checkpoint.CheckpointTs
checkpointChanged = true
}
if newResolvedTs > table.Checkpoint.ResolvedTs {
newResolvedTs = table.Checkpoint.ResolvedTs
checkpointChanged = true
}
}
if !checkpointChanged {
return checkpointCannotProceed, checkpointCannotProceed
}
return newCheckpointTs, newResolvedTs
}
88 changes: 88 additions & 0 deletions cdc/scheduler/internal/tp/replication_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,94 @@ func TestReplicationManagerMaxTaskConcurrency(t *testing.T) {
require.Len(t, msgs, 0)
}

func TestReplicationManagerAdvanceCheckpoint(t *testing.T) {
t.Parallel()

r := newReplicationManager(1)
rs, err := newReplicationSet(model.TableID(1), model.Ts(10),
map[model.CaptureID]*schedulepb.TableStatus{
"1": {
TableID: model.TableID(1),
State: schedulepb.TableStateReplicating,
Checkpoint: schedulepb.Checkpoint{
CheckpointTs: model.Ts(10),
ResolvedTs: model.Ts(20),
},
},
})
require.NoError(t, err)
r.tables[model.TableID(1)] = rs

rs, err = newReplicationSet(model.TableID(2), model.Ts(15),
map[model.CaptureID]*schedulepb.TableStatus{
"2": {
TableID: model.TableID(2),
State: schedulepb.TableStateReplicating,
Checkpoint: schedulepb.Checkpoint{
CheckpointTs: model.Ts(15),
ResolvedTs: model.Ts(30),
},
},
})
require.NoError(t, err)
r.tables[model.TableID(2)] = rs

// all table is replicating
currentTables := []model.TableID{1, 2}
checkpoint, resolved := r.AdvanceCheckpoint(currentTables)
require.Equal(t, model.Ts(10), checkpoint)
require.Equal(t, model.Ts(20), resolved)

// some table not exist yet.
currentTables = append(currentTables, 3)
checkpoint, resolved = r.AdvanceCheckpoint(currentTables)
require.Equal(t, checkpointCannotProceed, checkpoint)
require.Equal(t, checkpointCannotProceed, resolved)

rs, err = newReplicationSet(model.TableID(3), model.Ts(5),
map[model.CaptureID]*schedulepb.TableStatus{
"1": {
TableID: model.TableID(3),
State: schedulepb.TableStateReplicating,
Checkpoint: schedulepb.Checkpoint{
CheckpointTs: model.Ts(5),
ResolvedTs: model.Ts(40),
},
},
"2": {
TableID: model.TableID(3),
State: schedulepb.TableStatePreparing,
Checkpoint: schedulepb.Checkpoint{
CheckpointTs: model.Ts(5),
ResolvedTs: model.Ts(40),
},
},
})
require.NoError(t, err)
r.tables[model.TableID(3)] = rs
checkpoint, resolved = r.AdvanceCheckpoint(currentTables)
require.Equal(t, model.Ts(5), checkpoint)
require.Equal(t, model.Ts(20), resolved)

currentTables = append(currentTables, 4)
rs, err = newReplicationSet(model.TableID(4), model.Ts(3),
map[model.CaptureID]*schedulepb.TableStatus{
"1": {
TableID: model.TableID(4),
State: schedulepb.TableStatePrepared,
Checkpoint: schedulepb.Checkpoint{
CheckpointTs: model.Ts(3),
ResolvedTs: model.Ts(10),
},
},
})
require.NoError(t, err)
r.tables[model.TableID(4)] = rs
checkpoint, resolved = r.AdvanceCheckpoint(currentTables)
require.Equal(t, model.Ts(3), checkpoint)
require.Equal(t, model.Ts(10), resolved)
}

func TestReplicationManagerHandleCaptureChanges(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit edb8ef5

Please sign in to comment.