Skip to content

Commit

Permalink
roachtest/cdc/mixed-versions: use mixed version framework
Browse files Browse the repository at this point in the history
This change updates the `cdc/mixed-versions` roachtest to use
the mixed version testing framework. This mixed version testing
framework is better than the previous framework because it offers
testing for multiple upgrades. It will also make it easier to
maintain and expand this cdc-specific test.

Informs: #107451
Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Aug 15, 2023
1 parent 3a393c1 commit 3b0a059
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 240 deletions.
6 changes: 6 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) cont
})
}

// Command runs the command on the provided nodes.
func (h *Helper) Command(cmd string, nodes option.NodeListOption) error {
h.stepLogger.Printf("running command `%s` on nodes %v", cmd, nodes)
return h.runner.cluster.RunE(h.ctx, nodes, cmd)
}

// ExpectDeath alerts the testing infrastructure that a node is
// expected to die. Regular restarts as part of the mixedversion
// testing are already taken into account. This function should only
Expand Down
20 changes: 10 additions & 10 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ type (
Finalizing bool
}

// userFunc is the signature for user-provided functions that run at
// UserFunc is the signature for user-provided functions that run at
// various points in the test (synchronously or in the background).
// These functions run on the test runner node itself; i.e., any
// commands they wish to execute on the cluster need to go through
Expand All @@ -167,7 +167,7 @@ type (
// error over calling `t.Fatal` directly. The error is handled by
// the mixedversion framework and better error messages are produced
// as a result.
userFunc func(context.Context, *logger.Logger, *rand.Rand, *Helper) error
UserFunc func(context.Context, *logger.Logger, *rand.Rand, *Helper) error
predicateFunc func(Context) bool

// versionUpgradeHook is a hook that can be called at any time
Expand All @@ -179,7 +179,7 @@ type (
versionUpgradeHook struct {
name string
predicate predicateFunc
fn userFunc
fn UserFunc
}

// testStep is an opaque reference to one step of a mixed-version
Expand Down Expand Up @@ -359,7 +359,7 @@ func (t *Test) RNG() *rand.Rand {
// will be tested in arbitrary mixed-version states. If multiple
// InMixedVersion hooks are passed, they will be executed
// concurrently.
func (t *Test) InMixedVersion(desc string, fn userFunc) {
func (t *Test) InMixedVersion(desc string, fn UserFunc) {
lastFromVersion := "invalid-version"
var numUpgradedNodes int
predicate := func(testContext Context) bool {
Expand Down Expand Up @@ -389,7 +389,7 @@ func (t *Test) InMixedVersion(desc string, fn userFunc) {
// certain previous version, potentially from existing fixtures). If
// multiple OnStartup hooks are passed, they will be executed
// concurrently.
func (t *Test) OnStartup(desc string, fn userFunc) {
func (t *Test) OnStartup(desc string, fn UserFunc) {
// Since the callbacks here are only referenced in the setup steps
// of the planner, there is no need to have a predicate function
// gating them.
Expand All @@ -400,20 +400,20 @@ func (t *Test) OnStartup(desc string, fn userFunc) {
// mixed-version test has brought the cluster to the latest version,
// and allowed the upgrade to finalize successfully. If multiple such
// hooks are passed, they will be executed concurrently.
func (t *Test) AfterUpgradeFinalized(desc string, fn userFunc) {
func (t *Test) AfterUpgradeFinalized(desc string, fn UserFunc) {
t.hooks.AddAfterUpgradeFinalized(versionUpgradeHook{name: desc, fn: fn})
}

// BackgroundFunc runs the function passed as argument in the
// background during the test. Background functions are kicked off
// once the cluster has been initialized (i.e., after all startup
// steps have finished). If the `userFunc` returns an error, it will
// steps have finished). If the `UserFunc` returns an error, it will
// cause the test to fail. These functions can run indefinitely but
// should respect the context passed to them, which will be canceled
// when the test terminates (successfully or not). Returns a function
// that can be called to terminate the step, which will cancel the
// context passed to `userFunc`.
func (t *Test) BackgroundFunc(desc string, fn userFunc) StopFunc {
// context passed to `UserFunc`.
func (t *Test) BackgroundFunc(desc string, fn UserFunc) StopFunc {
t.hooks.AddBackground(versionUpgradeHook{name: desc, fn: fn})

ch := make(shouldStop)
Expand Down Expand Up @@ -512,7 +512,7 @@ func (t *Test) buildVersion() *version.Version {
return t.rt.BuildVersion()
}

func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) userFunc {
func (t *Test) runCommandFunc(nodes option.NodeListOption, cmd string) UserFunc {
return func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *Helper) error {
l.Printf("running command `%s` on nodes %v", cmd, nodes)
return t.cluster.RunE(ctx, nodes, cmd)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
t.Fatal(err)
}

tc, err := kafka.consumer(ctx, "bank")
tc, err := kafka.newConsumer(ctx, "bank")
if err != nil {
t.Fatal(errors.Wrap(err, "could not create kafka consumer"))
}
Expand Down Expand Up @@ -2234,7 +2234,7 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error {
})
}

func (k kafkaManager) consumer(ctx context.Context, topic string) (*topicConsumer, error) {
func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) {
kafkaAddrs := []string{k.consumerURL(ctx)}
config := sarama.NewConfig()
// I was seeing "error processing FetchRequest: kafka: error decoding
Expand Down
Loading

0 comments on commit 3b0a059

Please sign in to comment.