Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request cockroachdb#116526 from herkolategan/backport23.1-…
Browse files Browse the repository at this point in the history
…115599

release-23.1: roachtest: update multitenant/distsql to use new roachprod service APIs
herkolategan authored Jan 12, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents fb921d2 + e1d7050 commit cac67a7
Showing 8 changed files with 142 additions and 125 deletions.
114 changes: 87 additions & 27 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
@@ -640,8 +640,11 @@ type clusterImpl struct {
expiration time.Time
encAtRest bool // use encryption at rest

// clusterSettings are additional cluster settings set on cluster startup.
// clusterSettings are additional cluster settings set on the storage cluster startup.
clusterSettings map[string]string
// virtualClusterSettings are additional cluster settings to set on the
// virtual cluster startup.
virtualClusterSettings map[string]string
// goCoverDir is the directory for Go coverage data (if coverage is enabled).
// BAZEL_COVER_DIR will be set to this value when starting a node.
goCoverDir string
@@ -1873,6 +1876,38 @@ func (c *clusterImpl) clearStatusForClusterOpt(worker bool) {
}
}

func (c *clusterImpl) configureClusterSettingOptions(
defaultClusterSettings install.ClusterSettingsOption, settings install.ClusterSettings,
) []install.ClusterSettingOption {
setUnlessExists := func(name string, value interface{}) {
if !envExists(settings.Env, name) {
settings.Env = append(settings.Env, fmt.Sprintf("%s=%s", name, fmt.Sprint(value)))
}
}
// Set the same seed on every node, to be used by builds with
// runtime assertions enabled.
setUnlessExists("COCKROACH_RANDOM_SEED", c.cockroachRandomSeed())

// Panic on span use-after-Finish, so we catch such bugs.
setUnlessExists("COCKROACH_CRASH_ON_SPAN_USE_AFTER_FINISH", true)

if c.goCoverDir != "" {
settings.Env = append(settings.Env, fmt.Sprintf("BAZEL_COVER_DIR=%s", c.goCoverDir))
}

return []install.ClusterSettingOption{
install.TagOption(settings.Tag),
install.PGUrlCertsDirOption(settings.PGUrlCertsDir),
install.SecureOption(settings.Secure),
install.UseTreeDistOption(settings.UseTreeDist),
install.EnvOption(settings.Env),
install.NumRacksOption(settings.NumRacks),
install.BinaryOption(settings.Binary),
defaultClusterSettings,
install.ClusterSettingsOption(settings.ClusterSettings),
}
}

// StartE starts cockroach nodes on a subset of the cluster. The nodes parameter
// can either be a specific node, empty (to indicate all nodes), or a pair of
// nodes indicating a range.
@@ -1891,17 +1926,6 @@ func (c *clusterImpl) StartE(

startOpts.RoachprodOpts.EncryptedStores = c.encAtRest

setUnlessExists := func(name string, value interface{}) {
if !envExists(settings.Env, name) {
settings.Env = append(settings.Env, fmt.Sprintf("%s=%s", name, fmt.Sprint(value)))
}
}
// Panic on span use-after-Finish, so we catch such bugs.
setUnlessExists("COCKROACH_CRASH_ON_SPAN_USE_AFTER_FINISH", true)
// Set the same seed on every node, to be used by builds with
// runtime assertions enabled.
setUnlessExists("COCKROACH_RANDOM_SEED", c.cockroachRandomSeed())

// Needed for backward-compat on crdb_internal.ranges{_no_leases}.
// Remove in v23.2.
if !envExists(settings.Env, "COCKROACH_FORCE_DEPRECATED_SHOW_RANGE_BEHAVIOR") {
@@ -1910,21 +1934,7 @@ func (c *clusterImpl) StartE(
settings.Env = append(settings.Env, "COCKROACH_FORCE_DEPRECATED_SHOW_RANGE_BEHAVIOR=false")
}

if c.goCoverDir != "" {
settings.Env = append(settings.Env, fmt.Sprintf("BAZEL_COVER_DIR=%s", c.goCoverDir))
}

clusterSettingsOpts := []install.ClusterSettingOption{
install.TagOption(settings.Tag),
install.PGUrlCertsDirOption(settings.PGUrlCertsDir),
install.SecureOption(settings.Secure),
install.UseTreeDistOption(settings.UseTreeDist),
install.EnvOption(settings.Env),
install.NumRacksOption(settings.NumRacks),
install.BinaryOption(settings.Binary),
install.ClusterSettingsOption(c.clusterSettings),
install.ClusterSettingsOption(settings.ClusterSettings),
}
clusterSettingsOpts := c.configureClusterSettingOptions(c.clusterSettings, settings)

if err := roachprod.Start(ctx, l, c.MakeNodes(opts...), startOpts.RoachprodOpts, clusterSettingsOpts...); err != nil {
return err
@@ -1940,6 +1950,56 @@ func (c *clusterImpl) StartE(
return nil
}

// StartServiceForVirtualClusterE can start either external or shared process
// virtual clusters. This can be specified in startOpts.RoachprodOpts. Set the
// `Target` to the required virtual cluster type. Refer to the virtual cluster
// section in the struct for more information on what fields are available for
// virtual clusters.
//
// With external process virtual clusters an external process will be started on
// each node specified in the externalNodes parameter.
//
// With shared process virtual clusters the required queries will be run on a
// storage node of the cluster specified in the opts parameter.
func (c *clusterImpl) StartServiceForVirtualClusterE(
ctx context.Context,
l *logger.Logger,
externalNodes option.NodeListOption,
startOpts option.StartOpts,
settings install.ClusterSettings,
opts ...option.Option,
) error {

c.setStatusForClusterOpt("starting virtual cluster", startOpts.RoachtestOpts.Worker, opts...)
defer c.clearStatusForClusterOpt(startOpts.RoachtestOpts.Worker)

clusterSettingsOpts := c.configureClusterSettingOptions(c.virtualClusterSettings, settings)

if err := roachprod.StartServiceForVirtualCluster(ctx, l, c.MakeNodes(externalNodes), c.MakeNodes(opts...), startOpts.RoachprodOpts, clusterSettingsOpts...); err != nil {
return err
}

if settings.Secure {
if err := c.RefetchCertsFromNode(ctx, 1); err != nil {
return err
}
}
return nil
}

func (c *clusterImpl) StartServiceForVirtualCluster(
ctx context.Context,
l *logger.Logger,
externalNodes option.NodeListOption,
startOpts option.StartOpts,
settings install.ClusterSettings,
opts ...option.Option,
) {
if err := c.StartServiceForVirtualClusterE(ctx, l, externalNodes, startOpts, settings, opts...); err != nil {
c.t.Fatal(err)
}
}

func (c *clusterImpl) RefetchCertsFromNode(ctx context.Context, node int) error {
var err error
c.localCertsDir, err = os.MkdirTemp("", "roachtest-certs")
5 changes: 5 additions & 0 deletions pkg/cmd/roachtest/cluster/cluster_interface.go
Original file line number Diff line number Diff line change
@@ -64,6 +64,11 @@ type Cluster interface {
StopCockroachGracefullyOnNode(ctx context.Context, l *logger.Logger, node int) error
NewMonitor(context.Context, ...option.Option) Monitor

// Starting virtual clusters.

StartServiceForVirtualClusterE(ctx context.Context, l *logger.Logger, externalNodes option.NodeListOption, startOpts option.StartOpts, settings install.ClusterSettings, opts ...option.Option) error
StartServiceForVirtualCluster(ctx context.Context, l *logger.Logger, externalNodes option.NodeListOption, startOpts option.StartOpts, settings install.ClusterSettings, opts ...option.Option)

// Hostnames and IP addresses of the nodes.

InternalAddr(ctx context.Context, l *logger.Logger, node option.NodeListOption) ([]string, error)
6 changes: 6 additions & 0 deletions pkg/cmd/roachtest/option/connection_options.go
Original file line number Diff line number Diff line change
@@ -35,6 +35,12 @@ func TenantName(tenantName string) func(*ConnOption) {
}
}

func SQLInstance(sqlInstance int) func(*ConnOption) {
return func(option *ConnOption) {
option.SQLInstance = sqlInstance
}
}

func ConnectionOption(key, value string) func(*ConnOption) {
return func(option *ConnOption) {
if len(option.Options) == 0 {
10 changes: 10 additions & 0 deletions pkg/cmd/roachtest/option/options.go
Original file line number Diff line number Diff line change
@@ -46,6 +46,16 @@ func DefaultStartSingleNodeOpts() StartOpts {
return startOpts
}

// DefaultStartVirtualClusterOpts returns StartOpts for starting an external
// process virtual cluster with the given tenant name and SQL instance.
func DefaultStartVirtualClusterOpts(tenantName string, sqlInstance int) StartOpts {
startOpts := StartOpts{RoachprodOpts: roachprod.DefaultStartOpts()}
startOpts.RoachprodOpts.Target = install.StartServiceForVirtualCluster
startOpts.RoachprodOpts.VirtualClusterName = tenantName
startOpts.RoachprodOpts.SQLInstance = sqlInstance
return startOpts
}

// StopOpts is a type that combines the stop options needed by roachprod and roachtest.
type StopOpts struct {
RoachprodOpts roachprod.StopOpts
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
@@ -783,6 +783,7 @@ func (r *testRunner) runWorker(

// Set initial cluster settings for this test.
c.clusterSettings = map[string]string{}
c.virtualClusterSettings = map[string]string{}

switch testSpec.Leases {
case registry.DefaultLeases:
86 changes: 31 additions & 55 deletions pkg/cmd/roachtest/tests/multitenant_distsql.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ package tests
import (
"archive/zip"
"context"
gosql "database/sql"
"fmt"
"io"
"strconv"
@@ -59,78 +58,54 @@ func runMultiTenantDistSQL(
bundle bool,
timeoutMillis int,
) {
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.Node(2))
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(install.SecureOption(true)), c.Node(3))

const (
tenantID = 11
tenantBaseHTTPPort = 8081
tenantBaseSQLPort = 26259
)

tenantHTTPPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseHTTPPort + offset
}
return tenantBaseHTTPPort
}
tenantSQLPort := func(offset int) int {
if c.IsLocal() || numInstances > c.Spec().NodeCount {
return tenantBaseSQLPort + offset
}
return tenantBaseSQLPort
// This test sets a smaller default range size than the default due to
// performance and resource limitations. We set the minimum range max bytes to
// 1 byte to bypass the guardrails.
settings := install.MakeClusterSettings(install.SecureOption(true))
settings.Env = append(settings.Env, "COCKROACH_MIN_RANGE_MAX_BYTES=1")
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(2))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Node(3))
storageNodes := c.Range(1, 3)

tenantName := "test-tenant"
var nodes intsets.Fast
for i := 0; i < numInstances; i++ {
node := (i % c.Spec().NodeCount) + 1
sqlInstance := i / c.Spec().NodeCount
instStartOps := option.DefaultStartVirtualClusterOpts(tenantName, sqlInstance)
t.L().Printf("Starting instance %d on node %d", i, node)
c.StartServiceForVirtualCluster(ctx, t.L(), c.Node(node), instStartOps, settings, storageNodes)
nodes.Add(i + 1)
}

storConn := c.Conn(ctx, t.L(), 1)
_, err := storConn.Exec(`SELECT crdb_internal.create_tenant($1::INT)`, tenantID)
require.NoError(t, err)

instances := make([]*tenantNode, 0, numInstances)
instance1 := createTenantNode(ctx, t, c, c.Node(1), tenantID, 2 /* node */, tenantHTTPPort(0), tenantSQLPort(0),
createTenantCertNodes(c.All()))
instances = append(instances, instance1)
defer instance1.stop(ctx, t, c)
instance1.start(ctx, t, c, "./cockroach")

// Open things up so we can configure range sizes below.
_, err = storConn.Exec(`ALTER TENANT [$1] SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantID)
// Open things up, so we can configure range sizes below.
_, err := storConn.Exec(`ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true`, tenantName)
require.NoError(t, err)

// Create numInstances sql pods and spread them evenly across the machines.
var nodes intsets.Fast
nodes.Add(1)
for i := 1; i < numInstances; i++ {
node := ((i + 1) % c.Spec().NodeCount) + 1
inst, err := newTenantInstance(ctx, instance1, t, c, node, tenantHTTPPort(i), tenantSQLPort(i))
instances = append(instances, inst)
require.NoError(t, err)
defer inst.stop(ctx, t, c)
inst.start(ctx, t, c, "./cockroach")
nodes.Add(i + 1)
}

m := c.NewMonitor(ctx, c.Nodes(1, 2, 3))

inst1Conn, err := gosql.Open("postgres", instance1.pgURL)
inst1Conn, err := c.ConnE(ctx, t.L(), 1, option.TenantName(tenantName))
require.NoError(t, err)
_, err = inst1Conn.Exec("CREATE TABLE t(n INT, i INT,s STRING, PRIMARY KEY(n,i))")
require.NoError(t, err)

// DistSQL needs at least a range per node to distribute query everywhere
// and test takes too long and too much resources with default range sizes
// and test takes too long and too many resources with default range sizes
// so make them much smaller.
_, err = inst1Conn.Exec(`ALTER TABLE t CONFIGURE ZONE USING range_min_bytes = 1000,range_max_bytes = 100000`)
require.NoError(t, err)

insertCtx, cancel := context.WithCancel(ctx)
defer cancel()

for i, inst := range instances {
url := inst.pgURL
for i := 0; i < numInstances; i++ {
li := i
m.Go(func(ctx context.Context) error {
dbi, err := gosql.Open("postgres", url)
node := (li % c.Spec().NodeCount) + 1
sqlInstance := li / c.Spec().NodeCount
dbi, err := c.ConnE(ctx, t.L(), node, option.TenantName(tenantName), option.SQLInstance(sqlInstance))
require.NoError(t, err)
iter := 0
for {
@@ -140,7 +115,7 @@ func runMultiTenantDistSQL(
t.L().Printf("worker %d done:%v", li, insertCtx.Err())
return nil
default:
// procede to report error
// proceed to report error
}
require.NoError(t, err, "instance idx = %d, iter = %d", li, iter)
iter++
@@ -180,7 +155,6 @@ func runMultiTenantDistSQL(
} else {
t.L().Printf("Only %d nodes present: %v", nodesInPlan.Len(), nodesInPlan)
}

}
m.Wait()

@@ -224,7 +198,9 @@ func runMultiTenantDistSQL(
if bundle {
// Open bundle and verify its contents
sqlConnCtx := clisqlclient.Context{}
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, instance1.pgURL)
pgURL, err := c.ExternalPGUrl(ctx, t.L(), c.Node(1), tenantName, 0)
require.NoError(t, err)
conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, pgURL[0])
bundles, err := clisqlclient.StmtDiagListBundles(ctx, conn)
require.NoError(t, err)

43 changes: 0 additions & 43 deletions pkg/cmd/roachtest/tests/multitenant_utils.go
Original file line number Diff line number Diff line change
@@ -16,8 +16,6 @@ import (
"fmt"
"math/rand"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
@@ -70,10 +68,6 @@ func createTenantOtherTenantIDs(ids []int) createTenantOpt {
return func(c *createTenantOptions) { c.otherTenantIDs = ids }
}

func createTenantCertNodes(nodes option.NodeListOption) createTenantOpt {
return func(c *createTenantOptions) { c.certNodes = nodes }
}

func createTenantNodeInternal(
ctx context.Context,
t test.Test,
@@ -285,43 +279,6 @@ func startTenantServer(
return errCh
}

func newTenantInstance(
ctx context.Context, tn *tenantNode, t test.Test, c cluster.Cluster, node, http, sql int,
) (*tenantNode, error) {
instID := tenantIds[tn.tenantID] + 1
tenantIds[tn.tenantID] = instID
inst := tenantNode{
tenantID: tn.tenantID,
instanceID: instID,
kvAddrs: tn.kvAddrs,
node: node,
httpPort: http,
sqlPort: sql,
}
tenantCertsDir, err := os.MkdirTemp("", "tenant-certs")
if err != nil {
return nil, err
}
key, crt := fmt.Sprintf("client-tenant.%d.key", tn.tenantID), fmt.Sprintf("client-tenant.%d.crt", tn.tenantID)
err = c.Get(ctx, t.L(), filepath.Join("certs", key), filepath.Join(tenantCertsDir, key), c.Node(tn.node))
if err != nil {
return nil, err
}
err = c.Get(ctx, t.L(), filepath.Join("certs", crt), filepath.Join(tenantCertsDir, crt), c.Node(tn.node))
if err != nil {
return nil, err
}
c.Put(ctx, filepath.Join(tenantCertsDir, key), filepath.Join("certs", key), c.Node(node))
c.Put(ctx, filepath.Join(tenantCertsDir, crt), filepath.Join("certs", crt), c.Node(node))
// sigh: locally theses are symlinked which breaks our crypto cert checks
if c.IsLocal() {
c.Run(ctx, c.Node(node), "rm", filepath.Join("certs", key))
c.Run(ctx, c.Node(node), "cp", filepath.Join(tenantCertsDir, key), filepath.Join("certs", key))
}
c.Run(ctx, c.Node(node), "chmod", "0600", filepath.Join("certs", key))
return &inst, nil
}

// createTenantAdminRole creates a role that can be used to log into a secure cluster's db console.
func createTenantAdminRole(t test.Test, tenantName string, tenantSQL *sqlutils.SQLRunner) {
username := "secure"
2 changes: 2 additions & 0 deletions pkg/roachprod/install/cockroach.go
Original file line number Diff line number Diff line change
@@ -160,7 +160,9 @@ const (
// StartRoutingProxy starts the SQL proxy process to route
// connections to multiple virtual clusters.
StartRoutingProxy
)

const (
// startSQLTimeout identifies the COCKROACH_CONNECT_TIMEOUT to use (in seconds)
// for sql cmds within syncedCluster.Start().
startSQLTimeout = 1200

0 comments on commit cac67a7

Please sign in to comment.