Skip to content

Commit

Permalink
Remove errors from set methods on RemoteCacheClient interface (#466)
Browse files Browse the repository at this point in the history
This change does a few things:

* Removes the error from the set method on the `RemoteCacheClient`
  interface. The only error that can be returned was an error when
  the async queue used for `set` operations was full. All other errors
  were logged and a counter incremented. Even for this error, it was
  special-cased and `nil` was returned instead. Thus, the interface
  included an `error` return value that could never actually happen.
* Adds another method, `SetMultiAsync` to the interface for setting
  multiple key-value pairs at once. This was the only functionality
  that the `remoteCache` struct was providing beyond the cache clients
  it was wrapping. With this new method, there's no longer a reason
  to keep it around (will address in a follow-up PR).
* Uses relative TTLs for Memcached items. Memcached supports using
  a relative TTL (e.g. 300 for an item that expires after 5 minutes)
  or absolute TTL (a particular UNIX timestamp). We were converting
  `time.Duration` objects to an absolute TTL for no reason.

Part of #452

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters authored Jan 17, 2024
1 parent 824e75a commit b9a439d
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* [CHANGE] tenant: Remove `tenant.WithDefaultResolver()` and `SingleResolver` in favor of global functions `tenant.TenantID()`, `tenant.TenantIDs()`, or `MultiResolver`. #445
* [CHANGE] Cache: Remove legacy metrics from Memcached client that contained `_memcached_` in the name. #461
* [CHANGE] memberlist: Change default for `memberlist.stream-timeout` from `10s` to `2s`. #458
* [CHANGE] Cache: Remove errors from set methods on `RemoteCacheClient` interface. #466
* [CHANGE] Expose `BuildHTTPMiddleware` to enable dskit `Server` instrumentation addition on external `*mux.Router`s. #459
* [CHANGE] Remove `RouteHTTPToGRPC` option and related functionality #460
* [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276
Expand Down
1 change: 1 addition & 0 deletions cache/async_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func newAsyncQueue(length, maxConcurrency int) *asyncQueue {
return q
}

// submit adds an operation to the queue or returns an error if the queue is full
func (q *asyncQueue) submit(op func()) error {
select {
case q.queueCh <- op:
Expand Down
3 changes: 2 additions & 1 deletion cache/async_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ func TestAsyncQueue_QueueFullError(t *testing.T) {
for i := 0; i < queueLength; i++ {
require.NoError(t, q.submit(func() {}))
}
require.Equal(t, errAsyncQueueFull, q.submit(func() {}))

require.ErrorIs(t, q.submit(func() {}), errAsyncQueueFull)
}
16 changes: 10 additions & 6 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ import (

// RemoteCacheClient is a high level client to interact with remote cache.
type RemoteCacheClient interface {
// GetMulti fetches multiple keys at once from remoteCache. In case of error,
// GetMulti fetches multiple keys at once from a cache. In case of error,
// an empty map is returned and the error tracked/logged. One or more Option
// instances may be passed to modify the behavior of this GetMulti call.
GetMulti(ctx context.Context, keys []string, opts ...Option) map[string][]byte

// SetAsync enqueues an asynchronous operation to store a key into memcached.
// Returns an error in case it fails to enqueue the operation. In case the
// underlying async operation will fail, the error will be tracked/logged.
SetAsync(key string, value []byte, ttl time.Duration) error
// SetAsync enqueues an asynchronous operation to store a key into a cache.
// In case the underlying async operation fails, the error will be tracked/logged.
SetAsync(key string, value []byte, ttl time.Duration)

// Delete deletes a key from memcached.
// SetMultiAsync enqueues asynchronous operations to store a keys and values
// into a cache. In case the underlying async operations fail, the error will
// be tracked/logged.
SetMultiAsync(data map[string][]byte, ttl time.Duration)

// Delete deletes a key from a cache.
// This is a synchronous operation. If an asynchronous set operation for key is still
// pending to be processed, it will wait for it to complete before performing deletion.
Delete(ctx context.Context, key string) error
Expand Down
14 changes: 9 additions & 5 deletions cache/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,16 @@ func newBaseClient(
}
}

func (c *baseClient) setAsync(key string, value []byte, ttl time.Duration, f func(key string, buf []byte, ttl time.Duration) error) error {
func (c *baseClient) setMultiAsync(data map[string][]byte, ttl time.Duration, f func(key string, buf []byte, ttl time.Duration) error) {
for key, val := range data {
c.setAsync(key, val, ttl, f)
}
}

func (c *baseClient) setAsync(key string, value []byte, ttl time.Duration, f func(key string, buf []byte, ttl time.Duration) error) {
if c.maxItemSize > 0 && uint64(len(value)) > c.maxItemSize {
c.metrics.skipped.WithLabelValues(opSet, reasonMaxItemSize).Inc()
return nil
return
}

err := c.asyncQueue.submit(func() {
Expand All @@ -154,12 +160,10 @@ func (c *baseClient) setAsync(key string, value []byte, ttl time.Duration, f fun
c.metrics.duration.WithLabelValues(opSet).Observe(time.Since(start).Seconds())
})

if errors.Is(err, errAsyncQueueFull) {
if err != nil {
c.metrics.skipped.WithLabelValues(opSet, reasonAsyncBufferFull).Inc()
level.Debug(c.logger).Log("msg", "failed to store item to cache because the async buffer is full", "err", err, "size", c.asyncBuffSize)
return nil
}
return err
}

// wait submits an async task and blocks until it completes. This can be used during
Expand Down
18 changes: 14 additions & 4 deletions cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,22 @@ func (c *MemcachedClient) Stop() {
c.client.Close()
}

func (c *MemcachedClient) SetAsync(key string, value []byte, ttl time.Duration) error {
return c.setAsync(key, value, ttl, func(key string, buf []byte, ttl time.Duration) error {
func (c *MemcachedClient) SetMultiAsync(data map[string][]byte, ttl time.Duration) {
c.setMultiAsync(data, ttl, func(key string, buf []byte, ttl time.Duration) error {
return c.client.Set(&memcache.Item{
Key: key,
Value: value,
Expiration: int32(time.Now().Add(ttl).Unix()),
Value: buf,
Expiration: int32(ttl.Seconds()),
})
})
}

func (c *MemcachedClient) SetAsync(key string, value []byte, ttl time.Duration) {
c.setAsync(key, value, ttl, func(key string, buf []byte, ttl time.Duration) error {
return c.client.Set(&memcache.Item{
Key: key,
Value: buf,
Expiration: int32(ttl.Seconds()),
})
})
}
Expand Down
4 changes: 2 additions & 2 deletions cache/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
t.Run("no allocator", func(t *testing.T) {
client, backend, err := setup()
require.NoError(t, err)
require.NoError(t, client.SetAsync("foo", []byte("bar"), 10*time.Second))
client.SetAsync("foo", []byte("bar"), 10*time.Second)
require.NoError(t, client.wait())

ctx := context.Background()
Expand All @@ -94,7 +94,7 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
t.Run("with allocator", func(t *testing.T) {
client, backend, err := setup()
require.NoError(t, err)
require.NoError(t, client.SetAsync("foo", []byte("bar"), 10*time.Second))
client.SetAsync("foo", []byte("bar"), 10*time.Second)
require.NoError(t, client.wait())

res := client.GetMulti(context.Background(), []string{"foo"}, WithAllocator(&nopAllocator{}))
Expand Down
16 changes: 12 additions & 4 deletions cache/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,23 @@ func NewRedisClient(logger log.Logger, name string, config RedisClientConfig, re
return c, nil
}

// SetAsync implement RemoteCacheClient.
func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) error {
return c.setAsync(key, value, ttl, func(key string, buf []byte, ttl time.Duration) error {
// SetMultiAsync implements RemoteCacheClient.
func (c *RedisClient) SetMultiAsync(data map[string][]byte, ttl time.Duration) {
c.setMultiAsync(data, ttl, func(key string, value []byte, ttl time.Duration) error {
_, err := c.client.Set(context.Background(), key, value, ttl).Result()
return err
})
}

// GetMulti implement RemoteCacheClient.
// SetAsync implements RemoteCacheClient.
func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) {
c.setAsync(key, value, ttl, func(key string, buf []byte, ttl time.Duration) error {
_, err := c.client.Set(context.Background(), key, buf, ttl).Result()
return err
})
}

// GetMulti implements RemoteCacheClient.
func (c *RedisClient) GetMulti(ctx context.Context, keys []string, _ ...Option) map[string][]byte {
if len(keys) == 0 {
return nil
Expand Down
4 changes: 2 additions & 2 deletions cache/redis_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestRedisClient(t *testing.T) {
defer c.Stop()
defer s.FlushAll()
for k, v := range tt.args.data {
_ = c.SetAsync(k, v, time.Hour)
c.SetAsync(k, v, time.Hour)
}

test.Poll(t, time.Second, true, func() interface{} {
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestRedisClientDelete(t *testing.T) {
key1 := "key1"
value1 := []byte{1}

_ = c.SetAsync(key1, value1, time.Hour)
c.SetAsync(key1, value1, time.Hour)

test.Poll(t, time.Second, true, func() interface{} {
hits := c.GetMulti(context.Background(), []string{key1})
Expand Down
18 changes: 1 addition & 17 deletions cache/remote_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,7 @@ func newRemoteCache(name string, logger log.Logger, remoteClient RemoteCacheClie
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *remoteCache) StoreAsync(data map[string][]byte, ttl time.Duration) {
var (
firstErr error
failed int
)

for key, val := range data {
if err := c.remoteClient.SetAsync(key, val, ttl); err != nil {
failed++
if firstErr == nil {
firstErr = err
}
}
}

if firstErr != nil {
level.Warn(c.logger).Log("msg", "failed to store one or more items into remote cache", "failed", failed, "firstErr", firstErr)
}
c.remoteClient.SetMultiAsync(data, ttl)
}

// Fetch fetches multiple keys and returns a map containing cache hits, along with a list of missing keys.
Expand Down

0 comments on commit b9a439d

Please sign in to comment.