diff --git a/go.mod b/go.mod index 45e0bbe49a..dc262e397b 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.13.12 github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6 github.com/consensys/gnark-crypto v0.12.1 + github.com/emirpasic/gods v1.18.1 github.com/ethereum/go-ethereum v1.14.8 github.com/fxamacker/cbor/v2 v2.5.0 github.com/gin-contrib/logger v0.2.6 diff --git a/go.sum b/go.sum index 4762b276fb..d3b4dde0bf 100644 --- a/go.sum +++ b/go.sum @@ -165,6 +165,8 @@ github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6 github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/ethereum/c-kzg-4844 v1.0.0 h1:0X1LBXxaEtYD9xsyj9B9ctQEZIpnvVDeoBx8aHEwTNA= github.com/ethereum/c-kzg-4844 v1.0.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= github.com/ethereum/go-ethereum v1.14.8 h1:NgOWvXS+lauK+zFukEvi85UmmsS/OkV0N23UZ1VTIig= diff --git a/relay/auth/authenticator.go b/relay/auth/authenticator.go index 2e89c83d18..c45ee4da47 100644 --- a/relay/auth/authenticator.go +++ b/relay/auth/authenticator.go @@ -6,6 +6,8 @@ import ( "fmt" pb "github.com/Layr-Labs/eigenda/api/grpc/relay" "github.com/Layr-Labs/eigenda/core" + "github.com/emirpasic/gods/queues" + "github.com/emirpasic/gods/queues/linkedlistqueue" lru "github.com/hashicorp/golang-lru/v2" "sync" "time" @@ -38,7 +40,7 @@ type requestAuthenticator struct { authenticatedClients map[string]struct{} // authenticationTimeouts is a list of authentications that have been performed, along with their expiration times. - authenticationTimeouts []*authenticationTimeout + authenticationTimeouts queues.Queue // authenticationTimeoutDuration is the duration for which an auth is valid. // If this is zero, then auth saving is disabled, and each request will be authenticated independently. @@ -67,7 +69,7 @@ func NewRequestAuthenticator( authenticator := &requestAuthenticator{ ics: ics, authenticatedClients: make(map[string]struct{}), - authenticationTimeouts: make([]*authenticationTimeout, 0), + authenticationTimeouts: linkedlistqueue.New(), authenticationTimeoutDuration: authenticationTimeoutDuration, keyCache: keyCache, } @@ -170,7 +172,7 @@ func (a *requestAuthenticator) saveAuthenticationResult(now time.Time, origin st defer a.savedAuthLock.Unlock() a.authenticatedClients[origin] = struct{}{} - a.authenticationTimeouts = append(a.authenticationTimeouts, + a.authenticationTimeouts.Enqueue( &authenticationTimeout{ origin: origin, expiration: now.Add(a.authenticationTimeoutDuration), @@ -195,14 +197,13 @@ func (a *requestAuthenticator) isAuthenticationStillValid(now time.Time, address // removeOldAuthentications removes any authentications that have expired. // This method is not thread safe and should be called with the savedAuthLock held. func (a *requestAuthenticator) removeOldAuthentications(now time.Time) { - index := 0 - for ; index < len(a.authenticationTimeouts); index++ { - if a.authenticationTimeouts[index].expiration.After(now) { + for a.authenticationTimeouts.Size() > 0 { + val, _ := a.authenticationTimeouts.Peek() + next := val.(*authenticationTimeout) + if next.expiration.After(now) { break } - delete(a.authenticatedClients, a.authenticationTimeouts[index].origin) - } - if index > 0 { - a.authenticationTimeouts = a.authenticationTimeouts[index:] + delete(a.authenticatedClients, next.origin) + a.authenticationTimeouts.Dequeue() } } diff --git a/relay/blob_provider.go b/relay/blob_provider.go index 9b9863bfda..70cc310665 100644 --- a/relay/blob_provider.go +++ b/relay/blob_provider.go @@ -20,7 +20,7 @@ type blobProvider struct { blobStore *blobstore.BlobStore // blobCache is an LRU cache of blobs. - blobCache cache.CachedAccessor[v2.BlobKey, []byte] + blobCache cache.CacheAccessor[v2.BlobKey, []byte] // fetchTimeout is the maximum time to wait for a blob fetch operation to complete. fetchTimeout time.Duration @@ -31,7 +31,7 @@ func newBlobProvider( ctx context.Context, logger logging.Logger, blobStore *blobstore.BlobStore, - blobCacheSize int, + blobCacheSize uint64, maxIOConcurrency int, fetchTimeout time.Duration) (*blobProvider, error) { @@ -42,15 +42,23 @@ func newBlobProvider( fetchTimeout: fetchTimeout, } - c, err := cache.NewCachedAccessor[v2.BlobKey, []byte](blobCacheSize, maxIOConcurrency, server.fetchBlob) + c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight) + + cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](c, maxIOConcurrency, server.fetchBlob) if err != nil { return nil, fmt.Errorf("error creating blob cache: %w", err) } - server.blobCache = c + server.blobCache = cacheAccessor return server, nil } +// computeChunkCacheWeight computes the 'weight' of the blob for the cache. The weight of a blob +// is equal to its size, in bytes. +func computeBlobCacheWeight(_ v2.BlobKey, value []byte) uint64 { + return uint64(len(value)) +} + // GetBlob retrieves a blob from the blob store. func (s *blobProvider) GetBlob(ctx context.Context, blobKey v2.BlobKey) ([]byte, error) { data, err := s.blobCache.Get(ctx, blobKey) diff --git a/relay/blob_provider_test.go b/relay/blob_provider_test.go index 9309461c65..22368a5d5b 100644 --- a/relay/blob_provider_test.go +++ b/relay/blob_provider_test.go @@ -39,7 +39,7 @@ func TestReadWrite(t *testing.T) { context.Background(), logger, blobStore, - 10, + 1024*1024*32, 32, 10*time.Second) require.NoError(t, err) @@ -76,7 +76,7 @@ func TestNonExistentBlob(t *testing.T) { context.Background(), logger, blobStore, - 10, + 1024*1024*32, 32, 10*time.Second) require.NoError(t, err) diff --git a/relay/cache/cache.go b/relay/cache/cache.go new file mode 100644 index 0000000000..1d3c7f1a04 --- /dev/null +++ b/relay/cache/cache.go @@ -0,0 +1,25 @@ +package cache + +// WeightCalculator is a function that calculates the weight of a key-value pair in a Cache. +// By default, the weight of a key-value pair is 1. Cache capacity is always specified in terms of +// the weight of the key-value pairs it can hold, rather than the number of key-value pairs. +type WeightCalculator[K comparable, V any] func(key K, value V) uint64 + +// Cache is an interface for a generic cache. +// +// Unless otherwise noted, Cache implementations are not required to be thread safe. +type Cache[K comparable, V any] interface { + // Get returns the value associated with the key, and a boolean indicating whether the key was found in the cache. + Get(key K) (V, bool) + + // Put adds a key-value pair to the cache. After this operation, values may be dropped if the total weight + // exceeds the configured maximum weight. Will ignore the new value if it exceeds the maximum weight + // of the cache in and of itself. + Put(key K, value V) + + // Size returns the number of key-value pairs in the cache. + Size() int + + // Weight returns the total weight of the key-value pairs in the cache. + Weight() uint64 +} diff --git a/relay/cache/cached_accessor.go b/relay/cache/cache_accessor.go similarity index 81% rename from relay/cache/cached_accessor.go rename to relay/cache/cache_accessor.go index d131229082..a6389538b4 100644 --- a/relay/cache/cached_accessor.go +++ b/relay/cache/cache_accessor.go @@ -2,21 +2,20 @@ package cache import ( "context" - lru "github.com/hashicorp/golang-lru/v2" "golang.org/x/sync/semaphore" "sync" ) -// CachedAccessor is an interface for accessing a resource that is cached. It assumes that cache misses +// CacheAccessor is an interface for accessing a resource that is cached. It assumes that cache misses // are expensive, and prevents multiple concurrent cache misses for the same key. -type CachedAccessor[K comparable, V any] interface { +type CacheAccessor[K comparable, V any] interface { // Get returns the value for the given key. If the value is not in the cache, it will be fetched using the Accessor. // If the context is cancelled, the function may abort early. If multiple goroutines request the same key, // cancellation of one request will not affect the others. Get(ctx context.Context, key K) (V, error) } -// Accessor is function capable of fetching a value from a resource. Used by CachedAccessor when there is a cache miss. +// Accessor is function capable of fetching a value from a resource. Used by CacheAccessor when there is a cache miss. type Accessor[K comparable, V any] func(key K) (V, error) // accessResult is a struct that holds the result of an Accessor call. @@ -29,23 +28,24 @@ type accessResult[V any] struct { err error } -var _ CachedAccessor[string, string] = &cachedAccessor[string, string]{} +var _ CacheAccessor[string, string] = &cacheAccessor[string, string]{} // Future work: the cache used in this implementation is suboptimal when storing items that have a large // variance in size. The current implementation uses a fixed size cache, which requires the cached to be // sized to the largest item that will be stored. This cache should be replaced with an implementation // whose size can be specified by memory footprint in bytes. -// cachedAccessor is an implementation of CachedAccessor. -type cachedAccessor[K comparable, V any] struct { +// cacheAccessor is an implementation of CacheAccessor. +type cacheAccessor[K comparable, V any] struct { + // lookupsInProgress has an entry for each key that is currently being looked up via the accessor. The value // is written into the channel when it is eventually fetched. If a key is requested more than once while a // lookup in progress, the second (and following) requests will wait for the result of the first lookup // to be written into the channel. lookupsInProgress map[K]*accessResult[V] - // cache is the LRU cache used to store values fetched by the accessor. - cache *lru.Cache[K, V] + // cache is the underlying cache that this wrapper manages. + cache Cache[K, V] // concurrencyLimiter is a channel used to limit the number of concurrent lookups that can be in progress. concurrencyLimiter chan struct{} @@ -57,20 +57,15 @@ type cachedAccessor[K comparable, V any] struct { accessor Accessor[K, V] } -// NewCachedAccessor creates a new CachedAccessor. The cacheSize parameter specifies the maximum number of items +// NewCacheAccessor creates a new CacheAccessor. The cacheSize parameter specifies the maximum number of items // that can be stored in the cache. The concurrencyLimit parameter specifies the maximum number of concurrent // lookups that can be in progress at any given time. If a greater number of lookups are requested, the excess // lookups will block until a lookup completes. If concurrencyLimit is zero, then no limits are imposed. The accessor // parameter is the function used to fetch values that are not in the cache. -func NewCachedAccessor[K comparable, V any]( - cacheSize int, +func NewCacheAccessor[K comparable, V any]( + cache Cache[K, V], concurrencyLimit int, - accessor Accessor[K, V]) (CachedAccessor[K, V], error) { - - cache, err := lru.New[K, V](cacheSize) - if err != nil { - return nil, err - } + accessor Accessor[K, V]) (CacheAccessor[K, V], error) { lookupsInProgress := make(map[K]*accessResult[V]) @@ -79,7 +74,7 @@ func NewCachedAccessor[K comparable, V any]( concurrencyLimiter = make(chan struct{}, concurrencyLimit) } - return &cachedAccessor[K, V]{ + return &cacheAccessor[K, V]{ cache: cache, concurrencyLimiter: concurrencyLimiter, accessor: accessor, @@ -95,7 +90,7 @@ func newAccessResult[V any]() *accessResult[V] { return result } -func (c *cachedAccessor[K, V]) Get(ctx context.Context, key K) (V, error) { +func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) { c.cacheLock.Lock() // first, attempt to get the value from the cache @@ -126,7 +121,7 @@ func (c *cachedAccessor[K, V]) Get(ctx context.Context, key K) (V, error) { // waitForResult waits for the result of a lookup that was initiated by another requester and returns it // when it becomes is available. This method will return quickly if the provided context is cancelled. // Doing so does not disrupt the other requesters that are also waiting for this result. -func (c *cachedAccessor[K, V]) waitForResult(ctx context.Context, result *accessResult[V]) (V, error) { +func (c *cacheAccessor[K, V]) waitForResult(ctx context.Context, result *accessResult[V]) (V, error) { err := result.sem.Acquire(ctx, 1) if err != nil { var zeroValue V @@ -139,7 +134,7 @@ func (c *cachedAccessor[K, V]) waitForResult(ctx context.Context, result *access // fetchResult fetches the value for the given key and returns it. If the context is cancelled before the value // is fetched, the function will return early. If the fetch is successful, the value will be added to the cache. -func (c *cachedAccessor[K, V]) fetchResult(ctx context.Context, key K, result *accessResult[V]) (V, error) { +func (c *cacheAccessor[K, V]) fetchResult(ctx context.Context, key K, result *accessResult[V]) (V, error) { // Perform the work in a background goroutine. This allows us to return early if the context is cancelled // without disrupting the fetch operation that other requesters may be waiting for. @@ -159,7 +154,7 @@ func (c *cachedAccessor[K, V]) fetchResult(ctx context.Context, key K, result *a // Update the cache if the fetch was successful. if err == nil { - c.cache.Add(key, value) + c.cache.Put(key, value) } // Provide the result to all other goroutines that may be waiting for it. diff --git a/relay/cache/cached_accessor_test.go b/relay/cache/cache_accessor_test.go similarity index 89% rename from relay/cache/cached_accessor_test.go rename to relay/cache/cache_accessor_test.go index 9048e3d88a..0f2ac501d4 100644 --- a/relay/cache/cached_accessor_test.go +++ b/relay/cache/cache_accessor_test.go @@ -32,8 +32,11 @@ func TestRandomOperationsSingleThread(t *testing.T) { return &str, nil } cacheSize := rand.Intn(dataSize) + 1 + c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { + return 1 + }) - ca, err := NewCachedAccessor(cacheSize, 0, accessor) + ca, err := NewCacheAccessor[int, *string](c, 0, accessor) require.NoError(t, err) for i := 0; i < dataSize; i++ { @@ -80,7 +83,11 @@ func TestCacheMisses(t *testing.T) { return &str, nil } - ca, err := NewCachedAccessor(cacheSize, 0, accessor) + c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { + return 1 + }) + + ca, err := NewCacheAccessor[int, *string](c, 0, accessor) require.NoError(t, err) // Get the first cacheSize keys. This should fill the cache. @@ -143,7 +150,11 @@ func ParallelAccessTest(t *testing.T, sleepEnabled bool) { } cacheSize := rand.Intn(dataSize) + 1 - ca, err := NewCachedAccessor(cacheSize, 0, accessor) + c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { + return 1 + }) + + ca, err := NewCacheAccessor[int, *string](c, 0, accessor) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -184,7 +195,7 @@ func ParallelAccessTest(t *testing.T, sleepEnabled bool) { require.Equal(t, uint64(1), cacheMissCount.Load()) // The internal lookupsInProgress map should no longer contain the key. - require.Equal(t, 0, len(ca.(*cachedAccessor[int, *string]).lookupsInProgress)) + require.Equal(t, 0, len(ca.(*cacheAccessor[int, *string]).lookupsInProgress)) } func TestParallelAccess(t *testing.T) { @@ -212,7 +223,11 @@ func TestParallelAccessWithError(t *testing.T) { } cacheSize := 100 - ca, err := NewCachedAccessor(cacheSize, 0, accessor) + c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { + return 1 + }) + + ca, err := NewCacheAccessor[int, *string](c, 0, accessor) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -253,7 +268,7 @@ func TestParallelAccessWithError(t *testing.T) { require.Equal(t, count+1, cacheMissCount.Load()) // The internal lookupsInProgress map should no longer contain the key. - require.Equal(t, 0, len(ca.(*cachedAccessor[int, *string]).lookupsInProgress)) + require.Equal(t, 0, len(ca.(*cacheAccessor[int, *string]).lookupsInProgress)) } func TestConcurrencyLimiter(t *testing.T) { @@ -284,7 +299,11 @@ func TestConcurrencyLimiter(t *testing.T) { } cacheSize := 100 - ca, err := NewCachedAccessor(cacheSize, maxConcurrency, accessor) + c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { + return 1 + }) + + ca, err := NewCacheAccessor[int, *string](c, maxConcurrency, accessor) require.NoError(t, err) wg := sync.WaitGroup{} @@ -338,7 +357,11 @@ func TestOriginalRequesterTimesOut(t *testing.T) { } cacheSize := rand.Intn(dataSize) + 1 - ca, err := NewCachedAccessor(cacheSize, 0, accessor) + c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { + return 1 + }) + + ca, err := NewCacheAccessor[int, *string](c, 0, accessor) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -397,7 +420,7 @@ func TestOriginalRequesterTimesOut(t *testing.T) { require.Equal(t, uint64(1), cacheMissCount.Load()) // The internal lookupsInProgress map should no longer contain the key. - require.Equal(t, 0, len(ca.(*cachedAccessor[int, *string]).lookupsInProgress)) + require.Equal(t, 0, len(ca.(*cacheAccessor[int, *string]).lookupsInProgress)) } func TestSecondaryRequesterTimesOut(t *testing.T) { @@ -426,7 +449,11 @@ func TestSecondaryRequesterTimesOut(t *testing.T) { } cacheSize := rand.Intn(dataSize) + 1 - ca, err := NewCachedAccessor(cacheSize, 0, accessor) + c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { + return 1 + }) + + ca, err := NewCacheAccessor[int, *string](c, 0, accessor) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -489,5 +516,5 @@ func TestSecondaryRequesterTimesOut(t *testing.T) { require.Equal(t, uint64(1), cacheMissCount.Load()) // The internal lookupsInProgress map should no longer contain the key. - require.Equal(t, 0, len(ca.(*cachedAccessor[int, *string]).lookupsInProgress)) + require.Equal(t, 0, len(ca.(*cacheAccessor[int, *string]).lookupsInProgress)) } diff --git a/relay/cache/fifo-cache.go b/relay/cache/fifo-cache.go new file mode 100644 index 0000000000..1c2e7c6abd --- /dev/null +++ b/relay/cache/fifo-cache.go @@ -0,0 +1,73 @@ +package cache + +import ( + "github.com/emirpasic/gods/queues" + "github.com/emirpasic/gods/queues/linkedlistqueue" +) + +var _ Cache[string, string] = &FIFOCache[string, string]{} + +// FIFOCache is a cache that evicts the least recently added item when the cache is full. Useful for situations +// where time of addition is a better predictor of future access than time of most recent access. +type FIFOCache[K comparable, V any] struct { + weightCalculator WeightCalculator[K, V] + + currentWeight uint64 + maxWeight uint64 + data map[K]V + expirationQueue queues.Queue +} + +// NewFIFOCache creates a new FIFOCache. +func NewFIFOCache[K comparable, V any](maxWeight uint64, calculator WeightCalculator[K, V]) *FIFOCache[K, V] { + return &FIFOCache[K, V]{ + maxWeight: maxWeight, + data: make(map[K]V), + weightCalculator: calculator, + expirationQueue: linkedlistqueue.New(), + } +} + +func (f *FIFOCache[K, V]) Get(key K) (V, bool) { + val, ok := f.data[key] + return val, ok +} + +func (f *FIFOCache[K, V]) Put(key K, value V) { + weight := f.weightCalculator(key, value) + if weight > f.maxWeight { + // this item won't fit in the cache no matter what we evict + return + } + + old, ok := f.data[key] + f.currentWeight += weight + f.data[key] = value + if ok { + oldWeight := f.weightCalculator(key, old) + f.currentWeight -= oldWeight + } else { + f.expirationQueue.Enqueue(key) + } + + if f.currentWeight < f.maxWeight { + // no need to evict anything + return + } + + for f.currentWeight > f.maxWeight { + val, _ := f.expirationQueue.Dequeue() + keyToEvict := val.(K) + weightToEvict := f.weightCalculator(keyToEvict, f.data[keyToEvict]) + delete(f.data, keyToEvict) + f.currentWeight -= weightToEvict + } +} + +func (f *FIFOCache[K, V]) Size() int { + return len(f.data) +} + +func (f *FIFOCache[K, V]) Weight() uint64 { + return f.currentWeight +} diff --git a/relay/cache/fifo_cache_test.go b/relay/cache/fifo_cache_test.go new file mode 100644 index 0000000000..da4de5ad1f --- /dev/null +++ b/relay/cache/fifo_cache_test.go @@ -0,0 +1,138 @@ +package cache + +import ( + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" + "testing" +) + +func TestExpirationOrder(t *testing.T) { + tu.InitializeRandom() + + maxWeight := uint64(10 + rand.Intn(10)) + c := NewFIFOCache[int, int](maxWeight, func(key int, value int) uint64 { + return 1 + }) + + require.Equal(t, uint64(0), c.Weight()) + require.Equal(t, 0, c.Size()) + + expectedValues := make(map[int]int) + + // Fill up the cache. Everything should have weight 1. + for i := 1; i <= int(maxWeight); i++ { + + value := rand.Int() + expectedValues[i] = value + + // The value shouldn't be present yet + v, ok := c.Get(i) + require.False(t, ok) + require.Equal(t, 0, v) + + c.Put(i, value) + + require.Equal(t, uint64(i), c.Weight()) + require.Equal(t, i, c.Size()) + } + + // Verify that all expected values are present. + for k, v := range expectedValues { + value, ok := c.Get(k) + require.True(t, ok) + require.Equal(t, v, value) + } + + // Push the old values out of the queue one at a time. + for i := 1; i <= int(maxWeight); i++ { + value := rand.Int() + expectedValues[-i] = value + delete(expectedValues, i) + + // The value shouldn't be present yet + v, ok := c.Get(-i) + require.False(t, ok) + require.Equal(t, 0, v) + + c.Put(-i, value) + + require.Equal(t, maxWeight, c.Weight()) + require.Equal(t, int(maxWeight), c.Size()) + + // verify that the purged value is specifically not present + _, ok = c.Get(i) + require.False(t, ok) + + // verify that only the expected values have been purged. Has the added benefit of randomly + // reading all the values in the cache, which for a FIFO cache should not influence the order + // that we purge values. + for kk, vv := range expectedValues { + value, ok = c.Get(kk) + require.True(t, ok) + require.Equal(t, vv, value) + } + } +} + +func TestWeightedValues(t *testing.T) { + tu.InitializeRandom() + + maxWeight := uint64(100 + rand.Intn(100)) + + // For this test, weight is simply the key. + weightCalculator := func(key int, value int) uint64 { + return uint64(key) + } + + c := NewFIFOCache[int, int](maxWeight, weightCalculator) + + expectedValues := make(map[int]int) + + require.Equal(t, uint64(0), c.Weight()) + require.Equal(t, 0, c.Size()) + + highestUndeletedKey := 0 + expectedWeight := uint64(0) + for nextKey := 0; nextKey <= int(maxWeight); nextKey++ { + value := rand.Int() + c.Put(nextKey, value) + expectedValues[nextKey] = value + expectedWeight += uint64(nextKey) + + // simulate the expected removal + for expectedWeight > maxWeight { + delete(expectedValues, highestUndeletedKey) + expectedWeight -= uint64(highestUndeletedKey) + highestUndeletedKey++ + } + + require.Equal(t, expectedWeight, c.Weight()) + require.Equal(t, len(expectedValues), c.Size()) + + // Update a random existing key. Shouldn't affect the weight or removal order. + for k := range expectedValues { + value = rand.Int() + c.Put(k, value) + expectedValues[k] = value + break + } + + // verify that all expected values are present + for k, v := range expectedValues { + var ok bool + value, ok = c.Get(k) + require.True(t, ok) + require.Equal(t, v, value) + } + } + + // Attempting to insert a value that exceeds the max weight should have no effect. + c.Put(int(maxWeight)+1, rand.Int()) + + for k, v := range expectedValues { + value, ok := c.Get(k) + require.True(t, ok) + require.Equal(t, v, value) + } +} diff --git a/relay/chunk_provider.go b/relay/chunk_provider.go index 48ece7c3cd..5bc2926732 100644 --- a/relay/chunk_provider.go +++ b/relay/chunk_provider.go @@ -20,7 +20,7 @@ type chunkProvider struct { // metadataCache is an LRU cache of blob metadata. Each relay is authorized to serve data assigned to one or more // relay IDs. Blobs that do not belong to one of the relay IDs assigned to this server will not be in the cache. - frameCache cache.CachedAccessor[blobKeyWithMetadata, []*encoding.Frame] + frameCache cache.CacheAccessor[blobKeyWithMetadata, []*encoding.Frame] // chunkReader is used to read chunks from the chunk store. chunkReader chunkstore.ChunkReader @@ -47,7 +47,7 @@ func newChunkProvider( ctx context.Context, logger logging.Logger, chunkReader chunkstore.ChunkReader, - cacheSize int, + cacheSize uint64, maxIOConcurrency int, proofFetchTimeout time.Duration, coefficientFetchTimeout time.Duration) (*chunkProvider, error) { @@ -60,14 +60,16 @@ func newChunkProvider( coefficientFetchTimeout: coefficientFetchTimeout, } - c, err := cache.NewCachedAccessor[blobKeyWithMetadata, []*encoding.Frame]( - cacheSize, + c := cache.NewFIFOCache[blobKeyWithMetadata, []*encoding.Frame](cacheSize, computeFramesCacheWeight) + + cacheAccessor, err := cache.NewCacheAccessor[blobKeyWithMetadata, []*encoding.Frame]( + c, maxIOConcurrency, server.fetchFrames) if err != nil { return nil, err } - server.frameCache = c + server.frameCache = cacheAccessor return server, nil } @@ -75,6 +77,12 @@ func newChunkProvider( // frameMap is a map of blob keys to frames. type frameMap map[v2.BlobKey][]*encoding.Frame +// computeFramesCacheWeight computes the 'weight' of the frames for the cache. The weight of a list of frames +// is equal to the size required to store the data, in bytes. +func computeFramesCacheWeight(key blobKeyWithMetadata, frames []*encoding.Frame) uint64 { + return uint64(len(frames)) * uint64(key.metadata.chunkSizeBytes) +} + // GetFrames retrieves the frames for a blob. func (s *chunkProvider) GetFrames(ctx context.Context, mMap metadataMap) (frameMap, error) { diff --git a/relay/chunk_provider_test.go b/relay/chunk_provider_test.go index 8615ad7d23..06ec215b80 100644 --- a/relay/chunk_provider_test.go +++ b/relay/chunk_provider_test.go @@ -49,7 +49,7 @@ func TestFetchingIndividualBlobs(t *testing.T) { context.Background(), logger, chunkReader, - 10, + 1024*1024*32, 32, 10*time.Second, 10*time.Second) @@ -136,7 +136,7 @@ func TestFetchingBatchedBlobs(t *testing.T) { context.Background(), logger, chunkReader, - 10, + 1024*1024*32, 32, 10*time.Second, 10*time.Second) diff --git a/relay/cmd/config.go b/relay/cmd/config.go index 154c4c2bd2..ff1513d172 100644 --- a/relay/cmd/config.go +++ b/relay/cmd/config.go @@ -60,9 +60,9 @@ func NewConfig(ctx *cli.Context) (Config, error) { MaxGRPCMessageSize: ctx.Int(flags.MaxGRPCMessageSizeFlag.Name), MetadataCacheSize: ctx.Int(flags.MetadataCacheSizeFlag.Name), MetadataMaxConcurrency: ctx.Int(flags.MetadataMaxConcurrencyFlag.Name), - BlobCacheSize: ctx.Int(flags.BlobCacheSizeFlag.Name), + BlobCacheBytes: ctx.Uint64(flags.BlobCacheBytes.Name), BlobMaxConcurrency: ctx.Int(flags.BlobMaxConcurrencyFlag.Name), - ChunkCacheSize: ctx.Int(flags.ChunkCacheSizeFlag.Name), + ChunkCacheSize: ctx.Uint64(flags.ChunkCacheSizeFlag.Name), ChunkMaxConcurrency: ctx.Int(flags.ChunkMaxConcurrencyFlag.Name), RateLimits: limiter.Config{ MaxGetBlobOpsPerSecond: ctx.Float64(flags.MaxGetBlobOpsPerSecondFlag.Name), diff --git a/relay/cmd/flags/flags.go b/relay/cmd/flags/flags.go index baed1fbcf4..0bb24ce2ae 100644 --- a/relay/cmd/flags/flags.go +++ b/relay/cmd/flags/flags.go @@ -60,12 +60,12 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "METADATA_MAX_CONCURRENCY"), Value: 32, } - BlobCacheSizeFlag = cli.IntFlag{ - Name: common.PrefixFlag(FlagPrefix, "blob-cache-size"), - Usage: "Max number of items in the blob cache", + BlobCacheBytes = cli.Uint64Flag{ + Name: common.PrefixFlag(FlagPrefix, "blob-cache-bytes"), + Usage: "The size of the blob cache, in bytes.", Required: false, EnvVar: common.PrefixEnvVar(envVarPrefix, "BLOB_CACHE_SIZE"), - Value: 32, + Value: 1024 * 1024 * 1024, } BlobMaxConcurrencyFlag = cli.IntFlag{ Name: common.PrefixFlag(FlagPrefix, "blob-max-concurrency"), @@ -74,12 +74,12 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "BLOB_MAX_CONCURRENCY"), Value: 32, } - ChunkCacheSizeFlag = cli.IntFlag{ + ChunkCacheSizeFlag = cli.Int64Flag{ Name: common.PrefixFlag(FlagPrefix, "chunk-cache-size"), - Usage: "Max number of items in the chunk cache", + Usage: "Size of the chunk cache, in bytes.", Required: false, EnvVar: common.PrefixEnvVar(envVarPrefix, "CHUNK_CACHE_SIZE"), - Value: 32, + Value: 4 * 1024 * 1024 * 1024, } ChunkMaxConcurrencyFlag = cli.IntFlag{ Name: common.PrefixFlag(FlagPrefix, "chunk-max-concurrency"), @@ -297,7 +297,7 @@ var optionalFlags = []cli.Flag{ MaxGRPCMessageSizeFlag, MetadataCacheSizeFlag, MetadataMaxConcurrencyFlag, - BlobCacheSizeFlag, + BlobCacheBytes, BlobMaxConcurrencyFlag, ChunkCacheSizeFlag, ChunkMaxConcurrencyFlag, diff --git a/relay/metadata_provider.go b/relay/metadata_provider.go index 8f3f43ed86..e1f188bb9e 100644 --- a/relay/metadata_provider.go +++ b/relay/metadata_provider.go @@ -36,7 +36,7 @@ type metadataProvider struct { // metadataCache is an LRU cache of blob metadata. Blobs that do not belong to one of the relay shards // assigned to this server will not be in the cache. - metadataCache cache.CachedAccessor[v2.BlobKey, *blobMetadata] + metadataCache cache.CacheAccessor[v2.BlobKey, *blobMetadata] // relayIDSet is the set of relay IDs assigned to this relay. This relay will refuse to serve metadata for blobs // that are not assigned to one of these IDs. @@ -74,8 +74,13 @@ func newMetadataProvider( } server.blobParamsMap.Store(blobParamsMap) - metadataCache, err := cache.NewCachedAccessor[v2.BlobKey, *blobMetadata]( - metadataCacheSize, + c := cache.NewFIFOCache[v2.BlobKey, *blobMetadata](uint64(metadataCacheSize), + func(key v2.BlobKey, value *blobMetadata) uint64 { + return uint64(1) + }) + + metadataCache, err := cache.NewCacheAccessor[v2.BlobKey, *blobMetadata]( + c, maxIOConcurrency, server.fetchMetadata) if err != nil { diff --git a/relay/server.go b/relay/server.go index 540b46b0b0..eb00709e9f 100644 --- a/relay/server.go +++ b/relay/server.go @@ -78,14 +78,14 @@ type Config struct { // goroutines. MetadataMaxConcurrency int - // BlobCacheSize is the maximum number of items in the blob cache. - BlobCacheSize int + // BlobCacheBytes is the maximum size of the blob cache, in bytes. + BlobCacheBytes uint64 // BlobMaxConcurrency puts a limit on the maximum number of concurrent blob fetches actively running on goroutines. BlobMaxConcurrency int - // ChunkCacheSize is the maximum number of items in the chunk cache. - ChunkCacheSize int + // ChunkCacheSize is the maximum size of the chunk cache, in bytes. + ChunkCacheSize uint64 // ChunkMaxConcurrency is the size of the work pool for fetching chunks. Note that this does not // impact concurrency utilized by the s3 client to upload/download fragmented files. @@ -153,7 +153,7 @@ func NewServer( ctx, logger, blobStore, - config.BlobCacheSize, + config.BlobCacheBytes, config.BlobMaxConcurrency, config.Timeouts.InternalGetBlobTimeout) if err != nil { diff --git a/relay/server_test.go b/relay/server_test.go index 3e16c624c3..58b8893714 100644 --- a/relay/server_test.go +++ b/relay/server_test.go @@ -25,9 +25,9 @@ func defaultConfig() *Config { MaxGRPCMessageSize: 1024 * 1024 * 300, MetadataCacheSize: 1024 * 1024, MetadataMaxConcurrency: 32, - BlobCacheSize: 32, + BlobCacheBytes: 1024 * 1024, BlobMaxConcurrency: 32, - ChunkCacheSize: 32, + ChunkCacheSize: 1024 * 1024, ChunkMaxConcurrency: 32, MaxKeysPerGetChunksRequest: 1024, RateLimits: limiter.Config{