diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index e5f291e99f39..ee8fb5f927d1 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -25,11 +25,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/version" + "github.com/cockroachdb/errors" ) const ( @@ -101,6 +103,8 @@ type cdcMixedVersionTester struct { testFinished bool validatorFinished chan struct{} cleanup func() + + terminateWatcherCh chan struct{} } func newCDCMixedVersionTester( @@ -199,6 +203,7 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep { func (cmvt *cdcMixedVersionTester) finishTest() versionStep { return func(ctx context.Context, t test.Test, u *versionUpgradeTest) { t.L().Printf("waiting for background tasks to finish") + close(cmvt.terminateWatcherCh) cmvt.testFinished = true <-cmvt.validatorFinished } @@ -343,12 +348,60 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep { {"updated", ""}, {"resolved", fmt.Sprintf("'%s'", resolvedInterval)}, } - _, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). - With(options...). - Create() + creator := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). + With(options...) + + jobID, err := creator.Create() if err != nil { t.Fatal(err) } + // In 22.1, a changefeed may fail during a rolling upgrade due to a bug + // which is fixed in 22.2 onwards. To accomodate for this bug, + // poll the changefeed job and restart it if it fails. + // See #106878 for more details. + if cmvt.terminateWatcherCh == nil { + cmvt.terminateWatcherCh = make(chan struct{}) + } + cmvt.monitor.Go(func(ctx2 context.Context) error { + watchChangefeed(ctx2, t, cmvt.terminateWatcherCh, jobID, db, creator) + return nil + }) + } +} + +// watchChangefeed polls a changefeed job every 1 second and re-creates if it +// failed. +func watchChangefeed( + ctx context.Context, + t test.Test, + testFinished chan struct{}, + jobID int, + db *gosql.DB, + creator *changefeedCreator, +) { + for { + select { + case <-ctx.Done(): + return + case <-testFinished: + return + case <-time.After(1 * time.Second): + var feedStatus string + if err := db.QueryRow(`SELECT status FROM system.jobs WHERE id = $1`, jobID).Scan(&feedStatus); err != nil { + t.Fatal(errors.Wrap(err, "changefeed watcher failed to query jobs table")) + return + } + if feedStatus == string(jobs.StatusFailed) { + t.L().Printf("changefeed %d failed. recreating changefeed", jobID) + newJobID, err := creator.Create() + if err != nil { + t.Fatal(errors.Wrap(err, "changefeed watcher failed to recreate changefeed job")) + return + } + jobID = newJobID + t.L().Printf("successfully recreated changefeed as new job %d", jobID) + } + } } }