diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go index cbe67745bcc7..c35d6b1c8d07 100644 --- a/pkg/cli/cliflags/flags.go +++ b/pkg/cli/cliflags/flags.go @@ -147,6 +147,21 @@ percentage of physical memory (e.g. .25). If left unspecified, defaults to 25% o physical memory.`, } + TSDBMem = FlagInfo{ + Name: "max-tsdb-memory", + Description: ` +Maximum memory capacity available to store temporary data for use by the +time-series database to display metrics in the DB Console. Accepts numbers +interpreted as bytes, size suffixes (e.g. 1GB and 1GiB) or a +percentage of physical memory (e.g. 0.01). If left unspecified, defaults to +1% of physical memory or 64MiB whichever is greater. It maybe necessary to +manually increase this value on a cluster with hundreds of nodes where +individual nodes have very limited memory available. This can constrain +the ability of the DB Console to process time-series queries used to render +metrics for the entire cluster. This capacity constraint does not affect +SQL query execution.`, + } + SQLTempStorage = FlagInfo{ Name: "max-disk-temp-storage", Description: ` diff --git a/pkg/cli/context.go b/pkg/cli/context.go index 9c79097ffe28..aabdb7a598c2 100644 --- a/pkg/cli/context.go +++ b/pkg/cli/context.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -127,6 +128,16 @@ func setServerContextDefaults() { if bytes, _ := memoryPercentResolver(25); bytes != 0 { serverCfg.SQLConfig.MemoryPoolSize = bytes } + + // Attempt to set serverCfg.TimeSeriesServerConfig.QueryMemoryMax to + // the default (64MiB) or 1% of system memory, whichever is greater. + if bytes, _ := memoryPercentResolver(1); bytes != 0 { + if bytes > ts.DefaultQueryMemoryMax { + serverCfg.TimeSeriesServerConfig.QueryMemoryMax = bytes + } else { + serverCfg.TimeSeriesServerConfig.QueryMemoryMax = ts.DefaultQueryMemoryMax + } + } } // baseCfg points to the base.Config inside serverCfg. diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go index dac02eaa2614..8a5e64b562ba 100644 --- a/pkg/cli/flags.go +++ b/pkg/cli/flags.go @@ -526,6 +526,7 @@ func init() { // Engine flags. varFlag(f, cacheSizeValue, cliflags.Cache) varFlag(f, sqlSizeValue, cliflags.SQLMem) + varFlag(f, tsdbSizeValue, cliflags.TSDBMem) // N.B. diskTempStorageSizeValue.ResolvePercentage() will be called after // the stores flag has been parsed and the storage device that a percentage // refers to becomes known. @@ -986,6 +987,7 @@ func init() { // Engine flags. varFlag(f, sqlSizeValue, cliflags.SQLMem) + varFlag(f, tsdbSizeValue, cliflags.TSDBMem) // N.B. diskTempStorageSizeValue.ResolvePercentage() will be called after // the stores flag has been parsed and the storage device that a percentage // refers to becomes known. diff --git a/pkg/cli/flags_test.go b/pkg/cli/flags_test.go index 74aef5eba115..993b7fc29955 100644 --- a/pkg/cli/flags_test.go +++ b/pkg/cli/flags_test.go @@ -140,51 +140,61 @@ func TestClusterNameFlag(t *testing.T) { } } -func TestSQLMemoryPoolFlagValue(t *testing.T) { +func TestMemoryPoolFlagValues(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - // Avoid leaking configuration changes after the test ends. - defer initCLIDefaults() - - f := startCmd.Flags() - - // Check absolute values. - testCases := []struct { - value string - expected int64 + for _, tc := range []struct { + flag string + config *int64 }{ - {"100MB", 100 * 1000 * 1000}, - {".5GiB", 512 * 1024 * 1024}, - {"1.3", 1}, - } - for _, c := range testCases { - args := []string{"--max-sql-memory", c.value} - if err := f.Parse(args); err != nil { - t.Fatal(err) - } - if c.expected != serverCfg.MemoryPoolSize { - t.Errorf("expected %d, but got %d", c.expected, serverCfg.MemoryPoolSize) - } - } + {flag: "--max-sql-memory", config: &serverCfg.MemoryPoolSize}, + {flag: "--max-tsdb-memory", config: &serverCfg.TimeSeriesServerConfig.QueryMemoryMax}, + } { + t.Run(tc.flag, func(t *testing.T) { + // Avoid leaking configuration changes after the test ends. + defer initCLIDefaults() + + f := startCmd.Flags() + + // Check absolute values. + testCases := []struct { + value string + expected int64 + }{ + {"100MB", 100 * 1000 * 1000}, + {".5GiB", 512 * 1024 * 1024}, + {"1.3", 1}, + } + for _, c := range testCases { + args := []string{tc.flag, c.value} + if err := f.Parse(args); err != nil { + t.Fatal(err) + } + if c.expected != *tc.config { + t.Errorf("expected %d, but got %d", c.expected, tc.config) + } + } - for _, c := range []string{".30", "0.3"} { - args := []string{"--max-sql-memory", c} - if err := f.Parse(args); err != nil { - t.Fatal(err) - } + for _, c := range []string{".30", "0.3"} { + args := []string{tc.flag, c} + if err := f.Parse(args); err != nil { + t.Fatal(err) + } - // Check fractional values. - maxMem, err := status.GetTotalMemory(context.Background()) - if err != nil { - t.Logf("total memory unknown: %v", err) - return - } - expectedLow := (maxMem * 28) / 100 - expectedHigh := (maxMem * 32) / 100 - if serverCfg.MemoryPoolSize < expectedLow || serverCfg.MemoryPoolSize > expectedHigh { - t.Errorf("expected %d-%d, but got %d", expectedLow, expectedHigh, serverCfg.MemoryPoolSize) - } + // Check fractional values. + maxMem, err := status.GetTotalMemory(context.Background()) + if err != nil { + t.Logf("total memory unknown: %v", err) + return + } + expectedLow := (maxMem * 28) / 100 + expectedHigh := (maxMem * 32) / 100 + if *tc.config < expectedLow || *tc.config > expectedHigh { + t.Errorf("expected %d-%d, but got %d", expectedLow, expectedHigh, *tc.config) + } + } + }) } } diff --git a/pkg/cli/start.go b/pkg/cli/start.go index dc993456d1bb..90c371f8a5de 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -185,6 +185,7 @@ func initTraceDir(ctx context.Context, dir string) { var cacheSizeValue = newBytesOrPercentageValue(&serverCfg.CacheSize, memoryPercentResolver) var sqlSizeValue = newBytesOrPercentageValue(&serverCfg.MemoryPoolSize, memoryPercentResolver) var diskTempStorageSizeValue = newBytesOrPercentageValue(nil /* v */, nil /* percentResolver */) +var tsdbSizeValue = newBytesOrPercentageValue(&serverCfg.TimeSeriesServerConfig.QueryMemoryMax, memoryPercentResolver) func initExternalIODir(ctx context.Context, firstStore base.StoreSpec) (string, error) { externalIODir := startCtx.externalIODir @@ -1089,12 +1090,12 @@ func maybeWarnMemorySizes(ctx context.Context) { // Check that the total suggested "max" memory is well below the available memory. if maxMemory, err := status.GetTotalMemory(ctx); err == nil { - requestedMem := serverCfg.CacheSize + serverCfg.MemoryPoolSize + requestedMem := serverCfg.CacheSize + serverCfg.MemoryPoolSize + serverCfg.TimeSeriesServerConfig.QueryMemoryMax maxRecommendedMem := int64(.75 * float64(maxMemory)) if requestedMem > maxRecommendedMem { log.Ops.Shoutf(ctx, severity.WARNING, - "the sum of --max-sql-memory (%s) and --cache (%s) is larger than 75%% of total RAM (%s).\nThis server is running at increased risk of memory-related failures.", - sqlSizeValue, cacheSizeValue, humanizeutil.IBytes(maxRecommendedMem)) + "the sum of --max-sql-memory (%s), --cache (%s), and --max-tsdb-memory (%s) is larger than 75%% of total RAM (%s).\nThis server is running at increased risk of memory-related failures.", + sqlSizeValue, cacheSizeValue, tsdbSizeValue, humanizeutil.IBytes(maxRecommendedMem)) } } } diff --git a/pkg/server/server.go b/pkg/server/server.go index b4ffde2b3344..68951d85dc84 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -553,13 +553,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { cfg.AmbientCtx, st, nodeDialer, grpcServer.Server, stopper, ) - tsDB := ts.NewDB(db, cfg.Settings) - registry.AddMetricStruct(tsDB.Metrics()) - nodeCountFn := func() int64 { - return nodeLiveness.Metrics().LiveNodes.Value() - } - sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper) - ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer) stores := kvserver.NewStores(cfg.AmbientCtx, clock) ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */) @@ -609,6 +602,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { kvMemoryMonitor.Stop(ctx) })) + tsDB := ts.NewDB(db, cfg.Settings) + registry.AddMetricStruct(tsDB.Metrics()) + nodeCountFn := func() int64 { + return nodeLiveness.Metrics().LiveNodes.Value() + } + sTS := ts.MakeServer( + cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, + sqlMonitorAndMetrics.rootSQLMemoryMonitor, stopper, + ) + storeCfg := kvserver.StoreConfig{ DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(), Settings: st, diff --git a/pkg/ts/server.go b/pkg/ts/server.go index 74763e4090d7..2e76d50f42a3 100644 --- a/pkg/ts/server.go +++ b/pkg/ts/server.go @@ -37,10 +37,10 @@ const ( // queryWorkerMax is the default maximum number of worker goroutines that // the time series server can use to service incoming queries. queryWorkerMax = 8 - // queryMemoryMax is a soft limit for the amount of total memory used by - // time series queries. This is not currently enforced, but is used for - // monitoring purposes. - queryMemoryMax = int64(64 * 1024 * 1024) // 64MiB + // DefaultQueryMemoryMax is a soft limit for the amount of total + // memory used by time series queries. This is not currently enforced, + // but is used for monitoring purposes. + DefaultQueryMemoryMax = int64(64 * 1024 * 1024) // 64MiB // dumpBatchSize is the number of keys processed in each batch by the dump // command. dumpBatchSize = 100 @@ -104,50 +104,54 @@ func MakeServer( db *DB, nodeCountFn ClusterNodeCountFn, cfg ServerConfig, + memoryMonitor *mon.BytesMonitor, stopper *stop.Stopper, ) Server { ambient.AddLogTag("ts-srv", nil) + ctx := ambient.AnnotateCtx(context.Background()) // Override default values from configuration. queryWorkerMax := queryWorkerMax if cfg.QueryWorkerMax != 0 { queryWorkerMax = cfg.QueryWorkerMax } - queryMemoryMax := queryMemoryMax - if cfg.QueryMemoryMax != 0 { + queryMemoryMax := DefaultQueryMemoryMax + if cfg.QueryMemoryMax > DefaultQueryMemoryMax { queryMemoryMax = cfg.QueryMemoryMax } workerSem := quotapool.NewIntPool("ts.Server worker", uint64(queryWorkerMax)) stopper.AddCloser(workerSem.Closer("stopper")) - return Server{ + s := Server{ AmbientContext: ambient, db: db, stopper: stopper, nodeCountFn: nodeCountFn, - workerMemMonitor: mon.NewUnlimitedMonitor( - context.Background(), + workerMemMonitor: mon.NewMonitorInheritWithLimit( "timeseries-workers", - mon.MemoryResource, - nil, - nil, - // Begin logging messages if we exceed our planned memory usage by - // more than double. queryMemoryMax*2, - db.st, + memoryMonitor, ), - resultMemMonitor: mon.NewUnlimitedMonitor( - context.Background(), + resultMemMonitor: mon.NewMonitorInheritWithLimit( "timeseries-results", - mon.MemoryResource, - nil, - nil, math.MaxInt64, - db.st, + memoryMonitor, ), queryMemoryMax: queryMemoryMax, queryWorkerMax: queryWorkerMax, workerSem: workerSem, } + + s.workerMemMonitor.Start(ctx, memoryMonitor, mon.BoundAccount{}) + stopper.AddCloser(stop.CloserFn(func() { + s.workerMemMonitor.Stop(ctx) + })) + + s.resultMemMonitor.Start(ambient.AnnotateCtx(context.Background()), memoryMonitor, mon.BoundAccount{}) + stopper.AddCloser(stop.CloserFn(func() { + s.resultMemMonitor.Stop(ctx) + })) + + return s } // RegisterService registers the GRPC service.