Skip to content

Commit

Permalink
Merge pull request #209 from fission/mem-cache
Browse files Browse the repository at this point in the history
Constrain memory usage in the in-memory backend
  • Loading branch information
erwinvaneyk authored Sep 21, 2018
2 parents a28076a + 9023aed commit c49dfc4
Show file tree
Hide file tree
Showing 16 changed files with 398 additions and 58 deletions.
2 changes: 1 addition & 1 deletion cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func run(ctx context.Context, opts *Options) error {
}()
defer func() {
err := httpApiSrv.Shutdown(ctx)
log.Info("Stopped HTTP API server: %v", err)
log.Infof("Stopped HTTP API server: %v", err)
}()

log.Info("Serving HTTP API gateway at: ", httpApiSrv.Addr)
Expand Down
12 changes: 8 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ import:
- package: github.com/grpc-ecosystem/grpc-opentracing
subpackages:
- go/otgrpc
- package: github.com/hashicorp/golang-lru
version: v0.5.0
testImport:
- package: github.com/stretchr/testify
version: 1.1.4
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/httpclient/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func callWithJSON(ctx context.Context, method string, url string, in proto.Messa
if err != nil {
logrus.Errorf("Failed to read debug data: %v", err)
}
logrus.Debug("body: '%v'", data)
logrus.Debugf("body: '%v'", data)
}
}
req, err := http.NewRequest(method, url, buf)
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (gi *Invocation) List(ctx context.Context, query *InvocationListQuery) (*Wo
// TODO make more efficient (by moving list queries to cache)
entity, err := gi.wfiCache.GetAggregate(aggregate)
if err != nil {
logrus.Error("List: failed to fetch %v from cache: %v", aggregate, err)
logrus.Errorf("List: failed to fetch %v from cache: %v", aggregate, err)
continue
}
wfi := entity.(*aggregates.WorkflowInvocation)
Expand Down
228 changes: 200 additions & 28 deletions pkg/fes/backend/mem/mem.go
Original file line number Diff line number Diff line change
@@ -1,81 +1,253 @@
// package mem contains an implementation of the fes backend using an in-memory cache.
//
// This implementation is typically used for development and test purposes. However,
// if you are targeting pure performance, you can use this backend to effectively trade
// in persistence-related guarantees (e.g. fault-tolerance) to avoid overhead introduced
// by other event stores, such as the NATS implementation.
package mem

import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"

"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/util/pubsub"
"github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

var (
ErrInvalidAggregate = errors.New("invalid aggregate")
ErrEventLimitExceeded = &fes.EventStoreErr{
S: "event limit exceeded",
}

eventsAppended = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "fes",
Subsystem: "mem",
Name: "events_appended_total",
Help: "Count of appended events (excluding any internal events).",
}, []string{"eventType"})

cacheKeys = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "fes",
Subsystem: "mem",
Name: "keys",
Help: "Number of keys in the store by entity type.",
}, []string{"type"})

cacheEvents = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "fes",
Subsystem: "mem",
Name: "events",
Help: "Number of events in the store by entity type.",
}, []string{"type"})
)

// An in-memory, fes backend for development and testing purposes
// Config contains the user-configurable options of the in-memory backend.
//
// To limit the memory consumption of the backend, you can make use of the MaxKeys and
// MaxEventsPerKey. The absolute maximum memory usage is the product of the two limits.
type Config struct {
// MaxKeys specifies the limit of keys (or aggregates) in the backend.
// If set to 0, MaxInt32 will be used as the limit.
MaxKeys int

// MaxEventsPerKey specifies a limit on the number
MaxEventsPerKey int
}

// Backend is an in-memory, fes-compatible backend using a map for active entities with a LRU cache to store completed
// event streams, evicting oldest ones if it runs out of space. Active entities will never be deleted.
type Backend struct {
pubsub.Publisher
contents map[fes.Aggregate][]*fes.Event
lock sync.RWMutex
Config
buf *lru.Cache // map[fes.Aggregate][]*fes.Event
store map[fes.Aggregate][]*fes.Event
storeLock sync.RWMutex
entries *int32
}

func NewBackend() *Backend {
return &Backend{
func NewBackend(cfgs ...Config) *Backend {
cfg := Config{
MaxKeys: math.MaxInt32,
}
if len(cfgs) > 0 {
providedCfg := cfgs[0]
if providedCfg.MaxKeys <= 0 {
cfg.MaxKeys = math.MaxInt32
} else {
cfg.MaxKeys = providedCfg.MaxKeys
}
if providedCfg.MaxEventsPerKey > 0 {
cfg.MaxEventsPerKey = providedCfg.MaxEventsPerKey
}
}

e := int32(0)
b := &Backend{
Publisher: pubsub.NewPublisher(),
contents: map[fes.Aggregate][]*fes.Event{},
lock: sync.RWMutex{},
Config: cfg,
store: map[fes.Aggregate][]*fes.Event{},
entries: &e,
}

cache, err := lru.NewWithEvict(cfg.MaxKeys, b.evict)
if err != nil {
panic(err)
}
b.buf = cache
return b
}

func (b *Backend) Append(event *fes.Event) error {
if !fes.ValidateAggregate(event.Aggregate) {
return ErrInvalidAggregate
if err := fes.ValidateAggregate(event.Aggregate); err != nil {
return err
}

key := *event.Aggregate
b.lock.Lock()
defer b.lock.Unlock()

events, ok := b.contents[key]
b.storeLock.Lock()

var newEntry bool
events, ok, fromStore := b.get(key)
if !ok {
events = []*fes.Event{}
newEntry = true

// Verify that there is space for the new event
if !b.fitBuffer() {
b.storeLock.Unlock()
return fes.ErrEventStoreOverflow.WithAggregate(&key)
}
}

// Check if event stream is not out of limit
if b.MaxEventsPerKey > 0 && len(events) > b.MaxEventsPerKey {
b.storeLock.Unlock()
return ErrEventLimitExceeded.WithAggregate(&key)
}

if !fromStore {
b.promote(key)
}

b.store[key] = append(events, event)
logrus.Infof("Event appended: %s - %v", event.Aggregate.Format(), event.Type)

if event.GetHints().GetCompleted() {
b.demote(key)
}
b.storeLock.Unlock()
err := b.Publish(event)

if newEntry {
atomic.AddInt32(b.entries, 1)
cacheKeys.WithLabelValues(key.Type).Inc()
}
b.contents[key] = append(events, event)

eventsAppended.WithLabelValues(event.Type).Inc()
return b.Publish(event)
return err
}

func (b *Backend) Get(key fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(&key) {
return nil, ErrInvalidAggregate
if err := fes.ValidateAggregate(&key); err != nil {
return nil, err
}
b.lock.RLock()
defer b.lock.RUnlock()
events, ok := b.contents[key]

events, ok, _ := b.get(key)
if !ok {
events = []*fes.Event{}
}
return events, nil
}

func (b *Backend) List(matchFn fes.StringMatcher) ([]fes.Aggregate, error) {
b.lock.RLock()
defer b.lock.RUnlock()
func (b *Backend) Len() int {
return int(atomic.LoadInt32(b.entries))
}

func (b *Backend) List(matchFn fes.StringMatcher) ([]fes.Aggregate, error) {
var results []fes.Aggregate
for k := range b.contents {
if matchFn(k.Type + k.Id) {
results = append(results, k)
for key := range b.store {
if matchFn(key.Type + key.Id) {
results = append(results, key)
}
}
return results, nil
}

func (b *Backend) get(key fes.Aggregate) (events []*fes.Event, ok bool, fromStore bool) {
// First check the store
i, ok := b.store[key]
if ok {
return assertEventList(i), ok, true
}

// Fallback: check the buf buffer
e, ok := b.buf.Get(key)
if ok {
return assertEventList(e), ok, false
}
return nil, ok, false
}

// promote moves a buf buffer entry to the store
func (b *Backend) promote(key fes.Aggregate) {
events, ok, fromStore := b.get(key)
if !ok || fromStore {
return
}
b.buf.Remove(key)
b.store[key] = events
}

// demote moves a store entry to the buf buffer
func (b *Backend) demote(key fes.Aggregate) {
events, ok, fromStore := b.get(key)
if !ok || !fromStore {
return
}
delete(b.store, key)
b.buf.Add(key, events)
}

func (b *Backend) fitBuffer() bool {
last := -1
size := b.Len()
for size != last && size >= b.MaxKeys {
b.buf.RemoveOldest()
last = size
size = b.Len()
}
return size < b.MaxKeys
}

func (b *Backend) evict(k, v interface{}) {
logrus.Debugf("Evicted: %v", k)

// Update gauges
t := assertAggregate(k).Type
events := assertEventList(v)
cacheKeys.WithLabelValues(t).Dec()
cacheEvents.WithLabelValues(t).Add(-1 * float64(len(events)))

// Decrement entries counter
atomic.AddInt32(b.entries, -1)
}

func assertEventList(i interface{}) []*fes.Event {
events, typeOk := i.([]*fes.Event)
if !typeOk {
panic(fmt.Sprintf("found unexpected value type in the cache: %T", i))
}
return events
}

func assertAggregate(i interface{}) fes.Aggregate {
key, ok := i.(fes.Aggregate)
if !ok {
panic(fmt.Sprintf("found unexpected key type in the cache: %T", i))
}
return key
}
Loading

0 comments on commit c49dfc4

Please sign in to comment.