diff --git a/pkg/fes/backend/mem/mem.go b/pkg/fes/backend/mem/mem.go index c51ff7fb..49f5edaa 100644 --- a/pkg/fes/backend/mem/mem.go +++ b/pkg/fes/backend/mem/mem.go @@ -45,8 +45,8 @@ func (b *Backend) Append(event *fes.Event) error { return b.Publish(event) } -func (b *Backend) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) { - if !fes.ValidateAggregate(aggregate) { +func (b *Backend) Get(aggregate fes.Aggregate) ([]*fes.Event, error) { + if !fes.ValidateAggregate(&aggregate) { return nil, ErrInvalidAggregate } diff --git a/pkg/fes/backend/mem/mem_test.go b/pkg/fes/backend/mem/mem_test.go index 1850da72..67a09525 100644 --- a/pkg/fes/backend/mem/mem_test.go +++ b/pkg/fes/backend/mem/mem_test.go @@ -52,7 +52,7 @@ func TestBackend_GetMultiple(t *testing.T) { assert.NoError(t, err) } - getEvents, err := mem.Get(&key) + getEvents, err := mem.Get(key) assert.NoError(t, err) assert.EqualValues(t, events, getEvents) } @@ -60,7 +60,7 @@ func TestBackend_GetMultiple(t *testing.T) { func TestBackend_GetNonexistent(t *testing.T) { mem := NewBackend() key := fes.NewAggregate("type", "id") - getEvents, err := mem.Get(&key) + getEvents, err := mem.Get(key) assert.NoError(t, err) assert.EqualValues(t, []*fes.Event{}, getEvents) } diff --git a/pkg/fes/backend/nats/client.go b/pkg/fes/backend/nats/client.go index ebbb7510..5f400f2d 100644 --- a/pkg/fes/backend/nats/client.go +++ b/pkg/fes/backend/nats/client.go @@ -1,13 +1,14 @@ package nats import ( + "errors" "fmt" "strings" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/proto" - nats "github.com/nats-io/go-nats" + "github.com/nats-io/go-nats" "github.com/nats-io/go-nats-streaming" "github.com/sirupsen/logrus" ) @@ -16,6 +17,10 @@ const ( defaultClient = "fes" ) +var ( + ErrInvalidAggregate = errors.New("invalid aggregate") +) + type EventStore struct { pubsub.Publisher conn *WildcardConn @@ -24,14 +29,9 @@ type EventStore struct { } type Config struct { - //Cluster: clusterId, - //Client: "someClient", - //Url: fmt.Sprintf("nats://%s:%d", address, port), Cluster string Client string - - // Example: nats://localhost:9300 - Url string + Url string } func NewEventStore(conn *WildcardConn, cfg Config) *EventStore { @@ -101,9 +101,9 @@ func (es *EventStore) Close() error { func (es *EventStore) Append(event *fes.Event) error { // TODO make generic / configurable whether to fold event into parent's Subject - subject := toSubject(event.Aggregate) + subject := toSubject(*event.Aggregate) if event.Parent != nil { - subject = toSubject(event.Parent) + subject = toSubject(*event.Parent) } data, err := proto.Marshal(event) if err != nil { @@ -121,16 +121,21 @@ func (es *EventStore) Append(event *fes.Event) error { return es.conn.Publish(subject, data) } -func (es *EventStore) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) { +func (es *EventStore) Get(aggregate fes.Aggregate) ([]*fes.Event, error) { + if !fes.ValidateAggregate(&aggregate) { + return nil, ErrInvalidAggregate + } subject := toSubject(aggregate) + // TODO check if subject exists in NATS (MsgSeqRange takes a long time otherwise) + msgs, err := es.conn.MsgSeqRange(subject, firstMsg, mostRecentMsg) if err != nil { return nil, err } var results []*fes.Event - for _, msg := range msgs { - event, err := toEvent(msg) + for k := range msgs { + event, err := toEvent(msgs[k]) if err != nil { return nil, err } @@ -165,7 +170,7 @@ func toAggregate(subject string) *fes.Aggregate { } } -func toSubject(a *fes.Aggregate) string { +func toSubject(a fes.Aggregate) string { return fmt.Sprintf("%s.%s", a.Type, a.Id) } diff --git a/pkg/fes/backend/nats/nats.go b/pkg/fes/backend/nats/nats.go index 15aa2f5c..6065e767 100644 --- a/pkg/fes/backend/nats/nats.go +++ b/pkg/fes/backend/nats/nats.go @@ -74,7 +74,7 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* defer leftSub.Close() select { case seqEnd = <-rightBound: - case <-time.After(time.Duration(10) * time.Second): + case <-time.After(time.Duration(5) * time.Second): return nil, fmt.Errorf("timed out while finding boundary for Subject '%s'", subject) } } @@ -120,7 +120,9 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* case err := <-errC: return result, err case msg := <-elementC: - result = append(result, msg) + if msg != nil { + result = append(result, msg) + } } } } diff --git a/pkg/fes/caches.go b/pkg/fes/caches.go index e2104337..5d46db0e 100644 --- a/pkg/fes/caches.go +++ b/pkg/fes/caches.go @@ -233,7 +233,7 @@ func (c *FallbackCache) List() []Aggregate { for _, aggregate := range esAggregates { entity, err := c.cache.GetAggregate(aggregate) if err != nil || entity == nil { - events, err := c.client.Get(&aggregate) + events, err := c.client.Get(aggregate) if err != nil { logrus.WithField("err", err).Error("failed to get missed entity from event store") continue @@ -285,7 +285,7 @@ func (c *FallbackCache) Get(entity Aggregator) error { func (c *FallbackCache) getFromEventStore(aggregate Aggregate, target Aggregator) error { // Look up relevant events in event store - events, err := c.client.Get(&aggregate) + events, err := c.client.Get(aggregate) if err != nil { return err } diff --git a/pkg/fes/types.go b/pkg/fes/types.go index f299d93d..27edd4f1 100644 --- a/pkg/fes/types.go +++ b/pkg/fes/types.go @@ -30,7 +30,9 @@ type EventAppender interface { // Backend is a persistent store for events type Backend interface { EventAppender - Get(aggregate *Aggregate) ([]*Event, error) + + // Get fetches all events that belong to a specific aggregate + Get(aggregate Aggregate) ([]*Event, error) List(matcher StringMatcher) ([]Aggregate, error) } diff --git a/pkg/fes/util.go b/pkg/fes/util.go index a8aee033..aab5bea6 100644 --- a/pkg/fes/util.go +++ b/pkg/fes/util.go @@ -44,7 +44,6 @@ func NewAggregate(entityType string, entityId string) Aggregate { func NewEvent(aggregate Aggregate, data []byte) *Event { return &Event{ - Id: aggregate.Id, Type: aggregate.Type, Aggregate: &aggregate, Data: data, diff --git a/test/integration/bundle/bundle_test.go b/test/integration/bundle/bundle_test.go index 29df75ad..92bb08ca 100644 --- a/test/integration/bundle/bundle_test.go +++ b/test/integration/bundle/bundle_test.go @@ -25,7 +25,7 @@ const ( func TestMain(m *testing.M) { if testing.Short() { - fmt.Println("Skipping bundle tests...") + fmt.Println("Skipping bundle integration tests...") return } diff --git a/test/integration/fission/fnenv_test.go b/test/integration/fission/fnenv_test.go index 064df4c4..ec12c7a3 100644 --- a/test/integration/fission/fnenv_test.go +++ b/test/integration/fission/fnenv_test.go @@ -24,6 +24,11 @@ var fissionResolver *fission.Resolver var resolver fnenv.Resolver func TestMain(m *testing.M) { + if testing.Short() { + fmt.Println("Skipping Fission integration tests...") + return + } + fissionControllerAddr := os.Getenv("FNENV_FISSION_CONTROLLER") if len(fissionControllerAddr) == 0 { fissionControllerAddr = defaultFissionControllerAddr @@ -34,6 +39,11 @@ func TestMain(m *testing.M) { fissionExecutorAddr = defaultFissionExecutorAddr } + logrus.WithFields(logrus.Fields{ + "controller": fissionControllerAddr, + "executor": fissionExecutorAddr, + }).Infof("Checking if Fission is reachable") + var fissionMissing bool _, err := http.Get(fissionControllerAddr) if err != nil { @@ -46,8 +56,7 @@ func TestMain(m *testing.M) { fissionMissing = true } if fissionMissing { - fmt.Println("Fission not available; skipping Fission integration test") - return + panic("Fission integration test failed: Fission not available or reachable") } // Setup Fission connection diff --git a/test/integration/nats/nats_test.go b/test/integration/nats/nats_test.go new file mode 100644 index 00000000..4f5eac7d --- /dev/null +++ b/test/integration/nats/nats_test.go @@ -0,0 +1,72 @@ +package nats + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/fission/fission-workflows/pkg/fes" + "github.com/fission/fission-workflows/pkg/fes/backend/nats" + "github.com/fission/fission-workflows/test/integration" + "github.com/stretchr/testify/assert" +) + +var ( + backend fes.Backend +) + +// Tests the event store implementation with a live NATS cluster. +// This test will start and stop a NATS streaming cluster by itself. +// +// Prerequisites: +// - Docker + +func TestMain(m *testing.M) { + if testing.Short() { + fmt.Println("Skipping NATS integration tests...") + return + } + + ctx := context.TODO() + + cfg := integration.RunNatsStreaming(ctx) + + natsBackend, err := nats.Connect(cfg) + if err != nil { + panic(fmt.Sprintf("failed to connect to cluster: %v", err)) + } + backend = natsBackend + + status := m.Run() + os.Exit(status) +} + +func TestNatsBackend_GetNonExistent(t *testing.T) { + key := fes.NewAggregate("nonExistentType", "nonExistentId") + + // check + events, err := backend.Get(key) + assert.Error(t, err) + assert.Empty(t, events) +} + +func TestNatsBackend_Append(t *testing.T) { + key := fes.NewAggregate("someType", "someId") + event := fes.NewEvent(key, nil) + err := backend.Append(event) + assert.NoError(t, err) + + // check + events, err := backend.Get(key) + assert.NoError(t, err) + assert.Len(t, events, 1) + event.Id = "1" + assert.Equal(t, event, events[0]) +} + +func TestNatsBackend_List(t *testing.T) { + subjects, err := backend.List(&fes.ContainsMatcher{}) + assert.NoError(t, err) + assert.NotEmpty(t, subjects) +} diff --git a/test/integration/util.go b/test/integration/util.go index f9dd7231..0ee8d086 100644 --- a/test/integration/util.go +++ b/test/integration/util.go @@ -3,7 +3,6 @@ package integration import ( "context" "fmt" - "io" "net" "os" "os/exec" @@ -23,7 +22,7 @@ import ( // By default the bundle runs with all components are enabled, setting up a NATS cluster as the // backing event store, and internal fnenv and workflow runtime func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options { - nats := SetupNatsCluster(ctx) + nats := RunNatsStreaming(ctx) var bundleOpts bundle.Options if len(opts) > 0 { bundleOpts = opts[0] @@ -44,7 +43,7 @@ func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options { } // TODO check if there is a nats instance already is running -func SetupNatsCluster(ctx context.Context) fesnats.Config { +func RunNatsStreaming(ctx context.Context) fesnats.Config { id := util.Uid() clusterId := fmt.Sprintf("fission-workflows-tests-%s", id) port, err := findFreePort() @@ -52,17 +51,31 @@ func SetupNatsCluster(ctx context.Context) fesnats.Config { panic(err) } address := "127.0.0.1" - flags := strings.Split(fmt.Sprintf("-cid %s -p %d -a %s", clusterId, port, address), " ") - logrus.Info(flags) - cmd := exec.CommandContext(ctx, "nats-streaming-server", flags...) - stdOut, _ := cmd.StdoutPipe() - stdErr, _ := cmd.StderrPipe() - go io.Copy(os.Stdout, stdOut) - go io.Copy(os.Stdout, stdErr) - err = cmd.Start() - if err != nil { - panic(err) + args := []string{ + "run", + "--rm", + "-i", + "-p", fmt.Sprintf("%d:%d", port, port), + "nats-streaming:0.8.0-beta", + "-cid", clusterId, + "-p", fmt.Sprintf("%d", port), } + + go func() { + fmt.Printf("> docker %s\n", strings.Join(args, " ")) + cmd := exec.CommandContext(ctx, "docker", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Stdin = os.Stdin + err = cmd.Start() + if err != nil { + panic(err) + } + err = cmd.Wait() + if err != nil { + panic(err) + } + }() cfg := fesnats.Config{ Cluster: clusterId, Client: fmt.Sprintf("client-%s", id),