Skip to content

Commit

Permalink
roachtest/cdc/mixed-versions: use mixed version framework
Browse files Browse the repository at this point in the history
This change updates the `cdc/mixed-versions` roachtest to use
the mixed version testing framework. This mixed version testing
framework is better than the previous framework because it offers
testing for multiple upgrades. It will also make it easier to
maintain and expand this cdc-specific test.

Informs: cockroachdb#107451
Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Aug 17, 2023
1 parent 7c51462 commit fc25ef0
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 257 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/version",
"@com_github_cockroachdb_errors//:errors",
"@com_github_pkg_errors//:errors",
],
)
Expand Down
23 changes: 23 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/errors"
)

func (h *Helper) RandomNode(prng *rand.Rand, nodes option.NodeListOption) int {
Expand Down Expand Up @@ -105,6 +106,28 @@ func (h *Helper) BackgroundCommand(cmd string, nodes option.NodeListOption) cont
})
}

// ExpectNoRestarts acts as an assertion that ensures no restarts happen inside
// some critical section. Note that this does not block restarts, but will cause
// them to fail if they run concurrently to the critical section (eventually
// failing the test because this assertion failed). The expected usage is:
//
// done, err := h.ExpectNoRestarts()
// if err != nil {
// return err
// }
// defer done()
// {
// ... critical section
// }
func (h *Helper) ExpectNoRestarts() (done func(), err error) {
if !h.upgradeFlag.TryLock() {
return nil, errors.New("hook expected no restarts but a restart was detected")
}
return func() {
h.upgradeFlag.Unlock()
}, nil
}

// ExpectDeath alerts the testing infrastructure that a node is
// expected to die. Regular restarts as part of the mixedversion
// testing are already taken into account. This function should only
Expand Down
10 changes: 10 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,16 @@ func (s restartWithNewBinaryStep) Run(
ctx context.Context, l *logger.Logger, c cluster.Cluster, h *Helper,
) error {
h.ExpectDeath()

// If a hook set this flag before we did, the hook expects there to
// be no upgrades while it is holding the lock. Return an error to indicate
// this.
if !h.upgradeFlag.TryLock() {
return errors.New("attempted to restart when a hook expected" +
" no restarts to occur")
}
defer h.upgradeFlag.Unlock()

return clusterupgrade.RestartNodesWithNewBinary(
ctx,
s.rt,
Expand Down
10 changes: 10 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ type (
bgCount int64
runner *testRunner
stepLogger *logger.Logger

// A mutex held during node restart used to signal to a hook that the
// restart is taking place.
// The expected usage is:
//
// if !upgradeFlag.TryLock() {
// return errors.New()
// }
// defer upgradeFlag.Unlock()
upgradeFlag sync.Mutex
}

// backgroundEvent is the struct sent by background steps when they
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
t.Fatal(err)
}

tc, err := kafka.consumer(ctx, "bank")
tc, err := kafka.newConsumer(ctx, "bank")
if err != nil {
t.Fatal(errors.Wrap(err, "could not create kafka consumer"))
}
Expand Down Expand Up @@ -2234,7 +2234,7 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error {
})
}

func (k kafkaManager) consumer(ctx context.Context, topic string) (*topicConsumer, error) {
func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) {
kafkaAddrs := []string{k.consumerURL(ctx)}
config := sarama.NewConfig()
// I was seeing "error processing FetchRequest: kafka: error decoding
Expand Down
Loading

0 comments on commit fc25ef0

Please sign in to comment.