Skip to content

Commit

Permalink
Review comments #1.
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Feb 27, 2020
1 parent 853961d commit 4bc7384
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 56 deletions.
67 changes: 32 additions & 35 deletions balancer/rls/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/status"
)

// Key represents the cache key used to uniquely identify a cache entry.
Expand All @@ -44,9 +43,9 @@ type Key struct {
// Entry wraps all the data to be stored in a cache entry.
type Entry struct {
// Mu synchronizes access to this particular cache entry. The LB policy
// will also hold another mutex to synchornize access to the cache as a
// will also hold another mutex to synchronize access to the cache as a
// whole. To avoid holding the top-level mutex for the whole duration for
// which one particular cache entry is acted up, we use this entry mutex.
// which one particular cache entry is acted upon, we use this entry mutex.
Mu sync.Mutex
// ExpiryTime is the absolute time at which the data cached as part of this
// entry stops being valid. When an RLS request succeeds, this is set to
Expand Down Expand Up @@ -74,9 +73,10 @@ type Entry struct {
// new entry added to the cache is not evicted before the RLS response
// arrives (usually when the cache is too small).
EarliestEvictTime time.Time
// CallStatus stores the RPC status of the previous RLS request. Picks for
// entries with a non-OK status are failed with the error stored here.
CallStatus *status.Status
// CallStatus stores the RPC status of the previous RLS request for this
// entry. Picks for entries with a non-nil value for this field are failed
// with the error stored here.
CallStatus error
// Backoff contains all backoff related state. When an RLS request
// succeeds, backoff state is reset.
Backoff *BackoffState
Expand All @@ -85,6 +85,16 @@ type Entry struct {
HeaderData string
// TODO(easwars): Add support to store the ChildPolicy here. Need a
// balancerWrapper type to be implemented for this.

// size stores the size of this cache entry. Uses only a subset of the
// fields. See `entrySize` for this is computed.
size int
// key contains the cache key corresponding to this entry. This is required
// from methods like `removeElement` which only have a pointer to the
// list.Element which contains a reference to the cache.Entry. But these
// methods need the cache.Key to be able to remove the entry from the
// underlying map.
key Key
}

// BackoffState wraps all backoff related state associated with a cache entry.
Expand All @@ -104,15 +114,6 @@ type BackoffState struct {
Callback func()
}

// lruEntry is the actual entry which is stored in the LRU cache. It requires
// the key (in addition to the actual entry) as well, since the onEvicted
// callback and expiry timer would need the key to perform their job.
type lruEntry struct {
key Key
val *Entry
size int
}

// LRU is a cache with a least recently used eviction policy.
//
// It is not safe for concurrent access. The RLS LB policy will provide a mutex
Expand Down Expand Up @@ -160,6 +161,7 @@ func entrySize(key Key, value *Entry) int {
// removeToFit removes older entries from the cache to make room for a new
// entry of size newSize.
func (lru *LRU) removeToFit(newSize int) {
now := time.Now()
for lru.usedSize+newSize > lru.maxSize {
elem := lru.ll.Back()
if elem == nil {
Expand All @@ -169,15 +171,15 @@ func (lru *LRU) removeToFit(newSize int) {
return
}

entry := elem.Value.(*lruEntry).val
if t := entry.EarliestEvictTime; !t.IsZero() && t.Before(time.Now()) {
entry := elem.Value.(*Entry)
if t := entry.EarliestEvictTime; !t.IsZero() && t.Before(now) {
// When the oldest entry is too new (it hasn't even spent a default
// minimum amount of time in the cache), we abort and allow the
// cache to grow bigger than the configured maxSize.
grpclog.Info("rls: LRU eviction finds oldest entry to be too new. Allowing cache to exceed maxSize momentarily")
return
}
lru.removeOldest()
lru.removeElement(elem)
}
}

Expand All @@ -188,16 +190,18 @@ func (lru *LRU) Add(key Key, value *Entry) {
if !ok {
lru.removeToFit(size)
lru.usedSize += size
elem := lru.ll.PushFront(&lruEntry{key, value, size})
value.size = size
value.key = key
elem := lru.ll.PushFront(value)
lru.cache[key] = elem
return
}

lruE := elem.Value.(*lruEntry)
sizeDiff := size - lruE.size
existing := elem.Value.(*Entry)
sizeDiff := size - existing.size
lru.removeToFit(sizeDiff)
lruE.val = value
lruE.size = size
value.size = size
elem.Value = value
lru.ll.MoveToFront(elem)
lru.usedSize += sizeDiff
}
Expand All @@ -209,20 +213,13 @@ func (lru *LRU) Remove(key Key) {
}
}

func (lru *LRU) removeOldest() {
elem := lru.ll.Back()
if elem != nil {
lru.removeElement(elem)
}
}

func (lru *LRU) removeElement(e *list.Element) {
lruE := e.Value.(*lruEntry)
entry := e.Value.(*Entry)
lru.ll.Remove(e)
delete(lru.cache, lruE.key)
lru.usedSize -= lruE.size
delete(lru.cache, entry.key)
lru.usedSize -= entry.size
if lru.onEvicted != nil {
lru.onEvicted(lruE.key, lruE.val)
lru.onEvicted(entry.key, entry)
}
}

Expand All @@ -233,5 +230,5 @@ func (lru *LRU) Get(key Key) *Entry {
return nil
}
lru.ll.MoveToFront(elem)
return elem.Value.(*lruEntry).val
return elem.Value.(*Entry)
}
26 changes: 5 additions & 21 deletions balancer/rls/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ func TestGet(t *testing.T) {
for i, key := range test.keysToAdd {
lru.Add(key, test.valsToAdd[i])
}
if gotEntry := lru.Get(test.keyToGet); !cmp.Equal(gotEntry, test.wantEntry, cmpopts.IgnoreInterfaces(struct{ sync.Locker }{})) {
opts := []cmp.Option{
cmpopts.IgnoreInterfaces(struct{ sync.Locker }{}),
cmpopts.IgnoreUnexported(Entry{}),
}
if gotEntry := lru.Get(test.keyToGet); !cmp.Equal(gotEntry, test.wantEntry, opts...) {
t.Errorf("lru.Get(%+v) = %+v, want %+v", test.keyToGet, gotEntry, test.wantEntry)
}
})
Expand All @@ -108,26 +112,6 @@ func TestRemove(t *testing.T) {
}
}

// TestRemove verifies the removeOldest method.
func TestGetWithRemoveOldest(t *testing.T) {
keys := []Key{
{Path: "/service1/method1", KeyMap: "k1=v1,k2=v2"},
{Path: "/service2/method2", KeyMap: "k1=v1,k2=v2"},
{Path: "/service3/method3", KeyMap: "k1=v1,k2=v2"},
}

lru := NewLRU(testCacheMaxSize, nil)
for _, key := range keys {
lru.Add(key, &Entry{})
}
for _, key := range keys {
lru.removeOldest()
if entry := lru.Get(key); entry != nil {
t.Fatalf("lru.Get(%+v) after a call to lru.removeOldest succeeds, should have failed", key)
}
}
}

// TestExceedingSizeCausesEviction verifies the case where adding a new entry
// to the cache leads to eviction of old entries to make space for the new one.
func TestExceedingSizeCausesEviction(t *testing.T) {
Expand Down

0 comments on commit 4bc7384

Please sign in to comment.