Skip to content

Commit

Permalink
sinkv2(ticdc): rename
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Jun 29, 2022
1 parent 5e2b39f commit 51192af
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions cdc/sinkv2/eventsink/event_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ func (t *TxnEventAppender) Append(
continue
}

currentLastTxn := buffer[len(buffer)-1]
lastTxn := buffer[len(buffer)-1]
// Normally, this means the commitTs grows.
if currentLastTxn.GetCommitTs() != row.CommitTs ||
if lastTxn.GetCommitTs() != row.CommitTs ||
// Normally, this means we meet a new big txn batch.
row.SplitTxn ||
// Normally, this means we meet a new txn.
currentLastTxn.StartTs < row.StartTs {
lastTxn.StartTs < row.StartTs {
// Fail-fast check
commitTsDecreased := currentLastTxn.GetCommitTs() > row.CommitTs
commitTsDecreased := lastTxn.GetCommitTs() > row.CommitTs
if commitTsDecreased {
log.Panic("The commitTs of the emit row is less than the received row",
zap.Uint64("lastReceivedCommitTs", buffer[len(buffer)-1].GetCommitTs()),
zap.Any("row", row))
}
startTsDecreased := currentLastTxn.StartTs > row.StartTs
startTsDecreased := lastTxn.StartTs > row.StartTs
if startTsDecreased {
log.Panic("The startTs of the emit row is less than the received row",
zap.Any("lastReceivedStartTs", buffer[len(buffer)-1].GetCommitTs()),
Expand Down

0 comments on commit 51192af

Please sign in to comment.