diff --git a/network/p2p/throttler.go b/network/p2p/throttler.go new file mode 100644 index 000000000000..de173a655266 --- /dev/null +++ b/network/p2p/throttler.go @@ -0,0 +1,103 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "sync" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/timer/mockable" +) + +var _ Throttler = (*SlidingWindowThrottler)(nil) + +type Throttler interface { + // Handle returns true if a message from [nodeID] should be handled. + Handle(nodeID ids.NodeID) bool +} + +// NewSlidingWindowThrottler returns a new instance of SlidingWindowThrottler. +// Nodes are throttled if they exceed [limit] messages during an interval of +// time over [period]. +// [period] and [limit] should both be > 0. +func NewSlidingWindowThrottler(period time.Duration, limit int) *SlidingWindowThrottler { + now := time.Now() + return &SlidingWindowThrottler{ + period: period, + limit: float64(limit), + windows: [2]window{ + { + start: now, + hits: make(map[ids.NodeID]float64), + }, + { + start: now.Add(-period), + hits: make(map[ids.NodeID]float64), + }, + }, + } +} + +// window is used internally by SlidingWindowThrottler to represent the amount +// of hits from a node in the evaluation period beginning at [start] +type window struct { + start time.Time + hits map[ids.NodeID]float64 +} + +// SlidingWindowThrottler is an implementation of the sliding window throttling +// algorithm. +type SlidingWindowThrottler struct { + period time.Duration + limit float64 + clock mockable.Clock + + lock sync.Mutex + current int + windows [2]window +} + +// Handle returns true if the amount of calls received in the last [s.period] +// time is less than [s.limit] +// +// This is calculated by adding the current period's count to a weighted count +// of the previous period. +func (s *SlidingWindowThrottler) Handle(nodeID ids.NodeID) bool { + s.lock.Lock() + defer s.lock.Unlock() + + // The current window becomes the previous window if the current evaluation + // period is over + now := s.clock.Time() + sinceUpdate := now.Sub(s.windows[s.current].start) + if sinceUpdate >= 2*s.period { + s.rotate(now.Add(-s.period)) + } + if sinceUpdate >= s.period { + s.rotate(now) + sinceUpdate = 0 + } + + currentHits := s.windows[s.current].hits + current := currentHits[nodeID] + previousFraction := float64(s.period-sinceUpdate) / float64(s.period) + previous := s.windows[1-s.current].hits[nodeID] + estimatedHits := current + previousFraction*previous + if estimatedHits >= s.limit { + // The peer has sent too many requests, drop this request. + return false + } + + currentHits[nodeID]++ + return true +} + +func (s *SlidingWindowThrottler) rotate(t time.Time) { + s.current = 1 - s.current + s.windows[s.current] = window{ + start: t, + hits: make(map[ids.NodeID]float64), + } +} diff --git a/network/p2p/throttler_handler.go b/network/p2p/throttler_handler.go new file mode 100644 index 000000000000..e7b4d8f26082 --- /dev/null +++ b/network/p2p/throttler_handler.go @@ -0,0 +1,39 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ava-labs/avalanchego/ids" +) + +var ( + ErrThrottled = errors.New("throttled") + _ Handler = (*ThrottlerHandler)(nil) +) + +type ThrottlerHandler struct { + Handler + Throttler Throttler +} + +func (t ThrottlerHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) error { + if !t.Throttler.Handle(nodeID) { + return fmt.Errorf("dropping message from %s: %w", nodeID, ErrThrottled) + } + + return t.Handler.AppGossip(ctx, nodeID, gossipBytes) +} + +func (t ThrottlerHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) { + if !t.Throttler.Handle(nodeID) { + return nil, fmt.Errorf("dropping message from %s: %w", nodeID, ErrThrottled) + } + + return t.Handler.AppRequest(ctx, nodeID, deadline, requestBytes) +} diff --git a/network/p2p/throttler_handler_test.go b/network/p2p/throttler_handler_test.go new file mode 100644 index 000000000000..af9c3fda7194 --- /dev/null +++ b/network/p2p/throttler_handler_test.go @@ -0,0 +1,74 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" +) + +func TestThrottlerHandlerAppGossip(t *testing.T) { + tests := []struct { + name string + Throttler Throttler + expectedErr error + }{ + { + name: "throttled", + Throttler: NewSlidingWindowThrottler(time.Second, 1), + }, + { + name: "throttler errors", + Throttler: NewSlidingWindowThrottler(time.Second, 0), + expectedErr: ErrThrottled, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + handler := ThrottlerHandler{ + Handler: NoOpHandler{}, + Throttler: tt.Throttler, + } + err := handler.AppGossip(context.Background(), ids.GenerateTestNodeID(), []byte("foobar")) + require.ErrorIs(err, tt.expectedErr) + }) + } +} + +func TestThrottlerHandlerAppRequest(t *testing.T) { + tests := []struct { + name string + Throttler Throttler + expectedErr error + }{ + { + name: "throttled", + Throttler: NewSlidingWindowThrottler(time.Second, 1), + }, + { + name: "throttler errors", + Throttler: NewSlidingWindowThrottler(time.Second, 0), + expectedErr: ErrThrottled, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + handler := ThrottlerHandler{ + Handler: NoOpHandler{}, + Throttler: tt.Throttler, + } + _, err := handler.AppRequest(context.Background(), ids.GenerateTestNodeID(), time.Time{}, []byte("foobar")) + require.ErrorIs(err, tt.expectedErr) + }) + } +} diff --git a/network/p2p/throttler_test.go b/network/p2p/throttler_test.go new file mode 100644 index 000000000000..c7b0153e671d --- /dev/null +++ b/network/p2p/throttler_test.go @@ -0,0 +1,139 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package p2p + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" +) + +func TestSlidingWindowThrottlerHandle(t *testing.T) { + period := time.Minute + previousWindowStartTime := time.Time{} + currentWindowStartTime := previousWindowStartTime.Add(period) + + nodeID := ids.GenerateTestNodeID() + + type call struct { + time time.Time + throttled bool + } + + tests := []struct { + name string + limit int + calls []call + }{ + { + name: "throttled in current window", + limit: 1, + calls: []call{ + { + time: currentWindowStartTime, + }, + { + time: currentWindowStartTime, + throttled: true, + }, + }, + }, + { + name: "throttled from previous window", + limit: 1, + calls: []call{ + { + time: previousWindowStartTime, + }, + { + time: currentWindowStartTime, + throttled: true, + }, + }, + }, + { + name: "throttled over multiple evaluation periods", + limit: 5, + calls: []call{ + { + time: currentWindowStartTime.Add(30 * time.Second), + }, + { + time: currentWindowStartTime.Add(period).Add(1 * time.Second), + }, + { + time: currentWindowStartTime.Add(period).Add(2 * time.Second), + }, + { + time: currentWindowStartTime.Add(period).Add(3 * time.Second), + }, + { + time: currentWindowStartTime.Add(period).Add(4 * time.Second), + }, + { + time: currentWindowStartTime.Add(period).Add(30 * time.Second), + }, + { + time: currentWindowStartTime.Add(period).Add(30 * time.Second), + throttled: true, + }, + { + time: currentWindowStartTime.Add(5 * period), + }, + }, + }, + { + name: "one hit per period", + limit: 2, + calls: []call{ + { + time: currentWindowStartTime, + }, + { + time: currentWindowStartTime.Add(period).Add(time.Second), + }, + { + time: currentWindowStartTime.Add(2 * period).Add(time.Second), + }, + { + time: currentWindowStartTime.Add(3 * period).Add(time.Second), + }, + { + time: currentWindowStartTime.Add(4 * period).Add(time.Second), + }, + }, + }, + { + // if too much time passes by, a current window might not be a + // valid previous window. + name: "current window needs to be reset", + limit: 1, + calls: []call{ + { + time: currentWindowStartTime, + }, + { + time: currentWindowStartTime.Add(10 * period), + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + throttler := NewSlidingWindowThrottler(period, tt.limit) + throttler.windows[throttler.current].start = currentWindowStartTime + throttler.windows[1-throttler.current].start = previousWindowStartTime + + for _, call := range tt.calls { + throttler.clock.Set(call.time) + require.Equal(call.throttled, !throttler.Handle(nodeID)) + } + }) + } +} diff --git a/x/merkledb/cache.go b/x/merkledb/cache.go index 9d1d5f7a798c..57d674ed63ef 100644 --- a/x/merkledb/cache.go +++ b/x/merkledb/cache.go @@ -4,40 +4,40 @@ package merkledb import ( + "errors" "sync" "github.com/ava-labs/avalanchego/utils/linkedhashmap" "github.com/ava-labs/avalanchego/utils/wrappers" ) +var errEmptyCacheTooLarge = errors.New("cache is empty yet still too large") + // A cache that calls [onEviction] on the evicted element. type onEvictCache[K comparable, V any] struct { - lock sync.RWMutex - maxSize int - fifo linkedhashmap.LinkedHashmap[K, V] + lock sync.RWMutex + maxSize int + currentSize int + fifo linkedhashmap.LinkedHashmap[K, V] + size func(K, V) int // Must not call any method that grabs [c.lock] // because this would cause a deadlock. onEviction func(K, V) error } -func newOnEvictCache[K comparable, V any](maxSize int, onEviction func(K, V) error) onEvictCache[K, V] { +func newOnEvictCache[K comparable, V any]( + maxSize int, + size func(K, V) int, + onEviction func(K, V) error, +) onEvictCache[K, V] { return onEvictCache[K, V]{ maxSize: maxSize, fifo: linkedhashmap.New[K, V](), + size: size, onEviction: onEviction, } } -// removeOldest returns and removes the oldest element from this cache. -// Assumes [c.lock] is held. -func (c *onEvictCache[K, V]) removeOldest() (K, V, bool) { - k, v, exists := c.fifo.Oldest() - if exists { - c.fifo.Delete(k) - } - return k, v, exists -} - // Get an element from this cache. func (c *onEvictCache[K, V]) Get(key K) (V, bool) { c.lock.RLock() @@ -53,14 +53,14 @@ func (c *onEvictCache[K, V]) Put(key K, value V) error { c.lock.Lock() defer c.lock.Unlock() + if oldValue, replaced := c.fifo.Get(key); replaced { + c.currentSize -= c.size(key, oldValue) + } + + c.currentSize += c.size(key, value) c.fifo.Put(key, value) // Mark as MRU - if c.fifo.Len() > c.maxSize { - oldestKey, oldestVal, _ := c.fifo.Oldest() - c.fifo.Delete(oldestKey) - return c.onEviction(oldestKey, oldestVal) - } - return nil + return c.resize(c.maxSize) } // Flush removes all elements from the cache. @@ -74,16 +74,37 @@ func (c *onEvictCache[K, V]) Flush() error { c.lock.Unlock() }() + return c.resize(0) +} + +// removeOldest returns and removes the oldest element from this cache. +// +// Assumes [c.lock] is held. +func (c *onEvictCache[K, V]) removeOldest() (K, V, bool) { + k, v, exists := c.fifo.Oldest() + if exists { + c.currentSize -= c.size(k, v) + c.fifo.Delete(k) + } + return k, v, exists +} + +// resize removes the oldest elements from the cache until the cache is not +// larger than the provided target. +// +// Assumes [c.lock] is held. +func (c *onEvictCache[K, V]) resize(target int) error { // Note that we can't use [c.fifo]'s iterator because [c.onEviction] // modifies [c.fifo], which violates the iterator's invariant. var errs wrappers.Errs - for { - key, value, exists := c.removeOldest() + for c.currentSize > target { + k, v, exists := c.removeOldest() if !exists { - // The cache is empty. - return errs.Err + // This should really never happen unless the size of an entry + // changed or the target size is negative. + return errEmptyCacheTooLarge } - - errs.Add(c.onEviction(key, value)) + errs.Add(c.onEviction(k, v)) } + return errs.Err } diff --git a/x/merkledb/cache_test.go b/x/merkledb/cache_test.go index 782db6615048..e0939df9451d 100644 --- a/x/merkledb/cache_test.go +++ b/x/merkledb/cache_test.go @@ -16,13 +16,16 @@ func TestNewOnEvictCache(t *testing.T) { require := require.New(t) called := false + size := func(int, int) int { + return 1 + } onEviction := func(int, int) error { called = true return nil } maxSize := 10 - cache := newOnEvictCache[int](maxSize, onEviction) + cache := newOnEvictCache(maxSize, size, onEviction) require.Equal(maxSize, cache.maxSize) require.NotNil(cache.fifo) require.Zero(cache.fifo.Len()) @@ -40,6 +43,9 @@ func TestOnEvictCacheNoOnEvictionError(t *testing.T) { evictedKey := []int{} evictedValue := []int{} + size := func(int, int) int { + return 1 + } onEviction := func(k, n int) error { evictedKey = append(evictedKey, k) evictedValue = append(evictedValue, n) @@ -47,7 +53,7 @@ func TestOnEvictCacheNoOnEvictionError(t *testing.T) { } maxSize := 3 - cache := newOnEvictCache[int](maxSize, onEviction) + cache := newOnEvictCache(maxSize, size, onEviction) // Get non-existent key _, ok := cache.Get(0) @@ -162,8 +168,11 @@ func TestOnEvictCacheNoOnEvictionError(t *testing.T) { // Note this test assumes the cache is FIFO. func TestOnEvictCacheOnEvictionError(t *testing.T) { var ( - require = require.New(t) - evicted = []int{} + require = require.New(t) + evicted = []int{} + size = func(int, int) int { + return 1 + } onEviction = func(_, n int) error { // Evicting even keys errors evicted = append(evicted, n) @@ -175,7 +184,7 @@ func TestOnEvictCacheOnEvictionError(t *testing.T) { maxSize = 2 ) - cache := newOnEvictCache[int](maxSize, onEviction) + cache := newOnEvictCache(maxSize, size, onEviction) // Fill the cache for i := 0; i < maxSize; i++ { diff --git a/x/merkledb/db.go b/x/merkledb/db.go index 3f5d4873843b..b4c172aa3653 100644 --- a/x/merkledb/db.go +++ b/x/merkledb/db.go @@ -31,8 +31,7 @@ import ( ) const ( - DefaultEvictionBatchSize = 100 - RootPath = EmptyPath + RootPath = EmptyPath // TODO: name better rebuildViewSizeFractionOfCacheSize = 50 @@ -129,13 +128,15 @@ type Config struct { // If 0 is specified, [runtime.NumCPU] will be used. If -1 is specified, // no limit will be used. RootGenConcurrency int - // The number of nodes that are evicted from the cache and written to - // disk at a time. + // The number of bytes to write to disk when intermediate nodes are evicted + // from their cache and written to disk. EvictionBatchSize int // The number of changes to the database that we store in memory in order to // serve change proofs. - HistoryLength int - ValueNodeCacheSize int + HistoryLength int + // The number of bytes to cache nodes with values. + ValueNodeCacheSize int + // The number of bytes to cache nodes without values. IntermediateNodeCacheSize int // If [Reg] is nil, metrics are collected locally but not exported through // Prometheus. @@ -1209,3 +1210,10 @@ func addPrefixToKey(bufferPool *sync.Pool, prefix []byte, key []byte) []byte { copy(prefixedKey[prefixLen:], key) return prefixedKey } + +func cacheEntrySize(p path, n *node) int { + if n == nil { + return len(p) + } + return len(p) + len(n.marshal()) +} diff --git a/x/merkledb/db_test.go b/x/merkledb/db_test.go index 91768a7d3b25..c69b2cf8503e 100644 --- a/x/merkledb/db_test.go +++ b/x/merkledb/db_test.go @@ -38,8 +38,8 @@ func newDefaultConfig() Config { return Config{ EvictionBatchSize: 10, HistoryLength: defaultHistoryLength, - ValueNodeCacheSize: 1_000, - IntermediateNodeCacheSize: 1_000, + ValueNodeCacheSize: units.MiB, + IntermediateNodeCacheSize: units.MiB, Reg: prometheus.NewRegistry(), Tracer: trace.Noop, } diff --git a/x/merkledb/intermediate_node_db.go b/x/merkledb/intermediate_node_db.go index 698c057890d3..a303250fba5a 100644 --- a/x/merkledb/intermediate_node_db.go +++ b/x/merkledb/intermediate_node_db.go @@ -45,7 +45,11 @@ func newIntermediateNodeDB( bufferPool: bufferPool, evictionBatchSize: evictionBatchSize, } - result.nodeCache = newOnEvictCache[path](size, result.onEviction) + result.nodeCache = newOnEvictCache( + size, + cacheEntrySize, + result.onEviction, + ) return result } @@ -62,8 +66,7 @@ func (db *intermediateNodeDB) onEviction(key path, n *node) error { // and write them to disk. We write a batch of them, rather than // just [n], so that we don't immediately evict and write another // node, because each time this method is called we do a disk write. - // we have already removed the passed n, so the remove count starts at 1 - for removedCount := 1; removedCount < db.evictionBatchSize; removedCount++ { + for writeBatch.Size() < db.evictionBatchSize { key, n, exists := db.nodeCache.removeOldest() if !exists { // The cache is empty. diff --git a/x/merkledb/intermediate_node_db_test.go b/x/merkledb/intermediate_node_db_test.go index 76e356a297de..63a7e0a67e94 100644 --- a/x/merkledb/intermediate_node_db_test.go +++ b/x/merkledb/intermediate_node_db_test.go @@ -23,7 +23,7 @@ import ( func TestIntermediateNodeDB(t *testing.T) { require := require.New(t) - cacheSize := 10 + cacheSize := 100 evictionBatchSize := cacheSize baseDB := memdb.New() db := newIntermediateNodeDB( @@ -70,22 +70,33 @@ func TestIntermediateNodeDB(t *testing.T) { // Assert the key-node pair was deleted require.Equal(database.ErrNotFound, err) - // Put [cacheSize] elements in the cache - for i := byte(0); i < byte(cacheSize); i++ { - key := newPath([]byte{i}) + // Put elements in the cache until it is full. + expectedSize := cacheEntrySize(key, nil) + added := 0 + for { + key := newPath([]byte{byte(added)}) node := &node{ dbNode: dbNode{ - value: maybe.Some([]byte{i}), + value: maybe.Some([]byte{byte(added)}), }, } + newExpectedSize := expectedSize + cacheEntrySize(key, node) + if newExpectedSize > cacheSize { + // Don't trigger eviction. + break + } + require.NoError(db.Put(key, node)) + expectedSize = newExpectedSize + added++ } // Assert cache has expected number of elements - require.Equal(cacheSize, db.nodeCache.fifo.Len()) + require.Equal(added, db.nodeCache.fifo.Len()) // Put one more element in the cache, which should trigger an eviction - // of all but 1 element + // of all but 2 elements. 2 elements remain rather than 1 element because of + // the added key prefix increasing the size tracked by the batch. key = newPath([]byte{byte(cacheSize)}) node := &node{ dbNode: dbNode{ @@ -95,11 +106,10 @@ func TestIntermediateNodeDB(t *testing.T) { require.NoError(db.Put(key, node)) // Assert cache has expected number of elements - require.Equal(1, db.nodeCache.fifo.Len()) - gotKey, gotNode, ok := db.nodeCache.fifo.Oldest() + require.Equal(2, db.nodeCache.fifo.Len()) + gotKey, _, ok := db.nodeCache.fifo.Oldest() require.True(ok) - require.Equal(key, gotKey) - require.Equal(node, gotNode) + require.Equal(newPath([]byte{byte(added - 1)}), gotKey) // Get a node from the base database (not cache) nodeRead, err := db.Get(newPath([]byte{0x03})) @@ -112,7 +122,7 @@ func TestIntermediateNodeDB(t *testing.T) { // Assert the cache is empty require.Zero(db.nodeCache.fifo.Len()) - // Assert all [cacheSize]+1 elements evicted were written to disk with prefix. + // Assert the evicted cache elements were written to disk with prefix. it := baseDB.NewIteratorWithPrefix(intermediateNodePrefix) defer it.Release() @@ -121,5 +131,5 @@ func TestIntermediateNodeDB(t *testing.T) { count++ } require.NoError(it.Error()) - require.Equal(cacheSize+1, count) + require.Equal(added+1, count) } diff --git a/x/merkledb/proof.go b/x/merkledb/proof.go index a86b5762fdfc..d4348ebd0262 100644 --- a/x/merkledb/proof.go +++ b/x/merkledb/proof.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "math" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/memdb" @@ -19,7 +20,10 @@ import ( pb "github.com/ava-labs/avalanchego/proto/pb/sync" ) -const verificationCacheSize = 2_000 +const ( + verificationEvictionBatchSize = 0 + verificationCacheSize = math.MaxInt +) var ( ErrInvalidProof = errors.New("proof obtained an invalid root ID") @@ -870,7 +874,7 @@ func getStandaloneTrieView(ctx context.Context, ops []database.BatchOp) (*trieVi ctx, memdb.New(), Config{ - EvictionBatchSize: DefaultEvictionBatchSize, + EvictionBatchSize: verificationEvictionBatchSize, Tracer: trace.Noop, ValueNodeCacheSize: verificationCacheSize, IntermediateNodeCacheSize: verificationCacheSize, diff --git a/x/merkledb/value_node_db.go b/x/merkledb/value_node_db.go index adfe8c12b252..9082cf44b7a8 100644 --- a/x/merkledb/value_node_db.go +++ b/x/merkledb/value_node_db.go @@ -24,7 +24,7 @@ type valueNodeDB struct { // If a value is nil, the corresponding key isn't in the trie. // Paths in [nodeCache] aren't prefixed with [valueNodePrefix]. - nodeCache cache.LRU[path, *node] + nodeCache cache.Cacher[path, *node] metrics merkleMetrics closed utils.Atomic[bool] @@ -35,7 +35,7 @@ func newValueNodeDB(db database.Database, bufferPool *sync.Pool, metrics merkleM metrics: metrics, baseDB: db, bufferPool: bufferPool, - nodeCache: cache.LRU[path, *node]{Size: size}, + nodeCache: cache.NewSizedLRU(size, cacheEntrySize), } }