Skip to content

Commit

Permalink
changefeedccl: fix initial scan checkpointing
Browse files Browse the repository at this point in the history
Previously, we forward the most up to date state for all spans in #102717 to reduce duplicates upon changefeed resumes. This can cause unexpected behaviour during initial scans since initial scan logic relies on all resolved timestamps being empty.
  • Loading branch information
wenyihu6 committed May 9, 2024
1 parent d472092 commit c6d1392
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,12 +524,20 @@ func makeKVFeedMonitoringCfg(
// to name its output files in lexicographically monotonic fashion.
func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) {
var initialHighWater hlc.Timestamp
isInitialScan := false
spans = make([]roachpb.Span, 0, len(ca.spec.Watches))
for _, watch := range ca.spec.Watches {
if initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater) {
spans = append(spans, watch.Span)
if watch.InitialResolved.IsEmpty() {
// Keep initialHighWater as empty if there are any zero InitialResolved
// timestamp to indicate that initial scan is required.
initialHighWater = hlc.Timestamp{}
isInitialScan = true
} else if !isInitialScan && (initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater)) {
// If there are no empty InitialResolved timestamps, initial scan is not
// required. Set initialHighWater as the minimum of all InitialResolved.
initialHighWater = watch.InitialResolved
}
spans = append(spans, watch.Span)
}

ca.frontier, err = makeSchemaChangeFrontier(initialHighWater, spans...)
Expand Down

0 comments on commit c6d1392

Please sign in to comment.