Skip to content

Commit

Permalink
Avoid starting goroutines for memcached batch requests before gate (#…
Browse files Browse the repository at this point in the history
…5301)

Use the doWithBatch function to avoid starting goroutines to fetch batched
results from memcached before they are allowed to run via the concurrency
Gate. This avoids starting many goroutines which cannot make any progress
due to a concurrency limit.

Fixes #4967

Signed-off-by: Nick Pillitteri <[email protected]>
Signed-off-by: Wiard van Rij <[email protected]>
  • Loading branch information
56quarters authored and wiardvanrij committed May 5, 2022
1 parent 048b603 commit 18d1955
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 63 deletions.
42 changes: 42 additions & 0 deletions pkg/cacheutil/cacheutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cacheutil

import (
"context"

"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/gate"
)

// doWithBatch do func with batch and gate. batchSize==0 means one batch. gate==nil means no gate.
func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate, f func(startIndex, endIndex int) error) error {
if totalSize == 0 {
return nil
}
if batchSize <= 0 {
return f(0, totalSize)
}
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < totalSize; i += batchSize {
j := i + batchSize
if j > totalSize {
j = totalSize
}
if ga != nil {
if err := ga.Start(ctx); err != nil {
return nil
}
}
startIndex, endIndex := i, j
g.Go(func() error {
if ga != nil {
defer ga.Done()
}
return f(startIndex, endIndex)
})
}
return g.Wait()
}
49 changes: 20 additions & 29 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (c *memcachedClient) GetMulti(ctx context.Context, keys []string) map[strin
func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([]map[string]*memcache.Item, error) {
// Do not batch if the input keys are less than the max batch size.
if (c.config.MaxGetMultiBatchSize <= 0) || (len(keys) <= c.config.MaxGetMultiBatchSize) {
items, err := c.getMultiSingle(ctx, keys)
items, err := c.getMultiSingle(keys)
if err != nil {
return nil, err
}
Expand All @@ -457,29 +457,29 @@ func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([
numResults++
}

// Spawn a goroutine for each batch request. The max concurrency will be
// enforced by getMultiSingle().
// If max concurrency is disabled, use a nil gate for the doWithBatch method which will
// not apply any limit to the number goroutines started to make batch requests in that case.
var getMultiGate gate.Gate
if c.config.MaxGetMultiConcurrency > 0 {
getMultiGate = c.getMultiGate
}

// Allocate a channel to store results for each batch request. The max concurrency will be
// enforced by doWithBatch.
results := make(chan *memcachedGetMultiResult, numResults)
defer close(results)

for batchStart := 0; batchStart < len(keys); batchStart += batchSize {
batchEnd := batchStart + batchSize
if batchEnd > len(keys) {
batchEnd = len(keys)
}
// Ignore the error here since it can only be returned by our provided function which
// always returns nil.
_ = doWithBatch(ctx, len(keys), c.config.MaxGetMultiBatchSize, getMultiGate, func(startIndex, endIndex int) error {
batchKeys := keys[startIndex:endIndex]

batchKeys := keys[batchStart:batchEnd]
res := &memcachedGetMultiResult{}
res.items, res.err = c.getMultiSingle(batchKeys)

c.workers.Add(1)
go func() {
defer c.workers.Done()

res := &memcachedGetMultiResult{}
res.items, res.err = c.getMultiSingle(ctx, batchKeys)

results <- res
}()
}
results <- res
return nil
})

// Wait for all batch results. In case of error, we keep
// track of the last error occurred.
Expand All @@ -499,16 +499,7 @@ func (c *memcachedClient) getMultiBatched(ctx context.Context, keys []string) ([
return items, lastErr
}

func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string) (items map[string]*memcache.Item, err error) {
// Wait until we get a free slot from the gate, if the max
// concurrency should be enforced.
if c.config.MaxGetMultiConcurrency > 0 {
if err := c.getMultiGate.Start(ctx); err != nil {
return nil, errors.Wrapf(err, "failed to wait for turn. Instance: %s", c.name)
}
defer c.getMultiGate.Done()
}

func (c *memcachedClient) getMultiSingle(keys []string) (items map[string]*memcache.Item, err error) {
start := time.Now()
c.operations.WithLabelValues(opGetMulti).Inc()
items, err = c.client.GetMulti(keys)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cacheutil/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestMemcachedClient_SetAsync(t *testing.T) {
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2"), time.Second))
testutil.Ok(t, backendMock.waitItems(2))

actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
actual, err := client.getMultiSingle([]string{"key-1", "key-2"})
testutil.Ok(t, err)
testutil.Equals(t, []byte("value-1"), actual["key-1"].Value)
testutil.Equals(t, []byte("value-2"), actual["key-2"].Value)
Expand All @@ -166,7 +166,7 @@ func TestMemcachedClient_SetAsyncWithCustomMaxItemSize(t *testing.T) {
testutil.Ok(t, client.SetAsync(ctx, "key-2", []byte("value-2-too-long-to-be-stored"), time.Second))
testutil.Ok(t, backendMock.waitItems(1))

actual, err := client.getMultiSingle(ctx, []string{"key-1", "key-2"})
actual, err := client.getMultiSingle([]string{"key-1", "key-2"})
testutil.Ok(t, err)
testutil.Equals(t, []byte("value-1"), actual["key-1"].Value)
testutil.Equals(t, (*memcache.Item)(nil), actual["key-2"])
Expand Down
34 changes: 2 additions & 32 deletions pkg/cacheutil/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"gopkg.in/yaml.v3"

"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v3"
)

var (
Expand Down Expand Up @@ -276,36 +276,6 @@ func stringToBytes(s string) []byte {
))
}

// doWithBatch do func with batch and gate. batchSize==0 means one batch. gate==nil means no gate.
func doWithBatch(ctx context.Context, totalSize int, batchSize int, ga gate.Gate, f func(startIndex, endIndex int) error) error {
if totalSize == 0 {
return nil
}
if batchSize <= 0 {
return f(0, totalSize)
}
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < totalSize; i += batchSize {
j := i + batchSize
if j > totalSize {
j = totalSize
}
if ga != nil {
if err := ga.Start(ctx); err != nil {
return nil
}
}
startIndex, endIndex := i, j
g.Go(func() error {
if ga != nil {
defer ga.Done()
}
return f(startIndex, endIndex)
})
}
return g.Wait()
}

// parseRedisClientConfig unmarshals a buffer into a RedisClientConfig with default values.
func parseRedisClientConfig(conf []byte) (RedisClientConfig, error) {
config := DefaultRedisClientConfig
Expand Down

0 comments on commit 18d1955

Please sign in to comment.