From 13fc4f335deb419473d6a54b7d740a6bea87c3d8 Mon Sep 17 00:00:00 2001 From: Lorenzo Affetti Date: Tue, 31 Mar 2020 10:52:55 +0200 Subject: [PATCH] test(launcher): e2e test memory limits --- cmd/influxd/launcher/launcher.go | 58 +++- cmd/influxd/launcher/launcher_helpers.go | 13 +- cmd/influxd/launcher/query_test.go | 399 +++++++++++++++++++++-- http/document_test.go | 2 +- query/control/controller.go | 12 +- query/control/memory.go | 22 +- 6 files changed, 448 insertions(+), 58 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 45c26e81f83..b56edd1d573 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "math" "net" nethttp "net/http" _ "net/http/pprof" // needed to add pprof to our binary. @@ -279,6 +278,36 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) { Default: false, Desc: "disables the task scheduler", }, + { + DestP: &l.concurrencyQuota, + Flag: "query-concurrency", + Default: 10, + Desc: "the number of queries that are allowed to execute concurrently", + }, + { + DestP: &l.initialMemoryBytesQuotaPerQuery, + Flag: "query-initial-memory-bytes", + Default: 0, + Desc: "the initial number of bytes allocated for a query when it is started. If this is unset, then query-memory-bytes will be used", + }, + { + DestP: &l.memoryBytesQuotaPerQuery, + Flag: "query-memory-bytes", + Default: 10 * 1024 * 1024, // 10MB + Desc: "maximum number of bytes a query is allowed to use at any given time. This must be greater or equal to query-initial-memory-bytes", + }, + { + DestP: &l.maxMemoryBytes, + Flag: "query-max-memory-bytes", + Default: 0, + Desc: "the maximum amount of memory used for queries. If this is unset, then this number is query-concurrency * query-memory-bytes", + }, + { + DestP: &l.queueSize, + Flag: "query-queue-size", + Default: 10, + Desc: "the number of queries that are allowed to be awaiting execution before new queries are rejected", + }, } cli.BindOptions(cmd, opts) @@ -309,6 +338,13 @@ type Launcher struct { enableNewMetaStore bool newMetaStoreReadOnly bool + // Query options. + concurrencyQuota int + initialMemoryBytesQuotaPerQuery int + memoryBytesQuotaPerQuery int + maxMemoryBytes int + queueSize int + boltClient *bolt.Client kvStore kv.Store kvService *kv.Service @@ -642,14 +678,6 @@ func (m *Launcher) run(ctx context.Context) (err error) { backupService platform.BackupService = m.engine ) - // TODO(cwolff): Figure out a good default per-query memory limit: - // https://github.com/influxdata/influxdb/issues/13642 - const ( - concurrencyQuota = 10 - memoryBytesQuotaPerQuery = math.MaxInt64 - QueueSize = 10 - ) - deps, err := influxdb.NewDependencies( storageflux.NewReader(readservice.NewStore(m.engine)), m.engine, @@ -664,11 +692,13 @@ func (m *Launcher) run(ctx context.Context) (err error) { } m.queryController, err = control.New(control.Config{ - ConcurrencyQuota: concurrencyQuota, - MemoryBytesQuotaPerQuery: int64(memoryBytesQuotaPerQuery), - QueueSize: QueueSize, - Logger: m.log.With(zap.String("service", "storage-reads")), - ExecutorDependencies: []flux.Dependency{deps}, + ConcurrencyQuota: m.concurrencyQuota, + InitialMemoryBytesQuotaPerQuery: int64(m.initialMemoryBytesQuotaPerQuery), + MemoryBytesQuotaPerQuery: int64(m.memoryBytesQuotaPerQuery), + MaxMemoryBytes: int64(m.maxMemoryBytes), + QueueSize: m.queueSize, + Logger: m.log.With(zap.String("service", "storage-reads")), + ExecutorDependencies: []flux.Dependency{deps}, }) if err != nil { m.log.Error("Failed to create query controller", zap.Error(err)) diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index 6559adc464b..153cca789b9 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -78,12 +78,15 @@ func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, args ...string) * } // Run executes the program with additional arguments to set paths and ports. +// Passed arguments will overwrite/add to the default ones. func (tl *TestLauncher) Run(ctx context.Context, args ...string) error { - args = append(args, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename)) - args = append(args, "--engine-path", filepath.Join(tl.Path, "engine")) - args = append(args, "--http-bind-address", "127.0.0.1:0") - args = append(args, "--log-level", "debug") - return tl.Launcher.Run(ctx, args...) + largs := make([]string, 0, len(args)+8) + largs = append(largs, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename)) + largs = append(largs, "--engine-path", filepath.Join(tl.Path, "engine")) + largs = append(largs, "--http-bind-address", "127.0.0.1:0") + largs = append(largs, "--log-level", "debug") + largs = append(largs, args...) + return tl.Launcher.Run(ctx, largs...) } // Shutdown stops the program and cleans up temporary paths. diff --git a/cmd/influxd/launcher/query_test.go b/cmd/influxd/launcher/query_test.go index 37a77612d41..03d6ce0f1f5 100644 --- a/cmd/influxd/launcher/query_test.go +++ b/cmd/influxd/launcher/query_test.go @@ -5,9 +5,12 @@ import ( "context" "errors" "fmt" + "html/template" "io" + "math/rand" nethttp "net/http" "strings" + "sync" "testing" "time" @@ -19,10 +22,11 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/cmd/influxd/launcher" phttp "github.com/influxdata/influxdb/http" + "github.com/influxdata/influxdb/kit/prom" "github.com/influxdata/influxdb/query" ) -func TestPipeline_Write_Query_FieldKey(t *testing.T) { +func TestLauncher_Write_Query_FieldKey(t *testing.T) { be := launcher.RunTestLauncherOrFail(t, ctx) be.SetupOrFail(t) defer be.ShutdownOrFail(t, ctx) @@ -68,7 +72,7 @@ mem,server=b value=45.2`)) // This test initialises a default launcher writes some data, // and checks that the queried results contain the expected number of tables // and expected number of columns. -func TestPipeline_WriteV2_Query(t *testing.T) { +func TestLauncher_WriteV2_Query(t *testing.T) { be := launcher.RunTestLauncherOrFail(t, ctx) be.SetupOrFail(t) defer be.ShutdownOrFail(t, ctx) @@ -105,28 +109,116 @@ func TestPipeline_WriteV2_Query(t *testing.T) { res.HasTableCount(t, 1) } -// This test initializes a default launcher; writes some data; queries the data (success); -// sets memory limits to the same read query; checks that the query fails because limits are exceeded. -func TestPipeline_QueryMemoryLimits(t *testing.T) { - t.Skip("setting memory limits in the client is not implemented yet") +func getMemoryUnused(t *testing.T, reg *prom.Registry) int64 { + t.Helper() - l := launcher.RunTestLauncherOrFail(t, ctx) - l.SetupOrFail(t) - defer l.ShutdownOrFail(t, ctx) + ms, err := reg.Gather() + if err != nil { + t.Fatal(err) + } + for _, m := range ms { + if m.GetName() == "query_control_memory_unused_bytes" { + return int64(*m.GetMetric()[0].Gauge.Value) + } + } + t.Errorf("query metric for unused memory not found") + return 0 +} - // write some points - for i := 0; i < 100; i++ { - l.WritePointsOrFail(t, fmt.Sprintf(`m,k=v1 f=%di %d`, i*100, time.Now().UnixNano())) +func checkMemoryUsed(t *testing.T, l *launcher.TestLauncher, concurrency, initial int) { + t.Helper() + + got := l.QueryController().GetUsedMemoryBytes() + // base memory used is equal to initial memory bytes * concurrency. + if want := int64(concurrency * initial); want != got { + t.Errorf("expected used memory %d, got %d", want, got) } +} - // compile a from query and get the spec - qs := fmt.Sprintf(`from(bucket:"%s") |> range(start:-5m)`, l.Bucket.Name) +func writeBytes(t *testing.T, l *launcher.TestLauncher, tagValue string, bs int) int { + // When represented in Flux, every point is: + // 1 byte _measurement ("m") + // + 1 byte _field ("f") + // + 8 bytes _value + // + len(tagValue) bytes + // + 8 bytes _time + // + 8 bytes _start + // + 8 bytes _stop + // --------------------------- + // = 34 + len(tag) bytes + pointSize := 34 + len(tagValue) + if bs < pointSize { + bs = pointSize + } + n := bs / pointSize + if n*pointSize < bs { + n++ + } + sb := strings.Builder{} + for i := 0; i < n; i++ { + sb.WriteString(fmt.Sprintf(`m,t=%s f=%di %d`, tagValue, i*100, time.Now().UnixNano())) + sb.WriteRune('\n') + } + l.WritePointsOrFail(t, sb.String()) + return n * pointSize +} + +type data struct { + Bucket string + TagValue string + Sleep string + verbose bool +} + +type queryOption func(d *data) + +func withTagValue(tv string) queryOption { + return func(d *data) { + d.TagValue = tv + } +} + +func withSleep(s time.Duration) queryOption { + return func(d *data) { + d.Sleep = flux.ConvertDuration(s).String() + } +} + +func queryPoints(ctx context.Context, t *testing.T, l *launcher.TestLauncher, opts ...queryOption) error { + d := &data{ + Bucket: l.Bucket.Name, + } + for _, opt := range opts { + opt(d) + } + tmpls := `from(bucket: "{{ .Bucket }}") + |> range(start:-5m) + {{- if .TagValue }} + // this must be pushed down to avoid unnecessary memory allocations. + |> filter(fn: (r) => r.t == "{{ .TagValue }}") + {{- end}} + // ensure we load everything into memory. + |> sort(columns: ["_time"]) + {{- if .Sleep }} + // now that you have everything in memory, you can sleep. + |> sleep(duration: {{ .Sleep }}) + {{- end}}` + tmpl, err := template.New("test-query").Parse(tmpls) + if err != nil { + return err + } + bs := new(bytes.Buffer) + if err := tmpl.Execute(bs, d); err != nil { + return err + } + qs := bs.String() + if d.verbose { + t.Logf("query:\n%s", qs) + } pkg, err := flux.Parse(qs) if err != nil { t.Fatal(err) } - - // we expect this request to succeed req := &query.Request{ Authorization: l.Auth, OrganizationID: l.Org.ID, @@ -134,26 +226,273 @@ func TestPipeline_QueryMemoryLimits(t *testing.T) { AST: pkg, }, } - if err := l.QueryAndNopConsume(context.Background(), req); err != nil { - t.Fatal(err) + return l.QueryAndNopConsume(ctx, req) +} + +// This test: +// - initializes a default launcher and sets memory limits; +// - writes some data; +// - queries the data; +// - verifies that the query fails (or not) and that the memory was de-allocated. +func TestLauncher_QueryMemoryLimits(t *testing.T) { + tcs := []struct { + name string + args []string + err bool + querySizeBytes int + // max_memory - per_query_memory * concurrency + unusedMemoryBytes int + }{ + { + name: "ok - initial memory bytes, memory bytes, and max memory set", + args: []string{ + "--query-concurrency", "1", + "--query-initial-memory-bytes", "100", + "--query-max-memory-bytes", "1048576", // 1MB + }, + querySizeBytes: 30000, + err: false, + unusedMemoryBytes: 1048476, + }, + { + name: "error - memory bytes and max memory set", + args: []string{ + "--query-concurrency", "1", + "--query-memory-bytes", "1", + "--query-max-memory-bytes", "100", + }, + querySizeBytes: 2, + err: true, + unusedMemoryBytes: 99, + }, + { + name: "error - initial memory bytes and max memory set", + args: []string{ + "--query-concurrency", "1", + "--query-initial-memory-bytes", "1", + "--query-max-memory-bytes", "100", + }, + querySizeBytes: 101, + err: true, + unusedMemoryBytes: 99, + }, + { + name: "error - initial memory bytes, memory bytes, and max memory set", + args: []string{ + "--query-concurrency", "1", + "--query-initial-memory-bytes", "1", + "--query-memory-bytes", "50", + "--query-max-memory-bytes", "100", + }, + querySizeBytes: 51, + err: true, + unusedMemoryBytes: 99, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + l := launcher.RunTestLauncherOrFail(t, ctx, tc.args...) + l.SetupOrFail(t) + defer l.ShutdownOrFail(t, ctx) + + const tagValue = "t0" + writeBytes(t, l, tagValue, tc.querySizeBytes) + if err := queryPoints(context.Background(), t, l, withTagValue(tagValue)); err != nil { + if tc.err { + if !strings.Contains(err.Error(), "allocation limit reached") { + t.Errorf("query errored with unexpected error: %v", err) + } + } else { + t.Errorf("unexpected error: %v", err) + } + } else if tc.err { + t.Errorf("expected error, got successful query execution") + } + + reg := l.Registry() + got := getMemoryUnused(t, reg) + want := int64(tc.unusedMemoryBytes) + if want != got { + t.Errorf("expected unused memory %d, got %d", want, got) + } + }) } +} + +// This test: +// - initializes a default launcher and sets memory limits; +// - writes some data; +// - launches a query that does not error; +// - launches a query that gets canceled while executing; +// - launches a query that does not error; +// - verifies after each query run the used memory. +func TestLauncher_QueryMemoryManager_ExceedMemory(t *testing.T) { + t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\" on OK query") + + l := launcher.RunTestLauncherOrFail(t, ctx, + "--log-level", "error", + "--query-concurrency", "1", + "--query-initial-memory-bytes", "100", + "--query-memory-bytes", "50000", + "--query-max-memory-bytes", "200000", + ) + l.SetupOrFail(t) + defer l.ShutdownOrFail(t, ctx) - // ok, the first request went well, let's add memory limits: - // this query should error. - // spec.Resources = flux.ResourceManagement{ - // MemoryBytesQuota: 100, - // } + // One tag does not exceed memory. + const tOK = "t0" + writeBytes(t, l, tOK, 10000) + // The other does. + const tKO = "t1" + writeBytes(t, l, tKO, 50001) - if err := l.QueryAndNopConsume(context.Background(), req); err != nil { + if err := queryPoints(context.Background(), t, l, withTagValue(tOK)); err != nil { + t.Errorf("unexpected error: %v", err) + } + checkMemoryUsed(t, l, 1, 100) + if err := queryPoints(context.Background(), t, l, withTagValue(tKO)); err != nil { if !strings.Contains(err.Error(), "allocation limit reached") { - t.Fatalf("query errored with unexpected error: %v", err) + t.Errorf("query errored with unexpected error: %v", err) } } else { - t.Fatal("expected error, got successful query execution") + t.Errorf("unexpected error: %v", err) + } + checkMemoryUsed(t, l, 1, 100) + if err := queryPoints(context.Background(), t, l, withTagValue(tOK)); err != nil { + t.Errorf("unexpected error: %v", err) + } + checkMemoryUsed(t, l, 1, 100) +} + +// This test: +// - initializes a default launcher and sets memory limits; +// - writes some data; +// - launches a query that does not error; +// - launches a query and cancels its context; +// - launches a query that does not error; +// - verifies after each query run the used memory. +func TestLauncher_QueryMemoryManager_ContextCanceled(t *testing.T) { + t.Skip("this test is flaky, occasionally get error: \"memory allocation limit reached\"") + + l := launcher.RunTestLauncherOrFail(t, ctx, + "--log-level", "error", + "--query-concurrency", "1", + "--query-initial-memory-bytes", "100", + "--query-memory-bytes", "50000", + "--query-max-memory-bytes", "200000", + ) + l.SetupOrFail(t) + defer l.ShutdownOrFail(t, ctx) + + const tag = "t0" + writeBytes(t, l, tag, 10000) + + if err := queryPoints(context.Background(), t, l, withTagValue(tag)); err != nil { + t.Errorf("unexpected error: %v", err) + } + checkMemoryUsed(t, l, 1, 100) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := queryPoints(ctx, t, l, withSleep(4*time.Second)); err == nil { + t.Errorf("expected error got none") + } + checkMemoryUsed(t, l, 1, 100) + if err := queryPoints(context.Background(), t, l, withTagValue(tag)); err != nil { + t.Errorf("unexpected error: %v", err) + } + checkMemoryUsed(t, l, 1, 100) +} + +// This test: +// - initializes a default launcher and sets memory limits; +// - writes some data; +// - launches (concurrently) a mixture of +// - OK queries; +// - queries that exceed the memory limit; +// - queries that get canceled; +// - verifies the used memory. +// Concurrency limit is set to 1, so only 1 query runs at a time and the others are queued. +// OK queries do not overcome the soft limit, so that they can run concurrently with the ones that exceed limits. +// The aim of this test is to verify that memory tracking works properly in the controller, +// even in the case of concurrent/queued queries. +func TestLauncher_QueryMemoryManager_ConcurrentQueries(t *testing.T) { + t.Skip("this test is flaky, occasionally get error: \"dial tcp 127.0.0.1:59654: connect: connection reset by peer\"") + + l := launcher.RunTestLauncherOrFail(t, ctx, + "--log-level", "error", + "--query-queue-size", "1024", + "--query-concurrency", "1", + "--query-initial-memory-bytes", "10000", + "--query-memory-bytes", "50000", + "--query-max-memory-bytes", "200000", + ) + l.SetupOrFail(t) + defer l.ShutdownOrFail(t, ctx) + + // One tag does not exceed memory. + // The size is below the soft limit, so that querying this bucket never fail. + const tSmall = "t0" + writeBytes(t, l, tSmall, 9000) + // The other exceeds memory per query. + const tBig = "t1" + writeBytes(t, l, tBig, 100000) + + const nOK = 100 + const nMemExceeded = 100 + const nContextCanceled = 100 + nTotalQueries := nOK + nMemExceeded + nContextCanceled + + // In order to increase the variety of the load, store and shuffle queries. + qs := make([]func(), 0, nTotalQueries) + // Flock of OK queries. + for i := 0; i < nOK; i++ { + qs = append(qs, func() { + if err := queryPoints(context.Background(), t, l, withTagValue(tSmall)); err != nil { + t.Errorf("unexpected error (ok-query %d): %v", i, err) + } + }) + } + // Flock of big queries. + for i := 0; i < nMemExceeded; i++ { + qs = append(qs, func() { + if err := queryPoints(context.Background(), t, l, withTagValue(tBig)); err == nil { + t.Errorf("expected error got none (high-memory-query %d)", i) + } else if !strings.Contains(err.Error(), "allocation limit reached") { + t.Errorf("got wrong error (high-memory-query %d): %v", i, err) + } + }) + } + // Flock of context canceled queries. + for i := 0; i < nContextCanceled; i++ { + qs = append(qs, func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := queryPoints(ctx, t, l, withTagValue(tSmall), withSleep(4*time.Second)); err == nil { + t.Errorf("expected error got none (context-canceled-query %d)", i) + } else if !strings.Contains(err.Error(), "context") { + t.Errorf("got wrong error (context-canceled-query %d): %v", i, err) + } + }) + } + rand.Shuffle(len(qs), func(i, j int) { qs[i], qs[j] = qs[j], qs[i] }) + + wg := sync.WaitGroup{} + wg.Add(nTotalQueries) + for i, q := range qs { + qs[i] = func() { + defer wg.Done() + q() + } + } + for _, q := range qs { + go q() } + wg.Wait() + checkMemoryUsed(t, l, 1, 10000) } -func TestPipeline_Query_LoadSecret_Success(t *testing.T) { +func TestLauncher_Query_LoadSecret_Success(t *testing.T) { l := launcher.RunTestLauncherOrFail(t, ctx) l.SetupOrFail(t) defer l.ShutdownOrFail(t, ctx) @@ -203,7 +542,7 @@ from(bucket: "%s") } } -func TestPipeline_Query_LoadSecret_Forbidden(t *testing.T) { +func TestLauncher_Query_LoadSecret_Forbidden(t *testing.T) { l := launcher.RunTestLauncherOrFail(t, ctx) l.SetupOrFail(t) defer l.ShutdownOrFail(t, ctx) @@ -262,7 +601,7 @@ from(bucket: "%s") // written, and tableFind would complain not finding the tables. // This will change once we make side effects drive execution and remove from/to concurrency in our e2e tests. // See https://github.com/influxdata/flux/issues/1799. -func TestPipeline_DynamicQuery(t *testing.T) { +func TestLauncher_DynamicQuery(t *testing.T) { l := launcher.RunTestLauncherOrFail(t, ctx) l.SetupOrFail(t) defer l.ShutdownOrFail(t, ctx) @@ -337,7 +676,7 @@ stream2 |> filter(fn: (r) => contains(value: r._value, set: col)) |> group() |> } } -func TestPipeline_Query_ExperimentalTo(t *testing.T) { +func TestLauncher_Query_ExperimentalTo(t *testing.T) { l := launcher.RunTestLauncherOrFail(t, ctx) l.SetupOrFail(t) defer l.ShutdownOrFail(t, ctx) diff --git a/http/document_test.go b/http/document_test.go index d9e9d58fb75..51ce1504197 100644 --- a/http/document_test.go +++ b/http/document_test.go @@ -93,7 +93,7 @@ var ( }, Content: "content5", } - doc6 = influxdb.Document{ + doc6 = influxdb.Document{ ID: doc6ID, Meta: influxdb.DocumentMeta{ Name: "doc6", diff --git a/query/control/controller.go b/query/control/controller.go index 3c30848c5f3..45a1ea29833 100644 --- a/query/control/controller.go +++ b/query/control/controller.go @@ -47,6 +47,7 @@ const orgLabel = "org" // Controller provides a central location to manage all incoming queries. // The controller is responsible for compiling, queueing, and executing queries. type Controller struct { + config Config lastID uint64 queriesMu sync.RWMutex queries map[QueryID]*Query @@ -173,6 +174,7 @@ func New(config Config) (*Controller, error) { mm.unlimited = true } ctrl := &Controller{ + config: c, queries: make(map[QueryID]*Query), queryQueue: make(chan *Query, c.QueueSize), done: make(chan struct{}), @@ -508,6 +510,14 @@ func (c *Controller) PrometheusCollectors() []prometheus.Collector { return collectors } +func (c *Controller) GetUnusedMemoryBytes() int64 { + return c.memory.getUnusedMemoryBytes() +} + +func (c *Controller) GetUsedMemoryBytes() int64 { + return c.config.MaxMemoryBytes - c.GetUnusedMemoryBytes() +} + // Query represents a single request. type Query struct { id QueryID @@ -564,7 +574,7 @@ func (q *Query) Results() <-chan flux.Result { } func (q *Query) recordUnusedMemory() { - unused := q.memoryManager.getUnusedMemoryBytes() + unused := q.c.GetUnusedMemoryBytes() q.c.metrics.memoryUnused.WithLabelValues(q.labelValues...).Set(float64(unused)) } diff --git a/query/control/memory.go b/query/control/memory.go index 283cbbf0bb6..609ad43739d 100644 --- a/query/control/memory.go +++ b/query/control/memory.go @@ -28,6 +28,18 @@ type memoryManager struct { unlimited bool } +func (m *memoryManager) getUnusedMemoryBytes() int64 { + return atomic.LoadInt64(&m.unusedMemoryBytes) +} + +func (m *memoryManager) trySetUnusedMemoryBytes(old, new int64) bool { + return atomic.CompareAndSwapInt64(&m.unusedMemoryBytes, old, new) +} + +func (m *memoryManager) addUnusedMemoryBytes(amount int64) int64 { + return atomic.AddInt64(&m.unusedMemoryBytes, amount) +} + // createAllocator will construct an allocator and memory manager // for the given query. func (c *Controller) createAllocator(q *Query) { @@ -49,10 +61,6 @@ type queryMemoryManager struct { given int64 } -func (q *queryMemoryManager) getUnusedMemoryBytes() int64 { - return atomic.LoadInt64(&q.m.unusedMemoryBytes) -} - // RequestMemory will determine if the query can be given more memory // when it is requested. // @@ -72,7 +80,7 @@ func (q *queryMemoryManager) RequestMemory(want int64) (got int64, err error) { for { unused := int64(math.MaxInt64) if !q.m.unlimited { - unused = atomic.LoadInt64(&q.m.unusedMemoryBytes) + unused = q.m.getUnusedMemoryBytes() if unused < want { // We do not have the capacity for this query to // be given more memory. @@ -88,7 +96,7 @@ func (q *queryMemoryManager) RequestMemory(want int64) (got int64, err error) { // Reserve this memory for our own use. if !q.m.unlimited { - if !atomic.CompareAndSwapInt64(&q.m.unusedMemoryBytes, unused, unused-given) { + if !q.m.trySetUnusedMemoryBytes(unused, unused-given) { // The unused value has changed so someone may have taken // the memory that we wanted. Retry. continue @@ -142,7 +150,7 @@ func (q *queryMemoryManager) FreeMemory(bytes int64) { // memory manager. func (q *queryMemoryManager) Release() { if !q.m.unlimited { - atomic.AddInt64(&q.m.unusedMemoryBytes, q.given) + q.m.addUnusedMemoryBytes(q.given) } q.limit = q.m.initialBytesQuotaPerQuery q.given = 0