Skip to content

Commit

Permalink
Use dockertest to manage lifecycle of the nats-streaming server
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed Mar 27, 2018
1 parent bb06afd commit 7cde032
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 134 deletions.
6 changes: 6 additions & 0 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
wfictr "github.com/fission/fission-workflows/pkg/controller/invocation"
wfctr "github.com/fission/fission-workflows/pkg/controller/workflow"
"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fes/backend/mem"
"github.com/fission/fission-workflows/pkg/fes/backend/nats"
"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/fnenv/fission"
Expand Down Expand Up @@ -76,6 +77,11 @@ func Run(ctx context.Context, opts *Options) error {
natsEs := setupNatsEventStoreClient(opts.Nats.Url, opts.Nats.Cluster, opts.Nats.Client)
es = natsEs
esPub = natsEs
} else {
log.Infof("Using event store: MEM")
memEs := mem.NewBackend()
es = memEs
esPub = memEs
}

// Caches
Expand Down
83 changes: 74 additions & 9 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ testImport:
version: 1.1.4
subpackages:
- assert
- package: gopkg.in/ory-am/dockertest.v3
version: v3.1.6
5 changes: 2 additions & 3 deletions pkg/fes/backend/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ func (b *Backend) Append(event *fes.Event) error {
return b.Publish(event)
}

func (b *Backend) Get(aggregate fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(&aggregate) {
func (b *Backend) Get(key fes.Aggregate) ([]*fes.Event, error) {
if !fes.ValidateAggregate(&key) {
return nil, ErrInvalidAggregate
}

key := *aggregate
b.lock.RLock()
defer b.lock.RUnlock()
events, ok := b.contents[key]
Expand Down
12 changes: 6 additions & 6 deletions test/integration/fission/fnenv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/fission/fission-workflows/pkg/fnenv"
"github.com/fission/fission-workflows/pkg/fnenv/fission"
fissionenv "github.com/fission/fission-workflows/pkg/fnenv/fission"
"github.com/fission/fission-workflows/pkg/types"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand All @@ -19,8 +19,8 @@ const (
defaultFissionExecutorAddr = "http://executor.fission"
)

var fissionRuntime *fission.FunctionEnv
var fissionResolver *fission.Resolver
var fissionRuntime *fissionenv.FunctionEnv
var fissionResolver *fissionenv.Resolver
var resolver fnenv.Resolver

func TestMain(m *testing.M) {
Expand All @@ -34,7 +34,7 @@ func TestMain(m *testing.M) {
fissionControllerAddr = defaultFissionControllerAddr
}

fissionExecutorAddr := os.Getenv("FNENV_FISSION_Executor")
fissionExecutorAddr := os.Getenv("FNENV_FISSION_EXECUTOR")
if len(fissionExecutorAddr) == 0 {
fissionExecutorAddr = defaultFissionExecutorAddr
}
Expand All @@ -60,8 +60,8 @@ func TestMain(m *testing.M) {
}

// Setup Fission connection
fissionRuntime = fission.SetupRuntime(fissionExecutorAddr)
fissionResolver = fission.SetupResolver(fissionControllerAddr)
fissionRuntime = fissionenv.SetupRuntime(fissionExecutorAddr)
fissionResolver = fissionenv.SetupResolver(fissionControllerAddr)
resolver = fnenv.NewMetaResolver(map[string]fnenv.RuntimeResolver{
"fission": fissionResolver,
})
Expand Down
61 changes: 50 additions & 11 deletions test/integration/nats/nats_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package nats

import (
"context"
"fmt"
"os"
"testing"

"github.com/fission/fission-workflows/pkg/fes"
"github.com/fission/fission-workflows/pkg/fes/backend/nats"
"github.com/fission/fission-workflows/test/integration"
fesnats "github.com/fission/fission-workflows/pkg/fes/backend/nats"
"github.com/fission/fission-workflows/pkg/util"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"gopkg.in/ory-am/dockertest.v3"
)

var (
Expand All @@ -18,27 +20,64 @@ var (

// Tests the event store implementation with a live NATS cluster.
// This test will start and stop a NATS streaming cluster by itself.
//
// Prerequisites:
// - Docker

func TestMain(m *testing.M) {
if testing.Short() {
fmt.Println("Skipping NATS integration tests...")
return
}
// uses a sensible default on windows (tcp/http) and linux/osx (socket)
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

ctx := context.TODO()

cfg := integration.RunNatsStreaming(ctx)
// pulls an image, creates a container based on it and runs it
id := util.Uid()
clusterId := fmt.Sprintf("fission-workflows-tests-%s", id)
resource, err := pool.RunWithOptions(&dockertest.RunOptions{

natsBackend, err := nats.Connect(cfg)
Repository: "nats-streaming",
Tag: "0.8.0-beta",
Cmd: []string{"-cid", clusterId, "-p", fmt.Sprintf("%d", 4222)},
ExposedPorts: []string{"4222"},
})
if err != nil {
panic(fmt.Sprintf("failed to connect to cluster: %v", err))
log.Fatalf("Could not start resource: %s", err)
}

cleanup := func() {
if err := pool.Purge(resource); err != nil {
log.Fatalf("Could not purge resource: %s", err)
}
}
backend = natsBackend
defer cleanup()

// exponential backoff-retry, because the application in the container might not be ready to accept connections yet
if err := pool.Retry(func() error {
cfg := fesnats.Config{
Cluster: clusterId,
Client: fmt.Sprintf("client-%s", id),
Url: fmt.Sprintf("nats://%s:%s", "0.0.0.0", resource.GetPort("4222/tcp")),
}

var err error
backend, err = nats.Connect(cfg)
if err != nil {
return fmt.Errorf("failed to connect to cluster: %v", err)
}
return nil // TODO add ping
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

fmt.Println(backend)
fmt.Println("Setup done; running tests")
status := m.Run()
fmt.Println("Cleaning up test message queue")

// You can't defer this because os.Exit doesn't care for defer
cleanup()
os.Exit(status)
}

Expand Down
Loading

0 comments on commit 7cde032

Please sign in to comment.