Skip to content

Commit

Permalink
tsdb: expand mem per worker based on sql pool size
Browse files Browse the repository at this point in the history
Previously, the memory limit for all `tsdb` workers was set at a static
64MiB. This cap created issues seen in cockroachdb#24018 where this limit was hit
on a 30 node cluster. To alleviate the issue, the number of workers was
reduced, raising the per-worker allocation.

We've currently hit this limit again as part of load testing with larger
clusters and have decided to make the per-query worker memory limit
dynamic. The per-worker limit is now raised based on the amount of memory
available to the SQL Pool via the `MemoryPoolSize` configuration
variable. This is set to be 25% of the system memory by default. The
`tsdb` memory cap per-worker is now doubled until it reaches `1/128` of
the memory pool setting.

For example, on a node with 128 - 256 GiB of memory, this will
correspond to 512 MiB allocated for all running `tsdb` queries.

In addition, the ts server is now connected to the same `BytesMonitor`
instance as the SQL memory monitor and workers will becapped at double
the query limit. Results are monitored as before but a cap is not
introduced there since we didn't have one present previously.

This behavior is gated behind a private cluster setting that's enabled
by default and sets the ratio at 1/128 of the SQL memory pool.

Resolves cockroachdb#72986

Release note (ops change): customers running clusters with 240 nodes or
more can effectively access tsdb metrics.
  • Loading branch information
dhartunian committed Jan 27, 2022
1 parent e1ceeda commit bcaec7e
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 22 deletions.
18 changes: 11 additions & 7 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,13 +553,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
cfg.AmbientCtx, st, nodeDialer, grpcServer.Server, stopper,
)

tsDB := ts.NewDB(db, cfg.Settings)
registry.AddMetricStruct(tsDB.Metrics())
nodeCountFn := func() int64 {
return nodeLiveness.Metrics().LiveNodes.Value()
}
sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper)

ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer)
stores := kvserver.NewStores(cfg.AmbientCtx, clock)
ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */)
Expand Down Expand Up @@ -609,6 +602,17 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
kvMemoryMonitor.Stop(ctx)
}))

tsDB := ts.NewDB(db, cfg.Settings)
registry.AddMetricStruct(tsDB.Metrics())
nodeCountFn := func() int64 {
return nodeLiveness.Metrics().LiveNodes.Value()
}
sTS := ts.MakeServer(
cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig,
cfg.MemoryPoolSize, sqlMonitorAndMetrics.rootSQLMemoryMonitor,
cfg.Settings, stopper,
)

storeCfg := kvserver.StoreConfig{
DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(),
Settings: st,
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,14 @@ func (ts *TestServer) TsDB() *ts.DB {
return nil
}

// TsServer returns the ts.TsServer instance used by the TestServer.
func (ts *TestServer) TsServer() *ts.Server {
if ts != nil {
return ts.tsServer
}
return nil
}

// DB returns the client.DB instance used by the TestServer.
func (ts *TestServer) DB() *kv.DB {
if ts != nil {
Expand Down
80 changes: 65 additions & 15 deletions pkg/ts/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ package ts

import (
"context"
"github.com/cockroachdb/errors"
"math"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/ts/catalog"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -46,6 +49,24 @@ const (
dumpBatchSize = 100
)

var TSDBRatioOfSQLMemoryPool = func() *settings.IntSetting {
s := settings.RegisterIntSetting(
settings.TenantWritable,
"server.ts.ratio_of_sql_memory_pool",
"sets the target memory allocation to the tsdb as a fraction of the SQL memory pool "+
"(a setting of 128 sets the memory pool to be 1/128 of the SQL mem pool). "+
"Set to zero to disable memory growth for the tsdb and cap at 64MiB. This setting is experimental.",
128,
func(i int64) error {
if i < 0 {
return errors.New("value should be non-negative.")
}
return nil
},
)
return s
}()

// ClusterNodeCountFn is a function that returns the number of nodes active on
// the cluster.
type ClusterNodeCountFn func() int64
Expand Down Expand Up @@ -104,50 +125,68 @@ func MakeServer(
db *DB,
nodeCountFn ClusterNodeCountFn,
cfg ServerConfig,
memoryPoolSize int64,
memoryMonitor *mon.BytesMonitor,
settings *cluster.Settings,
stopper *stop.Stopper,
) Server {
ambient.AddLogTag("ts-srv", nil)
ctx := ambient.AnnotateCtx(context.Background())

// Override default values from configuration.
queryWorkerMax := queryWorkerMax
if cfg.QueryWorkerMax != 0 {
queryWorkerMax = cfg.QueryWorkerMax
}
queryMemoryMax := queryMemoryMax
targetMemRatio := TSDBRatioOfSQLMemoryPool.Get(&settings.SV)
if targetMemRatio > 0 {
// Double size until we hit 1/128 of the memory pool setting. Our
// typical default here is 64 MiB which corresponds to a pool of 2 GiB
// which corresponds to 8 GiB of system memory (assuming a default
// setting of 25% for the pool).
memoryCap := memoryPoolSize / targetMemRatio
for queryMemoryMax <= memoryCap/2 {
queryMemoryMax = queryMemoryMax * 2
}
log.Infof(ctx, "ts: setting query memory max to %d bytes", queryMemoryMax)
}
if cfg.QueryMemoryMax != 0 {
queryMemoryMax = cfg.QueryMemoryMax
}
workerSem := quotapool.NewIntPool("ts.Server worker", uint64(queryWorkerMax))
stopper.AddCloser(workerSem.Closer("stopper"))
return Server{
s := Server{
AmbientContext: ambient,
db: db,
stopper: stopper,
nodeCountFn: nodeCountFn,
workerMemMonitor: mon.NewUnlimitedMonitor(
context.Background(),
workerMemMonitor: mon.NewMonitorInheritWithLimit(
"timeseries-workers",
mon.MemoryResource,
nil,
nil,
// Begin logging messages if we exceed our planned memory usage by
// more than double.
queryMemoryMax*2,
db.st,
memoryMonitor,
),
resultMemMonitor: mon.NewUnlimitedMonitor(
context.Background(),
resultMemMonitor: mon.NewMonitorInheritWithLimit(
"timeseries-results",
mon.MemoryResource,
nil,
nil,
math.MaxInt64,
db.st,
memoryMonitor,
),
queryMemoryMax: queryMemoryMax,
queryWorkerMax: queryWorkerMax,
workerSem: workerSem,
}

s.workerMemMonitor.Start(ctx, memoryMonitor, mon.BoundAccount{})
stopper.AddCloser(stop.CloserFn(func() {
s.workerMemMonitor.Stop(ctx)
}))

s.resultMemMonitor.Start(ambient.AnnotateCtx(context.Background()), memoryMonitor, mon.BoundAccount{})
stopper.AddCloser(stop.CloserFn(func() {
s.resultMemMonitor.Stop(ctx)
}))

return s
}

// RegisterService registers the GRPC service.
Expand Down Expand Up @@ -446,3 +485,14 @@ func dumpTimeseriesAllSources(
}
return nil
}

// GetQueryWorkerMax is used by tests to verify the memory caps.
func (s *Server) GetQueryWorkerMax() int64 {
return s.queryMemoryMax
}

// GetQueryMemoryMax returns the soft memory limit on all running
// queries.
func (s *Server) GetQueryMemoryMax() int64 {
return s.queryMemoryMax
}
40 changes: 40 additions & 0 deletions pkg/ts/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package ts_test
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"io"
"reflect"
"sort"
Expand Down Expand Up @@ -294,6 +295,45 @@ func TestServerQueryStarvation(t *testing.T) {
}
}

func TestServerMemoryCap(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())
tsDB := ts.NewDB(s.DB(), s.ClusterSettings())
monitor := &mon.BytesMonitor{}

MiB := 1024 * 1024
GiB := MiB * 1024
tcs := []struct {
poolSize int64
expectedTsDBWorkerMax int64
clusterConfigOverride int64
}{
// Default target is 1/128 of pool size.
{poolSize: int64(2 * GiB), expectedTsDBWorkerMax: int64(64 * MiB)},
{poolSize: int64(32 * GiB), expectedTsDBWorkerMax: int64(256 * MiB)},
{poolSize: int64(48 * GiB), expectedTsDBWorkerMax: int64(256 * MiB)},
{poolSize: int64(64 * GiB), expectedTsDBWorkerMax: int64(512 * MiB)},
// In these cases, we override the ratio to half the size expected above.
{poolSize: int64(32 * GiB), expectedTsDBWorkerMax: int64(128 * MiB), clusterConfigOverride: 256},
{poolSize: int64(64 * GiB), expectedTsDBWorkerMax: int64(256 * MiB), clusterConfigOverride: 256},
}

for _, tc := range tcs {
t.Run(fmt.Sprintf("%d pool should have %d worker max memory", tc.poolSize, tc.expectedTsDBWorkerMax),
func(t *testing.T) {
if tc.clusterConfigOverride > 0 {
_, err := db.Exec(fmt.Sprintf("set cluster setting server.ts.ratio_of_sql_memory_pool = %d", tc.clusterConfigOverride))
require.NoError(t, err)
}
s := ts.MakeServer(s.AmbientCtx(), tsDB, nil, ts.ServerConfig{}, tc.poolSize, monitor, s.ClusterSettings(), s.Stopper())
require.Equal(t, tc.expectedTsDBWorkerMax, s.GetQueryWorkerMax())
})
}
}

// TestServerQueryMemoryManagement verifies that queries succeed under
// constrained memory requirements.
func TestServerQueryMemoryManagement(t *testing.T) {
Expand Down

0 comments on commit bcaec7e

Please sign in to comment.