Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Constrain memory usage in the in-memory backend #209

Merged
merged 1 commit into from
Sep 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func run(ctx context.Context, opts *Options) error {
}()
defer func() {
err := httpApiSrv.Shutdown(ctx)
log.Info("Stopped HTTP API server: %v", err)
log.Infof("Stopped HTTP API server: %v", err)
}()

log.Info("Serving HTTP API gateway at: ", httpApiSrv.Addr)
Expand Down
12 changes: 8 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ import:
- package: github.com/grpc-ecosystem/grpc-opentracing
subpackages:
- go/otgrpc
- package: github.com/hashicorp/golang-lru
version: v0.5.0
testImport:
- package: github.com/stretchr/testify
version: 1.1.4
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/httpclient/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func callWithJSON(ctx context.Context, method string, url string, in proto.Messa
if err != nil {
logrus.Errorf("Failed to read debug data: %v", err)
}
logrus.Debug("body: '%v'", data)
logrus.Debugf("body: '%v'", data)
}
}
req, err := http.NewRequest(method, url, buf)
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (gi *Invocation) List(ctx context.Context, query *InvocationListQuery) (*Wo
// TODO make more efficient (by moving list queries to cache)
entity, err := gi.wfiCache.GetAggregate(aggregate)
if err != nil {
logrus.Error("List: failed to fetch %v from cache: %v", aggregate, err)
logrus.Errorf("List: failed to fetch %v from cache: %v", aggregate, err)
continue
}
wfi := entity.(*aggregates.WorkflowInvocation)
Expand Down
228 changes: 200 additions & 28 deletions pkg/fes/backend/mem/mem.go
Original file line number Diff line number Diff line change
@@ -1,81 +1,253 @@
// package mem contains an implementation of the fes backend using an in-memory cache.
//
// This implementation is typically used for development and test purposes. However,
// if you are targeting pure performance, you can use this backend to effectively trade
// in persistence-related guarantees (e.g. fault-tolerance) to avoid overhead introduced
// by other event stores, such as the NATS implementation.
package mem

import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"

"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/util/pubsub"
"github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

var (
ErrInvalidAggregate = errors.New("invalid aggregate")
ErrEventLimitExceeded = &fes.EventStoreErr{
S: "event limit exceeded",
}

eventsAppended = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "fes",
Subsystem: "mem",
Name: "events_appended_total",
Help: "Count of appended events (excluding any internal events).",
}, []string{"eventType"})

cacheKeys = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "fes",
Subsystem: "mem",
Name: "keys",
Help: "Number of keys in the store by entity type.",
}, []string{"type"})

cacheEvents = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "fes",
Subsystem: "mem",
Name: "events",
Help: "Number of events in the store by entity type.",
}, []string{"type"})
)

// An in-memory, fes backend for development and testing purposes
// Config contains the user-configurable options of the in-memory backend.
//
// To limit the memory consumption of the backend, you can make use of the MaxKeys and
// MaxEventsPerKey. The absolute maximum memory usage is the product of the two limits.
type Config struct {
// MaxKeys specifies the limit of keys (or aggregates) in the backend.
// If set to 0, MaxInt32 will be used as the limit.
MaxKeys int

// MaxEventsPerKey specifies a limit on the number
MaxEventsPerKey int
}

// Backend is an in-memory, fes-compatible backend using a map for active entities with a LRU cache to store completed
// event streams, evicting oldest ones if it runs out of space. Active entities will never be deleted.
type Backend struct {
pubsub.Publisher
contents map[fes.Aggregate][]*fes.Event
lock sync.RWMutex
Config
buf *lru.Cache // map[fes.Aggregate][]*fes.Event
store map[fes.Aggregate][]*fes.Event
storeLock sync.RWMutex
entries *int32
}

func NewBackend() *Backend {
return &Backend{
func NewBackend(cfgs ...Config) *Backend {
cfg := Config{
MaxKeys: math.MaxInt32,
}
if len(cfgs) > 0 {
providedCfg := cfgs[0]
if providedCfg.MaxKeys <= 0 {
cfg.MaxKeys = math.MaxInt32
} else {
cfg.MaxKeys = providedCfg.MaxKeys
}
if providedCfg.MaxEventsPerKey > 0 {
cfg.MaxEventsPerKey = providedCfg.MaxEventsPerKey
}
}

e := int32(0)
b := &Backend{
Publisher: pubsub.NewPublisher(),
contents: map[fes.Aggregate][]*fes.Event{},
lock: sync.RWMutex{},
Config: cfg,
store: map[fes.Aggregate][]*fes.Event{},
entries: &e,
}

cache, err := lru.NewWithEvict(cfg.MaxKeys, b.evict)
if err != nil {
panic(err)
}
b.buf = cache
return b
}

func (b *Backend) Append(event *fes.Event) error {
if !fes.ValidateAggregate(event.Aggregate) {
return ErrInvalidAggregate
if err := fes.ValidateAggregate(event.Aggregate); err != nil {
return err
}

key := *event.Aggregate
b.lock.Lock()
defer b.lock.Unlock()

events, ok := b.contents[key]
b.storeLock.Lock()

var newEntry bool
events, ok, fromStore := b.get(key)
if !ok {
events = []*fes.Event{}
newEntry = true

// Verify that there is space for the new event
if !b.fitBuffer() {
b.storeLock.Unlock()
return fes.ErrEventStoreOverflow.WithAggregate(&key)
}
}

// Check if event stream is not out of limit
if b.MaxEventsPerKey > 0 && len(events) > b.MaxEventsPerKey {
b.storeLock.Unlock()
return ErrEventLimitExceeded.WithAggregate(&key)
}

if !fromStore {
b.promote(key)
}

b.store[key] = append(events, event)
logrus.Infof("Event appended: %s - %v", event.Aggregate.Format(), event.Type)

if event.GetHints().GetCompleted() {
b.demote(key)
}
b.storeLock.Unlock()
err := b.Publish(event)

if newEntry {
atomic.AddInt32(b.entries, 1)
cacheKeys.WithLabelValues(key.Type).Inc()
}
b.contents[key] = append(events, event)

eventsAppended.WithLabelValues(event.Type).Inc()
return b.Publish(event)
return err
}

func (b *Backend) Get(key fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(&key) {
return nil, ErrInvalidAggregate
if err := fes.ValidateAggregate(&key); err != nil {
return nil, err
}
b.lock.RLock()
defer b.lock.RUnlock()
events, ok := b.contents[key]

events, ok, _ := b.get(key)
if !ok {
events = []*fes.Event{}
}
return events, nil
}

func (b *Backend) List(matchFn fes.StringMatcher) ([]fes.Aggregate, error) {
b.lock.RLock()
defer b.lock.RUnlock()
func (b *Backend) Len() int {
return int(atomic.LoadInt32(b.entries))
}

func (b *Backend) List(matchFn fes.StringMatcher) ([]fes.Aggregate, error) {
var results []fes.Aggregate
for k := range b.contents {
if matchFn(k.Type + k.Id) {
results = append(results, k)
for key := range b.store {
if matchFn(key.Type + key.Id) {
results = append(results, key)
}
}
return results, nil
}

func (b *Backend) get(key fes.Aggregate) (events []*fes.Event, ok bool, fromStore bool) {
// First check the store
i, ok := b.store[key]
if ok {
return assertEventList(i), ok, true
}

// Fallback: check the buf buffer
e, ok := b.buf.Get(key)
if ok {
return assertEventList(e), ok, false
}
return nil, ok, false
}

// promote moves a buf buffer entry to the store
func (b *Backend) promote(key fes.Aggregate) {
events, ok, fromStore := b.get(key)
if !ok || fromStore {
return
}
b.buf.Remove(key)
b.store[key] = events
}

// demote moves a store entry to the buf buffer
func (b *Backend) demote(key fes.Aggregate) {
events, ok, fromStore := b.get(key)
if !ok || !fromStore {
return
}
delete(b.store, key)
b.buf.Add(key, events)
}

func (b *Backend) fitBuffer() bool {
last := -1
size := b.Len()
for size != last && size >= b.MaxKeys {
b.buf.RemoveOldest()
last = size
size = b.Len()
}
return size < b.MaxKeys
}

func (b *Backend) evict(k, v interface{}) {
logrus.Debugf("Evicted: %v", k)

// Update gauges
t := assertAggregate(k).Type
events := assertEventList(v)
cacheKeys.WithLabelValues(t).Dec()
cacheEvents.WithLabelValues(t).Add(-1 * float64(len(events)))

// Decrement entries counter
atomic.AddInt32(b.entries, -1)
}

func assertEventList(i interface{}) []*fes.Event {
events, typeOk := i.([]*fes.Event)
if !typeOk {
panic(fmt.Sprintf("found unexpected value type in the cache: %T", i))
}
return events
}

func assertAggregate(i interface{}) fes.Aggregate {
key, ok := i.(fes.Aggregate)
if !ok {
panic(fmt.Sprintf("found unexpected key type in the cache: %T", i))
}
return key
}
Loading