Skip to content

Commit

Permalink
workload: jitter the teardown of connections to prevent thundering herd
Browse files Browse the repository at this point in the history
This change upgrades workload's use of pgx from v4 to v5 in order to allow
jittering the teardown of connections.  This change sets a max connection age
of 5min and jitters the teardown by 30s.  Upgrading to pgx v5 also adds
non-blocking pgxpool connection acquisition.

Release note (cli change): workload jitters teardown of connections to prevent
thundering herd impacting P99 latency results.
  • Loading branch information
sean- committed Mar 16, 2023
1 parent d336a62 commit 1e2efd5
Show file tree
Hide file tree
Showing 22 changed files with 184 additions and 92 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/guptarohit/asciigraph v0.5.5
github.com/irfansharif/recorder v0.0.0-20211218081646-a21b46510fd6
github.com/jackc/pgx/v5 v5.2.0
github.com/jaegertracing/jaeger v1.18.1
github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible
github.com/jordanlewis/gcassert v0.0.0-20221027203946-81f097ad35a0
Expand Down Expand Up @@ -308,6 +309,7 @@ require (
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jackc/puddle/v2 v2.1.2 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1381,11 +1381,15 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ
github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs=
github.com/jackc/pgx/v4 v4.16.1 h1:JzTglcal01DrghUqt+PmzWsZx/Yh7SC/CTQmSBMTd0Y=
github.com/jackc/pgx/v4 v4.16.1/go.mod h1:SIhx0D5hoADaiXZVyv+3gSm3LCIIINTVO0PficsvWGQ=
github.com/jackc/pgx/v5 v5.2.0 h1:NdPpngX0Y6z6XDFKqmFQaE+bCtkqzvQIOt1wvBlAqs8=
github.com/jackc/pgx/v5 v5.2.0/go.mod h1:Ptn7zmohNsWEsdxRawMzk3gaKma2obW+NWTnKa0S4nk=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw=
github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle/v2 v2.1.2 h1:0f7vaaXINONKTsxYDn4otOAiJanX/BMeAtY//BXqzlg=
github.com/jackc/puddle/v2 v2.1.2/go.mod h1:2lpufsF5mRHO6SuZkm0fNYxM6SWHfvyFj62KwNzgels=
github.com/jaegertracing/jaeger v1.18.1 h1:eFqjEpTKq2FfiZ/YX53oxeCePdIZyWvDfXaTAGj0r5E=
github.com/jaegertracing/jaeger v1.18.1/go.mod h1:WRzMFH62rje1VgbShlgk6UbWUNoo08uFFvs/x50aZKk=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
Expand Down
8 changes: 5 additions & 3 deletions pkg/workload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ go_library(
"//pkg/util/timeutil",
"//pkg/workload/histogram",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_jackc_pgx_v4//pgxpool",
"@com_github_jackc_pgx_v5//:pgx",
"@com_github_jackc_pgx_v5//pgconn",
"@com_github_jackc_pgx_v5//pgxpool",
"@com_github_jackc_pgx_v5//tracelog",
"@com_github_lib_pq//:pq",
"@com_github_marusama_semaphore//:semaphore",
"@com_github_spf13_pflag//:pflag",
"@org_golang_x_sync//errgroup",
],
Expand Down
4 changes: 2 additions & 2 deletions pkg/workload/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ go_library(
"//pkg/workload",
"//pkg/workload/histogram",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_jackc_pgx_v5//:pgx",
"@com_github_jackc_pgx_v5//pgconn",
"@com_github_spf13_pflag//:pflag",
],
)
Expand Down
12 changes: 9 additions & 3 deletions pkg/workload/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/spf13/pflag"
)

Expand Down Expand Up @@ -598,7 +598,13 @@ func (o *kvOp) run(ctx context.Context) (retErr error) {
// that each run call makes 1 attempt, so that rate limiting in workerRun
// behaves as expected.
var tx pgx.Tx
if tx, err = o.mcp.Get().Begin(ctx); err != nil {
pl := o.mcp.Get()
conn, err := pl.Acquire(ctx)
if err != nil {
return err
}

if tx, err = conn.Begin(ctx); err != nil {
return err
}
defer func() {
Expand Down
175 changes: 126 additions & 49 deletions pkg/workload/pgx_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ import (
"context"
"strings"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/tracelog"
"github.com/marusama/semaphore"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -48,16 +51,34 @@ type MultiConnPoolCfg struct {
// If 0, there is no per-pool maximum (other than the total maximum number of
// connections which still applies).
MaxConnsPerPool int

// WarmupConns specifies the number of connections to prewarm when
// initializing a MultiConnPool. A value of 0 automatically initialize the
// max number of connections per pool. A value less than 0 skips the
// connection warmup phase.
WarmupConns int

// MaxConnLifetime specifies the max age of individual connections in
// connection pools. If 0, a default value of 5 minutes is used.
MaxConnLifetime time.Duration

// MaxConnLifetimeJitter shortens the max age of a connection by a random
// duration less than the specified jitter. If 0, a default value of 30
// seconds is used.
MaxConnLifetimeJitter time.Duration

// LogLevel specifies the log level (default: warn)
LogLevel tracelog.LogLevel
}

// pgxLogger implements the pgx.Logger interface.
type pgxLogger struct{}

var _ pgx.Logger = pgxLogger{}
var _ tracelog.Logger = pgxLogger{}

// Log implements the pgx.Logger interface.
func (p pgxLogger) Log(
ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{},
ctx context.Context, level tracelog.LogLevel, msg string, data map[string]interface{},
) {
if ctx.Err() != nil {
// Don't log anything from pgx if the context was canceled by the workload
Expand All @@ -76,7 +97,7 @@ func (p pgxLogger) Log(
return
}
}
log.Infof(ctx, "pgx logger [%s]: %s logParams=%v", level.String(), msg, data)
log.VInfof(ctx, log.Level(level), "pgx logger [%s]: %s logParams=%v", level.String(), msg, data)
}

// NewMultiConnPool creates a new MultiConnPool.
Expand All @@ -92,27 +113,36 @@ func NewMultiConnPool(
m := &MultiConnPool{}
m.mu.preparedStatements = map[string]string{}

logLevel := tracelog.LogLevelWarn
if cfg.LogLevel != 0 {
logLevel = cfg.LogLevel
}
maxConnLifetime := 5 * time.Minute
if cfg.MaxConnLifetime > 0 {
maxConnLifetime = cfg.MaxConnLifetime
}
maxConnLifetimeJitter := 30 * time.Second
if cfg.MaxConnLifetimeJitter > 0 {
maxConnLifetimeJitter = cfg.MaxConnLifetimeJitter
}

connsPerURL := distribute(cfg.MaxTotalConnections, len(urls))
maxConnsPerPool := cfg.MaxConnsPerPool
if maxConnsPerPool == 0 {
maxConnsPerPool = cfg.MaxTotalConnections
}

var warmupConns [][]*pgxpool.Conn
for i := range urls {
connsPerPool := distributeMax(connsPerURL[i], maxConnsPerPool)
for _, numConns := range connsPerPool {
connCfg, err := pgxpool.ParseConfig(urls[i])
poolCfg, err := pgxpool.ParseConfig(urls[i])
if err != nil {
return nil, err
}
// Disable the automatic prepared statement cache. We've seen a lot of
// churn in this cache since workloads create many of different queries.
connCfg.ConnConfig.BuildStatementCache = nil
connCfg.ConnConfig.LogLevel = pgx.LogLevelWarn
connCfg.ConnConfig.Logger = pgxLogger{}
connCfg.MaxConns = int32(numConns)
connCfg.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
poolCfg.MaxConnLifetime = maxConnLifetime
poolCfg.MaxConnLifetimeJitter = maxConnLifetimeJitter
poolCfg.MaxConns = int32(numConns)
poolCfg.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
m.mu.RLock()
defer m.mu.RUnlock()
for name, sql := range m.mu.preparedStatements {
Expand All @@ -126,51 +156,29 @@ func NewMultiConnPool(
}
return true
}
p, err := pgxpool.ConnectConfig(ctx, connCfg)

// Disable the automatic prepared statement cache. We've seen a lot of
// churn in this cache since workloads create many of different queries.
connCfg := poolCfg.ConnConfig
connCfg.DefaultQueryExecMode = pgx.QueryExecModeDescribeExec
connCfg.StatementCacheCapacity = 0
connCfg.DescriptionCacheCapacity = 0
connCfg.Tracer = &tracelog.TraceLog{
Logger: &pgxLogger{},
LogLevel: logLevel,
}
p, err := pgxpool.NewWithConfig(ctx, poolCfg)
if err != nil {
return nil, err
}

warmupConns = append(warmupConns, make([]*pgxpool.Conn, numConns))
m.Pools = append(m.Pools, p)
}
}

// "Warm up" the pools so we don't have to establish connections later (which
// would affect the observed latencies of the first requests, especially when
// prepared statements are used). We do this by
// acquiring connections (in parallel), then releasing them back to the
// pool.
var g errgroup.Group
// Limit concurrent connection establishment. Allowing this to run
// at maximum parallelism would trigger syn flood protection on the
// host, which combined with any packet loss could cause Acquire to
// return an error and fail the whole function. The value 100 is
// chosen because it is less than the default value for SOMAXCONN
// (128).
sem := make(chan struct{}, 100)
for i, p := range m.Pools {
p := p
conns := warmupConns[i]
for j := range conns {
j := j
sem <- struct{}{}
g.Go(func() error {
var err error
conns[j], err = p.Acquire(ctx)
<-sem
return err
})
}
}
if err := g.Wait(); err != nil {
if err := m.WarmupConns(ctx, cfg.WarmupConns); err != nil {
return nil, err
}
for i := range m.Pools {
for _, c := range warmupConns[i] {
c.Release()
}
}

return m, nil
}
Expand Down Expand Up @@ -200,6 +208,75 @@ func (m *MultiConnPool) Close() {
}
}

// WarmupConns warms up numConns connections across all pools contained within
// MultiConnPool. The max number of connections are warmed up if numConns is
// set to 0.
func (m *MultiConnPool) WarmupConns(ctx context.Context, numConns int) error {
if numConns < 0 {
return nil
}

// "Warm up" the pools so we don't have to establish connections later (which
// would affect the observed latencies of the first requests, especially when
// prepared statements are used). We do this by
// acquiring connections (in parallel), then releasing them back to the
// pool.
var g errgroup.Group
// Limit concurrent connection establishment. Allowing this to run
// at maximum parallelism would trigger syn flood protection on the
// host, which combined with any packet loss could cause Acquire to
// return an error and fail the whole function. The value 100 is
// chosen because it is less than the default value for SOMAXCONN
// (128).
sem := semaphore.New(100)

warmupCtx, cancel := context.WithCancel(ctx)
defer cancel()

var numWarmupConns int
if numConns <= 0 {
for _, p := range m.Pools {
numWarmupConns += int(p.Config().MaxConns)
}
} else {
numWarmupConns = numConns
}

warmupConns := make(chan struct{}, numWarmupConns)

for _, p := range m.Pools {
p := p
for j := 0; j < int(p.Config().MaxConns); j++ {
g.Go(func() error {
if err := sem.Acquire(warmupCtx, 1); err != nil {
warmupConns <- struct{}{}
return err
}
conn, err := p.Acquire(warmupCtx)
if err != nil {
sem.Release(1)
warmupConns <- struct{}{}
return err
}
sem.Release(1)
warmupConns <- struct{}{}
<-warmupCtx.Done()
conn.Release()
return err
})
}
}
for i := 0; i < numWarmupConns; i++ {
<-warmupConns
}
cancel()
if err := g.Wait(); err != nil {
return err
}

return nil
}

// distribute returns a slice of <num> integers that add up to <total> and are
// within +/-1 of each other.
func distribute(total, num int) []int {
Expand Down
6 changes: 3 additions & 3 deletions pkg/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ go_library(
"//pkg/workload",
"//pkg/workload/histogram",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_jackc_pgx_v4//pgxpool",
"@com_github_jackc_pgx_v5//:pgx",
"@com_github_jackc_pgx_v5//pgconn",
"@com_github_jackc_pgx_v5//pgxpool",
"@com_github_lib_pq//oid",
"@com_github_spf13_pflag//:pflag",
],
Expand Down
4 changes: 2 additions & 2 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)

func (og *operationGenerator) tableExists(
Expand Down
4 changes: 2 additions & 2 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)

// seqNum may be shared across multiple instances of this, so it should only
Expand Down
4 changes: 2 additions & 2 deletions pkg/workload/schemachange/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/spf13/pflag"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/workload/schemachange/type_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5"
"github.com/lib/pq/oid"
)

Expand Down
Loading

0 comments on commit 1e2efd5

Please sign in to comment.