diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index e5f291e99f39..3bef4e310b0f 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -25,11 +25,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/version" + "github.com/cockroachdb/errors" ) const ( @@ -343,12 +345,47 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep { {"updated", ""}, {"resolved", fmt.Sprintf("'%s'", resolvedInterval)}, } - _, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). - With(options...). - Create() + creator := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)). + With(options...) + + jobID, err := creator.Create() if err != nil { t.Fatal(err) } + // In 22.1, a changefeed may fail during a rolling upgrade due to a bug + // which is fixed in 22.2 onwards. To accomodate for this bug, + // poll the changefeed job and restart it if it fails. + // See #106878 for more details. + go watchChangefeed(ctx, t, jobID, db, creator) + } +} + +// watchChangefeed polls a changefeed job every 1 second and re-creates if it +// failed. +func watchChangefeed( + ctx context.Context, t test.Test, jobID int, db *gosql.DB, creator *changefeedCreator, +) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + var feedStatus string + if err := db.QueryRow(`SELECT status FROM system.jobs WHERE id = $1`, jobID).Scan(&feedStatus); err != nil { + t.Fatal(errors.Wrap(err, "changefeed watcher failed to query jobs table")) + return + } + if feedStatus == string(jobs.StatusFailed) { + t.L().Printf("changefeed %d failed. recreating changefeed", jobID) + newJobID, err := creator.Create() + if err != nil { + t.Fatal(errors.Wrap(err, "changefeed watcher failed to recreate changefeed job")) + return + } + jobID = newJobID + t.L().Printf("successfully recreated changefeed as new job %d", jobID) + } + } } }