From 0c8aaa2e2a8a7b89386ee6adcefda2e1cc4d0e08 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Tue, 27 Feb 2018 20:08:09 +0100 Subject: [PATCH 1/6] Initial setup of Fission integration tests --- pkg/fnenv/fission/runtime.go | 2 +- pkg/fnenv/fission/timed.go | 14 ++-- pkg/types/helpers.go | 8 +++ pkg/types/validate/validate.go | 5 ++ test/integration/fission/fnenv_test.go | 92 ++++++++++++++++++++++++++ 5 files changed, 113 insertions(+), 8 deletions(-) create mode 100644 test/integration/fission/fnenv_test.go diff --git a/pkg/fnenv/fission/runtime.go b/pkg/fnenv/fission/runtime.go index a8bbb0b6..776f4b0e 100644 --- a/pkg/fnenv/fission/runtime.go +++ b/pkg/fnenv/fission/runtime.go @@ -29,7 +29,7 @@ var log = logrus.WithField("component", "fnenv.fission") type FunctionEnv struct { executor *executor.Client routerURL string - timedExecService *TimedExecPool + timedExecService *timedExecPool } const ( diff --git a/pkg/fnenv/fission/timed.go b/pkg/fnenv/fission/timed.go index e14e7bc3..1de90d99 100644 --- a/pkg/fnenv/fission/timed.go +++ b/pkg/fnenv/fission/timed.go @@ -6,22 +6,22 @@ import ( "time" ) -// TimedExecPool provides a data structure for scheduling executions based on a timestamp. -type TimedExecPool struct { +// timedExecPool provides a data structure for scheduling executions based on a timestamp. +type timedExecPool struct { fnQueue *timedFnQueue cancel chan struct{} fnsLock *sync.Mutex } -func NewTimedExecPool() *TimedExecPool { - return &TimedExecPool{ +func newTimedExecPool() *timedExecPool { + return &timedExecPool{ fnQueue: &timedFnQueue{}, cancel: make(chan struct{}), fnsLock: &sync.Mutex{}, } } -func (ds *TimedExecPool) Submit(fn func(), execAt time.Time) { +func (ds *timedExecPool) Submit(fn func(), execAt time.Time) { ds.fnsLock.Lock() defer ds.fnsLock.Unlock() ds.fnQueue.Push(timedFn{ @@ -32,13 +32,13 @@ func (ds *TimedExecPool) Submit(fn func(), execAt time.Time) { ds.eval() } -func (ds *TimedExecPool) Eval() { +func (ds *timedExecPool) Eval() { ds.fnsLock.Lock() defer ds.fnsLock.Unlock() ds.eval() } -func (ds *TimedExecPool) eval() { +func (ds *timedExecPool) eval() { // Get head t := ds.fnQueue.Peek() if t == nil { diff --git a/pkg/types/helpers.go b/pkg/types/helpers.go index 0f5cff04..217213af 100644 --- a/pkg/types/helpers.go +++ b/pkg/types/helpers.go @@ -217,3 +217,11 @@ type NamedTypedValue struct { } type Inputs map[string]*TypedValue + +func NewTaskInvocationSpec(invocationId string, taskId string, fnRef FnRef) *TaskInvocationSpec { + return &TaskInvocationSpec{ + FnRef: &fnRef, + TaskId: taskId, + InvocationId: invocationId, + } +} diff --git a/pkg/types/validate/validate.go b/pkg/types/validate/validate.go index 93047654..6b784040 100644 --- a/pkg/types/validate/validate.go +++ b/pkg/types/validate/validate.go @@ -29,6 +29,7 @@ var ( ErrNoParentTaskDependency = errors.New("dynamic task does not contain parent dependency") ErrMultipleParentTaskDependency = errors.New("dynamic task contains multiple parent tasks") ErrNoWorkflowInvocation = errors.New("workflow invocation id is required") + ErrNoTaskInvocation = errors.New("task invocation id is required") ErrNoFnRef = errors.New("function reference is required") ErrNoWorkflow = errors.New("workflow id is required") ErrNoID = errors.New("id is required") @@ -259,6 +260,10 @@ func TaskInvocationSpec(spec *types.TaskInvocationSpec) error { errs.append(ErrNoWorkflowInvocation) } + if len(spec.TaskId) == 0 { + errs.append(ErrNoTaskInvocation) + } + if spec.FnRef == nil { errs.append(ErrNoFnRef) } diff --git a/test/integration/fission/fnenv_test.go b/test/integration/fission/fnenv_test.go new file mode 100644 index 00000000..064df4c4 --- /dev/null +++ b/test/integration/fission/fnenv_test.go @@ -0,0 +1,92 @@ +package fission + +import ( + "fmt" + "net/http" + "os" + "testing" + "time" + + "github.com/fission/fission-workflows/pkg/fnenv" + "github.com/fission/fission-workflows/pkg/fnenv/fission" + "github.com/fission/fission-workflows/pkg/types" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" +) + +const ( + defaultFissionControllerAddr = "http://controller.fission" + defaultFissionExecutorAddr = "http://executor.fission" +) + +var fissionRuntime *fission.FunctionEnv +var fissionResolver *fission.Resolver +var resolver fnenv.Resolver + +func TestMain(m *testing.M) { + fissionControllerAddr := os.Getenv("FNENV_FISSION_CONTROLLER") + if len(fissionControllerAddr) == 0 { + fissionControllerAddr = defaultFissionControllerAddr + } + + fissionExecutorAddr := os.Getenv("FNENV_FISSION_Executor") + if len(fissionExecutorAddr) == 0 { + fissionExecutorAddr = defaultFissionExecutorAddr + } + + var fissionMissing bool + _, err := http.Get(fissionControllerAddr) + if err != nil { + logrus.Warnf("Fission Controller not available: %v", err) + fissionMissing = true + } + _, err = http.Get(fissionExecutorAddr) + if err != nil { + logrus.Warnf("Fission Executor not available: %v", err) + fissionMissing = true + } + if fissionMissing { + fmt.Println("Fission not available; skipping Fission integration test") + return + } + + // Setup Fission connection + fissionRuntime = fission.SetupRuntime(fissionExecutorAddr) + fissionResolver = fission.SetupResolver(fissionControllerAddr) + resolver = fnenv.NewMetaResolver(map[string]fnenv.RuntimeResolver{ + "fission": fissionResolver, + }) + + m.Run() +} + +func TestFissionResolveFunction(t *testing.T) { + // TODO add actual function + fnId, err := fissionResolver.Resolve("hello") + assert.NoError(t, err) + assert.NotEmpty(t, fnId) +} + +func TestFissionExecuteFunction(t *testing.T) { + // TODO add actual function + fnRef, err := resolver.Resolve("hello") + assert.NoError(t, err) + assert.NotEmpty(t, fnRef) + + taskSpec := types.NewTaskInvocationSpec("wfi-1", "sometask", fnRef) + status, err := fissionRuntime.Invoke(taskSpec) + assert.NoError(t, err) + assert.True(t, status.Finished()) + assert.Nil(t, status.Error) + assert.NotNil(t, status.Output) +} + +func TestFissionNotify(t *testing.T) { + // TODO add actual function + fnRef, err := resolver.Resolve("hello") + assert.NoError(t, err) + assert.NotEmpty(t, fnRef) + + err = fissionRuntime.Notify("someTask", fnRef, time.Now()) + assert.NoError(t, err) +} From b8e830f382fe5b00ecf5f966bf8a822c7b24456a Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Tue, 13 Mar 2018 15:39:30 +0100 Subject: [PATCH 2/6] Moved NATS integration tests to integration package --- pkg/fes/backend/mem/mem.go | 6 +-- pkg/fes/backend/mem/mem_test.go | 4 +- pkg/fes/backend/nats/client.go | 20 ++++--- pkg/fes/backend/nats/nats.go | 6 ++- pkg/fes/caches.go | 4 +- pkg/fes/types.go | 4 +- pkg/fes/util.go | 1 - test/integration/bundle/bundle_test.go | 2 +- test/integration/fission/fnenv_test.go | 13 ++++- test/integration/nats/nats_test.go | 72 ++++++++++++++++++++++++++ test/integration/util.go | 38 +++++++++----- 11 files changed, 138 insertions(+), 32 deletions(-) create mode 100644 test/integration/nats/nats_test.go diff --git a/pkg/fes/backend/mem/mem.go b/pkg/fes/backend/mem/mem.go index 87333a19..a69cec81 100644 --- a/pkg/fes/backend/mem/mem.go +++ b/pkg/fes/backend/mem/mem.go @@ -54,12 +54,12 @@ func (b *Backend) Append(event *fes.Event) error { return b.Publish(event) } -func (b *Backend) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) { - if !fes.ValidateAggregate(aggregate) { +func (b *Backend) Get(aggregate fes.Aggregate) ([]*fes.Event, error) { + if !fes.ValidateAggregate(&aggregate) { return nil, ErrInvalidAggregate } - key := *aggregate + key := aggregate b.lock.RLock() defer b.lock.RUnlock() events, ok := b.contents[key] diff --git a/pkg/fes/backend/mem/mem_test.go b/pkg/fes/backend/mem/mem_test.go index e988daa3..b7fa4d9b 100644 --- a/pkg/fes/backend/mem/mem_test.go +++ b/pkg/fes/backend/mem/mem_test.go @@ -52,7 +52,7 @@ func TestBackend_GetMultiple(t *testing.T) { assert.NoError(t, err) } - getEvents, err := mem.Get(&key) + getEvents, err := mem.Get(key) assert.NoError(t, err) assert.EqualValues(t, events, getEvents) } @@ -60,7 +60,7 @@ func TestBackend_GetMultiple(t *testing.T) { func TestBackend_GetNonexistent(t *testing.T) { mem := NewBackend() key := fes.NewAggregate("type", "id") - getEvents, err := mem.Get(&key) + getEvents, err := mem.Get(key) assert.NoError(t, err) assert.EqualValues(t, []*fes.Event{}, getEvents) } diff --git a/pkg/fes/backend/nats/client.go b/pkg/fes/backend/nats/client.go index d7cbfebd..cdf94427 100644 --- a/pkg/fes/backend/nats/client.go +++ b/pkg/fes/backend/nats/client.go @@ -1,6 +1,7 @@ package nats import ( + "errors" "fmt" "strings" "time" @@ -20,6 +21,8 @@ const ( ) var ( + ErrInvalidAggregate = errors.New("invalid aggregate") + subsActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "fes", Subsystem: "nats", @@ -140,9 +143,9 @@ func (es *EventStore) Close() error { // Append publishes (and persists) an event on the NATS message queue func (es *EventStore) Append(event *fes.Event) error { // TODO make generic / configurable whether to fold event into parent's Subject - subject := toSubject(event.Aggregate) + subject := toSubject(*event.Aggregate) if event.Parent != nil { - subject = toSubject(event.Parent) + subject = toSubject(*event.Parent) } data, err := proto.Marshal(event) if err != nil { @@ -165,16 +168,21 @@ func (es *EventStore) Append(event *fes.Event) error { } // Get returns all events related to a specific aggregate -func (es *EventStore) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) { +func (es *EventStore) Get(aggregate fes.Aggregate) ([]*fes.Event, error) { + if !fes.ValidateAggregate(&aggregate) { + return nil, ErrInvalidAggregate + } subject := toSubject(aggregate) + // TODO check if subject exists in NATS (MsgSeqRange takes a long time otherwise) + msgs, err := es.conn.MsgSeqRange(subject, firstMsg, mostRecentMsg) if err != nil { return nil, err } var results []*fes.Event - for _, msg := range msgs { - event, err := toEvent(msg) + for k := range msgs { + event, err := toEvent(msgs[k]) if err != nil { return nil, err } @@ -210,7 +218,7 @@ func toAggregate(subject string) *fes.Aggregate { } } -func toSubject(a *fes.Aggregate) string { +func toSubject(a fes.Aggregate) string { return fmt.Sprintf("%s.%s", a.Type, a.Id) } diff --git a/pkg/fes/backend/nats/nats.go b/pkg/fes/backend/nats/nats.go index dd936e0e..f8554938 100644 --- a/pkg/fes/backend/nats/nats.go +++ b/pkg/fes/backend/nats/nats.go @@ -74,7 +74,7 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* defer leftSub.Close() select { case seqEnd = <-rightBound: - case <-time.After(time.Duration(10) * time.Second): + case <-time.After(time.Duration(5) * time.Second): return nil, fmt.Errorf("timed out while finding boundary for Subject '%s'", subject) } } @@ -124,7 +124,9 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* case err := <-errC: return result, err case msg := <-elementC: - result = append(result, msg) + if msg != nil { + result = append(result, msg) + } } } } diff --git a/pkg/fes/caches.go b/pkg/fes/caches.go index d0772aec..4aa92dae 100644 --- a/pkg/fes/caches.go +++ b/pkg/fes/caches.go @@ -259,7 +259,7 @@ func (c *FallbackCache) List() []Aggregate { for _, aggregate := range esAggregates { entity, err := c.cache.GetAggregate(aggregate) if err != nil || entity == nil { - events, err := c.client.Get(&aggregate) + events, err := c.client.Get(aggregate) if err != nil { logrus.WithField("err", err).Error("failed to get missed entity from event store") continue @@ -309,7 +309,7 @@ func (c *FallbackCache) Get(entity Aggregator) error { func (c *FallbackCache) getFromEventStore(aggregate Aggregate, target Aggregator) error { // Look up relevant events in event store - events, err := c.client.Get(&aggregate) + events, err := c.client.Get(aggregate) if err != nil { return err } diff --git a/pkg/fes/types.go b/pkg/fes/types.go index f299d93d..27edd4f1 100644 --- a/pkg/fes/types.go +++ b/pkg/fes/types.go @@ -30,7 +30,9 @@ type EventAppender interface { // Backend is a persistent store for events type Backend interface { EventAppender - Get(aggregate *Aggregate) ([]*Event, error) + + // Get fetches all events that belong to a specific aggregate + Get(aggregate Aggregate) ([]*Event, error) List(matcher StringMatcher) ([]Aggregate, error) } diff --git a/pkg/fes/util.go b/pkg/fes/util.go index 66767ead..eb7fb0cb 100644 --- a/pkg/fes/util.go +++ b/pkg/fes/util.go @@ -44,7 +44,6 @@ func NewAggregate(entityType string, entityID string) Aggregate { func NewEvent(aggregate Aggregate, data []byte) *Event { return &Event{ - Id: aggregate.Id, Type: aggregate.Type, Aggregate: &aggregate, Data: data, diff --git a/test/integration/bundle/bundle_test.go b/test/integration/bundle/bundle_test.go index fca83bac..4c0c7433 100644 --- a/test/integration/bundle/bundle_test.go +++ b/test/integration/bundle/bundle_test.go @@ -25,7 +25,7 @@ const ( func TestMain(m *testing.M) { if testing.Short() { - fmt.Println("Skipping bundle tests...") + fmt.Println("Skipping bundle integration tests...") return } diff --git a/test/integration/fission/fnenv_test.go b/test/integration/fission/fnenv_test.go index 064df4c4..ec12c7a3 100644 --- a/test/integration/fission/fnenv_test.go +++ b/test/integration/fission/fnenv_test.go @@ -24,6 +24,11 @@ var fissionResolver *fission.Resolver var resolver fnenv.Resolver func TestMain(m *testing.M) { + if testing.Short() { + fmt.Println("Skipping Fission integration tests...") + return + } + fissionControllerAddr := os.Getenv("FNENV_FISSION_CONTROLLER") if len(fissionControllerAddr) == 0 { fissionControllerAddr = defaultFissionControllerAddr @@ -34,6 +39,11 @@ func TestMain(m *testing.M) { fissionExecutorAddr = defaultFissionExecutorAddr } + logrus.WithFields(logrus.Fields{ + "controller": fissionControllerAddr, + "executor": fissionExecutorAddr, + }).Infof("Checking if Fission is reachable") + var fissionMissing bool _, err := http.Get(fissionControllerAddr) if err != nil { @@ -46,8 +56,7 @@ func TestMain(m *testing.M) { fissionMissing = true } if fissionMissing { - fmt.Println("Fission not available; skipping Fission integration test") - return + panic("Fission integration test failed: Fission not available or reachable") } // Setup Fission connection diff --git a/test/integration/nats/nats_test.go b/test/integration/nats/nats_test.go new file mode 100644 index 00000000..4f5eac7d --- /dev/null +++ b/test/integration/nats/nats_test.go @@ -0,0 +1,72 @@ +package nats + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/fission/fission-workflows/pkg/fes" + "github.com/fission/fission-workflows/pkg/fes/backend/nats" + "github.com/fission/fission-workflows/test/integration" + "github.com/stretchr/testify/assert" +) + +var ( + backend fes.Backend +) + +// Tests the event store implementation with a live NATS cluster. +// This test will start and stop a NATS streaming cluster by itself. +// +// Prerequisites: +// - Docker + +func TestMain(m *testing.M) { + if testing.Short() { + fmt.Println("Skipping NATS integration tests...") + return + } + + ctx := context.TODO() + + cfg := integration.RunNatsStreaming(ctx) + + natsBackend, err := nats.Connect(cfg) + if err != nil { + panic(fmt.Sprintf("failed to connect to cluster: %v", err)) + } + backend = natsBackend + + status := m.Run() + os.Exit(status) +} + +func TestNatsBackend_GetNonExistent(t *testing.T) { + key := fes.NewAggregate("nonExistentType", "nonExistentId") + + // check + events, err := backend.Get(key) + assert.Error(t, err) + assert.Empty(t, events) +} + +func TestNatsBackend_Append(t *testing.T) { + key := fes.NewAggregate("someType", "someId") + event := fes.NewEvent(key, nil) + err := backend.Append(event) + assert.NoError(t, err) + + // check + events, err := backend.Get(key) + assert.NoError(t, err) + assert.Len(t, events, 1) + event.Id = "1" + assert.Equal(t, event, events[0]) +} + +func TestNatsBackend_List(t *testing.T) { + subjects, err := backend.List(&fes.ContainsMatcher{}) + assert.NoError(t, err) + assert.NotEmpty(t, subjects) +} diff --git a/test/integration/util.go b/test/integration/util.go index 89c9a149..0a0360da 100644 --- a/test/integration/util.go +++ b/test/integration/util.go @@ -3,7 +3,6 @@ package integration import ( "context" "fmt" - "io" "net" "os" "os/exec" @@ -23,7 +22,7 @@ import ( // By default the bundle runs with all components are enabled, setting up a NATS cluster as the // backing event store, and internal fnenv and workflow runtime func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options { - nats := SetupNatsCluster(ctx) + nats := RunNatsStreaming(ctx) var bundleOpts bundle.Options if len(opts) > 0 { bundleOpts = opts[0] @@ -44,7 +43,7 @@ func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options { } // TODO check if there is a nats instance already is running -func SetupNatsCluster(ctx context.Context) fesnats.Config { +func RunNatsStreaming(ctx context.Context) fesnats.Config { id := util.UID() clusterId := fmt.Sprintf("fission-workflows-tests-%s", id) port, err := findFreePort() @@ -52,16 +51,31 @@ func SetupNatsCluster(ctx context.Context) fesnats.Config { panic(err) } address := "127.0.0.1" - flags := strings.Split(fmt.Sprintf("-cid %s -p %d -a %s", clusterId, port, address), " ") - cmd := exec.CommandContext(ctx, "nats-streaming-server", flags...) - stdOut, _ := cmd.StdoutPipe() - stdErr, _ := cmd.StderrPipe() - go io.Copy(os.Stdout, stdOut) - go io.Copy(os.Stdout, stdErr) - err = cmd.Start() - if err != nil { - panic(err) + args := []string{ + "run", + "--rm", + "-i", + "-p", fmt.Sprintf("%d:%d", port, port), + "nats-streaming:0.8.0-beta", + "-cid", clusterId, + "-p", fmt.Sprintf("%d", port), } + + go func() { + fmt.Printf("> docker %s\n", strings.Join(args, " ")) + cmd := exec.CommandContext(ctx, "docker", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Stdin = os.Stdin + err = cmd.Start() + if err != nil { + panic(err) + } + err = cmd.Wait() + if err != nil { + panic(err) + } + }() cfg := fesnats.Config{ Cluster: clusterId, Client: fmt.Sprintf("client-%s", id), From d91f397d1f86690a6b71877e93538ab9b34a1fae Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Mon, 19 Mar 2018 11:36:02 +0100 Subject: [PATCH 3/6] Use dockertest to manage lifecycle of the nats-streaming server --- glide.lock | 89 ++++++++++++++++++--- glide.yaml | 2 + pkg/fes/backend/mem/mem.go | 6 +- test/integration/fission/fnenv_test.go | 12 +-- test/integration/nats/nats_test.go | 61 +++++++++++--- test/integration/util.go | 105 ------------------------- 6 files changed, 139 insertions(+), 136 deletions(-) diff --git a/glide.lock b/glide.lock index e017f6e9..ed3f403c 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: ba75f2d0e6fb6979dd369812eac5b97d27a8cd3e03bcd4ff4b21509895366313 -updated: 2018-06-07T13:10:54.09849+02:00 +hash: 0aa3850135ea276827ed1beef1133960ffef5183930220ad59461636af062fc5 +updated: 2018-06-09T01:00:51.241533+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -14,7 +14,7 @@ imports: - autorest/azure - autorest/date - name: github.com/beorn7/perks - version: 3a771d992973f24aa725d07868b467d1ddfceafb + version: 3ac7bf7a47d159a033b107610db8a1b6575507a4 subpackages: - quantile - name: github.com/davecgh/go-spew @@ -116,11 +116,11 @@ imports: - name: github.com/howeyc/gopass version: bf9dde6d0d2c004a008c27aaee91170c786f6db8 - name: github.com/imdario/mergo - version: 9d5f1277e9a8ed20c3684bda8fde67c05628518c + version: 9316a62528ac99aaecb4e47eadd6dc8aa6533d58 - name: github.com/json-iterator/go version: 13f86432b882000a51c6e610c620974462691a97 - name: github.com/matttproud/golang_protobuf_extensions - version: c12348ce28de40eed0136aa2b644d0ee0650e56c + version: fc2b8d3a73c4867e51861bbdd5ae3c1f0869dd6a subpackages: - pbutil - name: github.com/nats-io/go-nats @@ -147,17 +147,17 @@ imports: - prometheus - prometheus/promhttp - name: github.com/prometheus/client_model - version: 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c + version: fa8ad6fec33561be4280a8f0514318c79d7f6cb6 subpackages: - go - name: github.com/prometheus/common - version: 61f87aac8082fa8c3c5655c7608d7478d46ac2ad + version: 13ba4ddd0caa9c28ca7b7bffe1dfa9ed8d5ef207 subpackages: - expfmt - internal/bitbucket.org/ww/goautoneg - model - name: github.com/prometheus/procfs - version: e645f4e5aaa8506fc71d6edbc5c4ff02c04c46f2 + version: 65c1f6f8f0fc1e2185eb9863a3bc751496404259 subpackages: - xfs - name: github.com/robertkrimen/otto @@ -181,7 +181,7 @@ imports: - name: github.com/urfave/cli version: 0bdeddeeb0f650497d603c4ad7b20cfe685682f6 - name: golang.org/x/crypto - version: d172538b2cfce0c13cee31e647d0367aa8cd2486 + version: 81e90905daefcd6fd217b62423c0908922eadb30 subpackages: - ssh/terminal - name: golang.org/x/net @@ -211,6 +211,7 @@ imports: version: c488ab1dd8481ef762f96a79a9577c27825be697 subpackages: - unix + - windows - name: golang.org/x/text version: 88f656faf3f37f690df1a32515b479415e1a6769 subpackages: @@ -295,7 +296,7 @@ imports: subpackages: - base64vlq - name: gopkg.in/yaml.v2 - version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 + version: 670d4cfef0544295bc27a114dbac37980d83185a - name: k8s.io/api version: 8b7507fac302640dd5f1efbf9643199952cc58db subpackages: @@ -446,6 +447,72 @@ imports: - util/jsonpath - util/retry testImports: +- name: github.com/Azure/go-ansiterm + version: d6e3b3328b783f23731bc4d058875b0371ff8109 + subpackages: + - winterm +- name: github.com/cenkalti/backoff + version: f756bc9a37f808627c8c1b26d2d6ea40c468440b +- name: github.com/containerd/continuity + version: d3c23511c1bf5851696cba83143d9cbcd666869b + subpackages: + - pathdriver +- name: github.com/docker/docker + version: 5e11f66cb6de472d11647e8b1a744afc941859ad + subpackages: + - api/types + - api/types/blkiodev + - api/types/container + - api/types/filters + - api/types/mount + - api/types/network + - api/types/registry + - api/types/strslice + - api/types/swarm + - api/types/swarm/runtime + - api/types/versions + - opts + - pkg/archive + - pkg/fileutils + - pkg/homedir + - pkg/idtools + - pkg/ioutils + - pkg/jsonmessage + - pkg/longpath + - pkg/mount + - pkg/pools + - pkg/stdcopy + - pkg/system + - pkg/term + - pkg/term/windows +- name: github.com/docker/go-connections + version: 7395e3f8aa162843a74ed6d48e79627d9792ac55 + subpackages: + - nat +- name: github.com/docker/go-units + version: 47565b4f722fb6ceae66b95f853feed578a4a51c +- name: github.com/docker/libnetwork + version: 19279f0492417475b6bfbd0aa529f73e8f178fb5 + subpackages: + - ipamutils +- name: github.com/fsouza/go-dockerclient + version: 51bd33c0c7792b2566054672a4915b406d988cc6 +- name: github.com/Microsoft/go-winio + version: ab35fc04b6365e8fcb18e6e9e41ea4a02b10b175 +- name: github.com/Nvveen/Gotty + version: cd527374f1e5bff4938207604a14f2e38a9cf512 +- name: github.com/opencontainers/go-digest + version: c9281466c8b2f606084ac71339773efd177436e7 +- name: github.com/opencontainers/image-spec + version: e562b04403929d582d449ae5386ff79dd7961a11 + subpackages: + - specs-go + - specs-go/v1 +- name: github.com/opencontainers/runc + version: dd56ece8236d6d9e5bed4ea0c31fe53c7b873ff4 + subpackages: + - libcontainer/system + - libcontainer/user - name: github.com/pmezard/go-difflib version: d8ed2627bdf02c080bf22230dbb337003b7aba2d subpackages: @@ -454,3 +521,5 @@ testImports: version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 subpackages: - assert +- name: gopkg.in/ory-am/dockertest.v3 + version: 15c8e8835bba04e0d7c2b57958ffe294d5e643dc diff --git a/glide.yaml b/glide.yaml index bfad3119..ac9fd17c 100644 --- a/glide.yaml +++ b/glide.yaml @@ -70,3 +70,5 @@ testImport: version: 1.1.4 subpackages: - assert +- package: gopkg.in/ory-am/dockertest.v3 + version: v3.1.6 diff --git a/pkg/fes/backend/mem/mem.go b/pkg/fes/backend/mem/mem.go index a69cec81..a296a72a 100644 --- a/pkg/fes/backend/mem/mem.go +++ b/pkg/fes/backend/mem/mem.go @@ -54,12 +54,10 @@ func (b *Backend) Append(event *fes.Event) error { return b.Publish(event) } -func (b *Backend) Get(aggregate fes.Aggregate) ([]*fes.Event, error) { - if !fes.ValidateAggregate(&aggregate) { +func (b *Backend) Get(key fes.Aggregate) ([]*fes.Event, error) { + if !fes.ValidateAggregate(&key) { return nil, ErrInvalidAggregate } - - key := aggregate b.lock.RLock() defer b.lock.RUnlock() events, ok := b.contents[key] diff --git a/test/integration/fission/fnenv_test.go b/test/integration/fission/fnenv_test.go index ec12c7a3..328040fb 100644 --- a/test/integration/fission/fnenv_test.go +++ b/test/integration/fission/fnenv_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/fission/fission-workflows/pkg/fnenv" - "github.com/fission/fission-workflows/pkg/fnenv/fission" + fissionenv "github.com/fission/fission-workflows/pkg/fnenv/fission" "github.com/fission/fission-workflows/pkg/types" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -19,8 +19,8 @@ const ( defaultFissionExecutorAddr = "http://executor.fission" ) -var fissionRuntime *fission.FunctionEnv -var fissionResolver *fission.Resolver +var fissionRuntime *fissionenv.FunctionEnv +var fissionResolver *fissionenv.Resolver var resolver fnenv.Resolver func TestMain(m *testing.M) { @@ -34,7 +34,7 @@ func TestMain(m *testing.M) { fissionControllerAddr = defaultFissionControllerAddr } - fissionExecutorAddr := os.Getenv("FNENV_FISSION_Executor") + fissionExecutorAddr := os.Getenv("FNENV_FISSION_EXECUTOR") if len(fissionExecutorAddr) == 0 { fissionExecutorAddr = defaultFissionExecutorAddr } @@ -60,8 +60,8 @@ func TestMain(m *testing.M) { } // Setup Fission connection - fissionRuntime = fission.SetupRuntime(fissionExecutorAddr) - fissionResolver = fission.SetupResolver(fissionControllerAddr) + fissionRuntime = fissionenv.SetupRuntime(fissionExecutorAddr) + fissionResolver = fissionenv.SetupResolver(fissionControllerAddr) resolver = fnenv.NewMetaResolver(map[string]fnenv.RuntimeResolver{ "fission": fissionResolver, }) diff --git a/test/integration/nats/nats_test.go b/test/integration/nats/nats_test.go index 4f5eac7d..0e65d4a5 100644 --- a/test/integration/nats/nats_test.go +++ b/test/integration/nats/nats_test.go @@ -1,15 +1,17 @@ package nats import ( - "context" "fmt" "os" "testing" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/fes/backend/nats" - "github.com/fission/fission-workflows/test/integration" + fesnats "github.com/fission/fission-workflows/pkg/fes/backend/nats" + "github.com/fission/fission-workflows/pkg/util" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "gopkg.in/ory-am/dockertest.v3" ) var ( @@ -18,27 +20,64 @@ var ( // Tests the event store implementation with a live NATS cluster. // This test will start and stop a NATS streaming cluster by itself. -// -// Prerequisites: -// - Docker func TestMain(m *testing.M) { if testing.Short() { fmt.Println("Skipping NATS integration tests...") return } + // uses a sensible default on windows (tcp/http) and linux/osx (socket) + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } - ctx := context.TODO() - - cfg := integration.RunNatsStreaming(ctx) + // pulls an image, creates a container based on it and runs it + id := util.Uid() + clusterId := fmt.Sprintf("fission-workflows-tests-%s", id) + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ - natsBackend, err := nats.Connect(cfg) + Repository: "nats-streaming", + Tag: "0.8.0-beta", + Cmd: []string{"-cid", clusterId, "-p", fmt.Sprintf("%d", 4222)}, + ExposedPorts: []string{"4222"}, + }) if err != nil { - panic(fmt.Sprintf("failed to connect to cluster: %v", err)) + log.Fatalf("Could not start resource: %s", err) + } + + cleanup := func() { + if err := pool.Purge(resource); err != nil { + log.Fatalf("Could not purge resource: %s", err) + } } - backend = natsBackend + defer cleanup() + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + if err := pool.Retry(func() error { + cfg := fesnats.Config{ + Cluster: clusterId, + Client: fmt.Sprintf("client-%s", id), + Url: fmt.Sprintf("nats://%s:%s", "0.0.0.0", resource.GetPort("4222/tcp")), + } + + var err error + backend, err = nats.Connect(cfg) + if err != nil { + return fmt.Errorf("failed to connect to cluster: %v", err) + } + return nil // TODO add ping + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + fmt.Println(backend) + fmt.Println("Setup done; running tests") status := m.Run() + fmt.Println("Cleaning up test message queue") + + // You can't defer this because os.Exit doesn't care for defer + cleanup() os.Exit(status) } diff --git a/test/integration/util.go b/test/integration/util.go index 0a0360da..adc36e61 100644 --- a/test/integration/util.go +++ b/test/integration/util.go @@ -2,19 +2,8 @@ package integration import ( "context" - "fmt" - "net" - "os" - "os/exec" - "strings" - "time" "github.com/fission/fission-workflows/cmd/fission-workflows-bundle/bundle" - fesnats "github.com/fission/fission-workflows/pkg/fes/backend/nats" - "github.com/fission/fission-workflows/pkg/util" - "github.com/nats-io/go-nats" - "github.com/nats-io/go-nats-streaming" - "github.com/sirupsen/logrus" ) // SetupBundle sets up and runs the workflows-bundle. @@ -22,7 +11,6 @@ import ( // By default the bundle runs with all components are enabled, setting up a NATS cluster as the // backing event store, and internal fnenv and workflow runtime func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options { - nats := RunNatsStreaming(ctx) var bundleOpts bundle.Options if len(opts) > 0 { bundleOpts = opts[0] @@ -35,101 +23,8 @@ func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options { InvocationAPI: true, WorkflowAPI: true, AdminAPI: true, - Nats: &nats, } } go bundle.Run(ctx, &bundleOpts) return bundleOpts } - -// TODO check if there is a nats instance already is running -func RunNatsStreaming(ctx context.Context) fesnats.Config { - id := util.UID() - clusterId := fmt.Sprintf("fission-workflows-tests-%s", id) - port, err := findFreePort() - if err != nil { - panic(err) - } - address := "127.0.0.1" - args := []string{ - "run", - "--rm", - "-i", - "-p", fmt.Sprintf("%d:%d", port, port), - "nats-streaming:0.8.0-beta", - "-cid", clusterId, - "-p", fmt.Sprintf("%d", port), - } - - go func() { - fmt.Printf("> docker %s\n", strings.Join(args, " ")) - cmd := exec.CommandContext(ctx, "docker", args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Stdin = os.Stdin - err = cmd.Start() - if err != nil { - panic(err) - } - err = cmd.Wait() - if err != nil { - panic(err) - } - }() - cfg := fesnats.Config{ - Cluster: clusterId, - Client: fmt.Sprintf("client-%s", id), - URL: fmt.Sprintf("nats://%s:%d", address, port), - } - - logrus.WithField("config", cfg).Info("Setting up NATS server") - - // wait for a bit to set it up - awaitCtx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second) - defer cancel() - err = waitForNats(awaitCtx, cfg.URL, cfg.Cluster) - if err != nil { - logrus.Error(err) - } - logrus.WithField("config", cfg).Info("NATS Server running") - - return cfg -} - -func findFreePort() (int, error) { - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") - if err != nil { - return 0, err - } - - listener, err := net.ListenTCP("tcp", addr) - if err != nil { - return 0, err - } - defer listener.Close() - tcpAddr := listener.Addr().(*net.TCPAddr) - return tcpAddr.Port, nil -} - -// Wait for NATS to come online, ignoring ErrNoServer as it could mean that NATS is still being setup -func waitForNats(ctx context.Context, url string, cluster string) error { - conn, err := stan.Connect(cluster, "setupEventStore-alive-test", stan.NatsURL(url), - stan.ConnectWait(time.Duration(10)*time.Second)) - if err == nats.ErrNoServers { - logrus.WithFields(logrus.Fields{ - "cluster": cluster, - "url": url, - }).Warnf("retrying due to err: %v", err) - select { - case <-time.After(time.Duration(1) * time.Second): - return waitForNats(ctx, url, cluster) - case <-ctx.Done(): - return ctx.Err() - } - } - if err != nil { - return err - } - defer conn.Close() - return nil -} From d630b1293bfa99bb4ffe72994dc7724fe352a1a4 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Mon, 18 Jun 2018 16:11:31 +0200 Subject: [PATCH 4/6] Improve readablity of validation errors --- pkg/types/validate/validate.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/types/validate/validate.go b/pkg/types/validate/validate.go index 6b784040..2e7e501d 100644 --- a/pkg/types/validate/validate.go +++ b/pkg/types/validate/validate.go @@ -76,11 +76,11 @@ func (ie Error) Error() string { vt = "Value" } prefix := fmt.Sprintf("%s is invalid", vt) - rs := ie.Reasons() - if len(rs) == 1 { - return prefix + ": " + rs[0].Error() + var rs []string + for k, reason := range ie.Reasons() { + rs = append(rs, fmt.Sprintf("(%d) %v", k, reason)) } - return prefix + " (multiple errors)" + return prefix + ": " + strings.Join(rs, "; ") } func (ie Error) getOrNil() error { From c6c8f32a5402fb06a946f56d5e7fea5d44334c6c Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Mon, 18 Jun 2018 16:37:58 +0200 Subject: [PATCH 5/6] Upgrade dockertest dependency --- glide.lock | 71 ++++++++++++++++++++++++------------------------------ glide.yaml | 2 +- 2 files changed, 33 insertions(+), 40 deletions(-) diff --git a/glide.lock b/glide.lock index ed3f403c..2760ab13 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 0aa3850135ea276827ed1beef1133960ffef5183930220ad59461636af062fc5 -updated: 2018-06-09T01:00:51.241533+02:00 +hash: 17bbcf54feef7c5516e253cd617a4f360562db75cb922cd91a7452cecda5adbb +updated: 2018-06-18T16:36:56.952921+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -329,7 +329,7 @@ imports: - storage/v1alpha1 - storage/v1beta1 - name: k8s.io/apiextensions-apiserver - version: 8e7f43002fec5394a8d96ebca781aa9d4b37aaef + version: b13a681559816a9c14f93086bbeeed1c7baf2bcb subpackages: - pkg/apis/apiextensions - pkg/apis/apiextensions/v1beta1 @@ -337,7 +337,7 @@ imports: - pkg/client/clientset/clientset/scheme - pkg/client/clientset/clientset/typed/apiextensions/v1beta1 - name: k8s.io/apimachinery - version: 17529ec7eadb8de8e7dc835201455f53571f655a + version: f6313580a4d36c7c74a3d845dda6e116642c4f90 subpackages: - pkg/api/errors - pkg/api/meta @@ -457,46 +457,12 @@ testImports: version: d3c23511c1bf5851696cba83143d9cbcd666869b subpackages: - pathdriver -- name: github.com/docker/docker - version: 5e11f66cb6de472d11647e8b1a744afc941859ad - subpackages: - - api/types - - api/types/blkiodev - - api/types/container - - api/types/filters - - api/types/mount - - api/types/network - - api/types/registry - - api/types/strslice - - api/types/swarm - - api/types/swarm/runtime - - api/types/versions - - opts - - pkg/archive - - pkg/fileutils - - pkg/homedir - - pkg/idtools - - pkg/ioutils - - pkg/jsonmessage - - pkg/longpath - - pkg/mount - - pkg/pools - - pkg/stdcopy - - pkg/system - - pkg/term - - pkg/term/windows - name: github.com/docker/go-connections version: 7395e3f8aa162843a74ed6d48e79627d9792ac55 subpackages: - nat - name: github.com/docker/go-units version: 47565b4f722fb6ceae66b95f853feed578a4a51c -- name: github.com/docker/libnetwork - version: 19279f0492417475b6bfbd0aa529f73e8f178fb5 - subpackages: - - ipamutils -- name: github.com/fsouza/go-dockerclient - version: 51bd33c0c7792b2566054672a4915b406d988cc6 - name: github.com/Microsoft/go-winio version: ab35fc04b6365e8fcb18e6e9e41ea4a02b10b175 - name: github.com/Nvveen/Gotty @@ -513,6 +479,33 @@ testImports: subpackages: - libcontainer/system - libcontainer/user +- name: github.com/ory/dockertest + version: 9bca068bf5e4af2484b9c2e8cfeb3d098d5327d7 + subpackages: + - docker + - docker/opts + - docker/pkg/archive + - docker/pkg/fileutils + - docker/pkg/homedir + - docker/pkg/idtools + - docker/pkg/ioutils + - docker/pkg/jsonmessage + - docker/pkg/longpath + - docker/pkg/mount + - docker/pkg/pools + - docker/pkg/stdcopy + - docker/pkg/system + - docker/pkg/term + - docker/pkg/term/windows + - docker/types + - docker/types/blkiodev + - docker/types/container + - docker/types/filters + - docker/types/mount + - docker/types/network + - docker/types/registry + - docker/types/strslice + - docker/types/versions - name: github.com/pmezard/go-difflib version: d8ed2627bdf02c080bf22230dbb337003b7aba2d subpackages: @@ -522,4 +515,4 @@ testImports: subpackages: - assert - name: gopkg.in/ory-am/dockertest.v3 - version: 15c8e8835bba04e0d7c2b57958ffe294d5e643dc + version: 9bca068bf5e4af2484b9c2e8cfeb3d098d5327d7 diff --git a/glide.yaml b/glide.yaml index ac9fd17c..c172eae8 100644 --- a/glide.yaml +++ b/glide.yaml @@ -71,4 +71,4 @@ testImport: subpackages: - assert - package: gopkg.in/ory-am/dockertest.v3 - version: v3.1.6 + version: v3.3.1 From f7a46659eb2179c0f5de992cc9ac24291fbac71d Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 29 Mar 2018 10:47:45 +0200 Subject: [PATCH 6/6] Add fission integration tests --- .travis.yml | 1 - pkg/fnenv/fission/runtime.go | 7 +- pkg/fnenv/fission/timed.go | 2 +- pkg/fnenv/fnenv.go | 2 +- test/integration/bundle/bundle_test.go | 4 +- test/integration/fission/fn.js | 8 + test/integration/fission/fnenv_test.go | 101 ---------- test/integration/fission/runtime_test.go | 179 ++++++++++++++++++ .../specs/env-fission-runtime-test.yaml | 17 ++ .../specs/fission-deployment-config.yaml | 7 + .../specs/function-fission-runtime-test.yaml | 50 +++++ test/integration/nats/nats_test.go | 13 +- test/runtests.sh | 4 +- 13 files changed, 276 insertions(+), 119 deletions(-) create mode 100644 test/integration/fission/fn.js delete mode 100644 test/integration/fission/fnenv_test.go create mode 100644 test/integration/fission/runtime_test.go create mode 100644 test/integration/fission/specs/env-fission-runtime-test.yaml create mode 100644 test/integration/fission/specs/fission-deployment-config.yaml create mode 100644 test/integration/fission/specs/function-fission-runtime-test.yaml diff --git a/.travis.yml b/.travis.yml index cfd3e56c..e3df8844 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,7 +39,6 @@ install: - test/e2e/install-clients.sh # Needed for some integration tests - glide -h >/dev/null 2>&1 || go get github.com/Masterminds/glide -- nats-streaming-server -h >/dev/null 2>&1 || go get github.com/nats-io/nats-streaming-server # TODO remove the need for this before_script: - cd ${TRAVIS_BUILD_DIR} diff --git a/pkg/fnenv/fission/runtime.go b/pkg/fnenv/fission/runtime.go index 776f4b0e..14135e93 100644 --- a/pkg/fnenv/fission/runtime.go +++ b/pkg/fnenv/fission/runtime.go @@ -40,8 +40,9 @@ const ( func NewFunctionEnv(executor *executor.Client, routerURL string) *FunctionEnv { return &FunctionEnv{ - executor: executor, - routerURL: routerURL, + executor: executor, + routerURL: routerURL, + timedExecService: newTimedExecPool(), } } @@ -110,7 +111,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca } // Notify signals the Fission runtime that a function request is expected at a specific time. -func (fe *FunctionEnv) Notify(taskID string, fn types.FnRef, expectedAt time.Time) error { +func (fe *FunctionEnv) Notify(fn types.FnRef, expectedAt time.Time) error { reqURL, err := fe.getFnURL(fn) if err != nil { return err diff --git a/pkg/fnenv/fission/timed.go b/pkg/fnenv/fission/timed.go index 1de90d99..9544e113 100644 --- a/pkg/fnenv/fission/timed.go +++ b/pkg/fnenv/fission/timed.go @@ -24,7 +24,7 @@ func newTimedExecPool() *timedExecPool { func (ds *timedExecPool) Submit(fn func(), execAt time.Time) { ds.fnsLock.Lock() defer ds.fnsLock.Unlock() - ds.fnQueue.Push(timedFn{ + ds.fnQueue.Push(&timedFn{ execAt: execAt, fn: fn, }) diff --git a/pkg/fnenv/fnenv.go b/pkg/fnenv/fnenv.go index c2e57233..3cde806b 100644 --- a/pkg/fnenv/fnenv.go +++ b/pkg/fnenv/fnenv.go @@ -88,7 +88,7 @@ type Notifier interface { // a signal that the function invocation will come (almost) immediately. fnId is an optional // identifier for the signal, which the implementation can use this to identify signals. // By default, if fnId is empty, it is not possible to later update the notification. - Notify(taskID string, fn types.FnRef, expectedAt time.Time) error + Notify(fn types.FnRef, expectedAt time.Time) error } // Resolver resolves a reference to a function to a deterministic, unique function id. diff --git a/test/integration/bundle/bundle_test.go b/test/integration/bundle/bundle_test.go index 4c0c7433..15f0e893 100644 --- a/test/integration/bundle/bundle_test.go +++ b/test/integration/bundle/bundle_test.go @@ -3,7 +3,6 @@ package bundle import ( "context" - "fmt" "os" "strings" "testing" @@ -15,6 +14,7 @@ import ( "github.com/fission/fission-workflows/pkg/types/typedvalues" "github.com/fission/fission-workflows/test/integration" "github.com/golang/protobuf/ptypes/empty" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) @@ -25,7 +25,7 @@ const ( func TestMain(m *testing.M) { if testing.Short() { - fmt.Println("Skipping bundle integration tests...") + log.Info("Short test; skipping bundle integration tests.") return } diff --git a/test/integration/fission/fn.js b/test/integration/fission/fn.js new file mode 100644 index 00000000..c8912832 --- /dev/null +++ b/test/integration/fission/fn.js @@ -0,0 +1,8 @@ +module.exports = async (context) => { + const body = JSON.stringify(context.request.body); + const headers = JSON.stringify(context.request.headers); + return { + status: 200, + body: `body:${body}\nheaders:${headers}` + }; +} \ No newline at end of file diff --git a/test/integration/fission/fnenv_test.go b/test/integration/fission/fnenv_test.go deleted file mode 100644 index 328040fb..00000000 --- a/test/integration/fission/fnenv_test.go +++ /dev/null @@ -1,101 +0,0 @@ -package fission - -import ( - "fmt" - "net/http" - "os" - "testing" - "time" - - "github.com/fission/fission-workflows/pkg/fnenv" - fissionenv "github.com/fission/fission-workflows/pkg/fnenv/fission" - "github.com/fission/fission-workflows/pkg/types" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" -) - -const ( - defaultFissionControllerAddr = "http://controller.fission" - defaultFissionExecutorAddr = "http://executor.fission" -) - -var fissionRuntime *fissionenv.FunctionEnv -var fissionResolver *fissionenv.Resolver -var resolver fnenv.Resolver - -func TestMain(m *testing.M) { - if testing.Short() { - fmt.Println("Skipping Fission integration tests...") - return - } - - fissionControllerAddr := os.Getenv("FNENV_FISSION_CONTROLLER") - if len(fissionControllerAddr) == 0 { - fissionControllerAddr = defaultFissionControllerAddr - } - - fissionExecutorAddr := os.Getenv("FNENV_FISSION_EXECUTOR") - if len(fissionExecutorAddr) == 0 { - fissionExecutorAddr = defaultFissionExecutorAddr - } - - logrus.WithFields(logrus.Fields{ - "controller": fissionControllerAddr, - "executor": fissionExecutorAddr, - }).Infof("Checking if Fission is reachable") - - var fissionMissing bool - _, err := http.Get(fissionControllerAddr) - if err != nil { - logrus.Warnf("Fission Controller not available: %v", err) - fissionMissing = true - } - _, err = http.Get(fissionExecutorAddr) - if err != nil { - logrus.Warnf("Fission Executor not available: %v", err) - fissionMissing = true - } - if fissionMissing { - panic("Fission integration test failed: Fission not available or reachable") - } - - // Setup Fission connection - fissionRuntime = fissionenv.SetupRuntime(fissionExecutorAddr) - fissionResolver = fissionenv.SetupResolver(fissionControllerAddr) - resolver = fnenv.NewMetaResolver(map[string]fnenv.RuntimeResolver{ - "fission": fissionResolver, - }) - - m.Run() -} - -func TestFissionResolveFunction(t *testing.T) { - // TODO add actual function - fnId, err := fissionResolver.Resolve("hello") - assert.NoError(t, err) - assert.NotEmpty(t, fnId) -} - -func TestFissionExecuteFunction(t *testing.T) { - // TODO add actual function - fnRef, err := resolver.Resolve("hello") - assert.NoError(t, err) - assert.NotEmpty(t, fnRef) - - taskSpec := types.NewTaskInvocationSpec("wfi-1", "sometask", fnRef) - status, err := fissionRuntime.Invoke(taskSpec) - assert.NoError(t, err) - assert.True(t, status.Finished()) - assert.Nil(t, status.Error) - assert.NotNil(t, status.Output) -} - -func TestFissionNotify(t *testing.T) { - // TODO add actual function - fnRef, err := resolver.Resolve("hello") - assert.NoError(t, err) - assert.NotEmpty(t, fnRef) - - err = fissionRuntime.Notify("someTask", fnRef, time.Now()) - assert.NoError(t, err) -} diff --git a/test/integration/fission/runtime_test.go b/test/integration/fission/runtime_test.go new file mode 100644 index 00000000..180edd38 --- /dev/null +++ b/test/integration/fission/runtime_test.go @@ -0,0 +1,179 @@ +package fission + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + "strings" + "testing" + "time" + + "github.com/fission/fission-workflows/pkg/fnenv/fission" + "github.com/fission/fission-workflows/pkg/types" + "github.com/fission/fission-workflows/pkg/types/typedvalues" + controllerclient "github.com/fission/fission/controller/client" + executorclient "github.com/fission/fission/executor/client" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" +) + +// Future: discover executor of fission deployment +const ( + executorLocalPort = 9031 + routerLocalPort = 9032 + controllerLocalPort = 9033 +) + +var executor = executorclient.MakeClient(localhost(executorLocalPort)) +var controller = controllerclient.MakeClient(localhost(controllerLocalPort)) +var testFnName = "fission-runtime-test" + +// Currently we assume that fission is present (along with the CLI) and kubectl. +func TestMain(m *testing.M) { + var status int + if testing.Short() { + log.Info("Short test; skipping Fission integration tests") + return + } + + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) + defer func() { + log.Info("Awaiting completion of goroutines...") + cancelFn() + time.Sleep(5 * time.Second) + os.Exit(status) + }() + + // Test if Fission is present + if err := exec.CommandContext(ctx, "fission", "fn", "list").Run(); err != nil { + log.Panicf("Fission is not present: %v", err) + } + + err := portForwardComponents(ctx, "fission") + if err != nil { + log.Panicf("Failed to forward components: %v", err) + } + time.Sleep(time.Minute) + + // Test if cluster is accessible + if err := exec.CommandContext(ctx, "curl", localhost(routerLocalPort)).Run(); err != nil { + log.Panicf("Fission cluster (%v) is not accessible: %v", localhost(controllerLocalPort), err) + } + + // Setup test environment and function (use fission CLI because client is not exposed correctly) + if err := exec.Command("fission", "spec", "apply").Run(); err != nil { + log.Panicf("Failed to create test resources in fission: %v", err) + } + log.Info("Fission integration test resources setup.") + + status = m.Run() + + // Clean up test function and env + cancelFn() + if err := exec.Command("fission", "spec", "destroy").Run(); err != nil { + log.Fatalf("Failed to destroy test resources in fission: %v", err) + } + log.Info("Cleaned up fission test resources.") +} + +func TestFnenvResolve(t *testing.T) { + resolver := fission.NewResolver(controller) + resolved, err := resolver.Resolve(testFnName) + assert.NoError(t, err) + assert.Equal(t, testFnName, resolved) +} + +func TestFnenvNotify(t *testing.T) { + fnref := types.NewFnRef(fission.Name, testFnName) + fnenv := fission.NewFunctionEnv(executor, localhost(routerLocalPort)) + err := fnenv.Notify(fnref, time.Now().Add(100*time.Millisecond)) + assert.NoError(t, err) +} + +func TestFnenvInvoke(t *testing.T) { + fnref := types.NewFnRef(fission.Name, testFnName) + fnenv := fission.NewFunctionEnv(executor, localhost(routerLocalPort)) + body := "stubBodyVal" + headerVal := "stub-header-val" + headerKey := "stub-header-key" + + result, err := fnenv.Invoke(&types.TaskInvocationSpec{ + TaskId: "fooTask", + InvocationId: "fooInvocation", + Inputs: types.Inputs{ + "default": typedvalues.MustParse(body), + "headers": typedvalues.MustParse(map[string]interface{}{ + headerKey: headerVal, + }), + }, + FnRef: &fnref, + }) + output := typedvalues.MustFormat(result.Output) + assert.NoError(t, err) + assert.True(t, result.Finished()) + assert.NotEmpty(t, output) + assert.Contains(t, output, body) + assert.Contains(t, output, headerVal) + assert.Contains(t, output, headerKey) +} + +func portForwardComponents(ctx context.Context, ns string) error { + buf := bytes.NewBuffer(nil) + cmd := exec.CommandContext(ctx, "kubectl", "-n", ns, "get", "pods", "-o", "name") + cmd.Stdout = buf + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return err + } + pods := strings.Split(buf.String(), "\n") + for _, pod := range pods { + if kindSplit := strings.Index(pod, "/"); kindSplit >= 0 { + pod = pod[kindSplit+1:] + } + + var portMap string + var err error + switch { + case strings.HasPrefix(pod, "controller"): + portMap = fmt.Sprintf("%d:80", controllerLocalPort) + case strings.HasPrefix(pod, "executor"): + portMap = fmt.Sprintf("%d:80", executorLocalPort) + case strings.HasPrefix(pod, "router"): + portMap = fmt.Sprintf("%d:80", routerLocalPort) + } + if len(portMap) != 0 { + err = portForward(ctx, ns, pod, portMap) + fmt.Printf("Setup proxy from %v to %v\n", portMap, pod) + if err != nil { + return err + } + } + } + return nil +} + +func portForward(ctx context.Context, ns string, pod string, portMap string) error { + cmd := exec.CommandContext(ctx, "kubectl", "-n", ns, "port-forward", pod, portMap) + log.Info("exec: ", strings.Join(cmd.Args, " ")) + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stdout + err := cmd.Start() + if err != nil { + return err + } + go func() { + state, err := cmd.Process.Wait() + if err != nil { + log.Error(err) + } + log.Infof("portForward: %v.%v - %v", pod, ns, state) + + }() + return nil +} + +func localhost(port int) string { + return fmt.Sprintf("http://localhost:%d", port) +} diff --git a/test/integration/fission/specs/env-fission-runtime-test.yaml b/test/integration/fission/specs/env-fission-runtime-test.yaml new file mode 100644 index 00000000..c08fc092 --- /dev/null +++ b/test/integration/fission/specs/env-fission-runtime-test.yaml @@ -0,0 +1,17 @@ +apiVersion: fission.io/v1 +kind: Environment +metadata: + creationTimestamp: null + name: fission-runtime-test + namespace: default +spec: + TerminationGracePeriod: 360 + builder: {} + poolsize: 3 + resources: {} + runtime: + functionendpointport: 0 + image: fission/node-env + loadendpointpath: "" + loadendpointport: 0 + version: 1 diff --git a/test/integration/fission/specs/fission-deployment-config.yaml b/test/integration/fission/specs/fission-deployment-config.yaml new file mode 100644 index 00000000..ad84f523 --- /dev/null +++ b/test/integration/fission/specs/fission-deployment-config.yaml @@ -0,0 +1,7 @@ +# This file is generated by the 'fission spec init' command. +# See the README in this directory for background and usage information. +# Do not edit the UID below: that will break 'fission spec apply' +apiVersion: fission.io/v1 +kind: DeploymentConfig +name: fission +uid: 3aacd049-9a2c-45d1-b9db-16ad4d6635aa diff --git a/test/integration/fission/specs/function-fission-runtime-test.yaml b/test/integration/fission/specs/function-fission-runtime-test.yaml new file mode 100644 index 00000000..102f3cd5 --- /dev/null +++ b/test/integration/fission/specs/function-fission-runtime-test.yaml @@ -0,0 +1,50 @@ +include: +- fn.js +kind: ArchiveUploadSpec +name: fn-js + +--- +apiVersion: fission.io/v1 +kind: Package +metadata: + creationTimestamp: null + name: fn-js-faqo + namespace: default +spec: + deployment: + checksum: {} + type: url + url: archive://fn-js + environment: + name: fission-runtime-test + namespace: default + source: + checksum: {} +status: + buildstatus: none + +--- +apiVersion: fission.io/v1 +kind: Function +metadata: + creationTimestamp: null + name: fission-runtime-test + namespace: default +spec: + InvokeStrategy: + ExecutionStrategy: + ExecutorType: poolmgr + MaxScale: 1 + MinScale: 0 + TargetCPUPercent: 80 + StrategyType: execution + configmaps: null + environment: + name: fission-runtime-test + namespace: default + package: + packageref: + name: fn-js-faqo + namespace: default + resources: {} + secrets: null diff --git a/test/integration/nats/nats_test.go b/test/integration/nats/nats_test.go index 0e65d4a5..36748f71 100644 --- a/test/integration/nats/nats_test.go +++ b/test/integration/nats/nats_test.go @@ -23,7 +23,7 @@ var ( func TestMain(m *testing.M) { if testing.Short() { - fmt.Println("Skipping NATS integration tests...") + log.Info("Short test; skipping NATS integration tests.") return } // uses a sensible default on windows (tcp/http) and linux/osx (socket) @@ -33,7 +33,7 @@ func TestMain(m *testing.M) { } // pulls an image, creates a container based on it and runs it - id := util.Uid() + id := util.UID() clusterId := fmt.Sprintf("fission-workflows-tests-%s", id) resource, err := pool.RunWithOptions(&dockertest.RunOptions{ @@ -58,7 +58,7 @@ func TestMain(m *testing.M) { cfg := fesnats.Config{ Cluster: clusterId, Client: fmt.Sprintf("client-%s", id), - Url: fmt.Sprintf("nats://%s:%s", "0.0.0.0", resource.GetPort("4222/tcp")), + URL: fmt.Sprintf("nats://%s:%s", "0.0.0.0", resource.GetPort("4222/tcp")), } var err error @@ -66,15 +66,14 @@ func TestMain(m *testing.M) { if err != nil { return fmt.Errorf("failed to connect to cluster: %v", err) } - return nil // TODO add ping + return nil }); err != nil { log.Fatalf("Could not connect to docker: %s", err) } - fmt.Println(backend) - fmt.Println("Setup done; running tests") + log.Info("Setup done; running tests") status := m.Run() - fmt.Println("Cleaning up test message queue") + log.Info("Cleaning up test message queue") // You can't defer this because os.Exit doesn't care for defer cleanup() diff --git a/test/runtests.sh b/test/runtests.sh index a23aec96..ff03faf2 100755 --- a/test/runtests.sh +++ b/test/runtests.sh @@ -3,9 +3,7 @@ set -e # Install test dependencies if requested -if [ -z "$1" ]; then - go test -v -i $(go list ./... | grep -v '/vendor/') -fi +go test -v -i $(go list ./... | grep -v '/vendor/') # Run unit and integration tests, exclude dependencies go test -race -v $(go list ./... | grep -v '/vendor/') "$@"