diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 0511c92417ee..989d6af259ae 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -685,6 +685,28 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) } +// Regression test for #34314 +func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { + defer leaktest.AfterTest(t)() + + testFn := func(t *testing.T, db *gosql.DB, f testfeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE after_backfill (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO after_backfill VALUES (0)`) + sqlDB.Exec(t, `ALTER TABLE after_backfill ADD COLUMN b INT DEFAULT 1`) + sqlDB.Exec(t, `INSERT INTO after_backfill VALUES (2, 3)`) + afterBackfill := f.Feed(t, `CREATE CHANGEFEED FOR after_backfill`) + defer afterBackfill.Close(t) + assertPayloads(t, afterBackfill, []string{ + `after_backfill: [0]->{"a": 0, "b": 1}`, + `after_backfill: [2]->{"a": 2, "b": 3}`, + }) + } + + t.Run(`sinkless`, sinklessTest(testFn)) + t.Run(`enterprise`, enterpriseTest(testFn)) +} + func TestChangefeedInterleaved(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index d14acab997cc..d92b089dadf4 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -560,7 +560,7 @@ func assertPayloads(t testing.TB, f testfeed, expected []string) { for len(actual) < len(expected) { topic, _, key, value, _, ok := f.Next(t) if !ok { - break + t.Fatalf(`expected another row: %s`, f.Err()) } else if key != nil { actual = append(actual, fmt.Sprintf(`%s: %s->%s`, topic, key, value)) } diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 852cc2accdae..236566b2d802 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -615,27 +615,32 @@ func (p *poller) validateTable(ctx context.Context, desc *sqlbase.TableDescripto } if lastVersion.HasColumnBackfillMutation() && !desc.HasColumnBackfillMutation() { boundaryTime := desc.GetModificationTime() - if boundaryTime.Less(p.mu.highWater) { - return fmt.Errorf( - "error: detected table ID %d backfill completed at %s earlier than highwater timestamp %s", - desc.ID, - boundaryTime, - p.mu.highWater, - ) - } - p.mu.scanBoundaries = append(p.mu.scanBoundaries, boundaryTime) - sort.Slice(p.mu.scanBoundaries, func(i, j int) bool { - return p.mu.scanBoundaries[i].Less(p.mu.scanBoundaries[j]) - }) - // To avoid race conditions with the lease manager, at this point we force - // the manager to acquire the freshest descriptor of this table from the - // store. In normal operation, the lease manager returns the newest - // descriptor it knows about for the timestamp, assuming it's still - // allowed; without this explicit load, the lease manager might therefore - // return the previous version of the table, which is still technically - // allowed by the schema change system. - if err := p.leaseMgr.AcquireFreshestFromStore(ctx, desc.ID); err != nil { - return err + // Only mutations that happened after the changefeed started are + // interesting here. + if p.details.StatementTime.Less(boundaryTime) { + if boundaryTime.Less(p.mu.highWater) { + return fmt.Errorf( + "error: detected table ID %d backfill completed at %s "+ + "earlier than highwater timestamp %s", + desc.ID, + boundaryTime, + p.mu.highWater, + ) + } + p.mu.scanBoundaries = append(p.mu.scanBoundaries, boundaryTime) + sort.Slice(p.mu.scanBoundaries, func(i, j int) bool { + return p.mu.scanBoundaries[i].Less(p.mu.scanBoundaries[j]) + }) + // To avoid race conditions with the lease manager, at this point we force + // the manager to acquire the freshest descriptor of this table from the + // store. In normal operation, the lease manager returns the newest + // descriptor it knows about for the timestamp, assuming it's still + // allowed; without this explicit load, the lease manager might therefore + // return the previous version of the table, which is still technically + // allowed by the schema change system. + if err := p.leaseMgr.AcquireFreshestFromStore(ctx, desc.ID); err != nil { + return err + } } } }