Skip to content

Commit

Permalink
Merge pull request #107293 from jayshrivastava/release-22.2-skip-mixe…
Browse files Browse the repository at this point in the history
…d-versions

roachtest/cdc/mixed-versions: allow for changefeed failures
  • Loading branch information
jayshrivastava authored Jul 25, 2023
2 parents 24f5ce7 + 09de229 commit bd2fcf2
Showing 1 changed file with 56 additions and 3 deletions.
59 changes: 56 additions & 3 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
)

const (
Expand Down Expand Up @@ -101,6 +103,8 @@ type cdcMixedVersionTester struct {
testFinished bool
validatorFinished chan struct{}
cleanup func()

terminateWatcherCh chan struct{}
}

func newCDCMixedVersionTester(
Expand Down Expand Up @@ -199,6 +203,7 @@ func (cmvt *cdcMixedVersionTester) waitForResolvedTimestamps() versionStep {
func (cmvt *cdcMixedVersionTester) finishTest() versionStep {
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
t.L().Printf("waiting for background tasks to finish")
close(cmvt.terminateWatcherCh)
cmvt.testFinished = true
<-cmvt.validatorFinished
}
Expand Down Expand Up @@ -343,12 +348,60 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep {
{"updated", ""},
{"resolved", fmt.Sprintf("'%s'", resolvedInterval)},
}
_, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)).
With(options...).
Create()
creator := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)).
With(options...)

jobID, err := creator.Create()
if err != nil {
t.Fatal(err)
}
// In 22.1, a changefeed may fail during a rolling upgrade due to a bug
// which is fixed in 22.2 onwards. To accomodate for this bug,
// poll the changefeed job and restart it if it fails.
// See #106878 for more details.
if cmvt.terminateWatcherCh == nil {
cmvt.terminateWatcherCh = make(chan struct{})
}
cmvt.monitor.Go(func(ctx2 context.Context) error {
watchChangefeed(ctx2, t, cmvt.terminateWatcherCh, jobID, db, creator)
return nil
})
}
}

// watchChangefeed polls a changefeed job every 1 second and re-creates if it
// failed.
func watchChangefeed(
ctx context.Context,
t test.Test,
testFinished chan struct{},
jobID int,
db *gosql.DB,
creator *changefeedCreator,
) {
for {
select {
case <-ctx.Done():
return
case <-testFinished:
return
case <-time.After(1 * time.Second):
var feedStatus string
if err := db.QueryRow(`SELECT status FROM system.jobs WHERE id = $1`, jobID).Scan(&feedStatus); err != nil {
t.L().Printf("%s", errors.Wrap(err, "changefeed watcher failed to query jobs table"))
continue
}
if feedStatus == string(jobs.StatusFailed) {
t.L().Printf("changefeed %d failed. recreating changefeed", jobID)
newJobID, err := creator.Create()
if err != nil {
t.L().Printf("%s", errors.Wrap(err, "changefeed watcher failed to recreate changefeed job"))
continue
}
jobID = newJobID
t.L().Printf("successfully recreated changefeed as new job %d", jobID)
}
}
}
}

Expand Down

0 comments on commit bd2fcf2

Please sign in to comment.