Skip to content

Commit

Permalink
roachtest/cdc/mixed-versions: allow for changefeed failures
Browse files Browse the repository at this point in the history
This test may flake due to the upgrade from 22.1->22.2. The
test asserts a changefeed remains running by checking for
resolved timestamps being emitted on a regular basis. The
problem with this is that, during the rolling upgrade,
the changefeed may fail with a "draining" error.
This issue is fixed in 22.2 onwards by treating all errors
as retryable.

Rather than skipping this test because 22.1 is EOLed, it is
preferable to still run this test regularly because it tests
22.2 functionality. This change adds a fix where the test
will poll the changefeed every 1s and recreate it if it fails.

Closes: #106878
Release note: None
Epic: None
  • Loading branch information
jayshrivastava committed Jul 21, 2023
1 parent 57c027d commit 3ab305d
Showing 1 changed file with 40 additions and 3 deletions.
43 changes: 40 additions & 3 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
)

const (
Expand Down Expand Up @@ -343,12 +345,47 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep {
{"updated", ""},
{"resolved", fmt.Sprintf("'%s'", resolvedInterval)},
}
_, err := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)).
With(options...).
Create()
creator := newChangefeedCreator(db, fmt.Sprintf("%s.%s", targetDB, targetTable), cmvt.kafka.sinkURL(ctx)).
With(options...)

jobID, err := creator.Create()
if err != nil {
t.Fatal(err)
}
// In 22.1, a changefeed may fail during a rolling upgrade due to a bug
// which is fixed in 22.2 onwards. To accomodate for this bug,
// poll the changefeed job and restart it if it fails.
// See #106878 for more details.
go watchChangefeed(ctx, t, jobID, db, creator)
}
}

// watchChangefeed polls a changefeed job every 1 second and re-creates if it
// failed.
func watchChangefeed(
ctx context.Context, t test.Test, jobID int, db *gosql.DB, creator *changefeedCreator,
) {
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
var feedStatus string
if err := db.QueryRow(`SELECT status FROM system.jobs WHERE id = $1`, jobID).Scan(&feedStatus); err != nil {
t.Fatal(errors.Wrap(err, "changefeed watcher failed to query jobs table"))
return
}
if feedStatus == string(jobs.StatusFailed) {
t.L().Printf("changefeed %d failed. recreating changefeed", jobID)
newJobID, err := creator.Create()
if err != nil {
t.Fatal(errors.Wrap(err, "changefeed watcher failed to recreate changefeed job"))
return
}
jobID = newJobID
t.L().Printf("successfully recreated changefeed as new job %d", jobID)
}
}
}
}

Expand Down

0 comments on commit 3ab305d

Please sign in to comment.