Skip to content

Commit

Permalink
asdf
Browse files Browse the repository at this point in the history
  • Loading branch information
jayshrivastava committed Aug 1, 2023
1 parent 1395ba8 commit 98485c5
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 52 deletions.
135 changes: 98 additions & 37 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ var (
targetDB = "bank"
targetTable = "bank"

timeout = 30 * time.Minute

// teamcityAgentZone is the zone used in this test. Since this test
// runs a lot of queries from the TeamCity agent to CRDB nodes, we
// make sure to create roachprod nodes that are in the same region
Expand All @@ -67,14 +65,25 @@ func registerCDCMixedVersions(r registry.Registry) {
zones = teamcityAgentZone
}
r.Add(registry.TestSpec{
Name: "cdc/mixed-versions",
Name: "cdc/mixed-versions-single-upgrade",
Owner: registry.OwnerTestEng,
// N.B. ARM64 is not yet supported, see https://github.com/cockroachdb/cockroach/issues/103888.
Cluster: r.MakeClusterSpec(5, spec.Zones(zones), spec.Arch(vm.ArchAMD64)),
Timeout: 30 * time.Minute,
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCMixedVersions(ctx, t, c, t.BuildVersion(), true)
},
})
r.Add(registry.TestSpec{
Name: "cdc/mixed-versions-multiple-upgrades",
Owner: registry.OwnerTestEng,
// N.B. ARM64 is not yet supported, see https://github.com/cockroachdb/cockroach/issues/103888.
Cluster: r.MakeClusterSpec(5, spec.Zones(zones), spec.Arch(vm.ArchAMD64)),
Timeout: timeout,
Timeout: 90 * time.Minute,
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCMixedVersions(ctx, t, c, t.BuildVersion())
runCDCMixedVersions(ctx, t, c, t.BuildVersion(), false)
},
})
}
Expand Down Expand Up @@ -352,12 +361,34 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(node int) versionStep {
}
}

// runCDCMixedVersions creates a cluster with a changefeed and upgrades it
// to the current version. While doing so, it asserts that the changefeed is
// running the entire time.
//
// If singlePredecessor is true, this will perform one upgrade starting from the
// preceding major version before buildVersion. Otherwise, it will start from
// 4 versions in the past (or v22.2 - taking whichever starting point is more recent)
// and perform as many upgrades as needed to reach the current version.
func runCDCMixedVersions(
ctx context.Context, t test.Test, c cluster.Cluster, buildVersion *version.Version,
ctx context.Context,
t test.Test,
c cluster.Cluster,
buildVersion *version.Version,
singlePredecessor bool,
) {
predecessorVersion, err := release.LatestPredecessor(buildVersion)
if err != nil {
t.Fatal(err)
var predecessorVersions []string
var err error
if singlePredecessor {
predecessorVersion, err := release.LatestPredecessor(buildVersion)
if err != nil {
t.Fatal(err)
}
predecessorVersions = append(predecessorVersions, predecessorVersion)
} else {
predecessorVersions, err = release.LatestPredecessorHistory(buildVersion, 4, version.MustParse("v22.2.0"))
if err != nil {
t.Fatal(err)
}
}

tester := newCDCMixedVersionTester(ctx, t, c)
Expand All @@ -374,41 +405,71 @@ func runCDCMixedVersions(
return tester.crdbNodes[rng.Intn(len(tester.crdbNodes))]
}

newVersionUpgradeTest(c,
uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersion),
// This returns an array of steps which do the following:
// - start a rolling upgrade to nextVersion
// - rollback the upgrade to curVersion
// - restart the rolling upgrade to nextVersion
// - finalize the upgrade to nextVersion
// NB: This does not check if the two versions are valid for an upgrade
// to take place.
simpleUpgradeStep := func(curVersion, nextVersion string) []versionStep {
return []versionStep{
preventAutoUpgradeStep(sqlNode()),

// Roll the nodes into the new version one by one in random order.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, nextVersion)),

// Let the workload run for a while.
tester.waitForResolvedTimestamps(),
tester.assertValid(),

// Roll back again, which ought to be fine because the cluster upgrade was
// not finalized.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, curVersion)),

tester.waitForResolvedTimestamps(),
tester.assertValid(),

// Roll the nodes into the new version one by one in random order.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, nextVersion)),

// Let the workload run for a while.
tester.waitForResolvedTimestamps(),
tester.assertValid(),

// Allow cluster version to update completely.
allowAutoUpgradeStep(sqlNode()),
waitForUpgradeStep(tester.crdbNodes),

// Let the workload run for a while.
tester.waitForResolvedTimestamps(),
tester.assertValid(),
}
}

// Initialize the cluster, workload, and changefeed.
steps := []versionStep{
uploadAndStartFromCheckpointFixture(tester.crdbNodes, predecessorVersions[0]),
tester.setupVerifier(sqlNode()),
tester.installAndStartWorkload(),
waitForUpgradeStep(tester.crdbNodes),

// NB: at this point, cluster and binary version equal predecessorVersion,
// and auto-upgrades are on.
preventAutoUpgradeStep(sqlNode()),
tester.createChangeFeed(sqlNode()),

tester.waitForResolvedTimestamps(),
// Roll the nodes into the new version one by one in random order
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)),
// let the workload run in the new version for a while
tester.waitForResolvedTimestamps(),

tester.assertValid(),

// Roll back again, which ought to be fine because the cluster upgrade was
// not finalized.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, predecessorVersion)),
// Create the changefeed and wait for it to start.
tester.createChangeFeed(sqlNode()),
tester.waitForResolvedTimestamps(),
}
// Perform rolling upgrades.
for i := 1; i < len(predecessorVersions); i++ {
steps = append(steps, simpleUpgradeStep(predecessorVersions[i-1], predecessorVersions[i])...)
}
// Upgrade to the current version.
steps = append(steps, simpleUpgradeStep(predecessorVersions[len(predecessorVersions)-1], clusterupgrade.MainVersion)...)
// Finish the test.
steps = append(steps, tester.finishTest())
steps = append(steps, tester.assertValid())

tester.assertValid(),

// Roll nodes forward and finalize upgrade.
tester.crdbUpgradeStep(binaryUpgradeStep(tester.crdbNodes, clusterupgrade.MainVersion)),

// allow cluster version to update
allowAutoUpgradeStep(sqlNode()),
waitForUpgradeStep(tester.crdbNodes),

tester.waitForResolvedTimestamps(),
tester.finishTest(),
tester.assertValid(),
).run(ctx, t)
newVersionUpgradeTest(c, steps...).run(ctx, t)
}
48 changes: 34 additions & 14 deletions pkg/testutils/release/releases.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,30 @@ func parseReleases() (map[string]Series, error) {
// the version passed. For example, if the version is "v19.2.0", this
// will return the latest 19.1 patch release.
func LatestPredecessor(v *version.Version) (string, error) {
history, err := LatestPredecessorHistory(v, 1)
history, err := LatestPredecessorHistory(v, 1, nil)
if err != nil {
return "", err
}

return history[0], nil
}

// LatestPredecessorHistory returns the last consecutive `k` releases
// LatestPredecessorHistory returns the consecutive releases
// that precede the given version in the upgrade order (as dictated by
// cockroach_releases.yaml). E.g., if v=22.2.3 and k=2, then this
// function will return, for example, ["21.2.7", "22.1.6"].
func LatestPredecessorHistory(v *version.Version, k int) ([]string, error) {
return predecessorHistory(v, k, func(releaseSeries Series) string {
// cockroach_releases.yaml). Only releases at least as new as `minVersion`
// will be returned. `k` releases will be checked.
//
// E.g., if v=22.2.3, k=2, and minVersion=nil then this
// function will return ["21.2.7", "22.1.6"].
//
// E.g., if v=22.2.3, k=3, and minVersion=22.1,
// then the function will return ["22.1.6"].
func LatestPredecessorHistory(
v *version.Version,
k int,
minVersion *version.Version, /* optional */
) ([]string, error) {
return predecessorHistory(v, k, minVersion, func(releaseSeries Series) string {
activeReleases := activePatchReleases(releaseSeries)
return activeReleases[len(activeReleases)-1]
})
Expand All @@ -93,28 +103,38 @@ func RandomPredecessor(rng *rand.Rand, v *version.Version) (string, error) {
// instead of returning a list of the latest patch releases, it will
// return a random non-withdrawn patch release for each release series.
func RandomPredecessorHistory(rng *rand.Rand, v *version.Version, k int) ([]string, error) {
return predecessorHistory(v, k, func(releaseSeries Series) string {
return predecessorHistory(v, k, nil, func(releaseSeries Series) string {
activeReleases := activePatchReleases(releaseSeries)
return activeReleases[rng.Intn(len(activeReleases))]
})
}

// predecessorHistory computes the history of size `k` for a given
// version (from least to most recent, using the order an actual
// upgrade would have to follow). The `releasePicker` function can be
// used to select which patch release is used at each step.
// predecessorHistory computes the history for a given version (from least to
// most recent, using the order an actual upgrade would have to follow). `k` is
// the number of predecessors to check and `minVersion` is the oldest version to
// return. The `releasePicker` function can be used to select which patch
// release is used at each step.
func predecessorHistory(
v *version.Version, k int, releasePicker func(Series) string,
v *version.Version,
k int,
minVersion *version.Version, /* optional */
releasePicker func(Series) string,
) ([]string, error) {
history := make([]string, k)
var history []string
currentV := v
for i := k - 1; i >= 0; i-- {
predecessor, err := predecessorSeries(currentV)
if err != nil {
return nil, err
}
history[i] = releasePicker(predecessor)
currentV = mustParseVersion(predecessor.Latest)
if minVersion != nil && !currentV.AtLeast(minVersion) {
break
}
history = append(history, releasePicker(predecessor))
}
for i := 0; i < len(history)/2; i++ {
history[i], history[len(history)-1-i] = history[len(history)-1-i], history[i]
}

return history, nil
Expand Down
15 changes: 14 additions & 1 deletion pkg/testutils/release/releases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestLatestPredecessorHistory(t *testing.T) {
name string
v string
k int
minVersion string
expectedErr string
expectedLatest []string
expectedRandom []string
Expand All @@ -129,6 +130,14 @@ func TestLatestPredecessorHistory(t *testing.T) {
expectedLatest: []string{"19.2.0", "22.1.12", "22.2.8"},
expectedRandom: []string{"19.2.0", "22.1.8", "22.2.8"},
},
{
name: "valid history with min version",
v: "v23.1.1",
k: 3,
minVersion: "v22.1.0",
expectedLatest: []string{"22.1.12", "22.2.8"},
expectedRandom: []string{"19.2.0", "22.1.8", "22.2.8"},
},
{
name: "with pre-release",
v: "v23.1.1-beta.1",
Expand All @@ -145,7 +154,11 @@ func TestLatestPredecessorHistory(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resetRNG() // deterministic results
latestHistory, latestErr := LatestPredecessorHistory(version.MustParse(tc.v), tc.k)
var minV *version.Version
if tc.minVersion != "" {
minV = version.MustParse(tc.minVersion)
}
latestHistory, latestErr := LatestPredecessorHistory(version.MustParse(tc.v), tc.k, minV)
randomHistory, randomErr := RandomPredecessorHistory(rng, version.MustParse(tc.v), tc.k)
if tc.expectedErr == "" {
require.NoError(t, latestErr)
Expand Down

0 comments on commit 98485c5

Please sign in to comment.