From 44ffafa07ff2e7ecce990f9e63897756f19202cc Mon Sep 17 00:00:00 2001 From: Yiling-J Date: Thu, 22 Aug 2024 09:47:51 +0800 Subject: [PATCH] add stats API (#45) * add stats API * update Readme --- README.md | 9 ++++++--- cache.go | 11 +++++++++++ internal/counter.go | 35 ++++++++++++++------------------- internal/stats.go | 29 ++++++++++++++++++++++++++++ internal/store.go | 10 +++++++--- internal/store_test.go | 4 ++-- internal/tlfu.go | 43 ++++++++++++++++++++++++----------------- stats_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 139 insertions(+), 46 deletions(-) create mode 100644 internal/stats.go create mode 100644 stats_test.go diff --git a/README.md b/README.md index 429add0..c5375c4 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,9 @@ client.Range(func(key, value int) bool { // returns an estimation of the cache size usage client.EstimatedSize() +// get cache stats(in-memory cache only), include hits, misses and hit ratio +client.Stats() + // close client, set hashmaps in shard to nil and close all goroutines client.Close() @@ -190,7 +193,7 @@ Theine will save checksum when persisting cache and verify checksum first when l ## Benchmarks Source: https://github.com/maypok86/benchmarks - + ### throughput ``` @@ -256,7 +259,7 @@ Items start their lifetime on DRAM. As an item becomes cold it gets evicted from Same as CacheLib, Theine hybrid cache also has **BigHash** and **Block Cache**, it's highly recommended to read the CacheLib architecture design before using hybrid cache, here is a simple introduction of these 2 engines(just copy from CacheLib): -- **BigHash** is effectively a giant fixed-bucket hash map on the device. To read or write, the entire bucket is read (in case of write, updated and written back). Bloom filter used to reduce number of IO. When bucket is full, items evicted in FIFO manner. You don't pay any RAM price here (except Bloom filter, which is 2GB for 1TB BigHash, tunable). +- **BigHash** is effectively a giant fixed-bucket hash map on the device. To read or write, the entire bucket is read (in case of write, updated and written back). Bloom filter used to reduce number of IO. When bucket is full, items evicted in FIFO manner. You don't pay any RAM price here (except Bloom filter, which is 2GB for 1TB BigHash, tunable). - **Block Cache**, on the other hand, divides device into equally sized regions (16MB, tunable) and fills a region with items of same size class, or, in case of log-mode fills regions sequentially with items of different size. Sometimes we call log-mode “stack alloc”. BC stores compact index in memory: key hash to offset. We do not store full key in memory and if collision happens (super rare), old item will look like evicted. In your calculations, use 12 bytes overhead per item to estimate RAM usage. For example, if your average item size is 4KB and cache size is 500GB you'll need around 1.4GB of memory. #### Using Hybrid Cache @@ -320,7 +323,7 @@ After you call `Hybrid(...)` in a cache builder. Theine will convert current bui * `Workers` defalut 2 Theine evicts entries in a separate policy goroutinue, but insert to NVM can be done parallel. To make this work, Theine send evicted entries to workers, and worker will sync data to NVM cache. This setting controls how many workers are used to sync data. - + * `AdmProbability` defalut 1 This is an admission policy for endurance and performance reason. When entries are evicted from DRAM cache, this policy will be used to control the insertion percentage. A value of 1 means that all entries evicted from DRAM will be inserted into NVM. Values should be in the range of [0, 1]. diff --git a/cache.go b/cache.go index 4490eab..6e8bcc9 100644 --- a/cache.go +++ b/cache.go @@ -16,6 +16,7 @@ var VersionMismatch = internal.VersionMismatch type RemoveReason = internal.RemoveReason type DataBlock = internal.DataBlock[any] +type Stats = internal.Stats type Loaded[V any] struct { Value V @@ -94,6 +95,11 @@ func (c *Cache[K, V]) LoadCache(version uint64, reader io.Reader) error { return c.store.Recover(version, reader) } +// Get cache stats. +func (c *Cache[K, V]) Stats() Stats { + return c.store.Stats() +} + type LoadingCache[K comparable, V any] struct { store *internal.LoadingStore[K, V] } @@ -146,6 +152,11 @@ func (c *LoadingCache[K, V]) LoadCache(version uint64, reader io.Reader) error { return c.store.Recover(version, reader) } +// Get cache stats. +func (c *LoadingCache[K, V]) Stats() Stats { + return c.store.Stats() +} + // Close closes all goroutines created by cache. func (c *LoadingCache[K, V]) Close() { c.store.Close() diff --git a/internal/counter.go b/internal/counter.go index 16ef2db..a8b28e3 100644 --- a/internal/counter.go +++ b/internal/counter.go @@ -37,27 +37,27 @@ type ptoken struct { pad [xruntime.CacheLineSize - 4]byte } -// A Counter is a striped int64 counter. +// A UnsignedCounter is a unsigned striped int64 counter. // // Should be preferred over a single atomically updated int64 // counter in high contention scenarios. // // A Counter must not be copied after first use. -type Counter struct { +type UnsignedCounter struct { stripes []cstripe mask uint32 } type cstripe struct { - c int64 + c uint64 //lint:ignore U1000 prevents false sharing pad [xruntime.CacheLineSize - 8]byte } -// NewCounter creates a new Counter instance. -func NewCounter() *Counter { +// UnsignedCounter creates a new UnsignedCounter instance. +func NewUnsignedCounter() *UnsignedCounter { nstripes := RoundUpPowerOf2(xruntime.Parallelism()) - c := Counter{ + c := UnsignedCounter{ stripes: make([]cstripe, nstripes), mask: nstripes - 1, } @@ -65,17 +65,12 @@ func NewCounter() *Counter { } // Inc increments the counter by 1. -func (c *Counter) Inc() { +func (c *UnsignedCounter) Inc() { c.Add(1) } -// Dec decrements the counter by 1. -func (c *Counter) Dec() { - c.Add(-1) -} - // Add adds the delta to the counter. -func (c *Counter) Add(delta int64) { +func (c *UnsignedCounter) Add(delta uint64) { t, ok := ptokenPool.Get().(*ptoken) if !ok { t = new(ptoken) @@ -83,8 +78,8 @@ func (c *Counter) Add(delta int64) { } for { stripe := &c.stripes[t.idx&c.mask] - cnt := atomic.LoadInt64(&stripe.c) - if atomic.CompareAndSwapInt64(&stripe.c, cnt, cnt+delta) { + cnt := atomic.LoadUint64(&stripe.c) + if atomic.CompareAndSwapUint64(&stripe.c, cnt, cnt+delta) { break } // Give a try with another randomly selected stripe. @@ -96,11 +91,11 @@ func (c *Counter) Add(delta int64) { // Value returns the current counter value. // The returned value may not include all of the latest operations in // presence of concurrent modifications of the counter. -func (c *Counter) Value() int64 { - v := int64(0) +func (c *UnsignedCounter) Value() uint64 { + v := uint64(0) for i := 0; i < len(c.stripes); i++ { stripe := &c.stripes[i] - v += atomic.LoadInt64(&stripe.c) + v += atomic.LoadUint64(&stripe.c) } return v } @@ -108,9 +103,9 @@ func (c *Counter) Value() int64 { // Reset resets the counter to zero. // This method should only be used when it is known that there are // no concurrent modifications of the counter. -func (c *Counter) Reset() { +func (c *UnsignedCounter) Reset() { for i := 0; i < len(c.stripes); i++ { stripe := &c.stripes[i] - atomic.StoreInt64(&stripe.c, 0) + atomic.StoreUint64(&stripe.c, 0) } } diff --git a/internal/stats.go b/internal/stats.go new file mode 100644 index 0000000..aab63d8 --- /dev/null +++ b/internal/stats.go @@ -0,0 +1,29 @@ +package internal + +type Stats struct { + hits uint64 + misses uint64 +} + +func newStats(hits uint64, misses uint64) Stats { + return Stats{ + hits: hits, + misses: misses, + } +} + +func (s Stats) Hits() uint64 { + return s.hits +} + +func (s Stats) Misses() uint64 { + return s.misses +} + +func (s Stats) HitRatio() float64 { + total := s.hits + s.misses + if total == 0 { + return 0.0 + } + return float64(s.hits) / float64(total) +} diff --git a/internal/store.go b/internal/store.go index 38cd012..0633173 100644 --- a/internal/store.go +++ b/internal/store.go @@ -224,13 +224,13 @@ func (s *Store[K, V]) getFromShard(key K, hash uint64, shard *Shard[K, V]) (V, b expire := entry.expire.Load() if expire != 0 && expire <= s.timerwheel.clock.NowNano() { ok = false - s.policy.miss.Add(1) + s.policy.misses.Add(1) } else { - s.policy.hit.Add(1) + s.policy.hits.Add(1) value = entry.value } } else { - s.policy.miss.Add(1) + s.policy.misses.Add(1) } shard.mu.RUnlock(tk) @@ -648,6 +648,10 @@ func (s *Store[K, V]) Range(f func(key K, value V) bool) { } } +func (s *Store[K, V]) Stats() Stats { + return newStats(s.policy.hits.Value(), s.policy.misses.Value()) +} + func (s *Store[K, V]) Close() { for _, s := range s.shards { tk := s.mu.RLock() diff --git a/internal/store_test.go b/internal/store_test.go index ae1fdfb..39fd70e 100644 --- a/internal/store_test.go +++ b/internal/store_test.go @@ -129,6 +129,6 @@ func TestPolicyCounter(t *testing.T) { store.Get(10000) } - require.Equal(t, int64(1600), store.policy.hit.Value()) - require.Equal(t, int64(1600), store.policy.miss.Value()) + require.Equal(t, uint64(1600), store.policy.hits.Value()) + require.Equal(t, uint64(1600), store.policy.misses.Value()) } diff --git a/internal/tlfu.go b/internal/tlfu.go index 93e1768..17af025 100644 --- a/internal/tlfu.go +++ b/internal/tlfu.go @@ -5,17 +5,19 @@ import ( ) type TinyLfu[K comparable, V any] struct { - slru *Slru[K, V] - sketch *CountMinSketch - hasher *Hasher[K] - size uint - counter uint - miss *Counter - hit *Counter - hr float32 - threshold atomic.Int32 - lruFactor uint8 - step int8 + slru *Slru[K, V] + sketch *CountMinSketch + hasher *Hasher[K] + size uint + counter uint + misses *UnsignedCounter + hits *UnsignedCounter + hitsPrev uint64 + missesPrev uint64 + hr float32 + threshold atomic.Int32 + lruFactor uint8 + step int8 } func NewTinyLfu[K comparable, V any](size uint, hasher *Hasher[K]) *TinyLfu[K, V] { @@ -25,8 +27,8 @@ func NewTinyLfu[K comparable, V any](size uint, hasher *Hasher[K]) *TinyLfu[K, V sketch: NewCountMinSketch(), step: 1, hasher: hasher, - miss: NewCounter(), - hit: NewCounter(), + misses: NewUnsignedCounter(), + hits: NewUnsignedCounter(), } // default threshold to -1 so all entries are admitted until cache is full tlfu.threshold.Store(-1) @@ -34,9 +36,16 @@ func NewTinyLfu[K comparable, V any](size uint, hasher *Hasher[K]) *TinyLfu[K, V } func (t *TinyLfu[K, V]) climb() { - miss := t.miss.Value() - hit := t.hit.Value() - current := float32(hit) / float32(hit+miss) + hits := t.hits.Value() + misses := t.misses.Value() + + hitsInc := hits - t.hitsPrev + missesInc := misses - t.missesPrev + + t.hitsPrev = hits + t.missesPrev = misses + + current := float32(hitsInc) / float32(hitsInc+missesInc) delta := current - t.hr var diff int8 if delta > 0.0 { @@ -76,8 +85,6 @@ func (t *TinyLfu[K, V]) climb() { } t.threshold.Add(-int32(diff)) t.hr = current - t.hit.Reset() - t.miss.Reset() } func (t *TinyLfu[K, V]) Set(entry *Entry[K, V]) *Entry[K, V] { diff --git a/stats_test.go b/stats_test.go new file mode 100644 index 0000000..64bc8a1 --- /dev/null +++ b/stats_test.go @@ -0,0 +1,44 @@ +package theine_test + +import ( + "testing" + + "github.com/Yiling-J/theine-go" + "github.com/stretchr/testify/require" +) + +func TestStats(t *testing.T) { + client, err := theine.NewBuilder[int, int](1000).Build() + require.Nil(t, err) + st := client.Stats() + require.Equal(t, uint64(0), st.Hits()) + require.Equal(t, uint64(0), st.Misses()) + require.Equal(t, float64(0), st.HitRatio()) + + client.Set(1, 1, 1) + for i := 0; i < 2000; i++ { + _, ok := client.Get(1) + require.True(t, ok) + } + + st = client.Stats() + require.Equal(t, uint64(2000), st.Hits()) + require.Equal(t, uint64(0), st.Misses()) + require.Equal(t, float64(1), st.HitRatio()) + + for i := 0; i < 10000; i++ { + _, ok := client.Get(1) + require.True(t, ok) + } + + for i := 0; i < 10000; i++ { + _, ok := client.Get(2) + require.False(t, ok) + } + + st = client.Stats() + require.Equal(t, uint64(12000), st.Hits()) + require.Equal(t, uint64(10000), st.Misses()) + require.Equal(t, float64(12000)/float64(12000+10000), st.HitRatio()) + +}