Skip to content

Commit

Permalink
roachtest: ensure c2c/shutdown tests start destination tenant with on…
Browse files Browse the repository at this point in the history
…line node

An earlier patch #110033 introduced a change that starts the destination tenant
from any destination node, but did not consider if that node was shut down.  If
the driver attempts to connect to the shut down node, the roachtest fails. This
patch ensures that the tenant is started on a node that will be online.

Fixes #110317

Release note: None
  • Loading branch information
msbutler committed Sep 11, 2023
1 parent 0d073f4 commit f951f56
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type clusterInfo struct {
// sql provides a sql connection to the system tenant
sysSQL *sqlutils.SQLRunner

// gatewayNodes identify the nodes that should remain available during the whole roachtest.
gatewayNodes option.NodeListOption

// nodes indicates the roachprod nodes running the cluster's nodes
nodes option.NodeListOption
}
Expand All @@ -77,9 +80,7 @@ type c2cSetup struct {
// workloadNode identifies the node in the roachprod cluster that runs the workload.
workloadNode option.NodeListOption

// gatewayNodes identify the nodes in the source cluster to connect the main workload to.
gatewayNodes option.NodeListOption
promCfg *prometheus.Config
promCfg *prometheus.Config
}

const maxExpectedLatencyDefault = 2 * time.Minute
Expand Down Expand Up @@ -209,7 +210,7 @@ type streamingWorkload interface {
func defaultWorkloadDriver(
workloadCtx context.Context, setup *c2cSetup, c cluster.Cluster, workload streamingWorkload,
) error {
return c.RunE(workloadCtx, setup.workloadNode, workload.sourceRunCmd(setup.src.name, setup.gatewayNodes))
return c.RunE(workloadCtx, setup.workloadNode, workload.sourceRunCmd(setup.src.name, setup.src.gatewayNodes))
}

type replicateTPCC struct {
Expand Down Expand Up @@ -439,24 +440,26 @@ func (rd *replicationDriver) setupC2C(ctx context.Context, t test.Test, c cluste
require.NoError(t, err)

srcTenantInfo := clusterInfo{
name: srcTenantName,
ID: srcTenantID,
pgURL: pgURL,
sysSQL: srcSQL,
db: srcDB,
nodes: srcCluster}
name: srcTenantName,
ID: srcTenantID,
pgURL: pgURL,
sysSQL: srcSQL,
db: srcDB,
gatewayNodes: srcCluster,
nodes: srcCluster}
destTenantInfo := clusterInfo{
name: destTenantName,
ID: destTenantID,
sysSQL: destSQL,
db: destDB,
nodes: dstCluster}
name: destTenantName,
ID: destTenantID,
sysSQL: destSQL,
db: destDB,
gatewayNodes: dstCluster,
nodes: dstCluster}

rd.setup = &c2cSetup{
src: &srcTenantInfo,
dst: &destTenantInfo,
workloadNode: workloadNode,
gatewayNodes: srcTenantInfo.nodes}
}

rd.t = t
rd.c = c
Expand Down Expand Up @@ -847,7 +850,7 @@ func (rd *replicationDriver) main(ctx context.Context) {
rd.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now())

rd.t.L().Printf("starting the destination tenant")
startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.nodes)
startInMemoryTenant(ctx, rd.t, rd.c, rd.setup.dst.name, rd.setup.dst.gatewayNodes)

rd.metrics.export(rd.t, len(rd.setup.src.nodes))

Expand Down Expand Up @@ -1314,13 +1317,14 @@ func registerClusterReplicationResilience(r registry.Registry) {
if rsp.onSrc {
rd.setup.src.db = watcherDB
rd.setup.src.sysSQL = watcherSQL
rd.setup.gatewayNodes = c.Node(rrd.watcherNode)
rd.setup.src.gatewayNodes = c.Node(rrd.watcherNode)
} else {
rd.setup.dst.db = watcherDB
rd.setup.dst.sysSQL = watcherSQL
rd.setup.dst.gatewayNodes = c.Node(rrd.watcherNode)
}
t.L().Printf(`%s configured: Shutdown Node %d; Watcher node %d; Gateway nodes %s`,
rrd.rsp.name(), rrd.shutdownNode, rrd.watcherNode, rrd.setup.gatewayNodes)
rrd.rsp.name(), rrd.shutdownNode, rrd.watcherNode, rrd.setup.dst.gatewayNodes)
}
mainDriverCtx, cancelMain := context.WithCancel(ctx)
mainMonitor := rrd.newMonitor(mainDriverCtx)
Expand Down

0 comments on commit f951f56

Please sign in to comment.