diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 66c056a96bdf..a375b19fbed4 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -66,6 +66,9 @@ type clusterInfo struct { // sql provides a sql connection to the system tenant sysSQL *sqlutils.SQLRunner + // gatewayNodes identify the nodes that should remain available during the whole roachtest. + gatewayNodes option.NodeListOption + // nodes indicates the roachprod nodes running the cluster's nodes nodes option.NodeListOption } @@ -77,9 +80,7 @@ type c2cSetup struct { // workloadNode identifies the node in the roachprod cluster that runs the workload. workloadNode option.NodeListOption - // gatewayNodes identify the nodes in the source cluster to connect the main workload to. - gatewayNodes option.NodeListOption - promCfg *prometheus.Config + promCfg *prometheus.Config } const maxExpectedLatencyDefault = 2 * time.Minute @@ -209,7 +210,7 @@ type streamingWorkload interface { func defaultWorkloadDriver( workloadCtx context.Context, setup *c2cSetup, c cluster.Cluster, workload streamingWorkload, ) error { - return c.RunE(workloadCtx, setup.workloadNode, workload.sourceRunCmd(setup.src.name, setup.gatewayNodes)) + return c.RunE(workloadCtx, setup.workloadNode, workload.sourceRunCmd(setup.src.name, setup.src.gatewayNodes)) } type replicateTPCC struct { @@ -439,24 +440,26 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste require.NoError(t, err) srcTenantInfo := clusterInfo{ - name: srcTenantName, - ID: srcTenantID, - pgURL: pgURL, - sysSQL: srcSQL, - db: srcDB, - nodes: srcCluster} + name: srcTenantName, + ID: srcTenantID, + pgURL: pgURL, + sysSQL: srcSQL, + db: srcDB, + gatewayNodes: srcCluster, + nodes: srcCluster} destTenantInfo := clusterInfo{ - name: destTenantName, - ID: destTenantID, - sysSQL: destSQL, - db: destDB, - nodes: dstCluster} + name: destTenantName, + ID: destTenantID, + sysSQL: destSQL, + db: destDB, + gatewayNodes: dstCluster, + nodes: dstCluster} rd.setup = &c2cSetup{ src: &srcTenantInfo, dst: &destTenantInfo, workloadNode: workloadNode, - gatewayNodes: srcTenantInfo.nodes} + } rd.t = t rd.c = c @@ -847,7 +850,7 @@ func (rd *replicationDriver) main(ctx context.Context) { rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.t.L().Printf("starting the destination tenant") - startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.nodes) + startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes) rd.metrics.export(rd.t, len(rd.setup.src.nodes)) @@ -1314,13 +1317,14 @@ func registerClusterReplicationResilience(r registry.Registry) { if rsp.onSrc { rd.setup.src.db = watcherDB rd.setup.src.sysSQL = watcherSQL - rd.setup.gatewayNodes = c.Node(rrd.watcherNode) + rd.setup.src.gatewayNodes = c.Node(rrd.watcherNode) } else { rd.setup.dst.db = watcherDB rd.setup.dst.sysSQL = watcherSQL + rd.setup.dst.gatewayNodes = c.Node(rrd.watcherNode) } t.L().Printf(`%s configured: Shutdown Node %d; Watcher node %d; Gateway nodes %s`, - rrd.rsp.name(), rrd.shutdownNode, rrd.watcherNode, rrd.setup.gatewayNodes) + rrd.rsp.name(), rrd.shutdownNode, rrd.watcherNode, rrd.setup.dst.gatewayNodes) } mainDriverCtx, cancelMain := context.WithCancel(ctx) mainMonitor := rrd.newMonitor(mainDriverCtx)