Skip to content

Commit

Permalink
smartconnpool: do not allow connections to starve
Browse files Browse the repository at this point in the history
Signed-off-by: Vicent Marti <[email protected]>
  • Loading branch information
vmg authored and mhamza15 committed Feb 3, 2025
1 parent 9b1f5c0 commit 890610e
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 24 deletions.
4 changes: 2 additions & 2 deletions go/pools/smartconnpool/benchmarking/legacy/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"math/rand/v2"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -576,7 +576,7 @@ func (rp *ResourcePool) extendedMaxLifetime() time.Duration {
if maxLifetime == 0 {
return 0
}
return time.Duration(maxLifetime + rand.Int63n(maxLifetime))
return time.Duration(maxLifetime + rand.Int64N(maxLifetime))
}

// MaxLifetimeClosed returns the count of resources closed due to refresh timeout.
Expand Down
2 changes: 1 addition & 1 deletion go/pools/smartconnpool/benchmarking/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand"
"math/rand/v2"
"os"
"sort"
"sync"
Expand Down
104 changes: 84 additions & 20 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package smartconnpool

import (
"context"
"math/rand/v2"
"slices"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/hack"
"vitess.io/vitess/go/vt/log"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
Expand Down Expand Up @@ -92,6 +92,7 @@ type RefreshCheck func() (bool, error)

type Config[C Connection] struct {
Capacity int64
MaxIdleCount int64
IdleTimeout time.Duration
MaxLifetime time.Duration
RefreshInterval time.Duration
Expand Down Expand Up @@ -123,6 +124,8 @@ type ConnPool[C Connection] struct {
active atomic.Int64
// capacity is the maximum number of connections that this pool can open
capacity atomic.Int64
// maxIdleCount is the maximum idle connections in the pool
idleCount atomic.Int64

// workers is a waitgroup for all the currently running worker goroutines
workers sync.WaitGroup
Expand All @@ -138,6 +141,8 @@ type ConnPool[C Connection] struct {
// maxCapacity is the maximum value to which capacity can be set; when the pool
// is re-opened, it defaults to this capacity
maxCapacity int64
// maxIdleCount is the maximum idle connections in the pool
maxIdleCount int64
// maxLifetime is the maximum time a connection can be open
maxLifetime atomic.Int64
// idleTimeout is the maximum time a connection can remain idle
Expand All @@ -156,8 +161,8 @@ type ConnPool[C Connection] struct {
// The pool must be ConnPool.Open before it can start giving out connections
func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
pool := &ConnPool[C]{}
pool.freshSettingsStack.Store(-1)
pool.config.maxCapacity = config.Capacity
pool.config.maxIdleCount = config.MaxIdleCount
pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds())
pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds())
pool.config.refreshInterval.Store(config.RefreshInterval.Nanoseconds())
Expand Down Expand Up @@ -192,11 +197,18 @@ func (pool *ConnPool[C]) runWorker(close <-chan struct{}, interval time.Duration
func (pool *ConnPool[C]) open() {
pool.close = make(chan struct{})
pool.capacity.Store(pool.config.maxCapacity)
pool.setIdleCount()

// The expire worker takes care of removing from the waiter list any clients whose
// context has been cancelled.
pool.runWorker(pool.close, 1*time.Second, func(_ time.Time) bool {
pool.wait.expire(false)
pool.runWorker(pool.close, 100*time.Millisecond, func(_ time.Time) bool {
maybeStarving := pool.wait.expire(false)

// Do not allow connections to starve; if there's waiters in the queue
// and connections in the stack, it means we could be starving them.
// Try getting out a connection and handing it over directly
for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ {
}
return true
})

Expand Down Expand Up @@ -315,6 +327,16 @@ func (pool *ConnPool[C]) MaxCapacity() int64 {
return pool.config.maxCapacity
}

func (pool *ConnPool[C]) setIdleCount() {
capacity := pool.Capacity()
maxIdleCount := pool.config.maxIdleCount
if maxIdleCount == 0 || maxIdleCount > capacity {
pool.idleCount.Store(capacity)
} else {
pool.idleCount.Store(maxIdleCount)
}
}

// InUse returns the number of connections that the pool has lent out to clients and that
// haven't been returned yet.
func (pool *ConnPool[C]) InUse() int64 {
Expand All @@ -340,6 +362,10 @@ func (pool *ConnPool[C]) SetIdleTimeout(duration time.Duration) {
pool.config.idleTimeout.Store(duration.Nanoseconds())
}

func (pool *ConnPool[D]) IdleCount() int64 {
return pool.idleCount.Load()
}

func (pool *ConnPool[D]) RefreshInterval() time.Duration {
return time.Duration(pool.config.refreshInterval.Load())
}
Expand Down Expand Up @@ -395,14 +421,52 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
}
}

if !pool.wait.tryReturnConn(conn) {
connSetting := conn.Conn.Setting()
if connSetting == nil {
pool.clean.Push(conn)
} else {
stack := connSetting.bucket & stackMask
pool.settings[stack].Push(conn)
pool.freshSettingsStack.Store(int64(stack))
pool.tryReturnConn(conn)
}

func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool {
if pool.wait.tryReturnConn(conn) {
return true
}
if pool.closeOnIdleLimitReached(conn) {
return false
}
connSetting := conn.Conn.Setting()
if connSetting == nil {
pool.clean.Push(conn)
} else {
stack := connSetting.bucket & stackMask
pool.settings[stack].Push(conn)
pool.freshSettingsStack.Store(int64(stack))
}
return false
}

func (pool *ConnPool[C]) tryReturnAnyConn() bool {
if conn, ok := pool.clean.Pop(); ok {
return pool.tryReturnConn(conn)
}
for u := 0; u <= stackMask; u++ {
if conn, ok := pool.settings[u].Pop(); ok {
return pool.tryReturnConn(conn)
}
}
return false
}

// closeOnIdleLimitReached closes a connection if the number of idle connections (active - inuse) in the pool
// exceeds the idleCount limit. It returns true if the connection is closed, false otherwise.
func (pool *ConnPool[C]) closeOnIdleLimitReached(conn *Pooled[C]) bool {
for {
open := pool.active.Load()
idle := open - pool.borrowed.Load()
if idle <= pool.idleCount.Load() {
return false
}
if pool.active.CompareAndSwap(open, open-1) {
pool.Metrics.idleClosed.Add(1)
conn.Close()
return true
}
}
}
Expand All @@ -412,8 +476,7 @@ func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
if maxLifetime == 0 {
return 0
}
extended := hack.FastRand() % uint32(maxLifetime)
return time.Duration(maxLifetime) + time.Duration(extended)
return time.Duration(maxLifetime) + time.Duration(rand.Uint32N(uint32(maxLifetime)))
}

func (pool *ConnPool[C]) connReopen(ctx context.Context, dbconn *Pooled[C], now time.Time) error {
Expand Down Expand Up @@ -443,14 +506,9 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
}

func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {
fresh := pool.freshSettingsStack.Load()
if fresh < 0 {
return nil
}

var start uint32
if setting == nil {
start = uint32(fresh)
start = uint32(pool.freshSettingsStack.Load())
} else {
start = setting.bucket
}
Expand Down Expand Up @@ -630,6 +688,9 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
if oldcap == newcap {
return nil
}
// update the idle count to match the new capacity if necessary
// wait for connections to be returned to the pool if we're reducing the capacity.
defer pool.setIdleCount()

const delay = 10 * time.Millisecond

Expand Down Expand Up @@ -733,6 +794,9 @@ func (pool *ConnPool[C]) RegisterStats(stats *servenv.Exporter, name string) {
// the smartconnpool doesn't have a maximum capacity
return pool.Capacity()
})
stats.NewGaugeFunc(name+"IdleAllowed", "Tablet server conn pool idle allowed limit", func() int64 {
return pool.IdleCount()
})
stats.NewCounterFunc(name+"WaitCount", "Tablet server conn pool wait count", func() int64 {
return pool.Metrics.WaitCount()
})
Expand Down
113 changes: 113 additions & 0 deletions go/pools/smartconnpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -746,6 +747,51 @@ func TestExtendedLifetimeTimeout(t *testing.T) {
}
}

// TestMaxIdleCount tests the MaxIdleCount setting, to check if the pool closes
// the idle connections when the number of idle connections exceeds the limit.
func TestMaxIdleCount(t *testing.T) {
testMaxIdleCount := func(t *testing.T, setting *Setting, maxIdleCount int64, expClosedConn int) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 5,
MaxIdleCount: maxIdleCount,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

defer p.Close()

var conns []*Pooled[*TestConn]
for i := 0; i < 5; i++ {
r, err := p.Get(ctx, setting)
require.NoError(t, err)
assert.EqualValues(t, i+1, state.open.Load())
assert.EqualValues(t, 0, p.Metrics.IdleClosed())

conns = append(conns, r)
}

for _, conn := range conns {
p.put(conn)
}

closedConn := 0
for _, conn := range conns {
if conn.Conn.IsClosed() {
closedConn++
}
}
assert.EqualValues(t, expClosedConn, closedConn)
assert.EqualValues(t, expClosedConn, p.Metrics.IdleClosed())
}

t.Run("WithoutSettings", func(t *testing.T) { testMaxIdleCount(t, nil, 2, 3) })
t.Run("WithSettings", func(t *testing.T) { testMaxIdleCount(t, sFoo, 2, 3) })
t.Run("WithoutSettings-MaxIdleCount-Zero", func(t *testing.T) { testMaxIdleCount(t, nil, 0, 0) })
t.Run("WithSettings-MaxIdleCount-Zero", func(t *testing.T) { testMaxIdleCount(t, sFoo, 0, 0) })
}

func TestCreateFail(t *testing.T) {
var state TestState
state.chaos.failConnect = true
Expand Down Expand Up @@ -1080,3 +1126,70 @@ func TestApplySettingsFailure(t *testing.T) {
p.put(r)
}
}

func TestGetSpike(t *testing.T) {
var state TestState

ctx := context.Background()
p := NewPool(&Config[*TestConn]{
Capacity: 5,
IdleTimeout: time.Second,
LogWait: state.LogWait,
}).Open(newConnector(&state), nil)

var resources [10]*Pooled[*TestConn]
var r *Pooled[*TestConn]
var err error

// Ensure we have a pool with 5 available resources
for i := 0; i < 5; i++ {
r, err = p.Get(ctx, nil)

require.NoError(t, err)
resources[i] = r
assert.EqualValues(t, 5-i-1, p.Available())
assert.Zero(t, p.Metrics.WaitCount())
assert.Zero(t, len(state.waits))
assert.Zero(t, p.Metrics.WaitTime())
assert.EqualValues(t, i+1, state.lastID.Load())
assert.EqualValues(t, i+1, state.open.Load())
}

for i := 0; i < 5; i++ {
p.put(resources[i])
}

assert.EqualValues(t, 5, p.Available())
assert.EqualValues(t, 5, p.Active())
assert.EqualValues(t, 0, p.InUse())

for i := 0; i < 2000; i++ {
wg := sync.WaitGroup{}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

errs := make(chan error, 80)

for j := 0; j < 80; j++ {
wg.Add(1)

go func() {
defer wg.Done()
r, err = p.Get(ctx, nil)
defer p.put(r)

if err != nil {
errs <- err
}
}()
}
wg.Wait()

if len(errs) > 0 {
t.Errorf("Error getting connection: %v", <-errs)
}

close(errs)
}
}
6 changes: 5 additions & 1 deletion go/pools/smartconnpool/waitlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting) (*Pool

// expire removes and wakes any expired waiter in the waitlist.
// if force is true, it'll wake and remove all the waiters.
func (wl *waitlist[C]) expire(force bool) {
func (wl *waitlist[C]) expire(force bool) (maybeStarving int) {
if wl.list.Len() == 0 {
return
}
Expand All @@ -91,6 +91,9 @@ func (wl *waitlist[C]) expire(force bool) {
expired = append(expired, e)
continue
}
if e.Value.age == 0 {
maybeStarving++
}
}
// remove the expired waiters from the waitlist after traversing it
for _, e := range expired {
Expand All @@ -102,6 +105,7 @@ func (wl *waitlist[C]) expire(force bool) {
for _, e := range expired {
e.Value.sema.notify(false)
}
return
}

// tryReturnConn tries handing over a connection to one of the waiters in the pool.
Expand Down

0 comments on commit 890610e

Please sign in to comment.