Skip to content

Commit

Permalink
test(launcher): e2e test memory limits
Browse files Browse the repository at this point in the history
  • Loading branch information
affo committed Mar 31, 2020
1 parent 5915fd3 commit 844c727
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 49 deletions.
58 changes: 44 additions & 14 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"math"
"net"
nethttp "net/http"
_ "net/http/pprof" // needed to add pprof to our binary.
Expand Down Expand Up @@ -279,6 +278,36 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
Default: false,
Desc: "disables the task scheduler",
},
{
DestP: &l.concurrencyQuota,
Flag: "query-concurrency",
Default: 10,
Desc: "the number of queries that are allowed to execute concurrently",
},
{
DestP: &l.initialMemoryBytesQuotaPerQuery,
Flag: "query-initial-memory-bytes",
Default: 0,
Desc: "the initial number of bytes allocated for a query when it is started. If this is unset, then query-memory-bytes will be used",
},
{
DestP: &l.memoryBytesQuotaPerQuery,
Flag: "query-memory-bytes",
Default: 10 * 1024 * 1024, // 10MB
Desc: "maximum number of bytes a query is allowed to use at any given time. This must be greater or equal to query-initial-memory-bytes",
},
{
DestP: &l.maxMemoryBytes,
Flag: "query-max-memory-bytes",
Default: 0,
Desc: "the maximum amount of memory used for queries. If this is unset, then this number is query-concurrency * query-memory-bytes",
},
{
DestP: &l.queueSize,
Flag: "query-queue-size",
Default: 10,
Desc: "the number of queries that are allowed to be awaiting execution before new queries are rejected",
},
}

cli.BindOptions(cmd, opts)
Expand Down Expand Up @@ -309,6 +338,13 @@ type Launcher struct {
enableNewMetaStore bool
newMetaStoreReadOnly bool

// Query options.
concurrencyQuota int
initialMemoryBytesQuotaPerQuery int
memoryBytesQuotaPerQuery int
maxMemoryBytes int
queueSize int

boltClient *bolt.Client
kvStore kv.Store
kvService *kv.Service
Expand Down Expand Up @@ -642,14 +678,6 @@ func (m *Launcher) run(ctx context.Context) (err error) {
backupService platform.BackupService = m.engine
)

// TODO(cwolff): Figure out a good default per-query memory limit:
// https://github.com/influxdata/influxdb/issues/13642
const (
concurrencyQuota = 10
memoryBytesQuotaPerQuery = math.MaxInt64
QueueSize = 10
)

deps, err := influxdb.NewDependencies(
storageflux.NewReader(readservice.NewStore(m.engine)),
m.engine,
Expand All @@ -664,11 +692,13 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}

m.queryController, err = control.New(control.Config{
ConcurrencyQuota: concurrencyQuota,
MemoryBytesQuotaPerQuery: int64(memoryBytesQuotaPerQuery),
QueueSize: QueueSize,
Logger: m.log.With(zap.String("service", "storage-reads")),
ExecutorDependencies: []flux.Dependency{deps},
ConcurrencyQuota: m.concurrencyQuota,
InitialMemoryBytesQuotaPerQuery: int64(m.initialMemoryBytesQuotaPerQuery),
MemoryBytesQuotaPerQuery: int64(m.memoryBytesQuotaPerQuery),
MaxMemoryBytes: int64(m.maxMemoryBytes),
QueueSize: m.queueSize,
Logger: m.log.With(zap.String("service", "storage-reads")),
ExecutorDependencies: []flux.Dependency{deps},
})
if err != nil {
m.log.Error("Failed to create query controller", zap.Error(err))
Expand Down
11 changes: 10 additions & 1 deletion cmd/influxd/launcher/launcher_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,16 @@ func (tl *TestLauncher) Run(ctx context.Context, args ...string) error {
args = append(args, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename))
args = append(args, "--engine-path", filepath.Join(tl.Path, "engine"))
args = append(args, "--http-bind-address", "127.0.0.1:0")
args = append(args, "--log-level", "debug")
logLevel := false
for _, arg := range args {
if arg == "--log-level" {
logLevel = true
break
}
}
if !logLevel {
args = append(args, "--log-level", "debug")
}
return tl.Launcher.Run(ctx, args...)
}

Expand Down
222 changes: 196 additions & 26 deletions cmd/influxd/launcher/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
nethttp "net/http"
"strings"
"sync"
"testing"
"time"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cmd/influxd/launcher"
phttp "github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/kit/prom"
"github.com/influxdata/influxdb/query"
)

Expand Down Expand Up @@ -105,52 +107,220 @@ func TestPipeline_WriteV2_Query(t *testing.T) {
res.HasTableCount(t, 1)
}

// This test initializes a default launcher; writes some data; queries the data (success);
// sets memory limits to the same read query; checks that the query fails because limits are exceeded.
func TestPipeline_QueryMemoryLimits(t *testing.T) {
t.Skip("setting memory limits in the client is not implemented yet")
func getMemoryUnused(t *testing.T, reg *prom.Registry) int64 {
t.Helper()

l := launcher.RunTestLauncherOrFail(t, ctx)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
ms, err := reg.Gather()
if err != nil {
t.Fatal(err)
}
for _, m := range ms {
if m.GetName() == "query_control_memory_unused_bytes" {
return int64(*m.GetMetric()[0].Gauge.Value)
}
}
t.Errorf("query metric for unused memory not found")
return 0
}

// write some points
for i := 0; i < 100; i++ {
l.WritePointsOrFail(t, fmt.Sprintf(`m,k=v1 f=%di %d`, i*100, time.Now().UnixNano()))
func writeBytes(t *testing.T, l *launcher.TestLauncher, tagValue string, bs int) int {
// every point is:
// 1 byte measurement ("m")
// + 2 bytes tag ("v1")
// + 64 bytes field
// + 64 bytes timestamp
// ---------------------------
// = 131 bytes
if bs < 131 {
bs = 131
}
n := bs / 131
if n*131 < bs {
n++
}
for i := 0; i < n; i++ {
l.WritePointsOrFail(t, fmt.Sprintf(`m,t=%s f=%di %d`, tagValue, i*100, time.Now().UnixNano()))
}
return n * 131
}

// compile a from query and get the spec
qs := fmt.Sprintf(`from(bucket:"%s") |> range(start:-5m)`, l.Bucket.Name)
func queryPoints(t *testing.T, l *launcher.TestLauncher, tagValue string) error {
filterExpression := fmt.Sprintf("r.t == \"%s\"", tagValue)
if tagValue == "" {
filterExpression = "true"
}
qs := fmt.Sprintf(`from(bucket:"%s") |> range(start:-5m) |> filter(fn: (r) => %s)`, l.Bucket.Name, filterExpression)
pkg, err := flux.Parse(qs)
if err != nil {
t.Fatal(err)
}

// we expect this request to succeed
req := &query.Request{
Authorization: l.Auth,
OrganizationID: l.Org.ID,
Compiler: lang.ASTCompiler{
AST: pkg,
},
}
if err := l.QueryAndNopConsume(context.Background(), req); err != nil {
t.Fatal(err)
return l.QueryAndNopConsume(context.Background(), req)
}

// This test:
// - initializes a default launcher and sets memory limits;
// - writes some data;
// - queries the data;
// - verifies that the query fails (or not) and that the memory was de-allocated.
func TestPipeline_QueryMemoryLimits(t *testing.T) {
tcs := []struct {
name string
args []string
err bool
querySizeBytes int
// max_memory - per_query_memory * concurrency
unusedMemoryBytes int
}{
{
name: "ok - initial memory bytes, memory bytes, and max memory set",
args: []string{
"--query-concurrency", "1",
"--query-initial-memory-bytes", "100",
"--query-max-memory-bytes", "1048576", // 1MB
},
querySizeBytes: 30000,
err: false,
unusedMemoryBytes: 1048476,
},
{
name: "error - memory bytes and max memory set",
args: []string{
"--query-concurrency", "1",
"--query-memory-bytes", "1",
"--query-max-memory-bytes", "100",
},
querySizeBytes: 2,
err: true,
unusedMemoryBytes: 99,
},
{
name: "error - initial memory bytes and max memory set",
args: []string{
"--query-concurrency", "1",
"--query-initial-memory-bytes", "1",
"--query-max-memory-bytes", "100",
},
querySizeBytes: 101,
err: true,
unusedMemoryBytes: 99,
},
{
name: "error - initial memory bytes, memory bytes, and max memory set",
args: []string{
"--query-concurrency", "1",
"--query-initial-memory-bytes", "1",
"--query-memory-bytes", "50",
"--query-max-memory-bytes", "100",
},
querySizeBytes: 51,
err: true,
unusedMemoryBytes: 99,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, tc.args...)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)

const tagValue = "t0"
writeBytes(t, l, tagValue, tc.querySizeBytes)
if err := queryPoints(t, l, tagValue); err != nil {
if tc.err {
if !strings.Contains(err.Error(), "allocation limit reached") {
t.Errorf("query errored with unexpected error: %v", err)
}
} else {
t.Errorf("unexpected error: %v", err)
}
} else if tc.err {
t.Errorf("expected error, got successful query execution")
}

reg := l.Registry()
got := getMemoryUnused(t, reg)
want := int64(tc.unusedMemoryBytes)
if want != got {
t.Errorf("expected unused memory %d, got %d", want, got)
}
})
}
}

// ok, the first request went well, let's add memory limits:
// this query should error.
// spec.Resources = flux.ResourceManagement{
// MemoryBytesQuota: 100,
// }
// This test:
// - initializes a default launcher and sets memory limits;
// - writes some data;
// - launches several queries that may exceed the memory limit;
// - verifies after each query run the used memory.
func TestPipeline_QueryMemoryLimits_MultipleQueries(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx,
"--log-level", "error",
"--query-queue-size", "1024",
"--query-concurrency", "1",
"--query-initial-memory-bytes", "100",
"--query-memory-bytes", "50000",
"--query-max-memory-bytes", "200000",
)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)

if err := l.QueryAndNopConsume(context.Background(), req); err != nil {
if !strings.Contains(err.Error(), "allocation limit reached") {
t.Fatalf("query errored with unexpected error: %v", err)
// one tag does not exceed memory.
const tOK = "t0"
writeBytes(t, l, tOK, 40000)
// the other does.
const tKO = "t1"
writeBytes(t, l, tKO, 100000)

checkMemoryUsed := func() {
t.Helper()
// base memory used is equal to initial memory bytes * concurrency.
const baseMemoryUsed = int64(100)
got := l.QueryController().GetUsedMemoryBytes()
if baseMemoryUsed != got {
t.Errorf("expected unused memory %d, got %d", baseMemoryUsed, got)
}
} else {
t.Fatal("expected error, got successful query execution")
}

runQueries := func(nOK, nKO int) {
// This flock of queries should run sequentially because concurrency quota is set to 1
wg := sync.WaitGroup{}
wg.Add(nOK + nKO)
for i := 0; i < nOK; i++ {
go func(idx int) {
t.Logf("running query %d - OK", idx)
if err := queryPoints(t, l, tOK); err != nil {
t.Errorf("unexpected error: %v", err)
}
checkMemoryUsed()
wg.Done()
}(i)
}
for i := 0; i < nKO; i++ {
go func(idx int) {
t.Logf("running query %d - KO", idx)
if err := queryPoints(t, l, tKO); err == nil {
t.Errorf("expected error got none")
} else if !strings.Contains(err.Error(), "allocation limit reached") {
t.Errorf("got wrong error: %v", err)
}
checkMemoryUsed()
wg.Done()
}(i)
}
wg.Wait()
}

runQueries(100, 0)
//runQueries(5, 5)
//runQueries(0, 50)
}

func TestPipeline_Query_LoadSecret_Success(t *testing.T) {
Expand Down
Loading

0 comments on commit 844c727

Please sign in to comment.