Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Factor out cache implementation to standalone package. #557

Merged
merged 3 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 21 additions & 53 deletions internal/authorizedapp/database_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package authorizedapp
import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"

authorizedappdb "github.com/google/exposure-notifications-server/internal/authorizedapp/database"
"github.com/google/exposure-notifications-server/internal/authorizedapp/model"
"github.com/google/exposure-notifications-server/internal/cache"
"github.com/google/exposure-notifications-server/internal/database"

"github.com/google/exposure-notifications-server/internal/logging"
Expand All @@ -39,13 +40,7 @@ type DatabaseProvider struct {
secretManager secrets.SecretManager
cacheDuration time.Duration

cache map[string]*cacheItem
cacheLock sync.RWMutex
}

type cacheItem struct {
value *model.AuthorizedApp
cachedAt time.Time
cache *cache.Cache
}

// DatabaseProviderOption is used as input to the database provider.
Expand All @@ -61,10 +56,14 @@ func WithSecretManager(sm secrets.SecretManager) DatabaseProviderOption {

// NewDatabaseProvider creates a new Provider that reads from a database.
func NewDatabaseProvider(ctx context.Context, db *database.DB, config *Config, opts ...DatabaseProviderOption) (Provider, error) {
cache, err := cache.New(config.CacheDuration)
if err != nil {
return nil, fmt.Errorf("cache.New: %w", err)
}
provider := &DatabaseProvider{
database: db,
cacheDuration: config.CacheDuration,
cache: make(map[string]*cacheItem),
cache: cache,
}

// Apply options.
Expand All @@ -75,25 +74,6 @@ func NewDatabaseProvider(ctx context.Context, db *database.DB, config *Config, o
return provider, nil
}

// checkCache checks the local cache within a read lock.
// The bool on return is true if there was a hit (And an error is a valid hit)
// or false if there was a miss (or expiry) and the data source should be queried again.
func (p *DatabaseProvider) checkCache(name string) (*model.AuthorizedApp, bool, error) {
// Acquire a read lock first, which allows concurrent readers, to check if
// there's an item in the cache.
p.cacheLock.RLock()
defer p.cacheLock.RUnlock()

item, ok := p.cache[name]
if ok && time.Since(item.cachedAt) <= p.cacheDuration {
if item.value == nil {
return nil, true, ErrAppNotFound
}
return item.value, true, nil
}
return nil, false, nil
}

// AppConfig returns the config for the given app package name.
func (p *DatabaseProvider) AppConfig(ctx context.Context, name string) (*model.AuthorizedApp, error) {
logger := logging.FromContext(ctx)
Expand All @@ -102,42 +82,30 @@ func (p *DatabaseProvider) AppConfig(ctx context.Context, name string) (*model.A
// cacher does not. To maximize cache hits, convert to lowercase.
name = strings.ToLower(name)

data, cacheHit, error := p.checkCache(name)
if cacheHit {
return data, error
}

// Acquire a more aggressive lock now because we're about to mutate. However,
// it's possible that a concurrent routine has already mutated between our
// read and write locks, so we have to check again.
p.cacheLock.Lock()
defer p.cacheLock.Unlock()
item, ok := p.cache[name]
if ok && time.Since(item.cachedAt) <= p.cacheDuration {
if item.value == nil {
return nil, ErrAppNotFound
lookup := func() (interface{}, error) {
// Load config.
config, err := p.loadAuthorizedAppFromDatabase(ctx, name)
if err != nil {
return nil, fmt.Errorf("authorizedapp: %w", err)
}
return item.value, nil
logger.Infof("authorizedapp: loaded %v, caching for %s", name, p.cacheDuration)
return config, nil
}
cached, err := p.cache.WriteThruLookup(name, lookup)

// Load config.
config, err := p.loadAuthorizedAppFromDatabase(ctx, name)
// Indicates an error on the write thru lookup.
if err != nil {
return nil, fmt.Errorf("authorizedapp: %w", err)
}

// Cache configs.
logger.Infof("authorizedapp: loaded %v, caching for %s", name, p.cacheDuration)
p.cache[name] = &cacheItem{
value: config,
cachedAt: time.Now(),
return nil, err
}

// Handle not found.
config := cached.(*model.AuthorizedApp)
if config == nil {
return nil, ErrAppNotFound
}

log.Printf("AppConfig: %+v %v", config, err)

// Returned config.
return config, nil
}
Expand Down
141 changes: 141 additions & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package cache implements an inmemory cache for any interface{} object.
package cache

import (
"errors"
"sync"
"time"
)

var ErrInvalidDuration = errors.New("expireAfter duration cannot be negative")

type Func func() (interface{}, error)

type Cache struct {
data map[string]item
expireAfter time.Duration
mu sync.RWMutex
}

type item struct {
object interface{}
expiresAt int64
}

func (i *item) expired() bool {
return i.expiresAt < time.Now().UnixNano()
}

// New creates a new in memory cache.
func New(expireAfter time.Duration) (*Cache, error) {
if expireAfter < 0 {
return nil, ErrInvalidDuration
}

return &Cache{
data: make(map[string]item),
expireAfter: expireAfter,
}, nil
}

// Removes an item by name and expiry time when the purge was scheduled.
// If there is a race, and the item has been refreshed, it will not be purged.
func (c *Cache) purgeExpired(name string, expectedExpiryTime int64) {
c.mu.Lock()
defer c.mu.Unlock()

if item, ok := c.data[name]; ok && item.expiresAt == expectedExpiryTime {
// found, and the expiry time is still the same as when the purge was requested.
delete(c.data, name)
}
}

// Size returns the number of items in the cache.
func (c *Cache) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.data)
}

// WriteThruLookup checks the cache for the value associated with name,
// and if not found or expired, invokes the provided primaryLookup function
// to local the value.
func (c *Cache) WriteThruLookup(name string, primaryLookup Func) (interface{}, error) {
// This call takes a read lock.
val, hit := c.Lookup(name)
if hit {
return val, nil
}

// Escalate the lock to a RW lock.
c.mu.Lock()
defer c.mu.Unlock()
// double check that the value hasn't been set by another goroutine.
if val, hit := c.data[name]; hit && !val.expired() {
return val, nil
}
// Either a miss, or hit w/ expired value.

// Value does indeed need to be refreshed. Used the provided fucntion.
newData, err := primaryLookup()
if err != nil {
return nil, err
}

// save the newData in the cache. newData may be nil, if that's what the WriteThruFunction provided.
c.data[name] = item{
object: newData,
expiresAt: time.Now().Add(c.expireAfter).UnixNano(),
}
return newData, nil

}

// Lookup checks the cache for a non-expired object by the supplied key name.
// The bool return informs the caller if there was a cache hit or not.
// A return of nil, true means that nil is in the cache.
// Where nil, false indicates a cache miss or that the value is expired and should
// be refreshed.
func (c *Cache) Lookup(name string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

if item, ok := c.data[name]; ok && item.expired() {
// Cache hit, but expired. The removal from the cache is deferred.
go c.purgeExpired(name, item.expiresAt)
return nil, false
} else if ok {
// Cache hit, not expired.
return item.object, true
}
// Cache miss.
return nil, false
}

// Set saves the current value of an object in the cache, with the supplied
// durintion until the object expires.
func (c *Cache) Set(name string, object interface{}) error {
c.mu.Lock()
defer c.mu.Unlock()

c.data[name] = item{
object: object,
expiresAt: time.Now().Add(c.expireAfter).UnixNano(),
}

return nil
}
Loading