Skip to content

Commit

Permalink
roachtest/cdc/mixed-versions: use mixed version framework
Browse files Browse the repository at this point in the history
This change updates the `cdc/mixed-versions` roachtest to use
the mixed version testing framework. This mixed version testing
framework is better than the previous framework because it offers
testing for multiple upgrades. It will also make it easier to
maintain and expand this cdc-specific test.

Informs: #107451
Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Aug 16, 2023
1 parent 7c51462 commit b583882
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 240 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/version",
"@com_github_cockroachdb_errors//:errors",
"@com_github_pkg_errors//:errors",
],
)
Expand Down
32 changes: 32 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/errors"
)

func (h *Helper) RandomNode(prng *rand.Rand, nodes option.NodeListOption) int {
Expand Down Expand Up @@ -105,6 +106,37 @@ func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) cont
})
}

// Command runs the command on the provided nodes.
func (h *Helper) Command(cmd string, nodes option.NodeListOption) error {
h.stepLogger.Printf("running command `%s` on nodes %v", cmd, nodes)
return h.runner.cluster.RunE(h.ctx, nodes, cmd)
}

// ExpectNoRestarts is a function which you should call at the beginning of a
// hook to asser that no node restarts happen while its running. Note that this
// does not block restarts, but will cause them to fail (and eventually fail the
// test).
// The expected usage is:
//
// reset, err := h.ExpectNoRestarts()
//
// if err != nil {
// return err
// }
//
// defer reset()
// {
// ... critical section
// }
func (h *Helper) ExpectNoRestarts() (reset func(), err error) {
if !h.upgradeFlag.TryLock() {
return nil, errors.New("hook expected no restarts but a restart was detected")
}
return func() {
h.upgradeFlag.Unlock()
}, nil
}

// ExpectDeath alerts the testing infrastructure that a node is
// expected to die. Regular restarts as part of the mixedversion
// testing are already taken into account. This function should only
Expand Down
29 changes: 19 additions & 10 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ type (
Finalizing bool
}

// userFunc is the signature for user-provided functions that run at
// UserFunc is the signature for user-provided functions that run at
// various points in the test (synchronously or in the background).
// These functions run on the test runner node itself; i.e., any
// commands they wish to execute on the cluster need to go through
Expand All @@ -167,7 +167,7 @@ type (
// error over calling `t.Fatal` directly. The error is handled by
// the mixedversion framework and better error messages are produced
// as a result.
userFunc func(context.Context, *logger.Logger, *rand.Rand, *Helper) error
UserFunc func(context.Context, *logger.Logger, *rand.Rand, *Helper) error
predicateFunc func(Context) bool

// versionUpgradeHook is a hook that can be called at any time
Expand All @@ -179,7 +179,7 @@ type (
versionUpgradeHook struct {
name string
predicate predicateFunc
fn userFunc
fn UserFunc
}

// testStep is an opaque reference to one step of a mixed-version
Expand Down Expand Up @@ -359,7 +359,7 @@ func (t *Test) RNG() *rand.Rand {
// will be tested in arbitrary mixed-version states. If multiple
// InMixedVersion hooks are passed, they will be executed
// concurrently.
func (t *Test) InMixedVersion(desc string, fn userFunc) {
func (t *Test) InMixedVersion(desc string, fn UserFunc) {
lastFromVersion := "invalid-version"
var numUpgradedNodes int
predicate := func(testContext Context) bool {
Expand Down Expand Up @@ -389,7 +389,7 @@ func (t *Test) InMixedVersion(desc string, fn userFunc) {
// certain previous version, potentially from existing fixtures). If
// multiple OnStartup hooks are passed, they will be executed
// concurrently.
func (t *Test) OnStartup(desc string, fn userFunc) {
func (t *Test) OnStartup(desc string, fn UserFunc) {
// Since the callbacks here are only referenced in the setup steps
// of the planner, there is no need to have a predicate function
// gating them.
Expand All @@ -400,20 +400,20 @@ func (t *Test) OnStartup(desc string, fn userFunc) {
// mixed-version test has brought the cluster to the latest version,
// and allowed the upgrade to finalize successfully. If multiple such
// hooks are passed, they will be executed concurrently.
func (t *Test) AfterUpgradeFinalized(desc string, fn userFunc) {
func (t *Test) AfterUpgradeFinalized(desc string, fn UserFunc) {
t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{name: desc, fn: fn})
}

// BackgroundFunc runs the function passed as argument in the
// background during the test. Background functions are kicked off
// once the cluster has been initialized (i.e., after all startup
// steps have finished). If the `userFunc` returns an error, it will
// steps have finished). If the `UserFunc` returns an error, it will
// cause the test to fail. These functions can run indefinitely but
// should respect the context passed to them, which will be canceled
// when the test terminates (successfully or not). Returns a function
// that can be called to terminate the step, which will cancel the
// context passed to `userFunc`.
func (t *Test) BackgroundFunc(desc string, fn userFunc) StopFunc {
// context passed to `UserFunc`.
func (t *Test) BackgroundFunc(desc string, fn UserFunc) StopFunc {
t.hooks.AddBackground(versionUpgradeHook{name: desc, fn: fn})

ch := make(shouldStop)
Expand Down Expand Up @@ -512,7 +512,7 @@ func (t *Test) buildVersion() *version.Version {
return t.rt.BuildVersion()
}

func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) userFunc {
func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) UserFunc {
return func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *Helper) error {
l.Printf("running command `%s` on nodes %v", cmd, nodes)
return t.cluster.RunE(ctx, nodes, cmd)
Expand Down Expand Up @@ -654,6 +654,15 @@ func (s restartWithNewBinaryStep) Run(
ctx context.Context, l *logger.Logger, c cluster.Cluster, h *Helper,
) error {
h.ExpectDeath()

// If a hook set this flag before we did, the hook expects there to
// be no upgrades while it is running. Return an error to indicate this.
if !h.upgradeFlag.TryLock() {
return errors.New("attempted to restart when a hook expected" +
" no restarts to occur")
}
defer h.upgradeFlag.Unlock()

return clusterupgrade.RestartNodesWithNewBinary(
ctx,
s.rt,
Expand Down
10 changes: 10 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ type (
bgCount int64
runner *testRunner
stepLogger *logger.Logger

// A mutex held during node restart used to signal to a hook that the
// restart is taking place.
// The expected usage is:
//
// if !upgradeFlag.TryLock() {
// return errors.New()
// }
// defer upgradeFlag.Unlock()
upgradeFlag sync.Mutex
}

// backgroundEvent is the struct sent by background steps when they
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
t.Fatal(err)
}

tc, err := kafka.consumer(ctx, "bank")
tc, err := kafka.newConsumer(ctx, "bank")
if err != nil {
t.Fatal(errors.Wrap(err, "could not create kafka consumer"))
}
Expand Down Expand Up @@ -2234,7 +2234,7 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error {
})
}

func (k kafkaManager) consumer(ctx context.Context, topic string) (*topicConsumer, error) {
func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) {
kafkaAddrs := []string{k.consumerURL(ctx)}
config := sarama.NewConfig()
// I was seeing "error processing FetchRequest: kafka: error decoding
Expand Down
Loading

0 comments on commit b583882

Please sign in to comment.