From 0fcb82262f53b73a00de8f056c9c2236d127a4e2 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 23 May 2022 22:58:37 -0400 Subject: [PATCH] ccl/sqlproxyccl: add --disable-connection-rebalancing flag to "mt start-proxy" Previously, we added the connection rebalancing feature to the proxy, and that gets enabled automatically. This feature will not work with < v22.1 clusters since it relies on the session migration work that was added recently. In theory, the proxy will still work, but the balancer will constantly attempt to rebalance connection when we know that it will fail, and this adds to the overall latency of the connection since we suspend the processors during connection migration. To allow us to transition to v22.1 nicely, we will introduce a new flag `--disable-connection-rebalancing` to the `mt start-proxy` subcommand. When that flag is set, all connection rebalancing operations will be disabled. We have to roll out sqlproxy to CC before the clusters get their major upgrade since it contains some auth work that is a pre-req to v22.1. To avoid these latency issues, sqlproxy will be rolled out with the flag set. Once clusters have been upgraded to v22.1, the flag will be removed during startup. No release notes since `mt start-proxy` is internal only. Release note: None --- pkg/ccl/sqlproxyccl/BUILD.bazel | 2 +- pkg/ccl/sqlproxyccl/balancer/balancer.go | 42 +++-- pkg/ccl/sqlproxyccl/balancer/balancer_test.go | 77 ++++++++- pkg/ccl/sqlproxyccl/proxy_handler.go | 5 + pkg/ccl/sqlproxyccl/proxy_handler_test.go | 152 +++++++++++++++--- pkg/cli/cliflags/flags_mt.go | 5 + pkg/cli/context.go | 1 + pkg/cli/flags.go | 2 + 8 files changed, 252 insertions(+), 34 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/BUILD.bazel b/pkg/ccl/sqlproxyccl/BUILD.bazel index 81ce3bf7228b..4125a98e4fca 100644 --- a/pkg/ccl/sqlproxyccl/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/BUILD.bazel @@ -54,7 +54,7 @@ go_library( go_test( name = "sqlproxyccl_test", - size = "small", + size = "medium", srcs = [ "authentication_test.go", "conn_migration_test.go", diff --git a/pkg/ccl/sqlproxyccl/balancer/balancer.go b/pkg/ccl/sqlproxyccl/balancer/balancer.go index f000bc20d440..01632e812f34 100644 --- a/pkg/ccl/sqlproxyccl/balancer/balancer.go +++ b/pkg/ccl/sqlproxyccl/balancer/balancer.go @@ -86,6 +86,7 @@ type balancerOptions struct { timeSource timeutil.TimeSource rebalanceRate float32 rebalanceDelay time.Duration + disableRebalancing bool } // Option defines an option that can be passed to NewBalancer in order to @@ -136,6 +137,16 @@ func RebalanceDelay(delay time.Duration) Option { } } +// DisableRebalancing disables all rebalancing operations within the balancer. +// Unlike NoRebalanceLoop that only disables automated rebalancing, this also +// causes RebalanceTenant to no-op if called by the pod watcher. Using this +// option implicitly disables the rebalance loop as well. +func DisableRebalancing() Option { + return func(opts *balancerOptions) { + opts.disableRebalancing = true + } +} + // Balancer handles load balancing of SQL connections within the proxy. // All methods on the Balancer instance are thread-safe. type Balancer struct { @@ -175,6 +186,10 @@ type Balancer struct { // attempts to rebalance a given tenant. Defaults to defaultRebalanceDelay. rebalanceDelay time.Duration + // disableRebalancing is used to indicate that all rebalancing options will + // be disabled. + disableRebalancing bool + // lastRebalance is the last time the tenants are rebalanced. This is used // to rate limit the number of rebalances per tenant. Synchronization is // needed since rebalance operations can be triggered by the rebalance loop, @@ -204,6 +219,11 @@ func NewBalancer( for _, opt := range opts { opt(options) } + // Since we want to disable all rebalancing operations, no point having the + // rebalance loop running. + if options.disableRebalancing { + options.noRebalanceLoop = true + } // Ensure that ctx gets cancelled on stopper's quiescing. ctx, _ = stopper.WithCancelOnQuiesce(ctx) @@ -214,14 +234,15 @@ func NewBalancer( } b := &Balancer{ - stopper: stopper, - metrics: metrics, - directoryCache: directoryCache, - queue: q, - processSem: semaphore.New(options.maxConcurrentRebalances), - timeSource: options.timeSource, - rebalanceRate: options.rebalanceRate, - rebalanceDelay: options.rebalanceDelay, + stopper: stopper, + metrics: metrics, + directoryCache: directoryCache, + queue: q, + processSem: semaphore.New(options.maxConcurrentRebalances), + timeSource: options.timeSource, + rebalanceRate: options.rebalanceRate, + rebalanceDelay: options.rebalanceDelay, + disableRebalancing: options.disableRebalancing, } b.lastRebalance.tenants = make(map[roachpb.TenantID]time.Time) @@ -249,8 +270,9 @@ func NewBalancer( // pod exists for the given tenant, or the tenant has been recently rebalanced, // this is a no-op. func (b *Balancer) RebalanceTenant(ctx context.Context, tenantID roachpb.TenantID) { - // If rebalanced recently, no-op. - if !b.canRebalanceTenant(tenantID) { + // If rebalancing is disabled, or tenant was rebalanced recently, then + // RebalanceTenant is a no-op. + if b.disableRebalancing || !b.canRebalanceTenant(tenantID) { return } diff --git a/pkg/ccl/sqlproxyccl/balancer/balancer_test.go b/pkg/ccl/sqlproxyccl/balancer/balancer_test.go index fa59896d3f37..ea246a082303 100644 --- a/pkg/ccl/sqlproxyccl/balancer/balancer_test.go +++ b/pkg/ccl/sqlproxyccl/balancer/balancer_test.go @@ -716,6 +716,81 @@ func TestRebalancer_rebalance(t *testing.T) { } } +func TestBalancer_RebalanceTenant_WithRebalancingDisabled(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + // Use a custom time source for testing. + t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + timeSource := timeutil.NewManualTime(t0) + + metrics := NewMetrics() + directoryCache := newTestDirectoryCache() + + b, err := NewBalancer( + ctx, + stopper, + metrics, + directoryCache, + DisableRebalancing(), + TimeSource(timeSource), + ) + require.NoError(t, err) + + tenantID := roachpb.MakeTenantID(10) + pods := []*tenant.Pod{ + {TenantID: tenantID.ToUint64(), Addr: "127.0.0.30:80", State: tenant.DRAINING}, + {TenantID: tenantID.ToUint64(), Addr: "127.0.0.30:81", State: tenant.RUNNING}, + } + for _, pod := range pods { + require.True(t, directoryCache.upsertPod(pod)) + } + + // Create 100 active connections, all to the draining pod. + const numConns = 100 + var handles []ConnectionHandle + for i := 0; i < numConns; i++ { + handle := makeTestHandle() + handles = append(handles, handle) + _ = NewServerAssignment(tenantID, b.connTracker, handle, pods[0].Addr) + } + + assertZeroTransfers := func() { + count := 0 + for i := 0; i < numConns; i++ { + count += handles[i].(*testConnHandle).transferConnectionCount() + } + require.Equal(t, 0, count) + } + + // Attempt the rebalance, and wait for a while. No rebalancing should occur. + b.RebalanceTenant(ctx, tenantID) + time.Sleep(1 * time.Second) + + // Queue should be empty, and no additional connections should be moved. + b.queue.mu.Lock() + queueLen := b.queue.queue.Len() + b.queue.mu.Unlock() + require.Equal(t, 0, queueLen) + assertZeroTransfers() + + // Advance the timer by some rebalance interval. No rebalancing should + // occur since the loop is disabled. + timeSource.Advance(2 * rebalanceInterval) + time.Sleep(1 * time.Second) + + // Queue should be empty, and no additional connections should be moved. + b.queue.mu.Lock() + queueLen = b.queue.queue.Len() + b.queue.mu.Unlock() + require.Equal(t, 0, queueLen) + assertZeroTransfers() +} + func TestBalancer_RebalanceTenant_WithDefaultDelay(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -781,7 +856,7 @@ func TestBalancer_RebalanceTenant_WithDefaultDelay(t *testing.T) { waitFor := func(numTransfers int) { testutils.SucceedsSoon(t, func() error { count := 0 - for i := 0; i < 100; i++ { + for i := 0; i < numConns; i++ { count += handles[i].(*testConnHandle).transferConnectionCount() } if count != numTransfers { diff --git a/pkg/ccl/sqlproxyccl/proxy_handler.go b/pkg/ccl/sqlproxyccl/proxy_handler.go index c438da17b3bd..4f13591a54db 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler.go @@ -98,6 +98,8 @@ type ProxyOptions struct { // ThrottleBaseDelay is the initial exponential backoff triggered in // response to the first connection failure. ThrottleBaseDelay time.Duration + // DisableConnectionRebalancing disables connection rebalancing for tenants. + DisableConnectionRebalancing bool // testingKnobs are knobs used for testing. testingKnobs struct { @@ -263,6 +265,9 @@ func newProxyHandler( balancerMetrics := balancer.NewMetrics() registry.AddMetricStruct(balancerMetrics) var balancerOpts []balancer.Option + if handler.DisableConnectionRebalancing { + balancerOpts = append(balancerOpts, balancer.DisableRebalancing()) + } if handler.testingKnobs.balancerOpts != nil { balancerOpts = append(balancerOpts, handler.testingKnobs.balancerOpts...) } diff --git a/pkg/ccl/sqlproxyccl/proxy_handler_test.go b/pkg/ccl/sqlproxyccl/proxy_handler_test.go index deba6d3c1ac4..18ce1a8fc0e9 100644 --- a/pkg/ccl/sqlproxyccl/proxy_handler_test.go +++ b/pkg/ccl/sqlproxyccl/proxy_handler_test.go @@ -765,7 +765,7 @@ func TestDirectoryConnect(t *testing.T) { }) } -func TestPodWatcher(t *testing.T) { +func TestConnectionRebalancingDisabled(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() defer log.Scope(t).Close(t) @@ -777,31 +777,103 @@ func TestPodWatcher(t *testing.T) { _, err := mainDB.Exec("ALTER TENANT ALL SET CLUSTER SETTING server.user_login.session_revival_token.enabled = true") require.NoError(t, err) - // Start four SQL pods for the test tenant. - var addresses []string + // Start two SQL pods for the test tenant. + const podCount = 2 tenantID := serverutils.TestTenantID() - const podCount = 4 - for i := 0; i < podCount; i++ { - params := tests.CreateTestTenantParams(tenantID) - // The first SQL pod will create the tenant keyspace in the host. - if i != 0 { - params.Existing = true + tenants := startTestTenantPods(ctx, t, s, tenantID, podCount) + defer func() { + for _, tenant := range tenants { + tenant.Stopper().Stop(ctx) } - tenant, tenantDB := serverutils.StartTenant(t, s, params) - tenant.PGServer().(*pgwire.Server).TestingSetTrustClientProvidedRemoteAddr(true) - defer tenant.Stopper().Stop(ctx) + }() - // Create a test user. We only need to do it once. - if i == 0 { - _, err = tenantDB.Exec("CREATE USER testuser WITH PASSWORD 'hunter2'") - require.NoError(t, err) - _, err = tenantDB.Exec("GRANT admin TO testuser") - require.NoError(t, err) + // Register one SQL pod in the directory server. + tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */) + tds.CreateTenant(tenantID, "tenant-cluster") + tds.AddPod(tenantID, &tenant.Pod{ + TenantID: tenantID.ToUint64(), + Addr: tenants[0].SQLAddr(), + State: tenant.RUNNING, + StateTimestamp: timeutil.Now(), + }) + require.NoError(t, tds.Start(ctx)) + + opts := &ProxyOptions{SkipVerify: true, DisableConnectionRebalancing: true} + opts.testingKnobs.directoryServer = tds + proxy, addr := newSecureProxyServer(ctx, t, s.Stopper(), opts) + connectionString := fmt.Sprintf("postgres://testuser:hunter2@%s/?sslmode=require&options=--cluster=tenant-cluster-%s", addr, tenantID) + + // Open 12 connections to the first pod. + dist := map[string]int{} + var conns []*gosql.DB + for i := 0; i < 12; i++ { + db, err := gosql.Open("postgres", connectionString) + db.SetMaxOpenConns(1) + defer db.Close() + require.NoError(t, err) + addr := queryAddr(ctx, t, db) + dist[addr]++ + conns = append(conns, db) + } + require.Len(t, dist, 1) + + // Add a second SQL pod. + tds.AddPod(tenantID, &tenant.Pod{ + TenantID: tenantID.ToUint64(), + Addr: tenants[1].SQLAddr(), + State: tenant.RUNNING, + StateTimestamp: timeutil.Now(), + }) + + // Wait until the update gets propagated to the directory cache. + testutils.SucceedsSoon(t, func() error { + pods, err := proxy.handler.directoryCache.TryLookupTenantPods(ctx, tenantID) + if err != nil { + return err } - tenantDB.Close() + if len(pods) != 2 { + return errors.Newf("expected 2 pods, but got %d", len(pods)) + } + return nil + }) + + // The update above should trigger the pod watcher. Regardless, we'll invoke + // rebalancing directly as well. There should be no rebalancing attempts. + proxy.handler.balancer.RebalanceTenant(ctx, tenantID) + time.Sleep(2 * time.Second) - addresses = append(addresses, tenant.SQLAddr()) + require.Equal(t, int64(0), proxy.metrics.ConnMigrationAttemptedCount.Count()) + + // Reset distribution and count again. + dist = map[string]int{} + for _, c := range conns { + addr := queryAddr(ctx, t, c) + dist[addr]++ } + require.Len(t, dist, 1) +} + +func TestPodWatcher(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + defer log.Scope(t).Close(t) + + // Start KV server, and enable session migration. + params, _ := tests.CreateTestServerParams() + s, mainDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + _, err := mainDB.Exec("ALTER TENANT ALL SET CLUSTER SETTING server.user_login.session_revival_token.enabled = true") + require.NoError(t, err) + + // Start four SQL pods for the test tenant. + const podCount = 4 + tenantID := serverutils.TestTenantID() + tenants := startTestTenantPods(ctx, t, s, tenantID, podCount) + defer func() { + for _, tenant := range tenants { + tenant.Stopper().Stop(ctx) + } + }() // Register only 3 SQL pods in the directory server. We will add the 4th // once the watcher has been established. @@ -810,7 +882,7 @@ func TestPodWatcher(t *testing.T) { for i := 0; i < 3; i++ { tds.AddPod(tenantID, &tenant.Pod{ TenantID: tenantID.ToUint64(), - Addr: addresses[i], + Addr: tenants[i].SQLAddr(), State: tenant.RUNNING, StateTimestamp: timeutil.Now(), }) @@ -851,7 +923,7 @@ func TestPodWatcher(t *testing.T) { // to the new pod. Note that for testing, we set customRebalanceRate to 1.0. tds.AddPod(tenantID, &tenant.Pod{ TenantID: tenantID.ToUint64(), - Addr: addresses[3], + Addr: tenants[3].SQLAddr(), State: tenant.RUNNING, StateTimestamp: timeutil.Now(), }) @@ -1734,3 +1806,39 @@ func queryAddr(ctx context.Context, t *testing.T, db queryer) string { `).Scan(&host, &port)) return fmt.Sprintf("%s:%s", host, port) } + +// startTestTenantPods starts count SQL pods for the given tenant, and returns +// a list of tenant servers. Note that a default admin testuser with the +// password hunter2 will be created. +func startTestTenantPods( + ctx context.Context, + t *testing.T, + ts serverutils.TestServerInterface, + tenantID roachpb.TenantID, + count int, +) []serverutils.TestTenantInterface { + t.Helper() + + var tenants []serverutils.TestTenantInterface + for i := 0; i < count; i++ { + params := tests.CreateTestTenantParams(tenantID) + // The first SQL pod will create the tenant keyspace in the host. + if i != 0 { + params.Existing = true + } + tenant, tenantDB := serverutils.StartTenant(t, ts, params) + tenant.PGServer().(*pgwire.Server).TestingSetTrustClientProvidedRemoteAddr(true) + + // Create a test user. We only need to do it once. + if i == 0 { + _, err := tenantDB.Exec("CREATE USER testuser WITH PASSWORD 'hunter2'") + require.NoError(t, err) + _, err = tenantDB.Exec("GRANT admin TO testuser") + require.NoError(t, err) + } + tenantDB.Close() + + tenants = append(tenants, tenant) + } + return tenants +} diff --git a/pkg/cli/cliflags/flags_mt.go b/pkg/cli/cliflags/flags_mt.go index 731e79751eca..5ded2caf3490 100644 --- a/pkg/cli/cliflags/flags_mt.go +++ b/pkg/cli/cliflags/flags_mt.go @@ -76,6 +76,11 @@ var ( Description: "If true, use insecure connection to the backend.", } + DisableConnectionRebalancing = FlagInfo{ + Name: "disable-connection-rebalancing", + Description: "If true, proxy will not attempt to rebalance connections.", + } + RatelimitBaseDelay = FlagInfo{ Name: "ratelimit-base-delay", Description: "Initial backoff after a failed login attempt. Set to 0 to disable rate limiting.", diff --git a/pkg/cli/context.go b/pkg/cli/context.go index 78839a907d0b..8c47c7213981 100644 --- a/pkg/cli/context.go +++ b/pkg/cli/context.go @@ -650,6 +650,7 @@ func setProxyContextDefaults() { proxyContext.ValidateAccessInterval = 30 * time.Second proxyContext.PollConfigInterval = 30 * time.Second proxyContext.ThrottleBaseDelay = time.Second + proxyContext.DisableConnectionRebalancing = false } var testDirectorySvrContext struct { diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go index b6506c3374d3..3b31fac711db 100644 --- a/pkg/cli/flags.go +++ b/pkg/cli/flags.go @@ -1033,7 +1033,9 @@ func init() { durationFlag(f, &proxyContext.ValidateAccessInterval, cliflags.ValidateAccessInterval) durationFlag(f, &proxyContext.PollConfigInterval, cliflags.PollConfigInterval) durationFlag(f, &proxyContext.ThrottleBaseDelay, cliflags.ThrottleBaseDelay) + boolFlag(f, &proxyContext.DisableConnectionRebalancing, cliflags.DisableConnectionRebalancing) } + // Multi-tenancy test directory command flags. { f := mtTestDirectorySvr.Flags()