Skip to content

Commit

Permalink
roachtest: cdc/mixed-versions support for shared-process deployments
Browse files Browse the repository at this point in the history
This commit updates the `cdc/mixed-versions` test so that it is able
to run on a shared-process deployment. Specifically, it updates the
changefeed creator to take two connections: one to the tenant being
tested (which could be the system tenant as well), and one to the
system tenant. The latter is used to set cluster settings that are
only visible to the system tenant and control how changefeeds work.

Informs: #127378

Release note: None
  • Loading branch information
renatolabs committed Jul 16, 2024
1 parent 84bec72 commit 0bd9e6d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
34 changes: 20 additions & 14 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob {
args.sinkType, args.targets, feedOptions,
))
db := ct.DB()
jobID, err := newChangefeedCreator(db, ct.logger, globalRand, targetsStr, sinkURI, makeDefaultFeatureFlags()).
jobID, err := newChangefeedCreator(db, db, ct.logger, globalRand, targetsStr, sinkURI, makeDefaultFeatureFlags()).
With(feedOptions).Create()
if err != nil {
ct.t.Fatalf("failed to create changefeed: %s", err.Error())
Expand Down Expand Up @@ -762,7 +762,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
"min_checkpoint_frequency": "'2s'",
"diff": "",
}
_, err := newChangefeedCreator(db, t.L(), globalRand, "bank.bank", kafka.sinkURL(ctx), makeDefaultFeatureFlags()).
_, err := newChangefeedCreator(db, db, t.L(), globalRand, "bank.bank", kafka.sinkURL(ctx), makeDefaultFeatureFlags()).
With(options).
Create()
if err != nil {
Expand Down Expand Up @@ -927,7 +927,7 @@ func runCDCSchemaRegistry(ctx context.Context, t test.Test, c cluster.Cluster) {
"diff": "",
}

_, err := newChangefeedCreator(db, t.L(), globalRand, "foo", kafka.sinkURL(ctx), makeDefaultFeatureFlags()).
_, err := newChangefeedCreator(db, db, t.L(), globalRand, "foo", kafka.sinkURL(ctx), makeDefaultFeatureFlags()).
With(options).
Args(kafka.schemaRegistryURL(ctx)).
Create()
Expand Down Expand Up @@ -1069,7 +1069,7 @@ func runCDCKafkaAuth(ctx context.Context, t test.Test, c cluster.Cluster) {

for _, f := range feeds {
t.Status(fmt.Sprintf("running:%s, query:%s", f.desc, f.queryArg))
_, err := newChangefeedCreator(db, t.L(), globalRand, "auth_test_table", f.queryArg, makeDefaultFeatureFlags()).Create()
_, err := newChangefeedCreator(db, db, t.L(), globalRand, "auth_test_table", f.queryArg, makeDefaultFeatureFlags()).Create()
if err != nil {
t.Fatalf("%s: %s", f.desc, err.Error())
}
Expand Down Expand Up @@ -2843,6 +2843,7 @@ func (lw *ledgerWorkload) run(ctx context.Context, c cluster.Cluster, workloadDu
// different options and sinks
type changefeedCreator struct {
db *gosql.DB
systemDB *gosql.DB
logger *logger.Logger
targets string
sinkURL string
Expand All @@ -2854,16 +2855,21 @@ type changefeedCreator struct {
}

func newChangefeedCreator(
db *gosql.DB, logger *logger.Logger, r *rand.Rand, targets, sinkURL string, flags cdcFeatureFlags,
db, systemDB *gosql.DB,
logger *logger.Logger,
r *rand.Rand,
targets, sinkURL string,
flags cdcFeatureFlags,
) *changefeedCreator {
return &changefeedCreator{
db: db,
logger: logger,
targets: targets,
sinkURL: sinkURL,
options: make(map[string]string),
flags: flags,
rng: enthropy{Rand: r},
db: db,
systemDB: systemDB,
logger: logger,
targets: targets,
sinkURL: sinkURL,
options: make(map[string]string),
flags: flags,
rng: enthropy{Rand: r},
}
}

Expand Down Expand Up @@ -2898,14 +2904,14 @@ func (cfc *changefeedCreator) applySettings() error {
return nil
}
// kv.rangefeed.enabled is required for changefeeds to run
if _, err := cfc.db.Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil {
if _, err := cfc.systemDB.Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil {
return err
}

schedEnabled := cfc.flags.RangeFeedScheduler.enabled(cfc.rng)
if schedEnabled != featureUnset {
cfc.logger.Printf("Setting kv.rangefeed.scheduler.enabled to %t", schedEnabled == featureEnabled)
if _, err := cfc.db.Exec(
if _, err := cfc.systemDB.Exec(
"SET CLUSTER SETTING kv.rangefeed.scheduler.enabled = $1", schedEnabled == featureEnabled,
); err != nil {
return err
Expand Down
23 changes: 17 additions & 6 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(
case <-cmvt.workloadInit:
}
node, db := h.RandomDB(r)
l.Printf("starting changefeed on node %d", node)
systemNode, systemDB := h.System.RandomDB(r)
l.Printf("starting changefeed on node %d (updating system settings via node %d)", node, systemNode)

options := map[string]string{
"updated": "",
Expand All @@ -380,7 +381,7 @@ func (cmvt *cdcMixedVersionTester) createChangeFeed(
ff.DistributionStrategy.v = &featureUnset
}

jobID, err := newChangefeedCreator(db, l, r, fmt.Sprintf("%s.%s", targetDB, targetTable),
jobID, err := newChangefeedCreator(db, systemDB, l, r, fmt.Sprintf("%s.%s", targetDB, targetTable),
cmvt.kafka.manager.sinkURL(ctx), ff).
With(options).
Create()
Expand All @@ -406,6 +407,10 @@ func (cmvt *cdcMixedVersionTester) runWorkloadCmd(r *rand.Rand) *roachtestutil.C
func (cmvt *cdcMixedVersionTester) initWorkload(
ctx context.Context, l *logger.Logger, r *rand.Rand, h *mixedversion.Helper,
) error {
if err := enableTenantSplitScatter(l, r, h); err != nil {
return err
}

bankInit := roachtestutil.NewCommand("%s workload init bank", test.DefaultCockroachPath).
Flag("seed", r.Int63()).
Arg("{pgurl%s}", cmvt.crdbNodes)
Expand Down Expand Up @@ -503,8 +508,12 @@ func runCDCMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster) {

mvt := mixedversion.NewTest(
ctx, t, t.L(), c, tester.crdbNodes,
// Multi-tenant deployments are currently unsupported. See #127378.
mixedversion.EnabledDeploymentModes(mixedversion.SystemOnlyDeployment),
// We set the minimum supported version to 23.2 in this test as it
// relies on the `kv.rangefeed.enabled` cluster setting. This
// setting is, confusingly, labeled as `TenantWritable` in
// 23.1. That mistake was then fixed (#110676) but, to simplify
// this test, we only create changefeeds in more recent versions.
mixedversion.MinimumSupportedVersion("v23.2.0"),
)

cleanupKafka := tester.StartKafka(t, c)
Expand All @@ -519,7 +528,9 @@ func runCDCMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster) {
if supported {
coin := r.Int()%2 == 0
l.PrintfCtx(ctx, "Setting changefeed.mux_rangefeed.enabled=%t ", coin)
return h.ExecWithGateway(r, gatewayNodes, "SET CLUSTER SETTING changefeed.mux_rangefeed.enabled=$1", coin)
return h.System.ExecWithGateway(
r, gatewayNodes, "SET CLUSTER SETTING changefeed.mux_rangefeed.enabled=$1", coin,
)
}
return nil
}
Expand All @@ -534,7 +545,7 @@ func runCDCMixedVersions(ctx context.Context, t test.Test, c cluster.Cluster) {
if supported {
coin := r.Int()%2 == 0
l.PrintfCtx(ctx, "Setting kv.rangefeed.scheduler.enabled=%t", coin)
return h.Exec(r, "SET CLUSTER SETTING kv.rangefeed.scheduler.enabled=$1", coin)
return h.System.Exec(r, "SET CLUSTER SETTING kv.rangefeed.scheduler.enabled=$1", coin)
}
return nil
}
Expand Down

0 comments on commit 0bd9e6d

Please sign in to comment.