From 10f488ba8d9fce512463dabc0e9f10e26c4cc231 Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Mon, 10 Jan 2022 18:51:56 -0500 Subject: [PATCH] tsdb: expand mem per worker based on sql pool size Previously, the memory limit for all `tsdb` workers was set at a static 64MiB. This cap created issues seen in #24018 where this limit was hit on a 30 node cluster. To alleviate the issue, the number of workers was reduced, raising the per-worker allocation. We've currently hit this limit again as part of load testing with larger clusters and have decided to make the per-query worker memory limit dynamic. The per-worker limit is now raised based on the amount of memory available to the SQL Pool via the `MemoryPoolSize` configuration variable. This is set to be 25% of the system memory by default. The `tsdb` memory cap per-worker is now doubled until it reaches `1/128` of the memory pool setting. For example, on a node with 128 - 256 GiB of memory, this will correspond to 512 MiB allocated for all running `tsdb` queries. In addition, the ts server is now connected to the same `BytesMonitor` instance as the SQL memory monitor and workers will becapped at double the query limit. Results are monitored as before but a cap is not introduced there since we didn't have one present previously. This behavior is gated behind a private cluster setting that's enabled by default. TODO(davidh): Can the tests be faster? They iterate on a server create TODO(davidh): Is 1/128 a good setting? How do we validate this. Resolves #72986 Release note (ops change): customers running clusters with 240 nodes or more can effectively access tsdb metrics. --- pkg/server/server.go | 18 +++++++---- pkg/server/testserver.go | 8 +++++ pkg/ts/server.go | 69 +++++++++++++++++++++++++++++++--------- pkg/ts/server_test.go | 27 ++++++++++++++++ 4 files changed, 100 insertions(+), 22 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index 66da809a3230..eae2ad82c3a6 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -550,13 +550,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { cfg.AmbientCtx, st, nodeDialer, grpcServer.Server, stopper, ) - tsDB := ts.NewDB(db, cfg.Settings) - registry.AddMetricStruct(tsDB.Metrics()) - nodeCountFn := func() int64 { - return nodeLiveness.Metrics().LiveNodes.Value() - } - sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper) - ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer) stores := kvserver.NewStores(cfg.AmbientCtx, clock) ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */) @@ -603,6 +596,17 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { kvMemoryMonitor.Stop(ctx) })) + tsDB := ts.NewDB(db, cfg.Settings) + registry.AddMetricStruct(tsDB.Metrics()) + nodeCountFn := func() int64 { + return nodeLiveness.Metrics().LiveNodes.Value() + } + sTS := ts.MakeServer( + cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, + cfg.MemoryPoolSize, sqlMonitorAndMetrics.rootSQLMemoryMonitor, + cfg.Settings, stopper, + ) + storeCfg := kvserver.StoreConfig{ DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(), Settings: st, diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 39eb3cfe3783..a26a1289ed01 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -450,6 +450,14 @@ func (ts *TestServer) TsDB() *ts.DB { return nil } +// TsServer returns the ts.TsServer instance used by the TestServer. +func (ts *TestServer) TsServer() *ts.Server { + if ts != nil { + return ts.tsServer + } + return nil +} + // DB returns the client.DB instance used by the TestServer. func (ts *TestServer) DB() *kv.DB { if ts != nil { diff --git a/pkg/ts/server.go b/pkg/ts/server.go index 74763e4090d7..e636866a98bd 100644 --- a/pkg/ts/server.go +++ b/pkg/ts/server.go @@ -17,6 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/ts/catalog" "github.com/cockroachdb/cockroach/pkg/ts/tspb" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -46,6 +48,16 @@ const ( dumpBatchSize = 100 ) +var TSDBAutoMemoryGrowthEnabled = func() *settings.BoolSetting { + s := settings.RegisterBoolSetting( + settings.TenantWritable, + "server.ts.auto_memory_growth.enabled", + "enables or disables automatic growth of memory allocated to the TSDB's query facility based on the SQL memory pool size", + true, + ) + return s +}() + // ClusterNodeCountFn is a function that returns the number of nodes active on // the cluster. type ClusterNodeCountFn func() int64 @@ -104,9 +116,13 @@ func MakeServer( db *DB, nodeCountFn ClusterNodeCountFn, cfg ServerConfig, + memoryPoolSize int64, + memoryMonitor *mon.BytesMonitor, + settings *cluster.Settings, stopper *stop.Stopper, ) Server { ambient.AddLogTag("ts-srv", nil) + ctx := ambient.AnnotateCtx(context.Background()) // Override default values from configuration. queryWorkerMax := queryWorkerMax @@ -114,40 +130,52 @@ func MakeServer( queryWorkerMax = cfg.QueryWorkerMax } queryMemoryMax := queryMemoryMax + if TSDBAutoMemoryGrowthEnabled.Get(&settings.SV) { + // Double size until we hit 1/128 of the memory pool setting. Our + // typical default here is 64 MiB which corresponds to a pool of 2 GiB + // which corresponds to 8 GiB of system memory (assuming a default + // setting of 25% for the pool). + if queryMemoryMax < memoryPoolSize/128 { + queryMemoryMax = queryMemoryMax << int(math.Log2((float64(memoryPoolSize)/128)/float64(queryMemoryMax))) + log.Infof(ctx, "ts: setting query memory max to %d bytes", queryMemoryMax) + } + } if cfg.QueryMemoryMax != 0 { queryMemoryMax = cfg.QueryMemoryMax } workerSem := quotapool.NewIntPool("ts.Server worker", uint64(queryWorkerMax)) stopper.AddCloser(workerSem.Closer("stopper")) - return Server{ + s := Server{ AmbientContext: ambient, db: db, stopper: stopper, nodeCountFn: nodeCountFn, - workerMemMonitor: mon.NewUnlimitedMonitor( - context.Background(), + workerMemMonitor: mon.NewMonitorInheritWithLimit( "timeseries-workers", - mon.MemoryResource, - nil, - nil, - // Begin logging messages if we exceed our planned memory usage by - // more than double. queryMemoryMax*2, - db.st, + memoryMonitor, ), - resultMemMonitor: mon.NewUnlimitedMonitor( - context.Background(), + resultMemMonitor: mon.NewMonitorInheritWithLimit( "timeseries-results", - mon.MemoryResource, - nil, - nil, math.MaxInt64, - db.st, + memoryMonitor, ), queryMemoryMax: queryMemoryMax, queryWorkerMax: queryWorkerMax, workerSem: workerSem, } + + s.workerMemMonitor.Start(ctx, memoryMonitor, mon.BoundAccount{}) + stopper.AddCloser(stop.CloserFn(func() { + s.workerMemMonitor.Stop(ctx) + })) + + s.resultMemMonitor.Start(ambient.AnnotateCtx(context.Background()), memoryMonitor, mon.BoundAccount{}) + stopper.AddCloser(stop.CloserFn(func() { + s.resultMemMonitor.Stop(ctx) + })) + + return s } // RegisterService registers the GRPC service. @@ -446,3 +474,14 @@ func dumpTimeseriesAllSources( } return nil } + +// GetQueryWorkerMax is used by tests to verify the memory caps. +func (s *Server) GetQueryWorkerMax() int64 { + return s.queryMemoryMax +} + +// GetQueryMemoryMax returns the soft memory limit on all running +// queries. +func (s *Server) GetQueryMemoryMax() int64 { + return s.queryMemoryMax +} diff --git a/pkg/ts/server_test.go b/pkg/ts/server_test.go index 238753dbf6a8..ef6967f463dc 100644 --- a/pkg/ts/server_test.go +++ b/pkg/ts/server_test.go @@ -294,6 +294,33 @@ func TestServerQueryStarvation(t *testing.T) { } } +func TestServerMemoryCap(t *testing.T) { + defer leaktest.AfterTest(t)() + + MiB := 1024 * 1024 + GiB := MiB * 1024 + tcs := []struct { + poolSize int64 + expectedTsDBWorkerMax int64 + }{ + {poolSize: int64(2 * GiB), expectedTsDBWorkerMax: int64(64 * MiB)}, + {poolSize: int64(32 * GiB), expectedTsDBWorkerMax: int64(256 * MiB)}, + {poolSize: int64(48 * GiB), expectedTsDBWorkerMax: int64(256 * MiB)}, + {poolSize: int64(64 * GiB), expectedTsDBWorkerMax: int64(512 * MiB)}, + } + + for _, tc := range tcs { + t.Run(fmt.Sprintf("%d pool should have %d worker max memory", tc.poolSize, tc.expectedTsDBWorkerMax), + func(t *testing.T) { + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{SQLMemoryPoolSize: tc.poolSize}) + defer s.Stopper().Stop(context.Background()) + + tsServer := s.(*server.TestServer).TsServer() + require.Equal(t, tc.expectedTsDBWorkerMax, tsServer.GetQueryWorkerMax()) + }) + } +} + // TestServerQueryMemoryManagement verifies that queries succeed under // constrained memory requirements. func TestServerQueryMemoryManagement(t *testing.T) {