Skip to content

Commit

Permalink
cdc: Test for falling behind schema TTL
Browse files Browse the repository at this point in the history
Add a test that ensures that changefeeds properly exit if they fall far
enough behind that schema information has been lost due to the GC TTL
(that is, a historical row version can no longer be read because the
schema at its timestamp has been garbage collected).

I have also discovered why the sister test (for the table TTL, not the
schema) required a 3 second sleep: the GC queue enforces that replicas
must have an appropriately high "score" before being GCed, even when the
"shouldQueue" process is skipped. To get around this, I have changed
"ManuallyEnqueueSpan" to a more explicit "ManuallyGCSpan", which
directly calls the processing implementation of the gcQueue on the
appropriate replicas. Both that sister test, and the new schema TTL
test, now only require a more predictable 1 second sleep.

Resolves #28644

Release note: None
  • Loading branch information
Matt Tracy committed Oct 6, 2018
1 parent 9c8037d commit fe51cfc
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 34 deletions.
109 changes: 82 additions & 27 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -890,9 +887,6 @@ func TestChangefeedDataTTL(t *testing.T) {
// versions.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

// Set a one second GC time; all historical rows subject to GC ASAP.
sqlDB.Exec(t, `ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = $1`, 1)

counter := 0
upsertRow := func() {
counter++
Expand All @@ -916,32 +910,94 @@ func TestChangefeedDataTTL(t *testing.T) {
upsertRow()
upsertRow()

// TODO(mrtracy): Even though the GC TTL on the table is set to 1 second,
// this does not work at 1 or even 2 seconds. Investigate why this is the
// case.
time.Sleep(3 * time.Second)
// Sleep for the one second GC period (one second is the minimum).
time.Sleep(time.Second)

// Force a GC of the table. This should cause both older versions of the
// table to be deleted, with the middle version being lost to the changefeed.
tblID, err := sqlutils.QueryTableID(sqlDB.DB, "d", "foo")
if err != nil {
t.Fatal(err)
// Set a one second GC time; all historical rows subject to GC ASAP.
forceTableGC(t, f.Server(), sqlDB, "d", "foo")

// Resume our changefeed normally.
atomic.StoreInt32(&shouldWait, 0)
resume <- struct{}{}

// Verify that the third call to Next() returns an error (the first is the
// initial row, the second is the first change. The third should detect the
// GC interval mismatch).
_, _, _, _, _, _ = dataExpiredRows.Next(t)
_, _, _, _, _, _ = dataExpiredRows.Next(t)
_, _, _, _, _, _ = dataExpiredRows.Next(t)
if err := dataExpiredRows.Err(); !testutils.IsError(err, `must be after replica GC threshold`) {
t.Errorf(`expected "must be after replica GC threshold" error got: %+v`, err)
}
}

tablePrefix := keys.MakeTablePrefix(tblID)
tableStartKey := roachpb.RKey(tablePrefix)
tableSpan := roachpb.RSpan{
Key: tableStartKey,
EndKey: tableStartKey.PrefixEnd(),
t.Run("sinkless", enterpriseTest(testFn))
t.Run("enterprise", enterpriseTest(testFn))
}

// TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case
// where the feed has fallen behind the GC TTL of the table's schema.
func TestChangefeedSchemaTTL(t *testing.T) {
defer leaktest.AfterTest(t)()

testFn := func(t *testing.T, db *gosql.DB, f testfeedFactory) {
// Set a very simple channel-based, wait-and-resume function as the
// BeforeEmitRow hook.
var shouldWait int32
wait := make(chan struct{})
resume := make(chan struct{})
knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs.
DistSQL.(*distsqlrun.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func() error {
if atomic.LoadInt32(&shouldWait) == 0 {
return nil
}
wait <- struct{}{}
<-resume
return nil
}

ts := f.Server().(*server.TestServer)
if err := ts.GetStores().(*storage.Stores).VisitStores(func(st *storage.Store) error {
return st.ManuallyEnqueueSpan(context.Background(), "gc", tableSpan, true /* skipShouldQueue */)
}); err != nil {
t.Fatal(err)
sqlDB := sqlutils.MakeSQLRunner(db)

// Create the data table; it will only contain a single row with multiple
// versions.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

counter := 0
upsertRow := func() {
counter++
sqlDB.Exec(t, `UPSERT INTO foo (a, b) VALUES (1, $1)`, fmt.Sprintf("version %d", counter))
}

// Create the initial version of the row and the changefeed itself. The initial
// version is necessary to prevent CREATE CHANGEFEED itself from hanging.
upsertRow()
dataExpiredRows := f.Feed(t, "CREATE CHANGEFEED FOR TABLE foo")
defer dataExpiredRows.Close(t)

// Set up our emit trap and update the row, which will allow us to "pause" the
// changefeed in order to force a GC.
atomic.StoreInt32(&shouldWait, 1)
upsertRow()
<-wait

// Upsert two additional versions. One of these will be deleted by the GC
// process before changefeed polling is resumed.
waitForSchemaChange(t, sqlDB, "ALTER TABLE foo ADD COLUMN c STRING")
upsertRow()
waitForSchemaChange(t, sqlDB, "ALTER TABLE foo ADD COLUMN d STRING")
upsertRow()

// Sleep for the one second GC period (one second is the minimum).
time.Sleep(time.Second)

// Force a GC of the table. This should cause both older versions of the
// table to be deleted, with the middle version being lost to the changefeed.
forceTableGC(t, f.Server(), sqlDB, "system", "descriptor")

// Resume our changefeed normally.
atomic.StoreInt32(&shouldWait, 0)
resume <- struct{}{}
Expand All @@ -952,13 +1008,12 @@ func TestChangefeedDataTTL(t *testing.T) {
_, _, _, _, _, _ = dataExpiredRows.Next(t)
_, _, _, _, _, _ = dataExpiredRows.Next(t)
_, _, _, _, _, _ = dataExpiredRows.Next(t)
if err := dataExpiredRows.Err(); !testutils.IsError(err, `must be after replica GC threshold`) {
t.Errorf(`expected "must be after replica GC threshold" error got: %+v`, err)
if err := dataExpiredRows.Err(); !testutils.IsError(err, `must be after GC threshold`) {
t.Errorf(`expected "must be after GC threshold" error got: %+v`, err)
}
}

// Due to the minimum 3 second run time (due to needing to wait that long for
// rows to be properly GCed), only run an enterprise test.
t.Run("sinkless", enterpriseTest(testFn))
t.Run("enterprise", enterpriseTest(testFn))
}

Expand Down
56 changes: 56 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -525,6 +529,25 @@ func (c *tableFeed) Close(t testing.TB) {
c.urlCleanup()
}

func waitForSchemaChange(
t testing.TB, sqlDB *sqlutils.SQLRunner, stmt string, arguments ...interface{},
) {
sqlDB.Exec(t, stmt, arguments...)
row := sqlDB.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] ORDER BY created DESC LIMIT 1")
var jobID string
row.Scan(&jobID)

testutils.SucceedsSoon(t, func() error {
row := sqlDB.QueryRow(t, "SELECT status FROM [SHOW JOBS] WHERE job_id = $1", jobID)
var status string
row.Scan(&status)
if status != "succeeded" {
return fmt.Errorf("Job %s had status %s, wanted 'succeeded'", jobID, status)
}
return nil
})
}

func assertPayloads(t testing.TB, f testfeed, expected []string) {
t.Helper()

Expand Down Expand Up @@ -674,3 +697,36 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, testfeedFactory)) func(*t
testFn(t, db, f)
}
}

func forceTableGC(
t testing.TB,
tsi serverutils.TestServerInterface,
sqlDB *sqlutils.SQLRunner,
database, table string,
) {
var stmt string
if database == "system" {
stmt = `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = $1`
} else {
fmt.Sprintf(`ALTER TABLE %s.%s CONFIGURE ZONE USING gc.ttlseconds = $1`, database, table)
}
sqlDB.Exec(t, stmt, 1)
tblID, err := sqlutils.QueryTableID(sqlDB.DB, database, table)
if err != nil {
t.Fatal(err)
}

tablePrefix := keys.MakeTablePrefix(tblID)
tableStartKey := roachpb.RKey(tablePrefix)
tableSpan := roachpb.RSpan{
Key: tableStartKey,
EndKey: tableStartKey.PrefixEnd(),
}

ts := tsi.(*server.TestServer)
if err := ts.GetStores().(*storage.Stores).VisitStores(func(st *storage.Store) error {
return st.ManuallyGCSpan(context.Background(), tableSpan)
}); err != nil {
t.Fatal(err)
}
}
20 changes: 13 additions & 7 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4600,23 +4600,29 @@ func (s *Store) ManuallyEnqueue(
return collect(), "", nil
}

// ManuallyEnqueueSpan runs all replicas in the supplied span through the named
// queue. This is currently intended for use in internal tests which have access
// to the store directly.
func (s *Store) ManuallyEnqueueSpan(
ctx context.Context, queueName string, span roachpb.RSpan, skipShouldQueue bool,
) error {
// ManuallyGCSpan runs the garbage collection process immediately on all replicas
// in the provided span. This completely bypasses the normal GC scoring process/
// This is only appropriate for use in tests.
func (s *Store) ManuallyGCSpan(ctx context.Context, span roachpb.RSpan) error {
var outerErr error
newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
desc := repl.Desc()
if bytes.Compare(span.Key, desc.EndKey) >= 0 || bytes.Compare(desc.StartKey, span.EndKey) >= 0 {
return true // continue
}

if _, _, err := s.ManuallyEnqueue(ctx, queueName, repl, skipShouldQueue); err != nil {
sysCfg := s.cfg.Gossip.GetSystemConfig()
if sysCfg == nil {
outerErr = errors.New("cannot run queue without a valid system config; make sure the cluster " +
"has been initialized and all nodes connected to it")
return false
}
now := s.Clock().Now()
if err := s.gcQueue.processImpl(ctx, repl, sysCfg, now); err != nil {
outerErr = err
return false
}

return true
})
return outerErr
Expand Down

0 comments on commit fe51cfc

Please sign in to comment.