From d80e9f75aaef340012d0ce475de9f5d4ce650a6a Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 30 Nov 2022 17:10:54 -0500 Subject: [PATCH 1/4] rpc: add UnaryClientInterceptor testing knob Release note: None --- pkg/rpc/context.go | 20 +++++++++++++++----- pkg/rpc/context_testutils.go | 4 ++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 74d94d52f2df..12bc8157d14e 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -1509,7 +1509,7 @@ func (rpcCtx *Context) GRPCDialOptions( func (rpcCtx *Context) grpcDialOptionsInternal( ctx context.Context, target string, class ConnectionClass, transport transportType, ) ([]grpc.DialOption, error) { - dialOpts, err := rpcCtx.dialOptsCommon(class) + dialOpts, err := rpcCtx.dialOptsCommon(target, class) if err != nil { return nil, err } @@ -1678,7 +1678,9 @@ func (rpcCtx *Context) dialOptsNetwork( // dialOptsCommon computes options used for both in-memory and // over-the-network RPC connections. -func (rpcCtx *Context) dialOptsCommon(class ConnectionClass) ([]grpc.DialOption, error) { +func (rpcCtx *Context) dialOptsCommon( + target string, class ConnectionClass, +) ([]grpc.DialOption, error) { // The limiting factor for lowering the max message size is the fact // that a single large kv can be sent over the network in one message. // Our maximum kv size is unlimited, so we need this to be very large. @@ -1703,9 +1705,17 @@ func (rpcCtx *Context) dialOptsCommon(class ConnectionClass) ([]grpc.DialOption, } else { dialOpts = append(dialOpts, grpc.WithInitialWindowSize(initialWindowSize)) } - - if len(rpcCtx.clientUnaryInterceptors) > 0 { - dialOpts = append(dialOpts, grpc.WithChainUnaryInterceptor(rpcCtx.clientUnaryInterceptors...)) + unaryInterceptors := rpcCtx.clientUnaryInterceptors + unaryInterceptors = unaryInterceptors[:len(unaryInterceptors):len(unaryInterceptors)] + if rpcCtx.Knobs.UnaryClientInterceptor != nil { + if interceptor := rpcCtx.Knobs.UnaryClientInterceptor( + target, class, + ); interceptor != nil { + unaryInterceptors = append(unaryInterceptors, interceptor) + } + } + if len(unaryInterceptors) > 0 { + dialOpts = append(dialOpts, grpc.WithChainUnaryInterceptor(unaryInterceptors...)) } if len(rpcCtx.clientStreamInterceptors) > 0 { dialOpts = append(dialOpts, grpc.WithChainStreamInterceptor(rpcCtx.clientStreamInterceptors...)) diff --git a/pkg/rpc/context_testutils.go b/pkg/rpc/context_testutils.go index 36ec93e984e5..e0d2663cdb34 100644 --- a/pkg/rpc/context_testutils.go +++ b/pkg/rpc/context_testutils.go @@ -36,6 +36,10 @@ type ContextTestingKnobs struct { // internalClientAdapter - i.e. KV RPCs done against the local server. StreamClientInterceptor func(target string, class ConnectionClass) grpc.StreamClientInterceptor + // UnaryClientInterceptor, if non-nil, will be called when invoking any + // unary RPC. + UnaryClientInterceptor func(target string, class ConnectionClass) grpc.UnaryClientInterceptor + // InjectedLatencyOracle if non-nil contains a map from target address // (server.RPCServingAddr() of a remote node) to artificial latency in // milliseconds to inject. Setting this will cause the server to pause for From 8c5b3fd077d07a2e0144d4c0242962d7f78861a2 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 1 Dec 2022 01:37:13 -0500 Subject: [PATCH 2/4] roachpb: add SpanConfigConformanceReport.IsEmpty method Release note: None --- pkg/roachpb/BUILD.bazel | 1 + pkg/roachpb/span_config.go | 13 ++++++++ .../span_config_conformance_report_test.go | 32 +++++++++++++++++++ 3 files changed, 46 insertions(+) create mode 100644 pkg/roachpb/span_config_conformance_report_test.go diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index 580d251af6cc..49b89ef2af00 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -83,6 +83,7 @@ go_test( "metadata_replicas_test.go", "metadata_test.go", "replica_unavailable_error_test.go", + "span_config_conformance_report_test.go", "span_group_test.go", "string_test.go", "tenant_test.go", diff --git a/pkg/roachpb/span_config.go b/pkg/roachpb/span_config.go index 90da9764fbab..4bf4b5d20416 100644 --- a/pkg/roachpb/span_config.go +++ b/pkg/roachpb/span_config.go @@ -240,3 +240,16 @@ func NewAllTenantKeyspaceTargetsSetTargetType() *SystemSpanConfigTarget_Type { }, } } + +const numSpanConfigConformanceReportSlices = 5 + +// IsEmpty returns true if there are no entries in the report. +func (m *SpanConfigConformanceReport) IsEmpty() bool { + return [numSpanConfigConformanceReportSlices]int{} == [...]int{ + len(m.OverReplicated), + len(m.UnderReplicated), + len(m.ViolatingConstraints), + len(m.Unavailable), + len(m.UnavailableNodeIDs), + } +} diff --git a/pkg/roachpb/span_config_conformance_report_test.go b/pkg/roachpb/span_config_conformance_report_test.go new file mode 100644 index 000000000000..37b0c98d1787 --- /dev/null +++ b/pkg/roachpb/span_config_conformance_report_test.go @@ -0,0 +1,32 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package roachpb + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSpanConfigConformanceReport_IsEmpty(t *testing.T) { + var r SpanConfigConformanceReport + require.True(t, r.IsEmpty()) + r.ViolatingConstraints = make([]ConformanceReportedRange, 0) + require.True(t, r.IsEmpty()) + r.ViolatingConstraints = append(r.ViolatingConstraints, ConformanceReportedRange{}) + require.False(t, r.IsEmpty()) + + require.Equal( + t, numSpanConfigConformanceReportSlices, + reflect.TypeOf(&r).Elem().NumField(), + ) +} From 4c838d0280254d91f5df2ec0090af52434136089 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 30 Nov 2022 17:15:53 -0500 Subject: [PATCH 3/4] regionlatency: add a method to get the set of regions Release note: None --- .../serverutils/regionlatency/region_latencies.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/testutils/serverutils/regionlatency/region_latencies.go b/pkg/testutils/serverutils/regionlatency/region_latencies.go index ae4166a36188..1ba58449b29d 100644 --- a/pkg/testutils/serverutils/regionlatency/region_latencies.go +++ b/pkg/testutils/serverutils/regionlatency/region_latencies.go @@ -13,6 +13,7 @@ package regionlatency import ( + "sort" "time" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -113,6 +114,16 @@ func (m LatencyMap) getLatency(a, b Region) (OneWayLatency, bool) { return toB, ok } +// GetRegions returns the set of regions in the map. +func (m LatencyMap) GetRegions() []Region { + var regions []Region + for r := range m.m { + regions = append(regions, r) + } + sort.Strings(regions) + return regions +} + // RoundTripPairs are pairs of round-trip latency between regions. type RoundTripPairs map[Pair]RoundTripLatency From 4c59d793c86c84823ba7e160d5101f1e10a321a7 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 30 Nov 2022 17:17:21 -0500 Subject: [PATCH 4/4] multiregionccl: add a cold start latency test This commit adds a test which creates an MR serverless cluster and then boots the sql pods in each region while disallowing connectivity to other regions. It also simulates latency to make sure the routing logic works and to provide a somewhat realistic picture of what to expect. Release note: None --- pkg/ccl/multiregionccl/BUILD.bazel | 9 + .../multiregionccl/cold_start_latency_test.go | 412 ++++++++++++++++++ pkg/server/server.go | 7 + 3 files changed, 428 insertions(+) create mode 100644 pkg/ccl/multiregionccl/cold_start_latency_test.go diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index bfb12a1bba53..a122a9887943 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -26,6 +26,7 @@ go_test( name = "multiregionccl_test", size = "enormous", srcs = [ + "cold_start_latency_test.go", "datadriven_test.go", "main_test.go", "multiregion_system_table_test.go", @@ -56,6 +57,7 @@ go_test( "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", "//pkg/roachpb", + "//pkg/rpc", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", @@ -81,6 +83,7 @@ go_test( "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", + "//pkg/testutils/serverutils/regionlatency", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", @@ -88,15 +91,21 @@ go_test( "//pkg/util/envutil", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/quotapool", "//pkg/util/randutil", "//pkg/util/syncutil", + "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", + "@com_github_jackc_pgx_v4//:pgx", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//:go_default_library", + "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/ccl/multiregionccl/cold_start_latency_test.go b/pkg/ccl/multiregionccl/cold_start_latency_test.go new file mode 100644 index 000000000000..77a544a6a7ba --- /dev/null +++ b/pkg/ccl/multiregionccl/cold_start_latency_test.go @@ -0,0 +1,412 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package multiregionccl + +import ( + "context" + gosql "database/sql" + "net/url" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/envutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/jackc/pgx/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +// TestColdStartLatency attempts to capture the cold start latency for +// sql pods given different cluster topologies. +func TestColdStartLatency(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderRace(t, "too slow") + skip.UnderStress(t, "too slow") + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + + // We'll need to make some per-node args to assign the different + // KV nodes to different regions and AZs. We'll want to do it to + // look somewhat like the real cluster topologies we have in mind. + // + // Initially we'll want 9 nodes, 3 per region in 3 regions with + // 2 per AZ. We can tune the various latencies between these regions. + regionLatencies := regionlatency.RoundTripPairs{ + {A: "us-east1", B: "us-west1"}: 66 * time.Millisecond, + {A: "us-east1", B: "europe-west1"}: 64 * time.Millisecond, + {A: "us-west1", B: "europe-west1"}: 146 * time.Millisecond, + }.ToLatencyMap() + const ( + numNodes = 9 + numAZsPerRegion = 3 + ) + localities := makeLocalities(regionLatencies, numNodes, numAZsPerRegion) + regions := make([]string, len(localities)) + for i, l := range localities { + regions[i], _ = l.Find("region") + } + pauseAfter := make(chan struct{}) + signalAfter := make([]chan struct{}, numNodes) + var latencyEnabled syncutil.AtomicBool + var addrsToNodeIDs sync.Map + + // Set up the host cluster. + perServerArgs := make(map[int]base.TestServerArgs, numNodes) + for i := 0; i < numNodes; i++ { + i := i + args := base.TestServerArgs{ + DisableDefaultTestTenant: true, + Locality: localities[i], + } + signalAfter[i] = make(chan struct{}) + serverKnobs := &server.TestingKnobs{ + PauseAfterGettingRPCAddress: pauseAfter, + SignalAfterGettingRPCAddress: signalAfter[i], + ContextTestingKnobs: rpc.ContextTestingKnobs{ + InjectedLatencyOracle: regionlatency.MakeAddrMap(), + InjectedLatencyEnabled: latencyEnabled.Get, + UnaryClientInterceptor: func( + target string, class rpc.ConnectionClass, + ) grpc.UnaryClientInterceptor { + return func( + ctx context.Context, method string, req, reply interface{}, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + if !log.ExpensiveLogEnabled(ctx, 2) { + return invoker(ctx, method, req, reply, cc, opts...) + } + nodeIDi, _ := addrsToNodeIDs.Load(target) + nodeID, _ := nodeIDi.(int) + start := timeutil.Now() + defer func() { + log.VEventf(ctx, 2, "%d->%d (%v->%v) %s %v %v took %v", + i, nodeID, localities[i], localities[nodeID], + method, req, reply, timeutil.Since(start), + ) + }() + return invoker(ctx, method, req, reply, cc, opts...) + } + }, + }, + } + args.Knobs.Server = serverKnobs + perServerArgs[i] = args + } + tc := testcluster.NewTestCluster(t, numNodes, base.TestClusterArgs{ + ParallelStart: true, + ServerArgsPerNode: perServerArgs, + }) + go func() { + for _, c := range signalAfter { + <-c + } + assert.NoError(t, regionLatencies.Apply(tc)) + close(pauseAfter) + }() + tc.Start(t) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + enableLatency := func() { + latencyEnabled.Set(true) + for i := 0; i < numNodes; i++ { + tc.Server(i).RPCContext().RemoteClocks.TestingResetLatencyInfos() + } + } + + for i := 0; i < numNodes; i++ { + addrsToNodeIDs.Store(tc.Server(i).RPCAddr(), i) + } + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(1)) + + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '200ms'`) + tdb.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '500ms'`) + + applyGlobalTables := func(t *testing.T, db *gosql.DB, isTenant bool) { + stmts := []string{ + `alter database system configure zone discard;`, + `alter database system set primary region "us-east1";`, + `alter database system add region "us-west1";`, + `alter database system add region "europe-west1"`, + } + + if !isTenant { + stmts = append(stmts, + "ALTER TENANT ALL SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true", + "ALTER TENANT ALL SET CLUSTER SETTING sql.multi_region.allow_abstractions_for_secondary_tenants.enabled = true", + `alter range meta configure zone using constraints = '{"+region=us-east1": 1, "+region=us-west1": 1, "+region=europe-west1": 1}';`) + } else { + stmts = append(stmts, + `SELECT crdb_internal.unsafe_optimize_system_database()`) + } + tdb := sqlutils.MakeSQLRunner(db) + for i, stmt := range stmts { + t.Log(i, stmt) + tdb.Exec(t, stmt) + } + } + applyGlobalTables(t, tc.ServerConn(0), false) + + var blockCrossRegionTenantAccess atomic.Bool + maybeWait := func(ctx context.Context, a, b int) { + if regions[a] != regions[b] && blockCrossRegionTenantAccess.Load() { + <-ctx.Done() + } + } + tenantServerKnobs := func(i int) *server.TestingKnobs { + return &server.TestingKnobs{ + ContextTestingKnobs: rpc.ContextTestingKnobs{ + InjectedLatencyOracle: tc.Server(i).TestingKnobs(). + Server.(*server.TestingKnobs).ContextTestingKnobs. + InjectedLatencyOracle, + InjectedLatencyEnabled: latencyEnabled.Get, + StreamClientInterceptor: func( + target string, class rpc.ConnectionClass, + ) grpc.StreamClientInterceptor { + return func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, + method string, streamer grpc.Streamer, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + nodeIDi, _ := addrsToNodeIDs.Load(target) + nodeID, _ := nodeIDi.(int) + start := timeutil.Now() + maybeWait(ctx, i, nodeID) + defer func() { + if !log.ExpensiveLogEnabled(ctx, 2) { + return + } + log.VEventf( + ctx, 2, "tenant%d->%d opening stream %v to %v (%v->%v)", + i, nodeID, method, target, localities[i], localities[nodeID], + ) + }() + c, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + return nil, err + } + return wrappedStream{ + start: start, + ClientStream: c, + }, nil + } + }, + UnaryClientInterceptor: func(target string, class rpc.ConnectionClass) grpc.UnaryClientInterceptor { + nodeIDi, _ := addrsToNodeIDs.Load(target) + nodeID, _ := nodeIDi.(int) + return func( + ctx context.Context, method string, req, reply interface{}, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + maybeWait(ctx, i, nodeID) + start := timeutil.Now() + defer func() { + log.VEventf( + ctx, 2, "tenant%d->%d %v->%v %s %v %v took %v", + i, nodeID, localities[i], localities[nodeID], + method, req, reply, timeutil.Since(start), + ) + }() + return invoker(ctx, method, req, reply, cc, opts...) + } + }, + }, + } + } + const password = "asdf" + { + tenant, tenantDB := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{ + TenantID: serverutils.TestTenantID(), + TestingKnobs: base.TestingKnobs{ + Server: tenantServerKnobs(0), + }, + Locality: localities[0], + }) + applyGlobalTables(t, tenantDB, true) + tdb := sqlutils.MakeSQLRunner(tenantDB) + tdb.Exec(t, "CREATE USER foo PASSWORD $1 LOGIN", password) + tdb.Exec(t, "GRANT admin TO foo") + + // Wait for the span configs to propagate. After we know they have + // propagated, we'll shut down the tenant and wait for them to get + // applied. + tdb.Exec(t, "CREATE TABLE after AS SELECT now() AS after") + tdb.CheckQueryResultsRetry(t, ` + WITH progress AS ( + SELECT crdb_internal.pb_to_json( + 'progress', + progress + )->'AutoSpanConfigReconciliation' AS p + FROM system.jobs + WHERE status = 'running' + ), + checkpoint AS ( + SELECT (p->'checkpoint'->>'wallTime')::FLOAT8 / 1e9 AS checkpoint + FROM progress + WHERE p IS NOT NULL + ) +SELECT checkpoint > extract(epoch from after) + FROM checkpoint, after`, + [][]string{{"true"}}) + tenant.Stopper().Stop(ctx) + } + // Wait for the configs to be applied. + testutils.SucceedsWithin(t, func() error { + reporter := tc.Servers[0].Server.SpanConfigReporter() + report, err := reporter.SpanConfigConformance(ctx, []roachpb.Span{ + {Key: keys.TableDataMin, EndKey: keys.TenantTableDataMax}, + }) + if err != nil { + return err + } + if !report.IsEmpty() { + var g errgroup.Group + for _, r := range report.ViolatingConstraints { + r := r // for closure + g.Go(func() error { + _, err := tc.Server(0).DB().AdminScatter( + ctx, r.RangeDescriptor.StartKey.AsRawKey(), 0, + ) + return err + }) + } + if err := g.Wait(); err != nil { + return err + } + return errors.Errorf("expected empty report, got: {over: %d, under: %d, violating: %d, unavailable: %d}", + len(report.OverReplicated), + len(report.UnderReplicated), + len(report.ViolatingConstraints), + len(report.Unavailable)) + } + return nil + }, 5*time.Minute) + + doTest := func(wg *sync.WaitGroup, qp *quotapool.IntPool, i int, duration *time.Duration) { + defer wg.Done() + r, _ := qp.Acquire(ctx, 1) + defer r.Release() + start := timeutil.Now() + sn := tenantServerKnobs(i) + tenant, err := tc.Server(i).StartTenant(ctx, base.TestTenantArgs{ + TenantID: serverutils.TestTenantID(), + DisableCreateTenant: true, + SkipTenantCheck: true, + TestingKnobs: base.TestingKnobs{ + Server: sn, + }, + Locality: localities[i], + }) + defer tenant.Stopper().Stop(ctx) + require.NoError(t, err) + pgURL, cleanup, err := sqlutils.PGUrlWithOptionalClientCertsE( + tenant.SQLAddr(), "tenantdata", url.UserPassword("foo", password), + false, // withClientCerts + ) + if !assert.NoError(t, err) { + return + } + defer cleanup() + pgURL.Path = "defaultdb" + conn, err := pgx.Connect(ctx, pgURL.String()) + if !assert.NoError(t, err) { + return + } + var one int + assert.NoError(t, conn.QueryRow(ctx, "SELECT 1").Scan(&one)) + *duration = timeutil.Since(start) + t.Log("done", i, localities[i], *duration) + } + // This controls how many servers to start up at the same time. The + // observation is that starting more concurrently does have a major + // latency impact. + const concurrency = 1 + runAllTests := func() []time.Duration { + qp := quotapool.NewIntPool("startup-concurrency", concurrency) + latencyResults := make([]time.Duration, len(localities)) + var wg sync.WaitGroup + for i := range localities { + wg.Add(1) + go doTest(&wg, qp, i, &latencyResults[i]) + } + wg.Wait() + return latencyResults + } + + enableLatency() + t.Log("pre running test to allocate instance IDs") + t.Log("result", localities, runAllTests()) + t.Log("running test with no connectivity from sql pods to remote regions") + blockCrossRegionTenantAccess.Store(true) + t.Log("result", localities, runAllTests()) + blockCrossRegionTenantAccess.Store(false) +} + +func makeLocalities( + lm regionlatency.LatencyMap, numNodes, azsPerRegion int, +) (ret []roachpb.Locality) { + regions := lm.GetRegions() + for regionIdx, nodesInRegion := range distribute(numNodes, len(regions)) { + for azIdx, nodesInAZ := range distribute(nodesInRegion, azsPerRegion) { + for i := 0; i < nodesInAZ; i++ { + ret = append(ret, roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: regions[regionIdx]}, + {Key: "az", Value: string(rune('a' + azIdx))}, + }, + }) + } + } + } + return ret +} + +func distribute(total, num int) []int { + res := make([]int, num) + for i := range res { + // Use the average number of remaining connections. + div := len(res) - i + res[i] = (total + div/2) / div + total -= res[i] + } + return res +} + +type wrappedStream struct { + start time.Time + grpc.ClientStream +} + +func (w wrappedStream) RecvMsg(m interface{}) error { + if err := w.ClientStream.RecvMsg(m); err != nil { + return err + } + log.VEventf(w.ClientStream.Context(), 2, "stream received %T %v %v", m, timeutil.Since(w.start), m) + return nil +} diff --git a/pkg/server/server.go b/pkg/server/server.go index f5a884c0d05d..21ef9eed23dd 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -160,6 +160,7 @@ type Server struct { protectedtsProvider protectedts.Provider spanConfigSubscriber spanconfig.KVSubscriber + spanConfigReporter spanconfig.Reporter // pgL is the SQL listener. pgL net.Listener @@ -1074,6 +1075,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { replicationReporter: replicationReporter, protectedtsProvider: protectedtsProvider, spanConfigSubscriber: spanConfig.subscriber, + spanConfigReporter: spanConfig.reporter, pgPreServer: &pgPreServer, sqlServer: sqlServer, serverController: sc, @@ -1909,6 +1911,11 @@ func (s *Server) PGServer() *pgwire.Server { return s.sqlServer.pgServer } +// SpanConfigReporter returns the spanconfig.Reporter. Used by tests. +func (s *Server) SpanConfigReporter() spanconfig.Reporter { + return s.spanConfigReporter +} + // LogicalClusterID implements cli.serverStartupInterface. This // implementation exports the logical cluster ID of the system tenant. func (s *Server) LogicalClusterID() uuid.UUID {