Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest/cdc-mixed-versions: add multi-step upgrade test #107948

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 98 additions & 37 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ var (
targetDB = "bank"
targetTable = "bank"

timeout = 30 * time.Minute

// teamcityAgentZone is the zone used in this test. Since this test
// runs a lot of queries from the TeamCity agent to CRDB nodes, we
// make sure to create roachprod nodes that are in the same region
Expand All @@ -67,14 +65,25 @@ func registerCDCMixedVersions(r registry.Registry) {
zones = teamcityAgentZone
}
r.Add(registry.TestSpec{
Name: "cdc/mixed-versions",
Name: "cdc/mixed-versions-single-upgrade",
Owner: registry.OwnerTestEng,
// N.B. ARM64 is not yet supported, see https://github.com/cockroachdb/cockroach/issues/103888.
Cluster: r.MakeClusterSpec(5, spec.Zones(zones), spec.Arch(vm.ArchAMD64)),
Timeout: 30 * time.Minute,
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCMixedVersions(ctx, t, c, t.BuildVersion(), true)
},
})
r.Add(registry.TestSpec{
Name: "cdc/mixed-versions-multiple-upgrades",
Owner: registry.OwnerTestEng,
// N.B. ARM64 is not yet supported, see https://github.com/cockroachdb/cockroach/issues/103888.
Cluster: r.MakeClusterSpec(5, spec.Zones(zones), spec.Arch(vm.ArchAMD64)),
Timeout: timeout,
Timeout: 90 * time.Minute,
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCMixedVersions(ctx, t, c, t.BuildVersion())
runCDCMixedVersions(ctx, t, c, t.BuildVersion(), false)
},
})
}
Expand Down Expand Up @@ -352,12 +361,34 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep {
}
}

// runCDCMixedVersions creates a cluster with a changefeed and upgrades it
// to the current version. While doing so, it asserts that the changefeed is
// running the entire time.
//
// If singlePredecessor is true, this will perform one upgrade starting from the
// preceding major version before buildVersion. Otherwise, it will start from
// 4 versions in the past (or v22.2 - taking whichever starting point is more recent)
// and perform as many upgrades as needed to reach the current version.
func runCDCMixedVersions(
ctx context.Context, t test.Test, c cluster.Cluster, buildVersion *version.Version,
ctx context.Context,
t test.Test,
c cluster.Cluster,
buildVersion *version.Version,
singlePredecessor bool,
) {
predecessorVersion, err := release.LatestPredecessor(buildVersion)
if err != nil {
t.Fatal(err)
var predecessorVersions []string
var err error
if singlePredecessor {
predecessorVersion, err := release.LatestPredecessor(buildVersion)
if err != nil {
t.Fatal(err)
}
predecessorVersions = append(predecessorVersions, predecessorVersion)
} else {
predecessorVersions, err = release.LatestPredecessorHistory(buildVersion, 4, version.MustParse("v22.2.0"))
if err != nil {
t.Fatal(err)
}
}

tester := newCDCMixedVersionTester(ctx, t, c)
Expand All @@ -374,41 +405,71 @@ func runCDCMixedVersions(
return tester.crdbNodes[rng.Intn(len(tester.crdbNodes))]
}

newVersionUpgradeTest(c,
uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersion),
// This returns an array of steps which do the following:
// - start a rolling upgrade to nextVersion
// - rollback the upgrade to curVersion
// - restart the rolling upgrade to nextVersion
// - finalize the upgrade to nextVersion
// NB: This does not check if the two versions are valid for an upgrade
// to take place.
simpleUpgradeStep := func(curVersion, nextVersion string) []versionStep {
return []versionStep{
preventAutoUpgradeStep(sqlNode()),

// Roll the nodes into the new version one by one in random order.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, nextVersion)),

// Let the workload run for a while.
tester.waitForResolvedTimestamps(),
tester.assertValid(),

// Roll back again, which ought to be fine because the cluster upgrade was
// not finalized.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, curVersion)),

tester.waitForResolvedTimestamps(),
tester.assertValid(),

// Roll the nodes into the new version one by one in random order.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, nextVersion)),

// Let the workload run for a while.
tester.waitForResolvedTimestamps(),
tester.assertValid(),

// Allow cluster version to update completely.
allowAutoUpgradeStep(sqlNode()),
waitForUpgradeStep(tester.crdbNodes),

// Let the workload run for a while.
tester.waitForResolvedTimestamps(),
tester.assertValid(),
}
}

// Initialize the cluster, workload, and changefeed.
steps := []versionStep{
uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersions[0]),
tester.setupVerifier(sqlNode()),
tester.installAndStartWorkload(),
waitForUpgradeStep(tester.crdbNodes),

// NB: at this point, cluster and binary version equal predecessorVersion,
// and auto-upgrades are on.
preventAutoUpgradeStep(sqlNode()),
tester.createChangeFeed(sqlNode()),

tester.waitForResolvedTimestamps(),
// Roll the nodes into the new version one by one in random order
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)),
// let the workload run in the new version for a while
tester.waitForResolvedTimestamps(),

tester.assertValid(),

// Roll back again, which ought to be fine because the cluster upgrade was
// not finalized.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, predecessorVersion)),
// Create the changefeed and wait for it to start.
tester.createChangeFeed(sqlNode()),
tester.waitForResolvedTimestamps(),
}
// Perform rolling upgrades.
for i := 1; i < len(predecessorVersions); i++ {
steps = append(steps, simpleUpgradeStep(predecessorVersions[i-1], predecessorVersions[i])...)
}
// Upgrade to the current version.
steps = append(steps, simpleUpgradeStep(predecessorVersions[len(predecessorVersions)-1], clusterupgrade.MainVersion)...)
// Finish the test.
steps = append(steps, tester.finishTest())
steps = append(steps, tester.assertValid())

tester.assertValid(),

// Roll nodes forward and finalize upgrade.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)),

// allow cluster version to update
allowAutoUpgradeStep(sqlNode()),
waitForUpgradeStep(tester.crdbNodes),

tester.waitForResolvedTimestamps(),
tester.finishTest(),
tester.assertValid(),
).run(ctx, t)
newVersionUpgradeTest(c, steps...).run(ctx, t)
}
48 changes: 34 additions & 14 deletions pkg/testutils/release/releases.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,30 @@ func parseReleases() (map[string]Series, error) {
// the version passed. For example, if the version is "v19.2.0", this
// will return the latest 19.1 patch release.
func LatestPredecessor(v *version.Version) (string, error) {
history, err := LatestPredecessorHistory(v, 1)
history, err := LatestPredecessorHistory(v, 1, nil)
if err != nil {
return "", err
}

return history[0], nil
}

// LatestPredecessorHistory returns the last consecutive `k` releases
// LatestPredecessorHistory returns the consecutive releases
// that precede the given version in the upgrade order (as dictated by
// cockroach_releases.yaml). E.g., if v=22.2.3 and k=2, then this
// function will return, for example, ["21.2.7", "22.1.6"].
func LatestPredecessorHistory(v *version.Version, k int) ([]string, error) {
return predecessorHistory(v, k, func(releaseSeries Series) string {
// cockroach_releases.yaml). Only releases at least as new as `minVersion`
// will be returned. `k` releases will be checked.
//
// E.g., if v=22.2.3, k=2, and minVersion=nil then this
// function will return ["21.2.7", "22.1.6"].
//
// E.g., if v=22.2.3, k=3, and minVersion=22.1,
// then the function will return ["22.1.6"].
func LatestPredecessorHistory(
v *version.Version,
k int,
minVersion *version.Version, /* optional */
) ([]string, error) {
return predecessorHistory(v, k, minVersion, func(releaseSeries Series) string {
activeReleases := activePatchReleases(releaseSeries)
return activeReleases[len(activeReleases)-1]
})
Expand All @@ -93,28 +103,38 @@ func RandomPredecessor(rng *rand.Rand, v *version.Version) (string, error) {
// instead of returning a list of the latest patch releases, it will
// return a random non-withdrawn patch release for each release series.
func RandomPredecessorHistory(rng *rand.Rand, v *version.Version, k int) ([]string, error) {
return predecessorHistory(v, k, func(releaseSeries Series) string {
return predecessorHistory(v, k, nil, func(releaseSeries Series) string {
activeReleases := activePatchReleases(releaseSeries)
return activeReleases[rng.Intn(len(activeReleases))]
})
}

// predecessorHistory computes the history of size `k` for a given
// version (from least to most recent, using the order an actual
// upgrade would have to follow). The `releasePicker` function can be
// used to select which patch release is used at each step.
// predecessorHistory computes the history for a given version (from least to
// most recent, using the order an actual upgrade would have to follow). `k` is
// the number of predecessors to check and `minVersion` is the oldest version to
// return. The `releasePicker` function can be used to select which patch
// release is used at each step.
func predecessorHistory(
v *version.Version, k int, releasePicker func(Series) string,
v *version.Version,
k int,
minVersion *version.Version, /* optional */
releasePicker func(Series) string,
) ([]string, error) {
history := make([]string, k)
var history []string
currentV := v
for i := k - 1; i >= 0; i-- {
predecessor, err := predecessorSeries(currentV)
if err != nil {
return nil, err
}
history[i] = releasePicker(predecessor)
currentV = mustParseVersion(predecessor.Latest)
if minVersion != nil && !currentV.AtLeast(minVersion) {
break
}
history = append(history, releasePicker(predecessor))
}
for i := 0; i < len(history)/2; i++ {
history[i], history[len(history)-1-i] = history[len(history)-1-i], history[i]
}

return history, nil
Expand Down
15 changes: 14 additions & 1 deletion pkg/testutils/release/releases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestLatestPredecessorHistory(t *testing.T) {
name string
v string
k int
minVersion string
expectedErr string
expectedLatest []string
expectedRandom []string
Expand All @@ -129,6 +130,14 @@ func TestLatestPredecessorHistory(t *testing.T) {
expectedLatest: []string{"19.2.0", "22.1.12", "22.2.8"},
expectedRandom: []string{"19.2.0", "22.1.8", "22.2.8"},
},
{
name: "valid history with min version",
v: "v23.1.1",
k: 3,
minVersion: "v22.1.0",
expectedLatest: []string{"22.1.12", "22.2.8"},
expectedRandom: []string{"19.2.0", "22.1.8", "22.2.8"},
},
{
name: "with pre-release",
v: "v23.1.1-beta.1",
Expand All @@ -145,7 +154,11 @@ func TestLatestPredecessorHistory(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resetRNG() // deterministic results
latestHistory, latestErr := LatestPredecessorHistory(version.MustParse(tc.v), tc.k)
var minV *version.Version
if tc.minVersion != "" {
minV = version.MustParse(tc.minVersion)
}
latestHistory, latestErr := LatestPredecessorHistory(version.MustParse(tc.v), tc.k, minV)
randomHistory, randomErr := RandomPredecessorHistory(rng, version.MustParse(tc.v), tc.k)
if tc.expectedErr == "" {
require.NoError(t, latestErr)
Expand Down