Skip to content

Commit

Permalink
ccl/sqlproxyccl: rate limit the number of rebalances per tenant
Browse files Browse the repository at this point in the history
This commit rate limits the number of rebalances per tenant to once every
15 seconds (i.e. 1/2 of the rebalance loop interval). The main purpose of
this is to prevent a burst of pod events for the same tenant causing multiple
rebalances, which may move a lot of connections around.

Release note: None
  • Loading branch information
jaylim-crl committed May 24, 2022
1 parent 29771f0 commit 62021aa
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 52 deletions.
148 changes: 96 additions & 52 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
// DRAINING state before the proxy starts moving connections away from it.
minDrainPeriod = 1 * time.Minute

// defaultRebalanceDelay is the minimum amount of time that must elapse
// between rebalance operations. This was deliberately chosen to be half of
// rebalanceInterval, and is mainly used to rate limit effects due to events
// from the pod watcher.
defaultRebalanceDelay = 15 * time.Second

// rebalancePercentDeviation defines the percentage threshold that the
// current number of assignments can deviate away from the mean. Having a
// 15% "deadzone" reduces frequent transfers especially when load is
Expand Down Expand Up @@ -79,6 +85,7 @@ type balancerOptions struct {
noRebalanceLoop bool
timeSource timeutil.TimeSource
rebalanceRate float32
rebalanceDelay time.Duration
}

// Option defines an option that can be passed to NewBalancer in order to
Expand Down Expand Up @@ -110,14 +117,25 @@ func TimeSource(ts timeutil.TimeSource) Option {
}
}

// RebalanceRate defines the rate of rebalancing across pods. Set to -1 to
// disable rebalancing (i.e. connection transfers).
// RebalanceRate defines the rate of rebalancing across pods. This must be
// between 0 and 1 inclusive. 0 means no rebalancing will occur.
func RebalanceRate(rate float32) Option {
return func(opts *balancerOptions) {
opts.rebalanceRate = rate
}
}

// RebalanceDelay specifies the minimum amount of time that must elapse between
// attempts to rebalance a given tenant. This delay has the effect of throttling
// RebalanceTenant calls to avoid constantly moving connections around.
//
// RebalanceDelay defaults to defaultRebalanceDelay. Use -1 to never throttle.
func RebalanceDelay(delay time.Duration) Option {
return func(opts *balancerOptions) {
opts.rebalanceDelay = delay
}
}

// Balancer handles load balancing of SQL connections within the proxy.
// All methods on the Balancer instance are thread-safe.
type Balancer struct {
Expand Down Expand Up @@ -152,6 +170,19 @@ type Balancer struct {

// rebalanceRate represents the rate of rebalancing connections.
rebalanceRate float32

// rebalanceDelay is the minimum amount of time that must elapse between
// attempts to rebalance a given tenant. Defaults to defaultRebalanceDelay.
rebalanceDelay time.Duration

// lastRebalance is the last time the tenants are rebalanced. This is used
// to rate limit the number of rebalances per tenant. Synchronization is
// needed since rebalance operations can be triggered by the rebalance loop,
// or the pod watcher.
lastRebalance struct {
syncutil.Mutex
tenants map[roachpb.TenantID]time.Time
}
}

// NewBalancer constructs a new Balancer instance that is responsible for
Expand All @@ -164,22 +195,15 @@ func NewBalancer(
opts ...Option,
) (*Balancer, error) {
// Handle options.
options := &balancerOptions{}
options := &balancerOptions{
maxConcurrentRebalances: defaultMaxConcurrentRebalances,
timeSource: timeutil.DefaultTimeSource{},
rebalanceRate: defaultRebalanceRate,
rebalanceDelay: defaultRebalanceDelay,
}
for _, opt := range opts {
opt(options)
}
if options.maxConcurrentRebalances == 0 {
options.maxConcurrentRebalances = defaultMaxConcurrentRebalances
}
if options.timeSource == nil {
options.timeSource = timeutil.DefaultTimeSource{}
}
if options.rebalanceRate == 0 {
options.rebalanceRate = defaultRebalanceRate
}
if options.rebalanceRate == -1 {
options.rebalanceRate = 0
}

// Ensure that ctx gets cancelled on stopper's quiescing.
ctx, _ = stopper.WithCancelOnQuiesce(ctx)
Expand All @@ -197,7 +221,10 @@ func NewBalancer(
processSem: semaphore.New(options.maxConcurrentRebalances),
timeSource: options.timeSource,
rebalanceRate: options.rebalanceRate,
rebalanceDelay: options.rebalanceDelay,
}
b.lastRebalance.tenants = make(map[roachpb.TenantID]time.Time)

b.connTracker, err = NewConnTracker(ctx, b.stopper, b.timeSource)
if err != nil {
return nil, err
Expand All @@ -218,6 +245,46 @@ func NewBalancer(
return b, nil
}

// RebalanceTenant rebalances connections to the given tenant. If no RUNNING
// pod exists for the given tenant, or the tenant has been recently rebalanced,
// this is a no-op.
func (b *Balancer) RebalanceTenant(ctx context.Context, tenantID roachpb.TenantID) {
// If rebalanced recently, no-op.
if !b.canRebalanceTenant(tenantID) {
return
}

tenantPods, err := b.directoryCache.TryLookupTenantPods(ctx, tenantID)
if err != nil {
log.Errorf(ctx, "could not rebalance tenant %s: %v", tenantID, err.Error())
return
}

// Construct a map so we could easily retrieve the pod by address.
podMap := make(map[string]*tenant.Pod)
var hasRunningPod bool
for _, pod := range tenantPods {
podMap[pod.Addr] = pod

if pod.State == tenant.RUNNING {
hasRunningPod = true
}
}

// Only attempt to rebalance if we have a RUNNING pod. In theory, this
// case would happen if we're scaling down from 1 to 0, which in that
// case, we can't transfer connections anywhere. Practically, we will
// never scale a tenant from 1 to 0 if there are still active
// connections, so this case should not occur.
if !hasRunningPod {
return
}

activeList, idleList := b.connTracker.listAssignments(tenantID)
b.rebalancePartition(podMap, activeList)
b.rebalancePartition(podMap, idleList)
}

// SelectTenantPod selects a tenant pod from the given list based on a weighted
// CPU load algorithm. It is expected that all pods within the list belongs to
// the same tenant. If no pods are available, this returns ErrNoAvailablePods.
Expand Down Expand Up @@ -318,6 +385,20 @@ func (b *Balancer) rebalanceLoop(ctx context.Context) {
}
}

// canRebalanceTenant returns true if it has been at least `rebalanceDelay`
// since the last time the given tenant was rebalanced, or false otherwise.
func (b *Balancer) canRebalanceTenant(tenantID roachpb.TenantID) bool {
b.lastRebalance.Lock()
defer b.lastRebalance.Unlock()

now := b.timeSource.Now()
if now.Sub(b.lastRebalance.tenants[tenantID]) < b.rebalanceDelay {
return false
}
b.lastRebalance.tenants[tenantID] = now
return true
}

// rebalance attempts to rebalance connections for all tenants within the proxy.
func (b *Balancer) rebalance(ctx context.Context) {
// getTenantIDs ensures that tenants will have at least one connection.
Expand All @@ -327,43 +408,6 @@ func (b *Balancer) rebalance(ctx context.Context) {
}
}

// RebalanceTenant rebalances connections for the given tenant. If no RUNNING
// pod exists for the given tenant, this is a no-op.
//
// TODO(jaylim-crl): Rate limit the number of rebalances per tenant for requests
// coming from the pod watcher.
func (b *Balancer) RebalanceTenant(ctx context.Context, tenantID roachpb.TenantID) {
tenantPods, err := b.directoryCache.TryLookupTenantPods(ctx, tenantID)
if err != nil {
log.Errorf(ctx, "could not rebalance tenant %s: %v", tenantID, err.Error())
return
}

// Construct a map so we could easily retrieve the pod by address.
podMap := make(map[string]*tenant.Pod)
var hasRunningPod bool
for _, pod := range tenantPods {
podMap[pod.Addr] = pod

if pod.State == tenant.RUNNING {
hasRunningPod = true
}
}

// Only attempt to rebalance if we have a RUNNING pod. In theory, this
// case would happen if we're scaling down from 1 to 0, which in that
// case, we can't transfer connections anywhere. Practically, we will
// never scale a tenant from 1 to 0 if there are still active
// connections, so this case should not occur.
if !hasRunningPod {
return
}

activeList, idleList := b.connTracker.listAssignments(tenantID)
b.rebalancePartition(podMap, activeList)
b.rebalancePartition(podMap, idleList)
}

// rebalancePartition rebalances the given assignments partition.
func (b *Balancer) rebalancePartition(
pods map[string]*tenant.Pod, assignments []*ServerAssignment,
Expand Down
104 changes: 104 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func TestRebalancer_rebalance(t *testing.T) {
directoryCache,
NoRebalanceLoop(),
TimeSource(timeSource),
RebalanceDelay(-1),
)
require.NoError(t, err)

Expand Down Expand Up @@ -715,6 +716,109 @@ func TestRebalancer_rebalance(t *testing.T) {
}
}

func TestBalancer_RebalanceTenant_WithDefaultDelay(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

// Use a custom time source for testing.
t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
timeSource := timeutil.NewManualTime(t0)

metrics := NewMetrics()
directoryCache := newTestDirectoryCache()

b, err := NewBalancer(
ctx,
stopper,
metrics,
directoryCache,
NoRebalanceLoop(),
TimeSource(timeSource),
)
require.NoError(t, err)

tenantID := roachpb.MakeTenantID(10)
pods := []*tenant.Pod{
{TenantID: tenantID.ToUint64(), Addr: "127.0.0.30:80", State: tenant.DRAINING},
{TenantID: tenantID.ToUint64(), Addr: "127.0.0.30:81", State: tenant.RUNNING},
}
for _, pod := range pods {
require.True(t, directoryCache.upsertPod(pod))
}

// Create 100 active connections, all to the draining pod.
const numConns = 100
var mu syncutil.Mutex
assignments := make([]*ServerAssignment, numConns)
makeTestConnHandle := func(idx int) *testConnHandle {
var handle *testConnHandle
handle = &testConnHandle{
onTransferConnection: func() error {
mu.Lock()
defer mu.Unlock()
assignments[idx].Close()
assignments[idx] = NewServerAssignment(
tenantID, b.connTracker, handle, pods[1].Addr,
)
return nil
},
}
return handle
}
var handles []ConnectionHandle
for i := 0; i < numConns; i++ {
handle := makeTestConnHandle(i)
handles = append(handles, handle)
assignments[i] = NewServerAssignment(
tenantID, b.connTracker, handle, pods[0].Addr,
)
}

waitFor := func(numTransfers int) {
testutils.SucceedsSoon(t, func() error {
count := 0
for i := 0; i < 100; i++ {
count += handles[i].(*testConnHandle).transferConnectionCount()
}
if count != numTransfers {
return errors.Newf("require %d, but got %v", numTransfers, count)
}
return nil
})
}

// Attempt the rebalance, and wait until 50 were moved
// (i.e. 100 * defaultRebalanceRate).
b.RebalanceTenant(ctx, tenantID)
waitFor(50)

// Run the rebalance again.
b.RebalanceTenant(ctx, tenantID)

// Queue should be empty, and no additional connections should be moved.
b.queue.mu.Lock()
queueLen := b.queue.queue.Len()
b.queue.mu.Unlock()
require.Equal(t, 0, queueLen)
waitFor(50)

// Advance time, rebalance, and wait until 75 (i.e. 50 + 25) connections
// get moved.
timeSource.Advance(defaultRebalanceDelay)
b.RebalanceTenant(ctx, tenantID)
waitFor(75)

// Advance time, rebalance, and wait until 88 (i.e. 75 + 13) connections
// get moved.
timeSource.Advance(defaultRebalanceDelay)
b.RebalanceTenant(ctx, tenantID)
waitFor(88)
}

func TestEnqueueRebalanceRequests(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down

0 comments on commit 62021aa

Please sign in to comment.