From 844c727c9fe128a96043f3c9da86761a591c0022 Mon Sep 17 00:00:00 2001 From: Lorenzo Affetti Date: Tue, 31 Mar 2020 10:52:55 +0200 Subject: [PATCH] test(launcher): e2e test memory limits --- cmd/influxd/launcher/launcher.go | 58 ++++-- cmd/influxd/launcher/launcher_helpers.go | 11 +- cmd/influxd/launcher/query_test.go | 222 ++++++++++++++++++++--- query/control/controller.go | 12 +- query/control/memory.go | 26 ++- 5 files changed, 280 insertions(+), 49 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 45c26e81f83..b56edd1d573 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "math" "net" nethttp "net/http" _ "net/http/pprof" // needed to add pprof to our binary. @@ -279,6 +278,36 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) { Default: false, Desc: "disables the task scheduler", }, + { + DestP: &l.concurrencyQuota, + Flag: "query-concurrency", + Default: 10, + Desc: "the number of queries that are allowed to execute concurrently", + }, + { + DestP: &l.initialMemoryBytesQuotaPerQuery, + Flag: "query-initial-memory-bytes", + Default: 0, + Desc: "the initial number of bytes allocated for a query when it is started. If this is unset, then query-memory-bytes will be used", + }, + { + DestP: &l.memoryBytesQuotaPerQuery, + Flag: "query-memory-bytes", + Default: 10 * 1024 * 1024, // 10MB + Desc: "maximum number of bytes a query is allowed to use at any given time. This must be greater or equal to query-initial-memory-bytes", + }, + { + DestP: &l.maxMemoryBytes, + Flag: "query-max-memory-bytes", + Default: 0, + Desc: "the maximum amount of memory used for queries. If this is unset, then this number is query-concurrency * query-memory-bytes", + }, + { + DestP: &l.queueSize, + Flag: "query-queue-size", + Default: 10, + Desc: "the number of queries that are allowed to be awaiting execution before new queries are rejected", + }, } cli.BindOptions(cmd, opts) @@ -309,6 +338,13 @@ type Launcher struct { enableNewMetaStore bool newMetaStoreReadOnly bool + // Query options. + concurrencyQuota int + initialMemoryBytesQuotaPerQuery int + memoryBytesQuotaPerQuery int + maxMemoryBytes int + queueSize int + boltClient *bolt.Client kvStore kv.Store kvService *kv.Service @@ -642,14 +678,6 @@ func (m *Launcher) run(ctx context.Context) (err error) { backupService platform.BackupService = m.engine ) - // TODO(cwolff): Figure out a good default per-query memory limit: - // https://github.com/influxdata/influxdb/issues/13642 - const ( - concurrencyQuota = 10 - memoryBytesQuotaPerQuery = math.MaxInt64 - QueueSize = 10 - ) - deps, err := influxdb.NewDependencies( storageflux.NewReader(readservice.NewStore(m.engine)), m.engine, @@ -664,11 +692,13 @@ func (m *Launcher) run(ctx context.Context) (err error) { } m.queryController, err = control.New(control.Config{ - ConcurrencyQuota: concurrencyQuota, - MemoryBytesQuotaPerQuery: int64(memoryBytesQuotaPerQuery), - QueueSize: QueueSize, - Logger: m.log.With(zap.String("service", "storage-reads")), - ExecutorDependencies: []flux.Dependency{deps}, + ConcurrencyQuota: m.concurrencyQuota, + InitialMemoryBytesQuotaPerQuery: int64(m.initialMemoryBytesQuotaPerQuery), + MemoryBytesQuotaPerQuery: int64(m.memoryBytesQuotaPerQuery), + MaxMemoryBytes: int64(m.maxMemoryBytes), + QueueSize: m.queueSize, + Logger: m.log.With(zap.String("service", "storage-reads")), + ExecutorDependencies: []flux.Dependency{deps}, }) if err != nil { m.log.Error("Failed to create query controller", zap.Error(err)) diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index 6cd12bde0ee..4f705f0dbef 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -83,7 +83,16 @@ func (tl *TestLauncher) Run(ctx context.Context, args ...string) error { args = append(args, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename)) args = append(args, "--engine-path", filepath.Join(tl.Path, "engine")) args = append(args, "--http-bind-address", "127.0.0.1:0") - args = append(args, "--log-level", "debug") + logLevel := false + for _, arg := range args { + if arg == "--log-level" { + logLevel = true + break + } + } + if !logLevel { + args = append(args, "--log-level", "debug") + } return tl.Launcher.Run(ctx, args...) } diff --git a/cmd/influxd/launcher/query_test.go b/cmd/influxd/launcher/query_test.go index 37a77612d41..5c611e443f7 100644 --- a/cmd/influxd/launcher/query_test.go +++ b/cmd/influxd/launcher/query_test.go @@ -8,6 +8,7 @@ import ( "io" nethttp "net/http" "strings" + "sync" "testing" "time" @@ -19,6 +20,7 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/cmd/influxd/launcher" phttp "github.com/influxdata/influxdb/http" + "github.com/influxdata/influxdb/kit/prom" "github.com/influxdata/influxdb/query" ) @@ -105,28 +107,53 @@ func TestPipeline_WriteV2_Query(t *testing.T) { res.HasTableCount(t, 1) } -// This test initializes a default launcher; writes some data; queries the data (success); -// sets memory limits to the same read query; checks that the query fails because limits are exceeded. -func TestPipeline_QueryMemoryLimits(t *testing.T) { - t.Skip("setting memory limits in the client is not implemented yet") +func getMemoryUnused(t *testing.T, reg *prom.Registry) int64 { + t.Helper() - l := launcher.RunTestLauncherOrFail(t, ctx) - l.SetupOrFail(t) - defer l.ShutdownOrFail(t, ctx) + ms, err := reg.Gather() + if err != nil { + t.Fatal(err) + } + for _, m := range ms { + if m.GetName() == "query_control_memory_unused_bytes" { + return int64(*m.GetMetric()[0].Gauge.Value) + } + } + t.Errorf("query metric for unused memory not found") + return 0 +} - // write some points - for i := 0; i < 100; i++ { - l.WritePointsOrFail(t, fmt.Sprintf(`m,k=v1 f=%di %d`, i*100, time.Now().UnixNano())) +func writeBytes(t *testing.T, l *launcher.TestLauncher, tagValue string, bs int) int { + // every point is: + // 1 byte measurement ("m") + // + 2 bytes tag ("v1") + // + 64 bytes field + // + 64 bytes timestamp + // --------------------------- + // = 131 bytes + if bs < 131 { + bs = 131 + } + n := bs / 131 + if n*131 < bs { + n++ } + for i := 0; i < n; i++ { + l.WritePointsOrFail(t, fmt.Sprintf(`m,t=%s f=%di %d`, tagValue, i*100, time.Now().UnixNano())) + } + return n * 131 +} - // compile a from query and get the spec - qs := fmt.Sprintf(`from(bucket:"%s") |> range(start:-5m)`, l.Bucket.Name) +func queryPoints(t *testing.T, l *launcher.TestLauncher, tagValue string) error { + filterExpression := fmt.Sprintf("r.t == \"%s\"", tagValue) + if tagValue == "" { + filterExpression = "true" + } + qs := fmt.Sprintf(`from(bucket:"%s") |> range(start:-5m) |> filter(fn: (r) => %s)`, l.Bucket.Name, filterExpression) pkg, err := flux.Parse(qs) if err != nil { t.Fatal(err) } - - // we expect this request to succeed req := &query.Request{ Authorization: l.Auth, OrganizationID: l.Org.ID, @@ -134,23 +161,166 @@ func TestPipeline_QueryMemoryLimits(t *testing.T) { AST: pkg, }, } - if err := l.QueryAndNopConsume(context.Background(), req); err != nil { - t.Fatal(err) + return l.QueryAndNopConsume(context.Background(), req) +} + +// This test: +// - initializes a default launcher and sets memory limits; +// - writes some data; +// - queries the data; +// - verifies that the query fails (or not) and that the memory was de-allocated. +func TestPipeline_QueryMemoryLimits(t *testing.T) { + tcs := []struct { + name string + args []string + err bool + querySizeBytes int + // max_memory - per_query_memory * concurrency + unusedMemoryBytes int + }{ + { + name: "ok - initial memory bytes, memory bytes, and max memory set", + args: []string{ + "--query-concurrency", "1", + "--query-initial-memory-bytes", "100", + "--query-max-memory-bytes", "1048576", // 1MB + }, + querySizeBytes: 30000, + err: false, + unusedMemoryBytes: 1048476, + }, + { + name: "error - memory bytes and max memory set", + args: []string{ + "--query-concurrency", "1", + "--query-memory-bytes", "1", + "--query-max-memory-bytes", "100", + }, + querySizeBytes: 2, + err: true, + unusedMemoryBytes: 99, + }, + { + name: "error - initial memory bytes and max memory set", + args: []string{ + "--query-concurrency", "1", + "--query-initial-memory-bytes", "1", + "--query-max-memory-bytes", "100", + }, + querySizeBytes: 101, + err: true, + unusedMemoryBytes: 99, + }, + { + name: "error - initial memory bytes, memory bytes, and max memory set", + args: []string{ + "--query-concurrency", "1", + "--query-initial-memory-bytes", "1", + "--query-memory-bytes", "50", + "--query-max-memory-bytes", "100", + }, + querySizeBytes: 51, + err: true, + unusedMemoryBytes: 99, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + l := launcher.RunTestLauncherOrFail(t, ctx, tc.args...) + l.SetupOrFail(t) + defer l.ShutdownOrFail(t, ctx) + + const tagValue = "t0" + writeBytes(t, l, tagValue, tc.querySizeBytes) + if err := queryPoints(t, l, tagValue); err != nil { + if tc.err { + if !strings.Contains(err.Error(), "allocation limit reached") { + t.Errorf("query errored with unexpected error: %v", err) + } + } else { + t.Errorf("unexpected error: %v", err) + } + } else if tc.err { + t.Errorf("expected error, got successful query execution") + } + + reg := l.Registry() + got := getMemoryUnused(t, reg) + want := int64(tc.unusedMemoryBytes) + if want != got { + t.Errorf("expected unused memory %d, got %d", want, got) + } + }) } +} - // ok, the first request went well, let's add memory limits: - // this query should error. - // spec.Resources = flux.ResourceManagement{ - // MemoryBytesQuota: 100, - // } +// This test: +// - initializes a default launcher and sets memory limits; +// - writes some data; +// - launches several queries that may exceed the memory limit; +// - verifies after each query run the used memory. +func TestPipeline_QueryMemoryLimits_MultipleQueries(t *testing.T) { + l := launcher.RunTestLauncherOrFail(t, ctx, + "--log-level", "error", + "--query-queue-size", "1024", + "--query-concurrency", "1", + "--query-initial-memory-bytes", "100", + "--query-memory-bytes", "50000", + "--query-max-memory-bytes", "200000", + ) + l.SetupOrFail(t) + defer l.ShutdownOrFail(t, ctx) - if err := l.QueryAndNopConsume(context.Background(), req); err != nil { - if !strings.Contains(err.Error(), "allocation limit reached") { - t.Fatalf("query errored with unexpected error: %v", err) + // one tag does not exceed memory. + const tOK = "t0" + writeBytes(t, l, tOK, 40000) + // the other does. + const tKO = "t1" + writeBytes(t, l, tKO, 100000) + + checkMemoryUsed := func() { + t.Helper() + // base memory used is equal to initial memory bytes * concurrency. + const baseMemoryUsed = int64(100) + got := l.QueryController().GetUsedMemoryBytes() + if baseMemoryUsed != got { + t.Errorf("expected unused memory %d, got %d", baseMemoryUsed, got) } - } else { - t.Fatal("expected error, got successful query execution") } + + runQueries := func(nOK, nKO int) { + // This flock of queries should run sequentially because concurrency quota is set to 1 + wg := sync.WaitGroup{} + wg.Add(nOK + nKO) + for i := 0; i < nOK; i++ { + go func(idx int) { + t.Logf("running query %d - OK", idx) + if err := queryPoints(t, l, tOK); err != nil { + t.Errorf("unexpected error: %v", err) + } + checkMemoryUsed() + wg.Done() + }(i) + } + for i := 0; i < nKO; i++ { + go func(idx int) { + t.Logf("running query %d - KO", idx) + if err := queryPoints(t, l, tKO); err == nil { + t.Errorf("expected error got none") + } else if !strings.Contains(err.Error(), "allocation limit reached") { + t.Errorf("got wrong error: %v", err) + } + checkMemoryUsed() + wg.Done() + }(i) + } + wg.Wait() + } + + runQueries(100, 0) + //runQueries(5, 5) + //runQueries(0, 50) } func TestPipeline_Query_LoadSecret_Success(t *testing.T) { diff --git a/query/control/controller.go b/query/control/controller.go index 3c30848c5f3..45a1ea29833 100644 --- a/query/control/controller.go +++ b/query/control/controller.go @@ -47,6 +47,7 @@ const orgLabel = "org" // Controller provides a central location to manage all incoming queries. // The controller is responsible for compiling, queueing, and executing queries. type Controller struct { + config Config lastID uint64 queriesMu sync.RWMutex queries map[QueryID]*Query @@ -173,6 +174,7 @@ func New(config Config) (*Controller, error) { mm.unlimited = true } ctrl := &Controller{ + config: c, queries: make(map[QueryID]*Query), queryQueue: make(chan *Query, c.QueueSize), done: make(chan struct{}), @@ -508,6 +510,14 @@ func (c *Controller) PrometheusCollectors() []prometheus.Collector { return collectors } +func (c *Controller) GetUnusedMemoryBytes() int64 { + return c.memory.getUnusedMemoryBytes() +} + +func (c *Controller) GetUsedMemoryBytes() int64 { + return c.config.MaxMemoryBytes - c.GetUnusedMemoryBytes() +} + // Query represents a single request. type Query struct { id QueryID @@ -564,7 +574,7 @@ func (q *Query) Results() <-chan flux.Result { } func (q *Query) recordUnusedMemory() { - unused := q.memoryManager.getUnusedMemoryBytes() + unused := q.c.GetUnusedMemoryBytes() q.c.metrics.memoryUnused.WithLabelValues(q.labelValues...).Set(float64(unused)) } diff --git a/query/control/memory.go b/query/control/memory.go index 283cbbf0bb6..bf1635b1f76 100644 --- a/query/control/memory.go +++ b/query/control/memory.go @@ -28,6 +28,22 @@ type memoryManager struct { unlimited bool } +func (m *memoryManager) getUnusedMemoryBytes() int64 { + return atomic.LoadInt64(&m.unusedMemoryBytes) +} + +func (m *memoryManager) getUsedMemoryBytes() int64 { + return atomic.LoadInt64(&m.unusedMemoryBytes) +} + +func (m *memoryManager) trySetUnusedMemoryBytes(old, new int64) bool { + return atomic.CompareAndSwapInt64(&m.unusedMemoryBytes, old, new) +} + +func (m *memoryManager) addUnusedMemoryBytes(amount int64) int64 { + return atomic.AddInt64(&m.unusedMemoryBytes, amount) +} + // createAllocator will construct an allocator and memory manager // for the given query. func (c *Controller) createAllocator(q *Query) { @@ -49,10 +65,6 @@ type queryMemoryManager struct { given int64 } -func (q *queryMemoryManager) getUnusedMemoryBytes() int64 { - return atomic.LoadInt64(&q.m.unusedMemoryBytes) -} - // RequestMemory will determine if the query can be given more memory // when it is requested. // @@ -72,7 +84,7 @@ func (q *queryMemoryManager) RequestMemory(want int64) (got int64, err error) { for { unused := int64(math.MaxInt64) if !q.m.unlimited { - unused = atomic.LoadInt64(&q.m.unusedMemoryBytes) + unused = q.m.getUnusedMemoryBytes() if unused < want { // We do not have the capacity for this query to // be given more memory. @@ -88,7 +100,7 @@ func (q *queryMemoryManager) RequestMemory(want int64) (got int64, err error) { // Reserve this memory for our own use. if !q.m.unlimited { - if !atomic.CompareAndSwapInt64(&q.m.unusedMemoryBytes, unused, unused-given) { + if !q.m.trySetUnusedMemoryBytes(unused, unused-given) { // The unused value has changed so someone may have taken // the memory that we wanted. Retry. continue @@ -142,7 +154,7 @@ func (q *queryMemoryManager) FreeMemory(bytes int64) { // memory manager. func (q *queryMemoryManager) Release() { if !q.m.unlimited { - atomic.AddInt64(&q.m.unusedMemoryBytes, q.given) + q.m.addUnusedMemoryBytes(q.given) } q.limit = q.m.initialBytesQuotaPerQuery q.given = 0