Skip to content

Commit

Permalink
add stats API (#45)
Browse files Browse the repository at this point in the history
* add stats API

* update Readme
  • Loading branch information
Yiling-J authored Aug 22, 2024
1 parent bcfb834 commit 44ffafa
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 46 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ client.Range(func(key, value int) bool {
// returns an estimation of the cache size usage
client.EstimatedSize()

// get cache stats(in-memory cache only), include hits, misses and hit ratio
client.Stats()

// close client, set hashmaps in shard to nil and close all goroutines
client.Close()

Expand Down Expand Up @@ -190,7 +193,7 @@ Theine will save checksum when persisting cache and verify checksum first when l
## Benchmarks

Source: https://github.com/maypok86/benchmarks

### throughput

```
Expand Down Expand Up @@ -256,7 +259,7 @@ Items start their lifetime on DRAM. As an item becomes cold it gets evicted from

Same as CacheLib, Theine hybrid cache also has **BigHash** and **Block Cache**, it's highly recommended to read the CacheLib architecture design before using hybrid cache, here is a simple introduction of these 2 engines(just copy from CacheLib):

- **BigHash** is effectively a giant fixed-bucket hash map on the device. To read or write, the entire bucket is read (in case of write, updated and written back). Bloom filter used to reduce number of IO. When bucket is full, items evicted in FIFO manner. You don't pay any RAM price here (except Bloom filter, which is 2GB for 1TB BigHash, tunable).
- **BigHash** is effectively a giant fixed-bucket hash map on the device. To read or write, the entire bucket is read (in case of write, updated and written back). Bloom filter used to reduce number of IO. When bucket is full, items evicted in FIFO manner. You don't pay any RAM price here (except Bloom filter, which is 2GB for 1TB BigHash, tunable).
- **Block Cache**, on the other hand, divides device into equally sized regions (16MB, tunable) and fills a region with items of same size class, or, in case of log-mode fills regions sequentially with items of different size. Sometimes we call log-mode “stack alloc”. BC stores compact index in memory: key hash to offset. We do not store full key in memory and if collision happens (super rare), old item will look like evicted. In your calculations, use 12 bytes overhead per item to estimate RAM usage. For example, if your average item size is 4KB and cache size is 500GB you'll need around 1.4GB of memory.

#### Using Hybrid Cache
Expand Down Expand Up @@ -320,7 +323,7 @@ After you call `Hybrid(...)` in a cache builder. Theine will convert current bui
* `Workers` defalut 2

Theine evicts entries in a separate policy goroutinue, but insert to NVM can be done parallel. To make this work, Theine send evicted entries to workers, and worker will sync data to NVM cache. This setting controls how many workers are used to sync data.

* `AdmProbability` defalut 1

This is an admission policy for endurance and performance reason. When entries are evicted from DRAM cache, this policy will be used to control the insertion percentage. A value of 1 means that all entries evicted from DRAM will be inserted into NVM. Values should be in the range of [0, 1].
Expand Down
11 changes: 11 additions & 0 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var VersionMismatch = internal.VersionMismatch

type RemoveReason = internal.RemoveReason
type DataBlock = internal.DataBlock[any]
type Stats = internal.Stats

type Loaded[V any] struct {
Value V
Expand Down Expand Up @@ -94,6 +95,11 @@ func (c *Cache[K, V]) LoadCache(version uint64, reader io.Reader) error {
return c.store.Recover(version, reader)
}

// Get cache stats.
func (c *Cache[K, V]) Stats() Stats {
return c.store.Stats()
}

type LoadingCache[K comparable, V any] struct {
store *internal.LoadingStore[K, V]
}
Expand Down Expand Up @@ -146,6 +152,11 @@ func (c *LoadingCache[K, V]) LoadCache(version uint64, reader io.Reader) error {
return c.store.Recover(version, reader)
}

// Get cache stats.
func (c *LoadingCache[K, V]) Stats() Stats {
return c.store.Stats()
}

// Close closes all goroutines created by cache.
func (c *LoadingCache[K, V]) Close() {
c.store.Close()
Expand Down
35 changes: 15 additions & 20 deletions internal/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,54 +37,49 @@ type ptoken struct {
pad [xruntime.CacheLineSize - 4]byte
}

// A Counter is a striped int64 counter.
// A UnsignedCounter is a unsigned striped int64 counter.
//
// Should be preferred over a single atomically updated int64
// counter in high contention scenarios.
//
// A Counter must not be copied after first use.
type Counter struct {
type UnsignedCounter struct {
stripes []cstripe
mask uint32
}

type cstripe struct {
c int64
c uint64
//lint:ignore U1000 prevents false sharing
pad [xruntime.CacheLineSize - 8]byte
}

// NewCounter creates a new Counter instance.
func NewCounter() *Counter {
// UnsignedCounter creates a new UnsignedCounter instance.
func NewUnsignedCounter() *UnsignedCounter {
nstripes := RoundUpPowerOf2(xruntime.Parallelism())
c := Counter{
c := UnsignedCounter{
stripes: make([]cstripe, nstripes),
mask: nstripes - 1,
}
return &c
}

// Inc increments the counter by 1.
func (c *Counter) Inc() {
func (c *UnsignedCounter) Inc() {
c.Add(1)
}

// Dec decrements the counter by 1.
func (c *Counter) Dec() {
c.Add(-1)
}

// Add adds the delta to the counter.
func (c *Counter) Add(delta int64) {
func (c *UnsignedCounter) Add(delta uint64) {
t, ok := ptokenPool.Get().(*ptoken)
if !ok {
t = new(ptoken)
t.idx = xruntime.Fastrand()
}
for {
stripe := &c.stripes[t.idx&c.mask]
cnt := atomic.LoadInt64(&stripe.c)
if atomic.CompareAndSwapInt64(&stripe.c, cnt, cnt+delta) {
cnt := atomic.LoadUint64(&stripe.c)
if atomic.CompareAndSwapUint64(&stripe.c, cnt, cnt+delta) {
break
}
// Give a try with another randomly selected stripe.
Expand All @@ -96,21 +91,21 @@ func (c *Counter) Add(delta int64) {
// Value returns the current counter value.
// The returned value may not include all of the latest operations in
// presence of concurrent modifications of the counter.
func (c *Counter) Value() int64 {
v := int64(0)
func (c *UnsignedCounter) Value() uint64 {
v := uint64(0)
for i := 0; i < len(c.stripes); i++ {
stripe := &c.stripes[i]
v += atomic.LoadInt64(&stripe.c)
v += atomic.LoadUint64(&stripe.c)
}
return v
}

// Reset resets the counter to zero.
// This method should only be used when it is known that there are
// no concurrent modifications of the counter.
func (c *Counter) Reset() {
func (c *UnsignedCounter) Reset() {
for i := 0; i < len(c.stripes); i++ {
stripe := &c.stripes[i]
atomic.StoreInt64(&stripe.c, 0)
atomic.StoreUint64(&stripe.c, 0)
}
}
29 changes: 29 additions & 0 deletions internal/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package internal

type Stats struct {
hits uint64
misses uint64
}

func newStats(hits uint64, misses uint64) Stats {
return Stats{
hits: hits,
misses: misses,
}
}

func (s Stats) Hits() uint64 {
return s.hits
}

func (s Stats) Misses() uint64 {
return s.misses
}

func (s Stats) HitRatio() float64 {
total := s.hits + s.misses
if total == 0 {
return 0.0
}
return float64(s.hits) / float64(total)
}
10 changes: 7 additions & 3 deletions internal/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,13 @@ func (s *Store[K, V]) getFromShard(key K, hash uint64, shard *Shard[K, V]) (V, b
expire := entry.expire.Load()
if expire != 0 && expire <= s.timerwheel.clock.NowNano() {
ok = false
s.policy.miss.Add(1)
s.policy.misses.Add(1)
} else {
s.policy.hit.Add(1)
s.policy.hits.Add(1)
value = entry.value
}
} else {
s.policy.miss.Add(1)
s.policy.misses.Add(1)
}
shard.mu.RUnlock(tk)

Expand Down Expand Up @@ -648,6 +648,10 @@ func (s *Store[K, V]) Range(f func(key K, value V) bool) {
}
}

func (s *Store[K, V]) Stats() Stats {
return newStats(s.policy.hits.Value(), s.policy.misses.Value())
}

func (s *Store[K, V]) Close() {
for _, s := range s.shards {
tk := s.mu.RLock()
Expand Down
4 changes: 2 additions & 2 deletions internal/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,6 @@ func TestPolicyCounter(t *testing.T) {
store.Get(10000)
}

require.Equal(t, int64(1600), store.policy.hit.Value())
require.Equal(t, int64(1600), store.policy.miss.Value())
require.Equal(t, uint64(1600), store.policy.hits.Value())
require.Equal(t, uint64(1600), store.policy.misses.Value())
}
43 changes: 25 additions & 18 deletions internal/tlfu.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
)

type TinyLfu[K comparable, V any] struct {
slru *Slru[K, V]
sketch *CountMinSketch
hasher *Hasher[K]
size uint
counter uint
miss *Counter
hit *Counter
hr float32
threshold atomic.Int32
lruFactor uint8
step int8
slru *Slru[K, V]
sketch *CountMinSketch
hasher *Hasher[K]
size uint
counter uint
misses *UnsignedCounter
hits *UnsignedCounter
hitsPrev uint64
missesPrev uint64
hr float32
threshold atomic.Int32
lruFactor uint8
step int8
}

func NewTinyLfu[K comparable, V any](size uint, hasher *Hasher[K]) *TinyLfu[K, V] {
Expand All @@ -25,18 +27,25 @@ func NewTinyLfu[K comparable, V any](size uint, hasher *Hasher[K]) *TinyLfu[K, V
sketch: NewCountMinSketch(),
step: 1,
hasher: hasher,
miss: NewCounter(),
hit: NewCounter(),
misses: NewUnsignedCounter(),
hits: NewUnsignedCounter(),
}
// default threshold to -1 so all entries are admitted until cache is full
tlfu.threshold.Store(-1)
return tlfu
}

func (t *TinyLfu[K, V]) climb() {
miss := t.miss.Value()
hit := t.hit.Value()
current := float32(hit) / float32(hit+miss)
hits := t.hits.Value()
misses := t.misses.Value()

hitsInc := hits - t.hitsPrev
missesInc := misses - t.missesPrev

t.hitsPrev = hits
t.missesPrev = misses

current := float32(hitsInc) / float32(hitsInc+missesInc)
delta := current - t.hr
var diff int8
if delta > 0.0 {
Expand Down Expand Up @@ -76,8 +85,6 @@ func (t *TinyLfu[K, V]) climb() {
}
t.threshold.Add(-int32(diff))
t.hr = current
t.hit.Reset()
t.miss.Reset()
}

func (t *TinyLfu[K, V]) Set(entry *Entry[K, V]) *Entry[K, V] {
Expand Down
44 changes: 44 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package theine_test

import (
"testing"

"github.com/Yiling-J/theine-go"
"github.com/stretchr/testify/require"
)

func TestStats(t *testing.T) {
client, err := theine.NewBuilder[int, int](1000).Build()
require.Nil(t, err)
st := client.Stats()
require.Equal(t, uint64(0), st.Hits())
require.Equal(t, uint64(0), st.Misses())
require.Equal(t, float64(0), st.HitRatio())

client.Set(1, 1, 1)
for i := 0; i < 2000; i++ {
_, ok := client.Get(1)
require.True(t, ok)
}

st = client.Stats()
require.Equal(t, uint64(2000), st.Hits())
require.Equal(t, uint64(0), st.Misses())
require.Equal(t, float64(1), st.HitRatio())

for i := 0; i < 10000; i++ {
_, ok := client.Get(1)
require.True(t, ok)
}

for i := 0; i < 10000; i++ {
_, ok := client.Get(2)
require.False(t, ok)
}

st = client.Stats()
require.Equal(t, uint64(12000), st.Hits())
require.Equal(t, uint64(10000), st.Misses())
require.Equal(t, float64(12000)/float64(12000+10000), st.HitRatio())

}

0 comments on commit 44ffafa

Please sign in to comment.