From 48c0e087309a47853fa67b4f715e26019a449a16 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 23 Feb 2023 10:37:53 -0500 Subject: [PATCH] c2c: refactor tenant ru limit removal in roachtests This patch streamlines how we remove ru limiting for roachtests that use tenants. For the c2c tests specifically, we know remove the limits on the dst cluster tenant as soon as the replication stream begins. Release note: None --- pkg/cmd/roachtest/tests/cluster_to_cluster.go | 59 ++++++++----------- pkg/cmd/roachtest/tests/multitenant_utils.go | 24 +++++--- 2 files changed, 42 insertions(+), 41 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 52392d16ab9f..2e2beb87fa8e 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -56,8 +56,8 @@ type clusterInfo struct { // db provides a connection to the system tenant db *gosql.DB - // sql provides a sql connection to the host cluster - sql *sqlutils.SQLRunner + // sql provides a sql connection to the system tenant + sysSQL *sqlutils.SQLRunner // nodes indicates the roachprod nodes running the cluster's nodes nodes option.NodeListOption @@ -300,18 +300,18 @@ func setupC2C( require.NoError(t, err) srcTenantInfo := clusterInfo{ - name: srcTenantName, - ID: srcTenantID, - pgURL: pgURL, - sql: srcSQL, - db: srcDB, - nodes: srcCluster} + name: srcTenantName, + ID: srcTenantID, + pgURL: pgURL, + sysSQL: srcSQL, + db: srcDB, + nodes: srcCluster} destTenantInfo := clusterInfo{ - name: destTenantName, - ID: destTenantID, - sql: destSQL, - db: destDB, - nodes: dstCluster} + name: destTenantName, + ID: destTenantID, + sysSQL: destSQL, + db: destDB, + nodes: dstCluster} setup := &c2cSetup{ src: srcTenantInfo, @@ -463,10 +463,13 @@ func registerClusterToCluster(r registry.Registry) { t.Status("starting replication stream") setup.metrics.initalScanStart = newMetricSnapshot(metricSnapper, timeutil.Now()) + + // There's no need to remove the tenant limiters for this new app tenant, as + // all replication traffic flows through the system tenant. streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'", setup.dst.name, setup.src.name, setup.src.pgURL) - setup.dst.sql.Exec(t, streamReplStmt) - ingestionJobID := getIngestionJobID(t, setup.dst.sql, setup.dst.name) + setup.dst.sysSQL.Exec(t, streamReplStmt) + ingestionJobID := getIngestionJobID(t, setup.dst.sysSQL, setup.dst.name) // TODO(ssd): The job doesn't record the initial // statement time, so we can't correctly measure the @@ -485,7 +488,7 @@ func registerClusterToCluster(r registry.Registry) { sp.additionalDuration)) var currentTime time.Time - setup.dst.sql.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) + setup.dst.sysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(¤tTime) cutoverTime := currentTime.Add(sp.additionalDuration - sp.cutover) t.Status("cutover time chosen: ", cutoverTime.String()) @@ -498,10 +501,10 @@ func registerClusterToCluster(r registry.Registry) { return } t.Status(fmt.Sprintf("waiting for replication stream to cutover to %s", cutoverTime.String())) - retainedTime := getReplicationRetainedTime(t, setup.dst.sql, roachpb.TenantName(setup.dst.name)) + retainedTime := getReplicationRetainedTime(t, setup.dst.sysSQL, roachpb.TenantName(setup.dst.name)) setup.metrics.cutoverTo = newMetricSnapshot(metricSnapper, cutoverTime) setup.metrics.cutoverStart = newMetricSnapshot(metricSnapper, timeutil.Now()) - stopReplicationStream(t, setup.dst.sql, ingestionJobID, cutoverTime) + stopReplicationStream(t, setup.dst.sysSQL, ingestionJobID, cutoverTime) setup.metrics.cutoverEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) setup.metrics.export(t, len(setup.src.nodes)) @@ -548,14 +551,14 @@ AS OF SYSTEM TIME '%s'`, startTimeStr, aost) var srcFingerprint int64 m.Go(func(ctx context.Context) error { - setup.src.sql.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) + setup.src.sysSQL.QueryRow(t, fingerprintQuery, setup.src.ID).Scan(&srcFingerprint) return nil }) var destFingerprint int64 m.Go(func(ctx context.Context) error { // TODO(adityamaru): Measure and record fingerprinting throughput. setup.metrics.fingerprintingStart = timeutil.Now() - setup.dst.sql.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) + setup.dst.sysSQL.QueryRow(t, fingerprintQuery, setup.dst.ID).Scan(&destFingerprint) setup.metrics.fingerprintingEnd = timeutil.Now() fingerprintingDuration := setup.metrics.fingerprintingEnd.Sub(setup.metrics.fingerprintingStart).String() t.L().Printf("fingerprinting the destination tenant took %s", fingerprintingDuration) @@ -660,24 +663,12 @@ func stopReplicationStream( } func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) { - db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) + db.ExecMultiple(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) } func destClusterSettings(t test.Test, db *sqlutils.SQLRunner) { db.ExecMultiple(t, `SET CLUSTER SETTING cross_cluster_replication.enabled = true;`, - `SET CLUSTER SETTING kv.rangefeed.enabled = true;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, - `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) + `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) } func copyPGCertsAndMakeURL( diff --git a/pkg/cmd/roachtest/tests/multitenant_utils.go b/pkg/cmd/roachtest/tests/multitenant_utils.go index 686feb9d78a7..990add690cb9 100644 --- a/pkg/cmd/roachtest/tests/multitenant_utils.go +++ b/pkg/cmd/roachtest/tests/multitenant_utils.go @@ -347,6 +347,8 @@ func createInMemoryTenant( sysSQL.Exec(t, "CREATE TENANT $1", tenantName) sysSQL.Exec(t, "ALTER TENANT $1 START SERVICE SHARED", tenantName) + removeTenantRateLimiters(t, sysSQL, tenantName) + // Opening a SQL session to a newly created in-process tenant may require a // few retries. Unfortunately, the c.ConnE and MakeSQLRunner APIs do not make // it clear if they eagerly open a session with the tenant or wait until the @@ -365,14 +367,22 @@ func createInMemoryTenant( return nil }) - // Currently, a tenant has by default a 10m RU burst limit, which can be - // reached during these tests. To prevent RU limit throttling, add 10B RUs to - // the tenant. - var tenantID int - sysSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) - sysSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, -10000000000, now(), 0);`, tenantID) if secure { createTenantAdminRole(t, tenantName, tenantSQL) } } + +// removeTenantRateLimiters ensures the tenant is not throttled by limiters. +func removeTenantRateLimiters(t test.Test, systemSQL *sqlutils.SQLRunner, tenantName string) { + var tenantID int + systemSQL.QueryRow(t, `SELECT id FROM [SHOW TENANT $1]`, tenantName).Scan(&tenantID) + systemSQL.Exec(t, `SELECT crdb_internal.update_tenant_resource_limits($1, 10000000000, 0, +10000000000, now(), 0);`, tenantID) + systemSQL.ExecMultiple(t, + `SET CLUSTER SETTING kv.tenant_rate_limiter.burst_limit_seconds = 10000;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.rate_limit = -1000; `, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_batch_cost = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.read_cost_per_mebibyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_cost_per_megabyte = 0;`, + `SET CLUSTER SETTING kv.tenant_rate_limiter.write_request_cost = 0;`) +}