Skip to content

Commit

Permalink
Merge pull request #34362 from danhhz/backport2.1-34317
Browse files Browse the repository at this point in the history
release-2.1: changefeedccl: fix bug with changefeeds on previously backfilled tables
  • Loading branch information
danhhz authored Jan 30, 2019
2 parents d364a23 + 228d776 commit f122145
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 22 deletions.
22 changes: 22 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,28 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
t.Run(`enterprise`, enterpriseTest(testFn))
}

// Regression test for #34314
func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()

testFn := func(t *testing.T, db *gosql.DB, f testfeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE after_backfill (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO after_backfill VALUES (0)`)
sqlDB.Exec(t, `ALTER TABLE after_backfill ADD COLUMN b INT DEFAULT 1`)
sqlDB.Exec(t, `INSERT INTO after_backfill VALUES (2, 3)`)
afterBackfill := f.Feed(t, `CREATE CHANGEFEED FOR after_backfill`)
defer afterBackfill.Close(t)
assertPayloads(t, afterBackfill, []string{
`after_backfill: [0]->{"a": 0, "b": 1}`,
`after_backfill: [2]->{"a": 2, "b": 3}`,
})
}

t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestChangefeedInterleaved(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func assertPayloads(t testing.TB, f testfeed, expected []string) {
for len(actual) < len(expected) {
topic, _, key, value, _, ok := f.Next(t)
if !ok {
break
t.Fatalf(`expected another row: %s`, f.Err())
} else if key != nil {
actual = append(actual, fmt.Sprintf(`%s: %s->%s`, topic, key, value))
}
Expand Down
47 changes: 26 additions & 21 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,27 +615,32 @@ func (p *poller) validateTable(ctx context.Context, desc *sqlbase.TableDescripto
}
if lastVersion.HasColumnBackfillMutation() && !desc.HasColumnBackfillMutation() {
boundaryTime := desc.GetModificationTime()
if boundaryTime.Less(p.mu.highWater) {
return fmt.Errorf(
"error: detected table ID %d backfill completed at %s earlier than highwater timestamp %s",
desc.ID,
boundaryTime,
p.mu.highWater,
)
}
p.mu.scanBoundaries = append(p.mu.scanBoundaries, boundaryTime)
sort.Slice(p.mu.scanBoundaries, func(i, j int) bool {
return p.mu.scanBoundaries[i].Less(p.mu.scanBoundaries[j])
})
// To avoid race conditions with the lease manager, at this point we force
// the manager to acquire the freshest descriptor of this table from the
// store. In normal operation, the lease manager returns the newest
// descriptor it knows about for the timestamp, assuming it's still
// allowed; without this explicit load, the lease manager might therefore
// return the previous version of the table, which is still technically
// allowed by the schema change system.
if err := p.leaseMgr.AcquireFreshestFromStore(ctx, desc.ID); err != nil {
return err
// Only mutations that happened after the changefeed started are
// interesting here.
if p.details.StatementTime.Less(boundaryTime) {
if boundaryTime.Less(p.mu.highWater) {
return fmt.Errorf(
"error: detected table ID %d backfill completed at %s "+
"earlier than highwater timestamp %s",
desc.ID,
boundaryTime,
p.mu.highWater,
)
}
p.mu.scanBoundaries = append(p.mu.scanBoundaries, boundaryTime)
sort.Slice(p.mu.scanBoundaries, func(i, j int) bool {
return p.mu.scanBoundaries[i].Less(p.mu.scanBoundaries[j])
})
// To avoid race conditions with the lease manager, at this point we force
// the manager to acquire the freshest descriptor of this table from the
// store. In normal operation, the lease manager returns the newest
// descriptor it knows about for the timestamp, assuming it's still
// allowed; without this explicit load, the lease manager might therefore
// return the previous version of the table, which is still technically
// allowed by the schema change system.
if err := p.leaseMgr.AcquireFreshestFromStore(ctx, desc.ID); err != nil {
return err
}
}
}
}
Expand Down

0 comments on commit f122145

Please sign in to comment.