Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl/sqlproxyccl: add --disable-connection-rebalancing flag to "mt start-proxy" #81712

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ go_library(

go_test(
name = "sqlproxyccl_test",
size = "small",
size = "medium",
srcs = [
"authentication_test.go",
"conn_migration_test.go",
Expand Down
42 changes: 32 additions & 10 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type balancerOptions struct {
timeSource timeutil.TimeSource
rebalanceRate float32
rebalanceDelay time.Duration
disableRebalancing bool
}

// Option defines an option that can be passed to NewBalancer in order to
Expand Down Expand Up @@ -136,6 +137,16 @@ func RebalanceDelay(delay time.Duration) Option {
}
}

// DisableRebalancing disables all rebalancing operations within the balancer.
// Unlike NoRebalanceLoop that only disables automated rebalancing, this also
// causes RebalanceTenant to no-op if called by the pod watcher. Using this
// option implicitly disables the rebalance loop as well.
func DisableRebalancing() Option {
return func(opts *balancerOptions) {
opts.disableRebalancing = true
}
}

// Balancer handles load balancing of SQL connections within the proxy.
// All methods on the Balancer instance are thread-safe.
type Balancer struct {
Expand Down Expand Up @@ -175,6 +186,10 @@ type Balancer struct {
// attempts to rebalance a given tenant. Defaults to defaultRebalanceDelay.
rebalanceDelay time.Duration

// disableRebalancing is used to indicate that all rebalancing options will
// be disabled.
disableRebalancing bool

// lastRebalance is the last time the tenants are rebalanced. This is used
// to rate limit the number of rebalances per tenant. Synchronization is
// needed since rebalance operations can be triggered by the rebalance loop,
Expand Down Expand Up @@ -204,6 +219,11 @@ func NewBalancer(
for _, opt := range opts {
opt(options)
}
// Since we want to disable all rebalancing operations, no point having the
// rebalance loop running.
if options.disableRebalancing {
options.noRebalanceLoop = true
}

// Ensure that ctx gets cancelled on stopper's quiescing.
ctx, _ = stopper.WithCancelOnQuiesce(ctx)
Expand All @@ -214,14 +234,15 @@ func NewBalancer(
}

b := &Balancer{
stopper: stopper,
metrics: metrics,
directoryCache: directoryCache,
queue: q,
processSem: semaphore.New(options.maxConcurrentRebalances),
timeSource: options.timeSource,
rebalanceRate: options.rebalanceRate,
rebalanceDelay: options.rebalanceDelay,
stopper: stopper,
metrics: metrics,
directoryCache: directoryCache,
queue: q,
processSem: semaphore.New(options.maxConcurrentRebalances),
timeSource: options.timeSource,
rebalanceRate: options.rebalanceRate,
rebalanceDelay: options.rebalanceDelay,
disableRebalancing: options.disableRebalancing,
}
b.lastRebalance.tenants = make(map[roachpb.TenantID]time.Time)

Expand Down Expand Up @@ -249,8 +270,9 @@ func NewBalancer(
// pod exists for the given tenant, or the tenant has been recently rebalanced,
// this is a no-op.
func (b *Balancer) RebalanceTenant(ctx context.Context, tenantID roachpb.TenantID) {
// If rebalanced recently, no-op.
if !b.canRebalanceTenant(tenantID) {
// If rebalancing is disabled, or tenant was rebalanced recently, then
// RebalanceTenant is a no-op.
if b.disableRebalancing || !b.canRebalanceTenant(tenantID) {
return
}

Expand Down
77 changes: 76 additions & 1 deletion pkg/ccl/sqlproxyccl/balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,81 @@ func TestRebalancer_rebalance(t *testing.T) {
}
}

func TestBalancer_RebalanceTenant_WithRebalancingDisabled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

// Use a custom time source for testing.
t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
timeSource := timeutil.NewManualTime(t0)

metrics := NewMetrics()
directoryCache := newTestDirectoryCache()

b, err := NewBalancer(
ctx,
stopper,
metrics,
directoryCache,
DisableRebalancing(),
TimeSource(timeSource),
)
require.NoError(t, err)

tenantID := roachpb.MakeTenantID(10)
pods := []*tenant.Pod{
{TenantID: tenantID.ToUint64(), Addr: "127.0.0.30:80", State: tenant.DRAINING},
{TenantID: tenantID.ToUint64(), Addr: "127.0.0.30:81", State: tenant.RUNNING},
}
for _, pod := range pods {
require.True(t, directoryCache.upsertPod(pod))
}

// Create 100 active connections, all to the draining pod.
const numConns = 100
var handles []ConnectionHandle
for i := 0; i < numConns; i++ {
handle := makeTestHandle()
handles = append(handles, handle)
_ = NewServerAssignment(tenantID, b.connTracker, handle, pods[0].Addr)
}

assertZeroTransfers := func() {
count := 0
for i := 0; i < numConns; i++ {
count += handles[i].(*testConnHandle).transferConnectionCount()
}
require.Equal(t, 0, count)
}

// Attempt the rebalance, and wait for a while. No rebalancing should occur.
b.RebalanceTenant(ctx, tenantID)
time.Sleep(1 * time.Second)

// Queue should be empty, and no additional connections should be moved.
b.queue.mu.Lock()
queueLen := b.queue.queue.Len()
b.queue.mu.Unlock()
require.Equal(t, 0, queueLen)
assertZeroTransfers()

// Advance the timer by some rebalance interval. No rebalancing should
// occur since the loop is disabled.
timeSource.Advance(2 * rebalanceInterval)
time.Sleep(1 * time.Second)

// Queue should be empty, and no additional connections should be moved.
b.queue.mu.Lock()
queueLen = b.queue.queue.Len()
b.queue.mu.Unlock()
require.Equal(t, 0, queueLen)
assertZeroTransfers()
}

func TestBalancer_RebalanceTenant_WithDefaultDelay(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -781,7 +856,7 @@ func TestBalancer_RebalanceTenant_WithDefaultDelay(t *testing.T) {
waitFor := func(numTransfers int) {
testutils.SucceedsSoon(t, func() error {
count := 0
for i := 0; i < 100; i++ {
for i := 0; i < numConns; i++ {
count += handles[i].(*testConnHandle).transferConnectionCount()
}
if count != numTransfers {
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type ProxyOptions struct {
// ThrottleBaseDelay is the initial exponential backoff triggered in
// response to the first connection failure.
ThrottleBaseDelay time.Duration
// DisableConnectionRebalancing disables connection rebalancing for tenants.
DisableConnectionRebalancing bool

// testingKnobs are knobs used for testing.
testingKnobs struct {
Expand Down Expand Up @@ -263,6 +265,9 @@ func newProxyHandler(
balancerMetrics := balancer.NewMetrics()
registry.AddMetricStruct(balancerMetrics)
var balancerOpts []balancer.Option
if handler.DisableConnectionRebalancing {
balancerOpts = append(balancerOpts, balancer.DisableRebalancing())
}
if handler.testingKnobs.balancerOpts != nil {
balancerOpts = append(balancerOpts, handler.testingKnobs.balancerOpts...)
}
Expand Down
152 changes: 130 additions & 22 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func TestDirectoryConnect(t *testing.T) {
})
}

func TestPodWatcher(t *testing.T) {
func TestConnectionRebalancingDisabled(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
defer log.Scope(t).Close(t)
Expand All @@ -777,31 +777,103 @@ func TestPodWatcher(t *testing.T) {
_, err := mainDB.Exec("ALTER TENANT ALL SET CLUSTER SETTING server.user_login.session_revival_token.enabled = true")
require.NoError(t, err)

// Start four SQL pods for the test tenant.
var addresses []string
// Start two SQL pods for the test tenant.
const podCount = 2
tenantID := serverutils.TestTenantID()
const podCount = 4
for i := 0; i < podCount; i++ {
params := tests.CreateTestTenantParams(tenantID)
// The first SQL pod will create the tenant keyspace in the host.
if i != 0 {
params.Existing = true
tenants := startTestTenantPods(ctx, t, s, tenantID, podCount)
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
tenant, tenantDB := serverutils.StartTenant(t, s, params)
tenant.PGServer().(*pgwire.Server).TestingSetTrustClientProvidedRemoteAddr(true)
defer tenant.Stopper().Stop(ctx)
}()

// Create a test user. We only need to do it once.
if i == 0 {
_, err = tenantDB.Exec("CREATE USER testuser WITH PASSWORD 'hunter2'")
require.NoError(t, err)
_, err = tenantDB.Exec("GRANT admin TO testuser")
require.NoError(t, err)
// Register one SQL pod in the directory server.
tds := tenantdirsvr.NewTestStaticDirectoryServer(s.Stopper(), nil /* timeSource */)
tds.CreateTenant(tenantID, "tenant-cluster")
tds.AddPod(tenantID, &tenant.Pod{
TenantID: tenantID.ToUint64(),
Addr: tenants[0].SQLAddr(),
State: tenant.RUNNING,
StateTimestamp: timeutil.Now(),
})
require.NoError(t, tds.Start(ctx))

opts := &ProxyOptions{SkipVerify: true, DisableConnectionRebalancing: true}
opts.testingKnobs.directoryServer = tds
proxy, addr := newSecureProxyServer(ctx, t, s.Stopper(), opts)
connectionString := fmt.Sprintf("postgres://testuser:hunter2@%s/?sslmode=require&options=--cluster=tenant-cluster-%s", addr, tenantID)

// Open 12 connections to the first pod.
dist := map[string]int{}
var conns []*gosql.DB
for i := 0; i < 12; i++ {
db, err := gosql.Open("postgres", connectionString)
db.SetMaxOpenConns(1)
defer db.Close()
require.NoError(t, err)
addr := queryAddr(ctx, t, db)
dist[addr]++
conns = append(conns, db)
}
require.Len(t, dist, 1)

// Add a second SQL pod.
tds.AddPod(tenantID, &tenant.Pod{
TenantID: tenantID.ToUint64(),
Addr: tenants[1].SQLAddr(),
State: tenant.RUNNING,
StateTimestamp: timeutil.Now(),
})

// Wait until the update gets propagated to the directory cache.
testutils.SucceedsSoon(t, func() error {
pods, err := proxy.handler.directoryCache.TryLookupTenantPods(ctx, tenantID)
if err != nil {
return err
}
tenantDB.Close()
if len(pods) != 2 {
return errors.Newf("expected 2 pods, but got %d", len(pods))
}
return nil
})

// The update above should trigger the pod watcher. Regardless, we'll invoke
// rebalancing directly as well. There should be no rebalancing attempts.
proxy.handler.balancer.RebalanceTenant(ctx, tenantID)
time.Sleep(2 * time.Second)

addresses = append(addresses, tenant.SQLAddr())
require.Equal(t, int64(0), proxy.metrics.ConnMigrationAttemptedCount.Count())

// Reset distribution and count again.
dist = map[string]int{}
for _, c := range conns {
addr := queryAddr(ctx, t, c)
dist[addr]++
}
require.Len(t, dist, 1)
}

func TestPodWatcher(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
defer log.Scope(t).Close(t)

// Start KV server, and enable session migration.
params, _ := tests.CreateTestServerParams()
s, mainDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
_, err := mainDB.Exec("ALTER TENANT ALL SET CLUSTER SETTING server.user_login.session_revival_token.enabled = true")
require.NoError(t, err)

// Start four SQL pods for the test tenant.
const podCount = 4
tenantID := serverutils.TestTenantID()
tenants := startTestTenantPods(ctx, t, s, tenantID, podCount)
defer func() {
for _, tenant := range tenants {
tenant.Stopper().Stop(ctx)
}
}()

// Register only 3 SQL pods in the directory server. We will add the 4th
// once the watcher has been established.
Expand All @@ -810,7 +882,7 @@ func TestPodWatcher(t *testing.T) {
for i := 0; i < 3; i++ {
tds.AddPod(tenantID, &tenant.Pod{
TenantID: tenantID.ToUint64(),
Addr: addresses[i],
Addr: tenants[i].SQLAddr(),
State: tenant.RUNNING,
StateTimestamp: timeutil.Now(),
})
Expand Down Expand Up @@ -851,7 +923,7 @@ func TestPodWatcher(t *testing.T) {
// to the new pod. Note that for testing, we set customRebalanceRate to 1.0.
tds.AddPod(tenantID, &tenant.Pod{
TenantID: tenantID.ToUint64(),
Addr: addresses[3],
Addr: tenants[3].SQLAddr(),
State: tenant.RUNNING,
StateTimestamp: timeutil.Now(),
})
Expand Down Expand Up @@ -1734,3 +1806,39 @@ func queryAddr(ctx context.Context, t *testing.T, db queryer) string {
`).Scan(&host, &port))
return fmt.Sprintf("%s:%s", host, port)
}

// startTestTenantPods starts count SQL pods for the given tenant, and returns
// a list of tenant servers. Note that a default admin testuser with the
// password hunter2 will be created.
func startTestTenantPods(
ctx context.Context,
t *testing.T,
ts serverutils.TestServerInterface,
tenantID roachpb.TenantID,
count int,
) []serverutils.TestTenantInterface {
t.Helper()

var tenants []serverutils.TestTenantInterface
for i := 0; i < count; i++ {
params := tests.CreateTestTenantParams(tenantID)
// The first SQL pod will create the tenant keyspace in the host.
if i != 0 {
params.Existing = true
}
tenant, tenantDB := serverutils.StartTenant(t, ts, params)
tenant.PGServer().(*pgwire.Server).TestingSetTrustClientProvidedRemoteAddr(true)

// Create a test user. We only need to do it once.
if i == 0 {
_, err := tenantDB.Exec("CREATE USER testuser WITH PASSWORD 'hunter2'")
require.NoError(t, err)
_, err = tenantDB.Exec("GRANT admin TO testuser")
require.NoError(t, err)
}
tenantDB.Close()

tenants = append(tenants, tenant)
}
return tenants
}
Loading