Skip to content

Commit

Permalink
roachtest/cdc-mixed-versions: add multi-step upgrade test
Browse files Browse the repository at this point in the history
This change adds a new roachtest "cdc/mixed-versions-multiple-upgrades"
which starts a changefeed and ensures that it keeps running through
multiple cluster upgrades and rollbacks. This test is an extension
of a previous test "cdc/mixed-versions" which only tests upgrading
from one previous version to the present version. "cdc/mixed-versions"
is now renamed to "cdc/mixed-versions-single-upgrade".

"cdc/mixed-versions-multiple-upgrades" performs 4 upgrades starting
from 4 versions in the past. As seen in cockroachdb#107293,
changefeeds running on versions prior to 22.2 can be very brittle,
mainly due to them not retrying transient errors. To address this
problem, this new roachtest will ensure that 22.2 is the minimum
binary version. So if this test is running on 23.1, it will only test
the upgrade path from 22.2-23.1. Once 23.2 releases, it will test the
upgrade path from 22.2 -> 23.1 -> 23.2 and so on, keeping a running
window of size 4.

More notes:

Can we remove the restriction of starting at version 22.2?
In [cockroachdb#107293](cockroachdb#107293), we added some code
to restart the changefeed in case it fails to reduce flakes when running 22.1. It does
defeat the purpose of the test to do so, so it would not make sense to do something similar
in this change. It also does not make sense to change 22.1 because it is EOL'd.

Why perform the upgrade through multiple versions?
As seen in cockroachdb#107451, there are cases where,
for example, a 22.1 changefeed survives an upgrade to 22.2 and a 22.2 changefeed survives
and upgrade to 23.1, but a 22.1 does NOT survive an upgrade from 22.1 -> 22.2 -> 23.1.
This non-transitive relationship is the reason that I added this new test. We (and our
users) expect that arbitrarily old jobs can be resumed in new versions without error (unless we
explicitly add code to fail these jobs in certain cases). Because of this expectation,
we should have a test which asserts a changefeed stays running through multiple version
upgrades.

Why limit the test at 4 versions?
We do not want the test to run for an unbounded number of upgrades as we have
many more releases in the future.

Closes: cockroachdb#107451
Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Aug 2, 2023
1 parent 05ad3c4 commit 65a10e5
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 52 deletions.
135 changes: 98 additions & 37 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ var (
targetDB = "bank"
targetTable = "bank"

timeout = 30 * time.Minute

// teamcityAgentZone is the zone used in this test. Since this test
// runs a lot of queries from the TeamCity agent to CRDB nodes, we
// make sure to create roachprod nodes that are in the same region
Expand All @@ -67,14 +65,25 @@ func registerCDCMixedVersions(r registry.Registry) {
zones = teamcityAgentZone
}
r.Add(registry.TestSpec{
Name: "cdc/mixed-versions",
Name: "cdc/mixed-versions-single-upgrade",
Owner: registry.OwnerTestEng,
// N.B. ARM64 is not yet supported, see https://github.com/cockroachdb/cockroach/issues/103888.
Cluster: r.MakeClusterSpec(5, spec.Zones(zones), spec.Arch(vm.ArchAMD64)),
Timeout: 30 * time.Minute,
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCMixedVersions(ctx, t, c, t.BuildVersion(), true)
},
})
r.Add(registry.TestSpec{
Name: "cdc/mixed-versions-multiple-upgrades",
Owner: registry.OwnerTestEng,
// N.B. ARM64 is not yet supported, see https://github.com/cockroachdb/cockroach/issues/103888.
Cluster: r.MakeClusterSpec(5, spec.Zones(zones), spec.Arch(vm.ArchAMD64)),
Timeout: timeout,
Timeout: 90 * time.Minute,
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCMixedVersions(ctx, t, c, t.BuildVersion())
runCDCMixedVersions(ctx, t, c, t.BuildVersion(), false)
},
})
}
Expand Down Expand Up @@ -352,12 +361,34 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep {
}
}

// runCDCMixedVersions creates a cluster with a changefeed and upgrades it
// to the current version. While doing so, it asserts that the changefeed is
// running the entire time.
//
// If singlePredecessor is true, this will perform one upgrade starting from the
// preceding major version before buildVersion. Otherwise, it will start from
// 4 versions in the past (or v22.2 - taking whichever starting point is more recent)
// and perform as many upgrades as needed to reach the current version.
func runCDCMixedVersions(
ctx context.Context, t test.Test, c cluster.Cluster, buildVersion *version.Version,
ctx context.Context,
t test.Test,
c cluster.Cluster,
buildVersion *version.Version,
singlePredecessor bool,
) {
predecessorVersion, err := release.LatestPredecessor(buildVersion)
if err != nil {
t.Fatal(err)
var predecessorVersions []string
var err error
if singlePredecessor {
predecessorVersion, err := release.LatestPredecessor(buildVersion)
if err != nil {
t.Fatal(err)
}
predecessorVersions = append(predecessorVersions, predecessorVersion)
} else {
predecessorVersions, err = release.LatestPredecessorHistory(buildVersion, 4, version.MustParse("v22.2.0"))
if err != nil {
t.Fatal(err)
}
}

tester := newCDCMixedVersionTester(ctx, t, c)
Expand All @@ -374,41 +405,71 @@ func runCDCMixedVersions(
return tester.crdbNodes[rng.Intn(len(tester.crdbNodes))]
}

newVersionUpgradeTest(c,
uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersion),
// This returns an array of steps which do the following:
// - start a rolling upgrade to nextVersion
// - rollback the upgrade to curVersion
// - restart the rolling upgrade to nextVersion
// - finalize the upgrade to nextVersion
// NB: This does not check if the two versions are valid for an upgrade
// to take place.
simpleUpgradeStep := func(curVersion, nextVersion string) []versionStep {
return []versionStep{
preventAutoUpgradeStep(sqlNode()),

// Roll the nodes into the new version one by one in random order.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, nextVersion)),

// Let the workload run for a while.
tester.waitForResolvedTimestamps(),
tester.assertValid(),

// Roll back again, which ought to be fine because the cluster upgrade was
// not finalized.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, curVersion)),

tester.waitForResolvedTimestamps(),
tester.assertValid(),

// Roll the nodes into the new version one by one in random order.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, nextVersion)),

// Let the workload run for a while.
tester.waitForResolvedTimestamps(),
tester.assertValid(),

// Allow cluster version to update completely.
allowAutoUpgradeStep(sqlNode()),
waitForUpgradeStep(tester.crdbNodes),

// Let the workload run for a while.
tester.waitForResolvedTimestamps(),
tester.assertValid(),
}
}

// Initialize the cluster, workload, and changefeed.
steps := []versionStep{
uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersions[0]),
tester.setupVerifier(sqlNode()),
tester.installAndStartWorkload(),
waitForUpgradeStep(tester.crdbNodes),

// NB: at this point, cluster and binary version equal predecessorVersion,
// and auto-upgrades are on.
preventAutoUpgradeStep(sqlNode()),
tester.createChangeFeed(sqlNode()),

tester.waitForResolvedTimestamps(),
// Roll the nodes into the new version one by one in random order
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)),
// let the workload run in the new version for a while
tester.waitForResolvedTimestamps(),

tester.assertValid(),

// Roll back again, which ought to be fine because the cluster upgrade was
// not finalized.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, predecessorVersion)),
// Create the changefeed and wait for it to start.
tester.createChangeFeed(sqlNode()),
tester.waitForResolvedTimestamps(),
}
// Perform rolling upgrades.
for i := 1; i < len(predecessorVersions); i++ {
steps = append(steps, simpleUpgradeStep(predecessorVersions[i-1], predecessorVersions[i])...)
}
// Upgrade to the current version.
steps = append(steps, simpleUpgradeStep(predecessorVersions[len(predecessorVersions)-1], clusterupgrade.MainVersion)...)
// Finish the test.
steps = append(steps, tester.finishTest())
steps = append(steps, tester.assertValid())

tester.assertValid(),

// Roll nodes forward and finalize upgrade.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)),

// allow cluster version to update
allowAutoUpgradeStep(sqlNode()),
waitForUpgradeStep(tester.crdbNodes),

tester.waitForResolvedTimestamps(),
tester.finishTest(),
tester.assertValid(),
).run(ctx, t)
newVersionUpgradeTest(c, steps...).run(ctx, t)
}
48 changes: 34 additions & 14 deletions pkg/testutils/release/releases.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,30 @@ func parseReleases() (map[string]Series, error) {
// the version passed. For example, if the version is "v19.2.0", this
// will return the latest 19.1 patch release.
func LatestPredecessor(v *version.Version) (string, error) {
history, err := LatestPredecessorHistory(v, 1)
history, err := LatestPredecessorHistory(v, 1, nil)
if err != nil {
return "", err
}

return history[0], nil
}

// LatestPredecessorHistory returns the last consecutive `k` releases
// LatestPredecessorHistory returns the consecutive releases
// that precede the given version in the upgrade order (as dictated by
// cockroach_releases.yaml). E.g., if v=22.2.3 and k=2, then this
// function will return, for example, ["21.2.7", "22.1.6"].
func LatestPredecessorHistory(v *version.Version, k int) ([]string, error) {
return predecessorHistory(v, k, func(releaseSeries Series) string {
// cockroach_releases.yaml). Only releases at least as new as `minVersion`
// will be returned. `k` releases will be checked.
//
// E.g., if v=22.2.3, k=2, and minVersion=nil then this
// function will return ["21.2.7", "22.1.6"].
//
// E.g., if v=22.2.3, k=3, and minVersion=22.1,
// then the function will return ["22.1.6"].
func LatestPredecessorHistory(
v *version.Version,
k int,
minVersion *version.Version, /* optional */
) ([]string, error) {
return predecessorHistory(v, k, minVersion, func(releaseSeries Series) string {
activeReleases := activePatchReleases(releaseSeries)
return activeReleases[len(activeReleases)-1]
})
Expand All @@ -93,28 +103,38 @@ func RandomPredecessor(rng *rand.Rand, v *version.Version) (string, error) {
// instead of returning a list of the latest patch releases, it will
// return a random non-withdrawn patch release for each release series.
func RandomPredecessorHistory(rng *rand.Rand, v *version.Version, k int) ([]string, error) {
return predecessorHistory(v, k, func(releaseSeries Series) string {
return predecessorHistory(v, k, nil, func(releaseSeries Series) string {
activeReleases := activePatchReleases(releaseSeries)
return activeReleases[rng.Intn(len(activeReleases))]
})
}

// predecessorHistory computes the history of size `k` for a given
// version (from least to most recent, using the order an actual
// upgrade would have to follow). The `releasePicker` function can be
// used to select which patch release is used at each step.
// predecessorHistory computes the history for a given version (from least to
// most recent, using the order an actual upgrade would have to follow). `k` is
// the number of predecessors to check and `minVersion` is the oldest version to
// return. The `releasePicker` function can be used to select which patch
// release is used at each step.
func predecessorHistory(
v *version.Version, k int, releasePicker func(Series) string,
v *version.Version,
k int,
minVersion *version.Version, /* optional */
releasePicker func(Series) string,
) ([]string, error) {
history := make([]string, k)
var history []string
currentV := v
for i := k - 1; i >= 0; i-- {
predecessor, err := predecessorSeries(currentV)
if err != nil {
return nil, err
}
history[i] = releasePicker(predecessor)
currentV = mustParseVersion(predecessor.Latest)
if minVersion != nil && !currentV.AtLeast(minVersion) {
break
}
history = append(history, releasePicker(predecessor))
}
for i := 0; i < len(history)/2; i++ {
history[i], history[len(history)-1-i] = history[len(history)-1-i], history[i]
}

return history, nil
Expand Down
15 changes: 14 additions & 1 deletion pkg/testutils/release/releases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestLatestPredecessorHistory(t *testing.T) {
name string
v string
k int
minVersion string
expectedErr string
expectedLatest []string
expectedRandom []string
Expand All @@ -129,6 +130,14 @@ func TestLatestPredecessorHistory(t *testing.T) {
expectedLatest: []string{"19.2.0", "22.1.12", "22.2.8"},
expectedRandom: []string{"19.2.0", "22.1.8", "22.2.8"},
},
{
name: "valid history with min version",
v: "v23.1.1",
k: 3,
minVersion: "v22.1.0",
expectedLatest: []string{"22.1.12", "22.2.8"},
expectedRandom: []string{"19.2.0", "22.1.8", "22.2.8"},
},
{
name: "with pre-release",
v: "v23.1.1-beta.1",
Expand All @@ -145,7 +154,11 @@ func TestLatestPredecessorHistory(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resetRNG() // deterministic results
latestHistory, latestErr := LatestPredecessorHistory(version.MustParse(tc.v), tc.k)
var minV *version.Version
if tc.minVersion != "" {
minV = version.MustParse(tc.minVersion)
}
latestHistory, latestErr := LatestPredecessorHistory(version.MustParse(tc.v), tc.k, minV)
randomHistory, randomErr := RandomPredecessorHistory(rng, version.MustParse(tc.v), tc.k)
if tc.expectedErr == "" {
require.NoError(t, latestErr)
Expand Down

0 comments on commit 65a10e5

Please sign in to comment.