Skip to content

Commit

Permalink
Moved NATS integration tests to integration package
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed Mar 27, 2018
1 parent 83fa17c commit bb06afd
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 39 deletions.
4 changes: 2 additions & 2 deletions pkg/fes/backend/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (b *Backend) Append(event *fes.Event) error {
return b.Publish(event)
}

func (b *Backend) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(aggregate) {
func (b *Backend) Get(aggregate fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(&aggregate) {
return nil, ErrInvalidAggregate
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/fes/backend/mem/mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ func TestBackend_GetMultiple(t *testing.T) {
assert.NoError(t, err)
}

getEvents, err := mem.Get(&key)
getEvents, err := mem.Get(key)
assert.NoError(t, err)
assert.EqualValues(t, events, getEvents)
}

func TestBackend_GetNonexistent(t *testing.T) {
mem := NewBackend()
key := fes.NewAggregate("type", "id")
getEvents, err := mem.Get(&key)
getEvents, err := mem.Get(key)
assert.NoError(t, err)
assert.EqualValues(t, []*fes.Event{}, getEvents)
}
Expand Down
31 changes: 18 additions & 13 deletions pkg/fes/backend/nats/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package nats

import (
"errors"
"fmt"
"strings"

"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/util/pubsub"
"github.com/golang/protobuf/proto"
nats "github.com/nats-io/go-nats"
"github.com/nats-io/go-nats"
"github.com/nats-io/go-nats-streaming"
"github.com/sirupsen/logrus"
)
Expand All @@ -16,6 +17,10 @@ const (
defaultClient = "fes"
)

var (
ErrInvalidAggregate = errors.New("invalid aggregate")
)

type EventStore struct {
pubsub.Publisher
conn *WildcardConn
Expand All @@ -24,14 +29,9 @@ type EventStore struct {
}

type Config struct {
//Cluster: clusterId,
//Client: "someClient",
//Url: fmt.Sprintf("nats://%s:%d", address, port),
Cluster string
Client string

// Example: nats://localhost:9300
Url string
Url string
}

func NewEventStore(conn *WildcardConn, cfg Config) *EventStore {
Expand Down Expand Up @@ -101,9 +101,9 @@ func (es *EventStore) Close() error {

func (es *EventStore) Append(event *fes.Event) error {
// TODO make generic / configurable whether to fold event into parent's Subject
subject := toSubject(event.Aggregate)
subject := toSubject(*event.Aggregate)
if event.Parent != nil {
subject = toSubject(event.Parent)
subject = toSubject(*event.Parent)
}
data, err := proto.Marshal(event)
if err != nil {
Expand All @@ -121,16 +121,21 @@ func (es *EventStore) Append(event *fes.Event) error {
return es.conn.Publish(subject, data)
}

func (es *EventStore) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) {
func (es *EventStore) Get(aggregate fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(&aggregate) {
return nil, ErrInvalidAggregate
}
subject := toSubject(aggregate)

// TODO check if subject exists in NATS (MsgSeqRange takes a long time otherwise)

msgs, err := es.conn.MsgSeqRange(subject, firstMsg, mostRecentMsg)
if err != nil {
return nil, err
}
var results []*fes.Event
for _, msg := range msgs {
event, err := toEvent(msg)
for k := range msgs {
event, err := toEvent(msgs[k])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,7 +170,7 @@ func toAggregate(subject string) *fes.Aggregate {
}
}

func toSubject(a *fes.Aggregate) string {
func toSubject(a fes.Aggregate) string {
return fmt.Sprintf("%s.%s", a.Type, a.Id)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/fes/backend/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]*
defer leftSub.Close()
select {
case seqEnd = <-rightBound:
case <-time.After(time.Duration(10) * time.Second):
case <-time.After(time.Duration(5) * time.Second):
return nil, fmt.Errorf("timed out while finding boundary for Subject '%s'", subject)
}
}
Expand Down Expand Up @@ -120,7 +120,9 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]*
case err := <-errC:
return result, err
case msg := <-elementC:
result = append(result, msg)
if msg != nil {
result = append(result, msg)
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/fes/caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (c *FallbackCache) List() []Aggregate {
for _, aggregate := range esAggregates {
entity, err := c.cache.GetAggregate(aggregate)
if err != nil || entity == nil {
events, err := c.client.Get(&aggregate)
events, err := c.client.Get(aggregate)
if err != nil {
logrus.WithField("err", err).Error("failed to get missed entity from event store")
continue
Expand Down Expand Up @@ -285,7 +285,7 @@ func (c *FallbackCache) Get(entity Aggregator) error {

func (c *FallbackCache) getFromEventStore(aggregate Aggregate, target Aggregator) error {
// Look up relevant events in event store
events, err := c.client.Get(&aggregate)
events, err := c.client.Get(aggregate)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/fes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type EventAppender interface {
// Backend is a persistent store for events
type Backend interface {
EventAppender
Get(aggregate *Aggregate) ([]*Event, error)

// Get fetches all events that belong to a specific aggregate
Get(aggregate Aggregate) ([]*Event, error)
List(matcher StringMatcher) ([]Aggregate, error)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/fes/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func NewAggregate(entityType string, entityId string) Aggregate {

func NewEvent(aggregate Aggregate, data []byte) *Event {
return &Event{
Id: aggregate.Id,
Type: aggregate.Type,
Aggregate: &aggregate,
Data: data,
Expand Down
2 changes: 1 addition & 1 deletion test/integration/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (

func TestMain(m *testing.M) {
if testing.Short() {
fmt.Println("Skipping bundle tests...")
fmt.Println("Skipping bundle integration tests...")
return
}

Expand Down
13 changes: 11 additions & 2 deletions test/integration/fission/fnenv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ var fissionResolver *fission.Resolver
var resolver fnenv.Resolver

func TestMain(m *testing.M) {
if testing.Short() {
fmt.Println("Skipping Fission integration tests...")
return
}

fissionControllerAddr := os.Getenv("FNENV_FISSION_CONTROLLER")
if len(fissionControllerAddr) == 0 {
fissionControllerAddr = defaultFissionControllerAddr
Expand All @@ -34,6 +39,11 @@ func TestMain(m *testing.M) {
fissionExecutorAddr = defaultFissionExecutorAddr
}

logrus.WithFields(logrus.Fields{
"controller": fissionControllerAddr,
"executor": fissionExecutorAddr,
}).Infof("Checking if Fission is reachable")

var fissionMissing bool
_, err := http.Get(fissionControllerAddr)
if err != nil {
Expand All @@ -46,8 +56,7 @@ func TestMain(m *testing.M) {
fissionMissing = true
}
if fissionMissing {
fmt.Println("Fission not available; skipping Fission integration test")
return
panic("Fission integration test failed: Fission not available or reachable")
}

// Setup Fission connection
Expand Down
72 changes: 72 additions & 0 deletions test/integration/nats/nats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package nats

import (
"context"
"fmt"
"os"
"testing"

"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fes/backend/nats"
"github.com/fission/fission-workflows/test/integration"
"github.com/stretchr/testify/assert"
)

var (
backend fes.Backend
)

// Tests the event store implementation with a live NATS cluster.
// This test will start and stop a NATS streaming cluster by itself.
//
// Prerequisites:
// - Docker

func TestMain(m *testing.M) {
if testing.Short() {
fmt.Println("Skipping NATS integration tests...")
return
}

ctx := context.TODO()

cfg := integration.RunNatsStreaming(ctx)

natsBackend, err := nats.Connect(cfg)
if err != nil {
panic(fmt.Sprintf("failed to connect to cluster: %v", err))
}
backend = natsBackend

status := m.Run()
os.Exit(status)
}

func TestNatsBackend_GetNonExistent(t *testing.T) {
key := fes.NewAggregate("nonExistentType", "nonExistentId")

// check
events, err := backend.Get(key)
assert.Error(t, err)
assert.Empty(t, events)
}

func TestNatsBackend_Append(t *testing.T) {
key := fes.NewAggregate("someType", "someId")
event := fes.NewEvent(key, nil)
err := backend.Append(event)
assert.NoError(t, err)

// check
events, err := backend.Get(key)
assert.NoError(t, err)
assert.Len(t, events, 1)
event.Id = "1"
assert.Equal(t, event, events[0])
}

func TestNatsBackend_List(t *testing.T) {
subjects, err := backend.List(&fes.ContainsMatcher{})
assert.NoError(t, err)
assert.NotEmpty(t, subjects)
}
39 changes: 26 additions & 13 deletions test/integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package integration
import (
"context"
"fmt"
"io"
"net"
"os"
"os/exec"
Expand All @@ -23,7 +22,7 @@ import (
// By default the bundle runs with all components are enabled, setting up a NATS cluster as the
// backing event store, and internal fnenv and workflow runtime
func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options {
nats := SetupNatsCluster(ctx)
nats := RunNatsStreaming(ctx)
var bundleOpts bundle.Options
if len(opts) > 0 {
bundleOpts = opts[0]
Expand All @@ -44,25 +43,39 @@ func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options {
}

// TODO check if there is a nats instance already is running
func SetupNatsCluster(ctx context.Context) fesnats.Config {
func RunNatsStreaming(ctx context.Context) fesnats.Config {
id := util.Uid()
clusterId := fmt.Sprintf("fission-workflows-tests-%s", id)
port, err := findFreePort()
if err != nil {
panic(err)
}
address := "127.0.0.1"
flags := strings.Split(fmt.Sprintf("-cid %s -p %d -a %s", clusterId, port, address), " ")
logrus.Info(flags)
cmd := exec.CommandContext(ctx, "nats-streaming-server", flags...)
stdOut, _ := cmd.StdoutPipe()
stdErr, _ := cmd.StderrPipe()
go io.Copy(os.Stdout, stdOut)
go io.Copy(os.Stdout, stdErr)
err = cmd.Start()
if err != nil {
panic(err)
args := []string{
"run",
"--rm",
"-i",
"-p", fmt.Sprintf("%d:%d", port, port),
"nats-streaming:0.8.0-beta",
"-cid", clusterId,
"-p", fmt.Sprintf("%d", port),
}

go func() {
fmt.Printf("> docker %s\n", strings.Join(args, " "))
cmd := exec.CommandContext(ctx, "docker", args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin
err = cmd.Start()
if err != nil {
panic(err)
}
err = cmd.Wait()
if err != nil {
panic(err)
}
}()
cfg := fesnats.Config{
Cluster: clusterId,
Client: fmt.Sprintf("client-%s", id),
Expand Down

0 comments on commit bb06afd

Please sign in to comment.