Skip to content

Commit

Permalink
add checkpoint log
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Nov 25, 2021

Verified

This commit was signed with the committer’s verified signature.
myii Imran Iqbal
1 parent 5889bb4 commit ab8fc8c
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
@@ -177,12 +177,27 @@ func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
}

func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
// Log abnormal checkpoint that is large than resolved ts.
logAbnormalCheckpoint := func(ckpt uint64) {
if ckpt > resolvedTs {
log.L().WithOptions(zap.AddCallerSkip(1)).
Warn("checkpoint ts > resolved ts, flushed more than emitted",
zap.Int64("tableID", t.tableID),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("checkpointTs", ckpt))
}
}
i := sort.Search(len(t.buffer), func(i int) bool {
return t.buffer[i].CommitTs > resolvedTs
})
if i == 0 {
atomic.StoreUint64(&t.emittedTs, resolvedTs)
return t.manager.flushBackendSink(ctx)
ckpt, err := t.manager.flushBackendSink(ctx)
if err != nil {
return ckpt, err
}
logAbnormalCheckpoint(ckpt)
return ckpt, err
}
resolvedRows := t.buffer[:i]
t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...)
@@ -192,7 +207,12 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64
return t.manager.getCheckpointTs(), errors.Trace(err)
}
atomic.StoreUint64(&t.emittedTs, resolvedTs)
return t.manager.flushBackendSink(ctx)
ckpt, err := t.manager.flushBackendSink(ctx)
if err != nil {
return ckpt, err
}
logAbnormalCheckpoint(ckpt)
return ckpt, err
}

func (t *tableSink) getEmittedTs() uint64 {

0 comments on commit ab8fc8c

Please sign in to comment.