Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Size aware cache #924

Merged
merged 52 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
02e9069
Added rate limiting.
cody-littley Nov 15, 2024
dc39c21
Properly handle blob sizes.
cody-littley Nov 15, 2024
d99fd39
Incremental progress.
cody-littley Nov 15, 2024
e39eaf9
Incremental progress.
cody-littley Nov 15, 2024
876ec69
Merge branch 'master' into relay-rate-limits2
cody-littley Nov 18, 2024
9500f4f
unit tests
cody-littley Nov 18, 2024
ffe18c6
Unit tests.
cody-littley Nov 18, 2024
79d9614
Fix tests.
cody-littley Nov 18, 2024
c1d83e6
Cleanup.
cody-littley Nov 18, 2024
ed4daca
Added get chunks request hashing.
cody-littley Nov 18, 2024
3438b92
Start work on authenticator.
cody-littley Nov 18, 2024
b5dc37c
Fix test issue.
cody-littley Nov 18, 2024
b420cca
Cleanup
cody-littley Nov 18, 2024
517b78e
Merge branch 'master' into relay-rate-limits2
cody-littley Nov 19, 2024
7982aec
Convert config to flag pattern.
cody-littley Nov 19, 2024
366fdf7
Simplify rate limiter classes.
cody-littley Nov 19, 2024
f2c10e4
Made suggested changes.
cody-littley Nov 19, 2024
0f95ce4
Merge branch 'relay-rate-limits2' into relay-authentication
cody-littley Nov 19, 2024
c2bb9c3
Shorten package name.
cody-littley Nov 19, 2024
151305b
Started testing
cody-littley Nov 19, 2024
29fd940
Finished unit tests.
cody-littley Nov 19, 2024
3993af4
Nil authenticator test.
cody-littley Nov 19, 2024
d094b80
Test with authentication saving disabled.
cody-littley Nov 19, 2024
d8691e1
Tie together config.
cody-littley Nov 19, 2024
8b9512b
Add method for convenient signing.
cody-littley Nov 19, 2024
fb7ec51
Made requested changes.
cody-littley Nov 19, 2024
6325f0c
Merge branch 'master' into relay-authentication
cody-littley Nov 19, 2024
5f853e0
Revert unintentional changes.
cody-littley Nov 19, 2024
f22544c
Fix bug.
cody-littley Nov 20, 2024
7a26c27
Made requested changes.
cody-littley Nov 20, 2024
d21e0bc
Update proto documentation.
cody-littley Nov 20, 2024
98a8469
Add key caching.
cody-littley Nov 20, 2024
5948e73
lint
cody-littley Nov 20, 2024
2f2209c
Added sane timeouts for relay.
cody-littley Nov 20, 2024
4204ea0
Added moar timeouts.
cody-littley Nov 20, 2024
0892c9c
Add flags.
cody-littley Nov 20, 2024
e6f4e8e
Unit tests.
cody-littley Nov 20, 2024
7f95615
Added base framework.
cody-littley Nov 20, 2024
e11bcf6
Add unit test for queue.
cody-littley Nov 20, 2024
0948a5c
Unit tests.
cody-littley Nov 20, 2024
a23b905
Merge branch 'master' into relay-timeouts
cody-littley Nov 21, 2024
3f06fff
fix bugs
cody-littley Nov 21, 2024
43d8c91
lint
cody-littley Nov 21, 2024
57ea072
Merge branch 'relay-timeouts' into size-aware-cache
cody-littley Nov 21, 2024
0903529
Rename cached accessor to cache accessor.
cody-littley Nov 21, 2024
04480ff
Integrate new cache.
cody-littley Nov 21, 2024
1db3a74
Merge branch 'master' into size-aware-cache
cody-littley Nov 25, 2024
9100eac
Use queue for authenticator because it's a lot clener.
cody-littley Nov 25, 2024
71a5aaa
Cleanup.
cody-littley Nov 25, 2024
f8327df
Use gods library for data structures.
cody-littley Nov 26, 2024
e925517
Made suggested changes.
cody-littley Nov 27, 2024
a9d3329
Cleanup
cody-littley Nov 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.12
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6
github.com/consensys/gnark-crypto v0.12.1
github.com/emirpasic/gods v1.18.1
github.com/ethereum/go-ethereum v1.14.8
github.com/fxamacker/cbor/v2 v2.5.0
github.com/gin-contrib/logger v0.2.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/ethereum/c-kzg-4844 v1.0.0 h1:0X1LBXxaEtYD9xsyj9B9ctQEZIpnvVDeoBx8aHEwTNA=
github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/go-ethereum v1.14.8 h1:NgOWvXS+lauK+zFukEvi85UmmsS/OkV0N23UZ1VTIig=
Expand Down
21 changes: 11 additions & 10 deletions relay/auth/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
pb "github.com/Layr-Labs/eigenda/api/grpc/relay"
"github.com/Layr-Labs/eigenda/core"
"github.com/emirpasic/gods/queues"
"github.com/emirpasic/gods/queues/linkedlistqueue"
lru "github.com/hashicorp/golang-lru/v2"
"sync"
"time"
Expand Down Expand Up @@ -38,7 +40,7 @@ type requestAuthenticator struct {
authenticatedClients map[string]struct{}

// authenticationTimeouts is a list of authentications that have been performed, along with their expiration times.
authenticationTimeouts []*authenticationTimeout
authenticationTimeouts queues.Queue

// authenticationTimeoutDuration is the duration for which an auth is valid.
// If this is zero, then auth saving is disabled, and each request will be authenticated independently.
Expand Down Expand Up @@ -67,7 +69,7 @@ func NewRequestAuthenticator(
authenticator := &requestAuthenticator{
ics: ics,
authenticatedClients: make(map[string]struct{}),
authenticationTimeouts: make([]*authenticationTimeout, 0),
authenticationTimeouts: linkedlistqueue.New(),
authenticationTimeoutDuration: authenticationTimeoutDuration,
keyCache: keyCache,
}
Expand Down Expand Up @@ -170,7 +172,7 @@ func (a *requestAuthenticator) saveAuthenticationResult(now time.Time, origin st
defer a.savedAuthLock.Unlock()

a.authenticatedClients[origin] = struct{}{}
a.authenticationTimeouts = append(a.authenticationTimeouts,
a.authenticationTimeouts.Enqueue(
&authenticationTimeout{
origin: origin,
expiration: now.Add(a.authenticationTimeoutDuration),
Expand All @@ -195,14 +197,13 @@ func (a *requestAuthenticator) isAuthenticationStillValid(now time.Time, address
// removeOldAuthentications removes any authentications that have expired.
// This method is not thread safe and should be called with the savedAuthLock held.
func (a *requestAuthenticator) removeOldAuthentications(now time.Time) {
index := 0
for ; index < len(a.authenticationTimeouts); index++ {
if a.authenticationTimeouts[index].expiration.After(now) {
for a.authenticationTimeouts.Size() > 0 {
val, _ := a.authenticationTimeouts.Peek()
next := val.(*authenticationTimeout)
if next.expiration.After(now) {
break
}
delete(a.authenticatedClients, a.authenticationTimeouts[index].origin)
}
if index > 0 {
a.authenticationTimeouts = a.authenticationTimeouts[index:]
delete(a.authenticatedClients, next.origin)
a.authenticationTimeouts.Dequeue()
}
}
20 changes: 16 additions & 4 deletions relay/blob_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type blobProvider struct {
blobStore *blobstore.BlobStore

// blobCache is an LRU cache of blobs.
blobCache cache.CachedAccessor[v2.BlobKey, []byte]
blobCache cache.CacheAccessor[v2.BlobKey, []byte]

// fetchTimeout is the maximum time to wait for a blob fetch operation to complete.
fetchTimeout time.Duration
Expand All @@ -31,7 +31,7 @@ func newBlobProvider(
ctx context.Context,
logger logging.Logger,
blobStore *blobstore.BlobStore,
blobCacheSize int,
blobCacheSize uint64,
maxIOConcurrency int,
fetchTimeout time.Duration) (*blobProvider, error) {

Expand All @@ -42,15 +42,27 @@ func newBlobProvider(
fetchTimeout: fetchTimeout,
}

c, err := cache.NewCachedAccessor[v2.BlobKey, []byte](blobCacheSize, maxIOConcurrency, server.fetchBlob)
c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize)
err := c.WithWeightCalculator(computeBlobCacheWeight)
if err != nil {
return nil, fmt.Errorf("error creating blob cache: %w", err)
}
server.blobCache = c

cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](c, maxIOConcurrency, server.fetchBlob)
if err != nil {
return nil, fmt.Errorf("error creating blob cache: %w", err)
}
server.blobCache = cacheAccessor

return server, nil
}

// computeChunkCacheWeight computes the 'weight' of the blob for the cache. The weight of a blob
// is equal to its size, in bytes.
func computeBlobCacheWeight(_ v2.BlobKey, value []byte) uint64 {
return uint64(len(value))
}

// GetBlob retrieves a blob from the blob store.
func (s *blobProvider) GetBlob(ctx context.Context, blobKey v2.BlobKey) ([]byte, error) {
data, err := s.blobCache.Get(ctx, blobKey)
Expand Down
4 changes: 2 additions & 2 deletions relay/blob_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestReadWrite(t *testing.T) {
context.Background(),
logger,
blobStore,
10,
1024*1024*32,
32,
10*time.Second)
require.NoError(t, err)
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestNonExistentBlob(t *testing.T) {
context.Background(),
logger,
blobStore,
10,
1024*1024*32,
32,
10*time.Second)
require.NoError(t, err)
Expand Down
30 changes: 30 additions & 0 deletions relay/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cache

// WeightCalculator is a function that calculates the weight of a key-value pair in a Cache.
// By default, the weight of a key-value pair is 1. Cache capacity is always specified in terms of
// the weight of the key-value pairs it can hold, rather than the number of key-value pairs.
//
// Unless otherwise noted, Cache implementations are not required to be thread safe.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? It's quite often the use cases will need thread safety.

Also please move this line down to the Cache

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the cache accessor, we are forced to use a mutex in order to perform atomic operations on the cache and the map of in-progress operations. Since that is currently our only use case, adding thread safety to the cache would be redundant and would provide no immediate benefit.

I agree that having a thread safe cache may be useful if we want to reuse it as a general purpose cache. I see two options:

  • we make the cache inherently thread safe, even though not all use cases may need thread safety.
  • we create a wrapper object that implements Cache that adds thread safety to another cache implementation, e.g. func NewThreadSafeCache(cache Cache) Cache. We could either write this now, or implement it later when we need it.

Of these options, which would you prefer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this intends to be a generic cache (per comment of Cache). If this has intended use case for cache accessor, then it's fine to keep it as it is.

For the two options, they both look fine. Mutex locking is very cheap so there is no concern even if some use cases don't need concurrency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently there are no intended use cases outside of the cache accessor. Let's wait to add locking until we have a use case that justifies it.

type WeightCalculator[K comparable, V any] func(key K, value V) uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this generality or just give the cache a total memory limit will work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generalization is needed to support the current use case. There isn't a trivial and cheap way we can determine the size of an arbitrary data structure, so we need the ability to compute the size for different data types. For things like blobs that are just an array of bytes, it's easy. For things like the chunks that are in a complex data structure, we need an abstraction like this.


// Cache is an interface for a generic cache.
type Cache[K comparable, V any] interface {
// Get returns the value associated with the key, and a boolean indicating whether the key was found in the cache.
Get(key K) (V, bool)

// Put adds a key-value pair to the cache. After this operation, values may be dropped if the total weight
// exceeds the configured maximum weight. Will ignore the new value if it exceeds the maximum weight
// of the cache in and of itself.
Put(key K, value V)

// WithWeightCalculator sets the weight calculator for the cache. May only be called
// when the cache is empty. The weight calculator should be an idempotent function that
// always returns the same output given the same input.
WithWeightCalculator(weightCalculator WeightCalculator[K, V]) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be better to drop this method and pass it in from the constructor since it's required to have this calculator

Copy link
Contributor Author

@cody-littley cody-littley Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no weight calculator was provided, it would give every entry a weight of 1. But I can understand how the API is cleaner by moving it to the constructor. Change made.


// Size returns the number of key-value pairs in the cache.
Size() int

// Weight returns the total weight of the key-value pairs in the cache.
Weight() uint64
}
41 changes: 18 additions & 23 deletions relay/cache/cached_accessor.go → relay/cache/cache_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ package cache

import (
"context"
lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/sync/semaphore"
"sync"
)

// CachedAccessor is an interface for accessing a resource that is cached. It assumes that cache misses
// CacheAccessor is an interface for accessing a resource that is cached. It assumes that cache misses
// are expensive, and prevents multiple concurrent cache misses for the same key.
type CachedAccessor[K comparable, V any] interface {
type CacheAccessor[K comparable, V any] interface {
// Get returns the value for the given key. If the value is not in the cache, it will be fetched using the Accessor.
// If the context is cancelled, the function may abort early. If multiple goroutines request the same key,
// cancellation of one request will not affect the others.
Get(ctx context.Context, key K) (V, error)
}

// Accessor is function capable of fetching a value from a resource. Used by CachedAccessor when there is a cache miss.
// Accessor is function capable of fetching a value from a resource. Used by CacheAccessor when there is a cache miss.
type Accessor[K comparable, V any] func(key K) (V, error)

// accessResult is a struct that holds the result of an Accessor call.
Expand All @@ -29,23 +28,24 @@ type accessResult[V any] struct {
err error
}

var _ CachedAccessor[string, string] = &cachedAccessor[string, string]{}
var _ CacheAccessor[string, string] = &cacheAccessor[string, string]{}

// Future work: the cache used in this implementation is suboptimal when storing items that have a large
// variance in size. The current implementation uses a fixed size cache, which requires the cached to be
// sized to the largest item that will be stored. This cache should be replaced with an implementation
// whose size can be specified by memory footprint in bytes.

// cachedAccessor is an implementation of CachedAccessor.
type cachedAccessor[K comparable, V any] struct {
// cacheAccessor is an implementation of CacheAccessor.
type cacheAccessor[K comparable, V any] struct {

// lookupsInProgress has an entry for each key that is currently being looked up via the accessor. The value
// is written into the channel when it is eventually fetched. If a key is requested more than once while a
// lookup in progress, the second (and following) requests will wait for the result of the first lookup
// to be written into the channel.
lookupsInProgress map[K]*accessResult[V]

// cache is the LRU cache used to store values fetched by the accessor.
cache *lru.Cache[K, V]
// cache is the underlying cache that this wrapper manages.
cache Cache[K, V]

// concurrencyLimiter is a channel used to limit the number of concurrent lookups that can be in progress.
concurrencyLimiter chan struct{}
Expand All @@ -57,20 +57,15 @@ type cachedAccessor[K comparable, V any] struct {
accessor Accessor[K, V]
}

// NewCachedAccessor creates a new CachedAccessor. The cacheSize parameter specifies the maximum number of items
// NewCacheAccessor creates a new CacheAccessor. The cacheSize parameter specifies the maximum number of items
// that can be stored in the cache. The concurrencyLimit parameter specifies the maximum number of concurrent
// lookups that can be in progress at any given time. If a greater number of lookups are requested, the excess
// lookups will block until a lookup completes. If concurrencyLimit is zero, then no limits are imposed. The accessor
// parameter is the function used to fetch values that are not in the cache.
func NewCachedAccessor[K comparable, V any](
cacheSize int,
func NewCacheAccessor[K comparable, V any](
cache Cache[K, V],
concurrencyLimit int,
accessor Accessor[K, V]) (CachedAccessor[K, V], error) {

cache, err := lru.New[K, V](cacheSize)
if err != nil {
return nil, err
}
accessor Accessor[K, V]) (CacheAccessor[K, V], error) {

lookupsInProgress := make(map[K]*accessResult[V])

Expand All @@ -79,7 +74,7 @@ func NewCachedAccessor[K comparable, V any](
concurrencyLimiter = make(chan struct{}, concurrencyLimit)
}

return &cachedAccessor[K, V]{
return &cacheAccessor[K, V]{
cache: cache,
concurrencyLimiter: concurrencyLimiter,
accessor: accessor,
Expand All @@ -95,7 +90,7 @@ func newAccessResult[V any]() *accessResult[V] {
return result
}

func (c *cachedAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {
func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {
c.cacheLock.Lock()

// first, attempt to get the value from the cache
Expand Down Expand Up @@ -126,7 +121,7 @@ func (c *cachedAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {
// waitForResult waits for the result of a lookup that was initiated by another requester and returns it
// when it becomes is available. This method will return quickly if the provided context is cancelled.
// Doing so does not disrupt the other requesters that are also waiting for this result.
func (c *cachedAccessor[K, V]) waitForResult(ctx context.Context, result *accessResult[V]) (V, error) {
func (c *cacheAccessor[K, V]) waitForResult(ctx context.Context, result *accessResult[V]) (V, error) {
err := result.sem.Acquire(ctx, 1)
if err != nil {
var zeroValue V
Expand All @@ -139,7 +134,7 @@ func (c *cachedAccessor[K, V]) waitForResult(ctx context.Context, result *access

// fetchResult fetches the value for the given key and returns it. If the context is cancelled before the value
// is fetched, the function will return early. If the fetch is successful, the value will be added to the cache.
func (c *cachedAccessor[K, V]) fetchResult(ctx context.Context, key K, result *accessResult[V]) (V, error) {
func (c *cacheAccessor[K, V]) fetchResult(ctx context.Context, key K, result *accessResult[V]) (V, error) {

// Perform the work in a background goroutine. This allows us to return early if the context is cancelled
// without disrupting the fetch operation that other requesters may be waiting for.
Expand All @@ -159,7 +154,7 @@ func (c *cachedAccessor[K, V]) fetchResult(ctx context.Context, key K, result *a

// Update the cache if the fetch was successful.
if err == nil {
c.cache.Add(key, value)
c.cache.Put(key, value)
}

// Provide the result to all other goroutines that may be waiting for it.
Expand Down
Loading
Loading