Skip to content

Commit

Permalink
roachtest/cdc/mixed-versions: allow for changefeed failures
Browse files Browse the repository at this point in the history
This test may flake due to the upgrade from 22.1->22.2. The
test asserts a changefeed remains running by checking for
resolved timestamps being emitted on a regular basis. The
problem with this is that, during the rolling upgrade,
the changefeed may fail with a "draining" error.
This issue is fixed in 22.2 onwards by treating all errors
as retryable.

Rather than skipping this test because 22.1 is EOLed, it is
preferable to still run this test regularly because it tests
22.2 functionality. This change adds a fix where the test
will poll the changefeed every 1s and recreate it if it fails.

Closes: cockroachdb#106878
Release note: None
Epic: None
  • Loading branch information
jayshrivastava committed Jul 24, 2023
1 parent 57c027d commit 0ab05d1
Showing 1 changed file with 56 additions and 3 deletions.
59 changes: 56 additions & 3 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
)

const (
Expand Down Expand Up @@ -101,6 +103,8 @@ type cdcMixedVersionTester struct {
testFinished bool
validatorFinished chan struct{}
cleanup func()

terminateWatcherCh chan struct{}
}

func newCDCMixedVersionTester(
Expand Down Expand Up @@ -199,6 +203,7 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep {
func (cmvt *cdcMixedVersionTester) finishTest() versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.L().Printf("waiting for background tasks to finish")
close(cmvt.terminateWatcherCh)
cmvt.testFinished = true
<-cmvt.validatorFinished
}
Expand Down Expand Up @@ -343,12 +348,60 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep {
{"updated", ""},
{"resolved", fmt.Sprintf("'%s'", resolvedInterval)},
}
_, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)).
With(options...).
Create()
creator := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)).
With(options...)

jobID, err := creator.Create()
if err != nil {
t.Fatal(err)
}
// In 22.1, a changefeed may fail during a rolling upgrade due to a bug
// which is fixed in 22.2 onwards. To accomodate for this bug,
// poll the changefeed job and restart it if it fails.
// See #106878 for more details.
if cmvt.terminateWatcherCh == nil {
cmvt.terminateWatcherCh = make(chan struct{})
}
cmvt.monitor.Go(func(ctx2 context.Context) error {
watchChangefeed(ctx2, t, cmvt.terminateWatcherCh, jobID, db, creator)
return nil
})
}
}

// watchChangefeed polls a changefeed job every 1 second and re-creates if it
// failed.
func watchChangefeed(
ctx context.Context,
t test.Test,
testFinished chan struct{},
jobID int,
db *gosql.DB,
creator *changefeedCreator,
) {
for {
select {
case <-ctx.Done():
return
case <-testFinished:
return
case <-time.After(1 * time.Second):
var feedStatus string
if err := db.QueryRow(`SELECT status FROM system.jobs WHERE id = $1`, jobID).Scan(&feedStatus); err != nil {
t.Fatal(errors.Wrap(err, "changefeed watcher failed to query jobs table"))
return
}
if feedStatus == string(jobs.StatusFailed) {
t.L().Printf("changefeed %d failed. recreating changefeed", jobID)
newJobID, err := creator.Create()
if err != nil {
t.Fatal(errors.Wrap(err, "changefeed watcher failed to recreate changefeed job"))
return
}
jobID = newJobID
t.L().Printf("successfully recreated changefeed as new job %d", jobID)
}
}
}
}

Expand Down

0 comments on commit 0ab05d1

Please sign in to comment.