Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fission integration tests #121

Merged
merged 6 commits into from
Jun 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ install:
- test/e2e/install-clients.sh
# Needed for some integration tests
- glide -h >/dev/null 2>&1 || go get github.com/Masterminds/glide
- nats-streaming-server -h >/dev/null 2>&1 || go get github.com/nats-io/nats-streaming-server # TODO remove the need for this

before_script:
- cd ${TRAVIS_BUILD_DIR}
Expand Down
86 changes: 74 additions & 12 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ testImport:
version: 1.1.4
subpackages:
- assert
- package: gopkg.in/ory-am/dockertest.v3
version: v3.3.1
6 changes: 2 additions & 4 deletions pkg/fes/backend/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ func (b *Backend) Append(event *fes.Event) error {
return b.Publish(event)
}

func (b *Backend) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(aggregate) {
func (b *Backend) Get(key fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(&key) {
return nil, ErrInvalidAggregate
}

key := *aggregate
b.lock.RLock()
defer b.lock.RUnlock()
events, ok := b.contents[key]
Expand Down
4 changes: 2 additions & 2 deletions pkg/fes/backend/mem/mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ func TestBackend_GetMultiple(t *testing.T) {
assert.NoError(t, err)
}

getEvents, err := mem.Get(&key)
getEvents, err := mem.Get(key)
assert.NoError(t, err)
assert.EqualValues(t, events, getEvents)
}

func TestBackend_GetNonexistent(t *testing.T) {
mem := NewBackend()
key := fes.NewAggregate("type", "id")
getEvents, err := mem.Get(&key)
getEvents, err := mem.Get(key)
assert.NoError(t, err)
assert.EqualValues(t, []*fes.Event{}, getEvents)
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/fes/backend/nats/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nats

import (
"errors"
"fmt"
"strings"
"time"
Expand All @@ -20,6 +21,8 @@ const (
)

var (
ErrInvalidAggregate = errors.New("invalid aggregate")

subsActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "fes",
Subsystem: "nats",
Expand Down Expand Up @@ -140,9 +143,9 @@ func (es *EventStore) Close() error {
// Append publishes (and persists) an event on the NATS message queue
func (es *EventStore) Append(event *fes.Event) error {
// TODO make generic / configurable whether to fold event into parent's Subject
subject := toSubject(event.Aggregate)
subject := toSubject(*event.Aggregate)
if event.Parent != nil {
subject = toSubject(event.Parent)
subject = toSubject(*event.Parent)
}
data, err := proto.Marshal(event)
if err != nil {
Expand All @@ -165,16 +168,21 @@ func (es *EventStore) Append(event *fes.Event) error {
}

// Get returns all events related to a specific aggregate
func (es *EventStore) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) {
func (es *EventStore) Get(aggregate fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(&aggregate) {
return nil, ErrInvalidAggregate
}
subject := toSubject(aggregate)

// TODO check if subject exists in NATS (MsgSeqRange takes a long time otherwise)

msgs, err := es.conn.MsgSeqRange(subject, firstMsg, mostRecentMsg)
if err != nil {
return nil, err
}
var results []*fes.Event
for _, msg := range msgs {
event, err := toEvent(msg)
for k := range msgs {
event, err := toEvent(msgs[k])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -210,7 +218,7 @@ func toAggregate(subject string) *fes.Aggregate {
}
}

func toSubject(a *fes.Aggregate) string {
func toSubject(a fes.Aggregate) string {
return fmt.Sprintf("%s.%s", a.Type, a.Id)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/fes/backend/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]*
defer leftSub.Close()
select {
case seqEnd = <-rightBound:
case <-time.After(time.Duration(10) * time.Second):
case <-time.After(time.Duration(5) * time.Second):
return nil, fmt.Errorf("timed out while finding boundary for Subject '%s'", subject)
}
}
Expand Down Expand Up @@ -124,7 +124,9 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]*
case err := <-errC:
return result, err
case msg := <-elementC:
result = append(result, msg)
if msg != nil {
result = append(result, msg)
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/fes/caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (c *FallbackCache) List() []Aggregate {
for _, aggregate := range esAggregates {
entity, err := c.cache.GetAggregate(aggregate)
if err != nil || entity == nil {
events, err := c.client.Get(&aggregate)
events, err := c.client.Get(aggregate)
if err != nil {
logrus.WithField("err", err).Error("failed to get missed entity from event store")
continue
Expand Down Expand Up @@ -309,7 +309,7 @@ func (c *FallbackCache) Get(entity Aggregator) error {

func (c *FallbackCache) getFromEventStore(aggregate Aggregate, target Aggregator) error {
// Look up relevant events in event store
events, err := c.client.Get(&aggregate)
events, err := c.client.Get(aggregate)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/fes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type EventAppender interface {
// Backend is a persistent store for events
type Backend interface {
EventAppender
Get(aggregate *Aggregate) ([]*Event, error)

// Get fetches all events that belong to a specific aggregate
Get(aggregate Aggregate) ([]*Event, error)
List(matcher StringMatcher) ([]Aggregate, error)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/fes/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func NewAggregate(entityType string, entityID string) Aggregate {

func NewEvent(aggregate Aggregate, data []byte) *Event {
return &Event{
Id: aggregate.Id,
Type: aggregate.Type,
Aggregate: &aggregate,
Data: data,
Expand Down
9 changes: 5 additions & 4 deletions pkg/fnenv/fission/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var log = logrus.WithField("component", "fnenv.fission")
type FunctionEnv struct {
executor *executor.Client
routerURL string
timedExecService *TimedExecPool
timedExecService *timedExecPool
}

const (
Expand All @@ -40,8 +40,9 @@ const (

func NewFunctionEnv(executor *executor.Client, routerURL string) *FunctionEnv {
return &FunctionEnv{
executor: executor,
routerURL: routerURL,
executor: executor,
routerURL: routerURL,
timedExecService: newTimedExecPool(),
}
}

Expand Down Expand Up @@ -110,7 +111,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca
}

// Notify signals the Fission runtime that a function request is expected at a specific time.
func (fe *FunctionEnv) Notify(taskID string, fn types.FnRef, expectedAt time.Time) error {
func (fe *FunctionEnv) Notify(fn types.FnRef, expectedAt time.Time) error {
reqURL, err := fe.getFnURL(fn)
if err != nil {
return err
Expand Down
Loading