Skip to content

Commit

Permalink
Merge pull request #409 from oasisprotocol/andrew7234/cache-fix
Browse files Browse the repository at this point in the history
bugfix: KVStore (analyzer cache): Fix retrieval and error handling
  • Loading branch information
mitjat authored May 9, 2023
2 parents c587c3c + 1a06549 commit 4d3a6ad
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 27 deletions.
2 changes: 1 addition & 1 deletion storage/oasis/nodeapi/file/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewFileConsensusApiLite(cacheDir string, consensusApi nodeapi.ConsensusApiL
return nil, err
}
return &FileConsensusApiLite{
db: *db,
db: db,
consensusApi: consensusApi,
}, nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,53 @@
package file

import (
"errors"
"fmt"

"github.com/akrylysov/pogreb"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-indexer/log"
)

type NodeApiMethod func() (interface{}, error)

var ErrUnstableRPCMethod = errors.New("this method is not cacheable because the RPC return value is not constant")

// A key in the KVStore.
type CacheKey []byte

func generateCacheKey(methodName string, params ...interface{}) CacheKey {
return CacheKey(cbor.Marshal([]interface{}{methodName, params}))
}

type KVStore struct {
// A key-value store. Additional method-like functions that give a typed interface
// to the store (i.e. with typed values/keys instead of []byte) are provided below,
// taking KVStore as the first argument so they can use generics.
type KVStore interface {
Has(key []byte) (bool, error)
Get(key []byte) ([]byte, error)
Put(key []byte, value []byte) error
Close() error
}

type pogrebKVStore struct {
*pogreb.DB

path string
logger *log.Logger
}

func (s KVStore) Close() error {
var _ KVStore = (*pogrebKVStore)(nil)

func (s pogrebKVStore) Close() error {
s.logger.Info("closing KVStore", "path", s.path)
return s.DB.Close()
}

func OpenKVStore(logger *log.Logger, path string) (*KVStore, error) {
func OpenKVStore(logger *log.Logger, path string) (KVStore, error) {
logger.Info("(re)opening KVStore", "path", path)
db, err := pogreb.Open(path, &pogreb.Options{BackgroundSyncInterval: -1})
if err != nil {
return nil, err
}
logger.Info(fmt.Sprintf("KVStore has %d entries", db.Count()))

return &KVStore{DB: db, logger: logger, path: path}, nil
return &pogrebKVStore{DB: db, logger: logger, path: path}, nil
}

// Pretty() returns a pretty-printed, human-readable version of the cache key.
Expand All @@ -61,6 +68,29 @@ func (cacheKey CacheKey) Pretty() string {
return pretty
}

var errNoSuchKey = fmt.Errorf("no such key")

// fetchTypedValue fetches the value of `cacheKey` from the cache, interpreted as a `Value`.
func fetchTypedValue[Value any](cache KVStore, key CacheKey, value *Value) error {
isCached, err := cache.Has(key)
if err != nil {
return err
}
if !isCached {
return errNoSuchKey
}
raw, err := cache.Get(key)
if err != nil {
return fmt.Errorf("failed to fetch key %s from cache: %v", key.Pretty(), err)
}
err = cbor.Unmarshal(raw, value)
if err != nil {
return fmt.Errorf("failed to unmarshal key %s from cache into %T: %v", key.Pretty(), value, err)
}

return nil
}

// getFromCacheOrCall fetches the value of `cacheKey` from the cache if it exists,
// interpreted as a `Value`. If it does not exist, it calls `valueFunc` to get the
// value, and caches it before returning it.
Expand All @@ -72,30 +102,27 @@ func GetFromCacheOrCall[Value any](cache KVStore, volatile bool, key CacheKey, v
}

// If the value is cached, return it.
isCached, err := cache.Has(key)
if err != nil {
return nil, err
}
if isCached {
raw, err2 := cache.Get(key)
if err2 != nil {
cache.logger.Warn(fmt.Sprintf("failed to fetch key %s from cache: %v", key.Pretty(), err2))
}
var result *Value
err2 = cbor.Unmarshal(raw, &result)
if err2 != nil {
cache.logger.Warn(fmt.Sprintf("failed to unmarshal key %s from cache into %T: %v", key.Pretty(), result, err2))
var cached Value
switch err := fetchTypedValue(cache, key, &cached); err {
case nil:
return &cached, nil
case errNoSuchKey: // Regular cache miss; continue below.
default:
// Log unexpected error and continue to call the backing API.
loggingCache, ok := cache.(*pogrebKVStore)
if ok {
loggingCache.logger.Warn(fmt.Sprintf("error fetching %s from cache: %v", key.Pretty(), err))
}
}

// Otherwise, the value is not cached or couldn't be restored from the cache. Call the backing API to get it.
result, err := valueFunc()
// The value is not cached or couldn't be restored from the cache. Call the backing API to get it.
computed, err := valueFunc()
if err != nil {
return nil, err
}

// Store value in cache for later use.
return result, cache.Put(key, cbor.Marshal(result))
return computed, cache.Put(key, cbor.Marshal(computed))
}

// Like getFromCacheOrCall, but for slice-typed return values.
Expand Down
112 changes: 112 additions & 0 deletions storage/oasis/nodeapi/file/kvstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package file

import (
"fmt"
"os"
"testing"

"github.com/oasisprotocol/oasis-indexer/log"
"github.com/stretchr/testify/require"
)

// Returns an open KVStore, and a function to clean it up.
func openTestKVStore(t *testing.T) (KVStore, func()) {
path, err := os.MkdirTemp("", "indexer-kv-test")
require.NoError(t, err)
kv, err := OpenKVStore(log.NewDefaultLogger("unit-test"), path)
require.NoError(t, err)
return kv, func() {
err := kv.Close()
os.RemoveAll(path)
require.NoError(t, err)
}
}

func TestHappyPath(t *testing.T) {
kv, closer := openTestKVStore(t)
defer closer()

has, err := kv.Has([]byte("mykey"))
require.NoError(t, err)
require.False(t, has)

err = kv.Put([]byte("mykey"), []byte("myval"))
require.NoError(t, err)

has, err = kv.Has([]byte("mykey"))
require.NoError(t, err)
require.True(t, has)

val, err := kv.Get([]byte("mykey"))
require.NoError(t, err)
require.Equal(t, []byte("myval"), val)
}

func TestGetFromCacheOrCallSuccess(t *testing.T) {
kv, closer := openTestKVStore(t)
defer closer()

var callCount int
generator := func() (*string, error) {
callCount++
val := "myval"
return &val, nil
}

// First call should call the function.
val, err := GetFromCacheOrCall(kv, false /*volatile*/, generateCacheKey("mykey"), generator)
require.NoError(t, err)
require.Equal(t, "myval", *val)
require.Equal(t, 1, callCount)

// Second call should not call the function.
val, err = GetFromCacheOrCall(kv, false /*volatile*/, generateCacheKey("mykey"), generator)
require.NoError(t, err)
require.Equal(t, "myval", *val)
require.Equal(t, 1, callCount)

// If a key is marked volatile, the function should be called every time.
val, err = GetFromCacheOrCall(kv, true /*volatile*/, generateCacheKey("mykey"), generator)
require.NoError(t, err)
require.Equal(t, "myval", *val)
require.Equal(t, 2, callCount)
}

func TestGetFromCacheOrCallError(t *testing.T) {
kv, closer := openTestKVStore(t)
defer closer()

generator := func() (*int, error) {
one := 1
return &one, fmt.Errorf("myerr")
}

// The call should propagate the generator function error.
val, err := GetFromCacheOrCall(kv, false /*volatile*/, generateCacheKey("mykey"), generator)
require.Error(t, err)
require.Nil(t, val)
}

func TestGetFromCacheOrCallTypeMismatch(t *testing.T) {
kv, closer := openTestKVStore(t)
defer closer()

stringGenerator := func() (*string, error) {
val := "myval"
return &val, nil
}
intGenerator := func() (*int, error) {
val := 123
return &val, nil
}

// Put a string into the cache.
_, err := GetFromCacheOrCall(kv, false /*volatile*/, generateCacheKey("mykey"), stringGenerator)
require.NoError(t, err)

// Try to fetch it and interpret it as an int.
// It should log a warning but recover by calling the generator function.
myInt, err := GetFromCacheOrCall(kv, false /*volatile*/, generateCacheKey("mykey"), intGenerator)
require.NoError(t, err)
require.Equal(t, 123, *myInt)
}
2 changes: 1 addition & 1 deletion storage/oasis/nodeapi/file/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewFileRuntimeApiLite(runtime common.Runtime, cacheDir string, runtimeApi n
}
return &FileRuntimeApiLite{
runtime: runtime,
db: *db,
db: db,
runtimeApi: runtimeApi,
}, nil
}
Expand Down

0 comments on commit 4d3a6ad

Please sign in to comment.