diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index 3c5d5a03..ad8305cf 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -314,7 +314,7 @@ func run(ctx context.Context, opts *Options) error { }() defer func() { err := httpApiSrv.Shutdown(ctx) - log.Info("Stopped HTTP API server: %v", err) + log.Infof("Stopped HTTP API server: %v", err) }() log.Info("Serving HTTP API gateway at: ", httpApiSrv.Addr) diff --git a/glide.lock b/glide.lock index 8d37e8ee..275e49d3 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 6903c5d9f84153e543483015f749a22afd7ca331b979a124c825ed977a4026d3 -updated: 2018-09-03T16:53:54.242728+02:00 +hash: 8625631121e3da0aa6118e13145d87b9efb1659cb820d24b2c2f781af212556a +updated: 2018-09-20T14:16:47.976902+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -68,6 +68,10 @@ imports: - types - name: github.com/golang/glog version: 44145f04b68cf362d9c4df2182967c2275eaefed +- name: github.com/golang/groupcache + version: 24b0969c4cb722950103eed87108c8d291a8df00 + subpackages: + - lru - name: github.com/golang/protobuf version: aa810b61a9c79d51363740d207bb46cf8e620ed5 subpackages: @@ -128,7 +132,7 @@ imports: - name: github.com/hashicorp/go-multierror version: 3d5d8f294aa03d8e98859feac328afbdf1ae0703 - name: github.com/hashicorp/golang-lru - version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4 + version: 20f1fb78b0740ba8c3cb143a61e86ba5c8669768 subpackages: - simplelru - name: github.com/howeyc/gopass @@ -155,7 +159,7 @@ imports: subpackages: - pb - name: github.com/nats-io/nats-streaming-server - version: 63e2c334b66dba3edade0047625a25c0d8f18f80 + version: 8910c0c347bc51cc87227aeb27bef19d409bf5c2 subpackages: - spb - util diff --git a/glide.yaml b/glide.yaml index 286e84d7..41208fa3 100644 --- a/glide.yaml +++ b/glide.yaml @@ -76,6 +76,8 @@ import: - package: github.com/grpc-ecosystem/grpc-opentracing subpackages: - go/otgrpc +- package: github.com/hashicorp/golang-lru + version: v0.5.0 testImport: - package: github.com/stretchr/testify version: 1.1.4 diff --git a/pkg/apiserver/httpclient/httpclient.go b/pkg/apiserver/httpclient/httpclient.go index 73b03bb7..2ed07dcf 100644 --- a/pkg/apiserver/httpclient/httpclient.go +++ b/pkg/apiserver/httpclient/httpclient.go @@ -56,7 +56,7 @@ func callWithJSON(ctx context.Context, method string, url string, in proto.Messa if err != nil { logrus.Errorf("Failed to read debug data: %v", err) } - logrus.Debug("body: '%v'", data) + logrus.Debugf("body: '%v'", data) } } req, err := http.NewRequest(method, url, buf) diff --git a/pkg/apiserver/invocation.go b/pkg/apiserver/invocation.go index e059d236..8e8b208c 100644 --- a/pkg/apiserver/invocation.go +++ b/pkg/apiserver/invocation.go @@ -81,7 +81,7 @@ func (gi *Invocation) List(ctx context.Context, query *InvocationListQuery) (*Wo // TODO make more efficient (by moving list queries to cache) entity, err := gi.wfiCache.GetAggregate(aggregate) if err != nil { - logrus.Error("List: failed to fetch %v from cache: %v", aggregate, err) + logrus.Errorf("List: failed to fetch %v from cache: %v", aggregate, err) continue } wfi := entity.(*aggregates.WorkflowInvocation) diff --git a/pkg/fes/backend/mem/mem.go b/pkg/fes/backend/mem/mem.go index 73afa719..977c1a5c 100644 --- a/pkg/fes/backend/mem/mem.go +++ b/pkg/fes/backend/mem/mem.go @@ -1,16 +1,28 @@ +// package mem contains an implementation of the fes backend using an in-memory cache. +// +// This implementation is typically used for development and test purposes. However, +// if you are targeting pure performance, you can use this backend to effectively trade +// in persistence-related guarantees (e.g. fault-tolerance) to avoid overhead introduced +// by other event stores, such as the NATS implementation. package mem import ( - "errors" + "fmt" + "math" "sync" + "sync/atomic" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/util/pubsub" + "github.com/hashicorp/golang-lru" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) var ( - ErrInvalidAggregate = errors.New("invalid aggregate") + ErrEventLimitExceeded = &fes.EventStoreErr{ + S: "event limit exceeded", + } eventsAppended = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "fes", @@ -18,64 +30,224 @@ var ( Name: "events_appended_total", Help: "Count of appended events (excluding any internal events).", }, []string{"eventType"}) + + cacheKeys = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "fes", + Subsystem: "mem", + Name: "keys", + Help: "Number of keys in the store by entity type.", + }, []string{"type"}) + + cacheEvents = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "fes", + Subsystem: "mem", + Name: "events", + Help: "Number of events in the store by entity type.", + }, []string{"type"}) ) -// An in-memory, fes backend for development and testing purposes +// Config contains the user-configurable options of the in-memory backend. +// +// To limit the memory consumption of the backend, you can make use of the MaxKeys and +// MaxEventsPerKey. The absolute maximum memory usage is the product of the two limits. +type Config struct { + // MaxKeys specifies the limit of keys (or aggregates) in the backend. + // If set to 0, MaxInt32 will be used as the limit. + MaxKeys int + + // MaxEventsPerKey specifies a limit on the number + MaxEventsPerKey int +} + +// Backend is an in-memory, fes-compatible backend using a map for active entities with a LRU cache to store completed +// event streams, evicting oldest ones if it runs out of space. Active entities will never be deleted. type Backend struct { pubsub.Publisher - contents map[fes.Aggregate][]*fes.Event - lock sync.RWMutex + Config + buf *lru.Cache // map[fes.Aggregate][]*fes.Event + store map[fes.Aggregate][]*fes.Event + storeLock sync.RWMutex + entries *int32 } -func NewBackend() *Backend { - return &Backend{ +func NewBackend(cfgs ...Config) *Backend { + cfg := Config{ + MaxKeys: math.MaxInt32, + } + if len(cfgs) > 0 { + providedCfg := cfgs[0] + if providedCfg.MaxKeys <= 0 { + cfg.MaxKeys = math.MaxInt32 + } else { + cfg.MaxKeys = providedCfg.MaxKeys + } + if providedCfg.MaxEventsPerKey > 0 { + cfg.MaxEventsPerKey = providedCfg.MaxEventsPerKey + } + } + + e := int32(0) + b := &Backend{ Publisher: pubsub.NewPublisher(), - contents: map[fes.Aggregate][]*fes.Event{}, - lock: sync.RWMutex{}, + Config: cfg, + store: map[fes.Aggregate][]*fes.Event{}, + entries: &e, } + + cache, err := lru.NewWithEvict(cfg.MaxKeys, b.evict) + if err != nil { + panic(err) + } + b.buf = cache + return b } func (b *Backend) Append(event *fes.Event) error { - if !fes.ValidateAggregate(event.Aggregate) { - return ErrInvalidAggregate + if err := fes.ValidateAggregate(event.Aggregate); err != nil { + return err } - key := *event.Aggregate - b.lock.Lock() - defer b.lock.Unlock() - events, ok := b.contents[key] + b.storeLock.Lock() + + var newEntry bool + events, ok, fromStore := b.get(key) if !ok { events = []*fes.Event{} + newEntry = true + + // Verify that there is space for the new event + if !b.fitBuffer() { + b.storeLock.Unlock() + return fes.ErrEventStoreOverflow.WithAggregate(&key) + } + } + + // Check if event stream is not out of limit + if b.MaxEventsPerKey > 0 && len(events) > b.MaxEventsPerKey { + b.storeLock.Unlock() + return ErrEventLimitExceeded.WithAggregate(&key) + } + + if !fromStore { + b.promote(key) + } + + b.store[key] = append(events, event) + logrus.Infof("Event appended: %s - %v", event.Aggregate.Format(), event.Type) + + if event.GetHints().GetCompleted() { + b.demote(key) + } + b.storeLock.Unlock() + err := b.Publish(event) + + if newEntry { + atomic.AddInt32(b.entries, 1) + cacheKeys.WithLabelValues(key.Type).Inc() } - b.contents[key] = append(events, event) eventsAppended.WithLabelValues(event.Type).Inc() - return b.Publish(event) + return err } func (b *Backend) Get(key fes.Aggregate) ([]*fes.Event, error) { - if !fes.ValidateAggregate(&key) { - return nil, ErrInvalidAggregate + if err := fes.ValidateAggregate(&key); err != nil { + return nil, err } - b.lock.RLock() - defer b.lock.RUnlock() - events, ok := b.contents[key] + + events, ok, _ := b.get(key) if !ok { events = []*fes.Event{} } return events, nil } -func (b *Backend) List(matchFn fes.StringMatcher) ([]fes.Aggregate, error) { - b.lock.RLock() - defer b.lock.RUnlock() +func (b *Backend) Len() int { + return int(atomic.LoadInt32(b.entries)) +} +func (b *Backend) List(matchFn fes.StringMatcher) ([]fes.Aggregate, error) { var results []fes.Aggregate - for k := range b.contents { - if matchFn(k.Type + k.Id) { - results = append(results, k) + for key := range b.store { + if matchFn(key.Type + key.Id) { + results = append(results, key) } } return results, nil } + +func (b *Backend) get(key fes.Aggregate) (events []*fes.Event, ok bool, fromStore bool) { + // First check the store + i, ok := b.store[key] + if ok { + return assertEventList(i), ok, true + } + + // Fallback: check the buf buffer + e, ok := b.buf.Get(key) + if ok { + return assertEventList(e), ok, false + } + return nil, ok, false +} + +// promote moves a buf buffer entry to the store +func (b *Backend) promote(key fes.Aggregate) { + events, ok, fromStore := b.get(key) + if !ok || fromStore { + return + } + b.buf.Remove(key) + b.store[key] = events +} + +// demote moves a store entry to the buf buffer +func (b *Backend) demote(key fes.Aggregate) { + events, ok, fromStore := b.get(key) + if !ok || !fromStore { + return + } + delete(b.store, key) + b.buf.Add(key, events) +} + +func (b *Backend) fitBuffer() bool { + last := -1 + size := b.Len() + for size != last && size >= b.MaxKeys { + b.buf.RemoveOldest() + last = size + size = b.Len() + } + return size < b.MaxKeys +} + +func (b *Backend) evict(k, v interface{}) { + logrus.Debugf("Evicted: %v", k) + + // Update gauges + t := assertAggregate(k).Type + events := assertEventList(v) + cacheKeys.WithLabelValues(t).Dec() + cacheEvents.WithLabelValues(t).Add(-1 * float64(len(events))) + + // Decrement entries counter + atomic.AddInt32(b.entries, -1) +} + +func assertEventList(i interface{}) []*fes.Event { + events, typeOk := i.([]*fes.Event) + if !typeOk { + panic(fmt.Sprintf("found unexpected value type in the cache: %T", i)) + } + return events +} + +func assertAggregate(i interface{}) fes.Aggregate { + key, ok := i.(fes.Aggregate) + if !ok { + panic(fmt.Sprintf("found unexpected key type in the cache: %T", i)) + } + return key +} diff --git a/pkg/fes/backend/mem/mem_test.go b/pkg/fes/backend/mem/mem_test.go index 2b4d5181..b0e693a3 100644 --- a/pkg/fes/backend/mem/mem_test.go +++ b/pkg/fes/backend/mem/mem_test.go @@ -1,6 +1,8 @@ package mem import ( + "fmt" + "math" "testing" "github.com/fission/fission-workflows/pkg/fes" @@ -20,18 +22,134 @@ func newEvent(a fes.Aggregate, data []byte) *fes.Event { return event } +func setupBackend() *Backend { + return NewBackend() +} + +// 2018-09-20: 0.5 ms +func BenchmarkRoundtripSingleKey(b *testing.B) { + mem := setupBackend() + sub := mem.Subscribe() + + // Generate test data + events := make([]*fes.Event, b.N) + for i := 0; i < b.N; i++ { + key := fes.NewAggregate("type", "id") + events[i] = newEvent(key, []byte(fmt.Sprintf("event-%d", i))) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := mem.Append(events[i]) + if err != nil { + panic(err) + } + <-sub.Ch + } +} + +// 2018-09-20: 2 ms +func BenchmarkRoundtripManyKeys(b *testing.B) { + mem := setupBackend() + sub := mem.Subscribe() + + // Generate test data + events := make([]*fes.Event, b.N) + for i := 0; i < b.N; i++ { + key := fes.NewAggregate("type", fmt.Sprintf("%d", i)) + events[i] = newEvent(key, []byte(fmt.Sprintf("event-%d", i))) + } + + // Run benchmark based on roundtrip + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := mem.Append(events[i]) + if err != nil { + panic(err) + } + <-sub.Ch + } +} + +// 2018-09-20: 2-3 ms +func BenchmarkRoundtripManyEvictions(b *testing.B) { + mem := setupBackend() + sub := mem.Subscribe() + + mem.MaxKeys = int(math.Max(float64(b.N)/100, 1)) + + // Generate test data + events := make([]*fes.Event, b.N) + for i := 0; i < b.N; i++ { + key := fes.NewAggregate("type", fmt.Sprintf("%d", i)) + events[i] = newEvent(key, []byte(fmt.Sprintf("event-%d", i))) + events[i].Hints = &fes.EventHints{ + Completed: true, + } + } + + // Run benchmark based on roundtrip + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := mem.Append(events[i]) + if err != nil { + panic(err) + } + <-sub.Ch + } +} + +func TestBackendStoreFull(t *testing.T) { + mem := setupBackend() + mem.MaxKeys = 2 + n := 3 + events := make([]*fes.Event, n) + for i := 0; i < n; i++ { + key := fes.NewAggregate("entity", fmt.Sprintf("%d", i)) + events[i] = newEvent(key, []byte(fmt.Sprintf("event-%d", i))) + } + for _, event := range events { + err := mem.Append(event) + if err != nil { + assert.EqualError(t, err, fes.ErrEventStoreOverflow.WithAggregate(events[2].Aggregate).Error()) + } + } +} + +func TestBackendBufferEvict(t *testing.T) { + mem := setupBackend() + mem.MaxKeys = 3 + n := 10 + events := make([]*fes.Event, n) + mem.Append(newEvent(fes.NewAggregate("active", "1"), []byte("active stream"))) + for i := 0; i < n; i++ { + key := fes.NewAggregate("entity", fmt.Sprintf("%d", i)) + events[i] = newEvent(key, []byte(fmt.Sprintf("event-%d", i))) + events[i].Hints = &fes.EventHints{ + Completed: true, + } + } + for _, event := range events { + err := mem.Append(event) + assert.NoError(t, err) + } + assert.Equal(t, 2, mem.buf.Len()) + assert.Equal(t, 1, len(mem.store)) + assert.Equal(t, 3, mem.Len()) +} + func TestBackend_Append(t *testing.T) { - mem := NewBackend() + mem := setupBackend() event := newEvent(fes.NewAggregate("type", "id"), []byte("event 1")) err := mem.Append(event) assert.NoError(t, err) - assert.Len(t, mem.contents, 1) + assert.Equal(t, mem.Len(), 1) event2 := newEvent(fes.Aggregate{}, []byte("event 1")) err = mem.Append(event2) - assert.EqualError(t, err, ErrInvalidAggregate.Error()) - assert.Len(t, mem.contents, 1) + assert.EqualError(t, err, fes.ErrInvalidAggregate.Error()) + assert.Equal(t, mem.Len(), 1) // Event under existing aggregate event3, err := fes.NewEvent(fes.NewAggregate("type", "id"), &wrappers.BytesValue{ @@ -40,8 +158,8 @@ func TestBackend_Append(t *testing.T) { assert.NoError(t, err) err = mem.Append(event3) assert.NoError(t, err) - assert.Len(t, mem.contents, 1) - assert.Len(t, mem.contents[fes.NewAggregate("type", "id")], 2) + assert.Equal(t, mem.Len(), 1) + assert.Equal(t, len(mem.mustGet(fes.NewAggregate("type", "id"))), 2) // Event under new aggregate event4, err := fes.NewEvent(fes.NewAggregate("Type", "other"), &wrappers.BytesValue{ @@ -50,13 +168,13 @@ func TestBackend_Append(t *testing.T) { assert.NoError(t, err) err = mem.Append(event4) assert.NoError(t, err) - assert.Len(t, mem.contents, 2) - assert.Len(t, mem.contents[fes.NewAggregate("Type", "other")], 1) - assert.Len(t, mem.contents[fes.NewAggregate("type", "id")], 2) + assert.Equal(t, mem.Len(), 2) + assert.Equal(t, len(mem.mustGet(fes.NewAggregate("Type", "other"))), 1) + assert.Equal(t, len(mem.mustGet(fes.NewAggregate("type", "id"))), 2) } func TestBackend_GetMultiple(t *testing.T) { - mem := NewBackend() + mem := setupBackend() key := fes.NewAggregate("type", "id") events := []*fes.Event{ newEvent(key, []byte("event 1")), @@ -75,7 +193,7 @@ func TestBackend_GetMultiple(t *testing.T) { } func TestBackend_GetNonexistent(t *testing.T) { - mem := NewBackend() + mem := setupBackend() key := fes.NewAggregate("type", "id") getEvents, err := mem.Get(key) assert.NoError(t, err) @@ -83,7 +201,7 @@ func TestBackend_GetNonexistent(t *testing.T) { } func TestBackend_Subscribe(t *testing.T) { - mem := NewBackend() + mem := setupBackend() key := fes.NewAggregate("type", "id") sub := mem.Subscribe(pubsub.SubscriptionOptions{ LabelMatcher: labels.In(fes.PubSubLabelAggregateType, key.Type), @@ -108,3 +226,11 @@ func TestBackend_Subscribe(t *testing.T) { } assert.EqualValues(t, events, receivedEvents) } + +func (b *Backend) mustGet(key fes.Aggregate) []*fes.Event { + val, ok, _ := b.get(key) + if !ok { + panic(fmt.Sprintf("expected value present for key %s", key)) + } + return val +} diff --git a/pkg/fes/backend/nats/client.go b/pkg/fes/backend/nats/client.go index cf0768e2..17babcc8 100644 --- a/pkg/fes/backend/nats/client.go +++ b/pkg/fes/backend/nats/client.go @@ -169,8 +169,8 @@ func (es *EventStore) Append(event *fes.Event) error { // Get returns all events related to a specific aggregate func (es *EventStore) Get(aggregate fes.Aggregate) ([]*fes.Event, error) { - if !fes.ValidateAggregate(&aggregate) { - return nil, ErrInvalidAggregate + if err := fes.ValidateAggregate(&aggregate); err != nil { + return nil, err } subject := toSubject(aggregate) diff --git a/pkg/fes/entity.go b/pkg/fes/entity.go index 6e5ec480..a229daec 100644 --- a/pkg/fes/entity.go +++ b/pkg/fes/entity.go @@ -60,6 +60,9 @@ func NewBaseEntity(thiz Entity, aggregate Aggregate) *BaseEntity { } } -func ValidateAggregate(a *Aggregate) bool { - return a != nil && len(a.Type) != 0 && len(a.Id) != 0 +func ValidateAggregate(a *Aggregate) error { + if a != nil && len(a.Type) != 0 && len(a.Id) != 0 { + return nil + } + return ErrInvalidAggregate.WithAggregate(a) } diff --git a/pkg/fes/types.go b/pkg/fes/types.go index a2a0f821..7a234997 100644 --- a/pkg/fes/types.go +++ b/pkg/fes/types.go @@ -1,6 +1,8 @@ package fes import ( + "fmt" + "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/opentracing/opentracing-go" "github.com/sirupsen/logrus" @@ -85,3 +87,26 @@ func newNotification(entity Entity, event *Event) *Notification { SpanCtx: spanCtx, } } + +type EventStoreErr struct { + S string + K *Aggregate +} + +func (err *EventStoreErr) WithAggregate(aggregate *Aggregate) *EventStoreErr { + err.K = aggregate + return err +} + +func (err *EventStoreErr) Error() string { + if err.K == nil { + return err.S + } else { + return fmt.Sprintf("%v: %s", err.K.Format(), err.S) + } +} + +var ( + ErrInvalidAggregate = &EventStoreErr{S: "invalid aggregate"} + ErrEventStoreOverflow = &EventStoreErr{S: "event store out of space"} +) diff --git a/pkg/fnenv/fission/runtime.go b/pkg/fnenv/fission/runtime.go index 954d2683..0b121f67 100644 --- a/pkg/fnenv/fission/runtime.go +++ b/pkg/fnenv/fission/runtime.go @@ -108,7 +108,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo // Determine status of the task invocation if resp.StatusCode >= 400 { msg, _ := typedvalues.Format(&output) - ctxLog.Warnf("[%s] Failed %v: %v", resp.StatusCode, msg) + ctxLog.Warnf("[%s] Failed %v: %v", fnRef.ID, resp.StatusCode, msg) return &types.TaskInvocationStatus{ Status: types.TaskInvocationStatus_FAILED, Error: &types.Error{ diff --git a/pkg/fnenv/native/builtin/if.go b/pkg/fnenv/native/builtin/if.go index 557016e6..54b423a9 100644 --- a/pkg/fnenv/native/builtin/if.go +++ b/pkg/fnenv/native/builtin/if.go @@ -77,7 +77,7 @@ func (fn *FunctionIf) Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, } // Output consequent or alternative based on condition - logrus.Infof("If-task has evaluated to '%b''", condition) + logrus.Infof("If-task has evaluated to '%v''", condition) if condition { return consequent, nil } diff --git a/pkg/fnenv/native/builtin/switch.go b/pkg/fnenv/native/builtin/switch.go index 75011f6b..39624a78 100644 --- a/pkg/fnenv/native/builtin/switch.go +++ b/pkg/fnenv/native/builtin/switch.go @@ -98,7 +98,6 @@ func (fn *FunctionSwitch) getCases(inputs map[string]*types.TypedValue) (map[str for _, c := range ir { m, ok := c.(map[string]interface{}) if !ok { - logrus.Warnf("Invalid case provided: %t", m) return nil, nil, errors.New("invalid case provided") } tva, err := typedvalues.Parse(m[SwitchCaseValue]) diff --git a/pkg/types/typedvalues/httpconv/httpconv.go b/pkg/types/typedvalues/httpconv/httpconv.go index 35c30984..89cf4c10 100644 --- a/pkg/types/typedvalues/httpconv/httpconv.go +++ b/pkg/types/typedvalues/httpconv/httpconv.go @@ -235,7 +235,7 @@ func FormatMethod(inputs map[string]*types.TypedValue) string { if err == nil { return contentType } - logrus.Error("Invalid method in inputs: %+v", tv) + logrus.Errorf("Invalid method in inputs: %+v", tv) } return methodDefault } @@ -354,7 +354,7 @@ func DetermineContentTypeInputs(inputs map[string]*types.TypedValue) string { if err == nil { return contentType } - logrus.Error("Invalid content type in inputs: %+v", ctTv) + logrus.Errorf("Invalid content type in inputs: %+v", ctTv) } // Otherwise, check for label on body input diff --git a/pkg/types/typedvalues/typedvalues.go b/pkg/types/typedvalues/typedvalues.go index 443b9e07..023fda2d 100644 --- a/pkg/types/typedvalues/typedvalues.go +++ b/pkg/types/typedvalues/typedvalues.go @@ -92,7 +92,6 @@ func (pf *ComposedParserFormatter) Parse(ctx Parser, i interface{}) (*types.Type return nil, err } } - logrus.Debugf("Parser success: %t", tv) return tv, nil } logrus.Debugf("No parsers for %t", i) diff --git a/test/integration/nats/nats_test.go b/test/integration/nats/nats_test.go index a6c30b3d..d3d5bf13 100644 --- a/test/integration/nats/nats_test.go +++ b/test/integration/nats/nats_test.go @@ -15,7 +15,7 @@ import ( ) var ( - backend fes.Backend + backend *nats.EventStore ) // Tests the event store implementation with a live NATS cluster. @@ -66,6 +66,16 @@ func TestMain(m *testing.M) { if err != nil { return fmt.Errorf("failed to connect to cluster: %v", err) } + + err = backend.Watch(fes.Aggregate{Type: "invocation"}) + if err != nil { + panic(err) + } + err = backend.Watch(fes.Aggregate{Type: "workflow"}) + if err != nil { + panic(err) + } + return nil }); err != nil { log.Fatalf("Could not connect to docker: %s", err)