diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index 32a3a4e6..8a03671e 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -15,6 +15,7 @@ import ( wfictr "github.com/fission/fission-workflows/pkg/controller/invocation" wfctr "github.com/fission/fission-workflows/pkg/controller/workflow" "github.com/fission/fission-workflows/pkg/fes" + "github.com/fission/fission-workflows/pkg/fes/backend/mem" "github.com/fission/fission-workflows/pkg/fes/backend/nats" "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/fnenv/fission" @@ -76,6 +77,11 @@ func Run(ctx context.Context, opts *Options) error { natsEs := setupNatsEventStoreClient(opts.Nats.Url, opts.Nats.Cluster, opts.Nats.Client) es = natsEs esPub = natsEs + } else { + log.Infof("Using event store: MEM") + memEs := mem.NewBackend() + es = memEs + esPub = memEs } // Caches diff --git a/glide.lock b/glide.lock index 3b6dd06d..67f986ad 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 907dcf84743a73a0e26ed9087066d90036e224ec6e1492cc5ca7bcbd21f519d9 -updated: 2018-03-22T14:07:15.634944+01:00 +hash: 89b65ff6ecb95b6b15472ff1dca88f581c2f64e2cd645cbd4c933318d685c49a +updated: 2018-03-26T18:56:28.891843+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -7,14 +7,14 @@ imports: - compute/metadata - internal - name: github.com/Azure/go-autorest - version: d4e6b95c12a08b4de2d48b45d5b4d594e5d32fab + version: 58f6f26e200fa5dfb40c9cd1c83f3e2c860d779d subpackages: - autorest - autorest/adal - autorest/azure - autorest/date - name: github.com/davecgh/go-spew - version: 346938d642f2ec3594ed81d874461961cd0faa76 + version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d subpackages: - spew - name: github.com/dgrijalva/jwt-go @@ -35,7 +35,7 @@ imports: - name: github.com/emicklei/go-restful-swagger12 version: dcef7f55730566d41eae5db10e7d6981829720f6 - name: github.com/fission/fission - version: 9a3768316bda67294804a377a1adf90dbdde252a + version: 704a8094e6724f8c26a667909c48c62ba6ee095f subpackages: - cache - controller/client @@ -181,6 +181,7 @@ imports: version: c488ab1dd8481ef762f96a79a9577c27825be697 subpackages: - unix + - windows - name: golang.org/x/text version: 88f656faf3f37f690df1a32515b479415e1a6769 subpackages: @@ -270,7 +271,7 @@ imports: - name: gopkg.in/yaml.v2 version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 - name: k8s.io/apiextensions-apiserver - version: d2a827762e85b433a8f34b819aef90fd68fc69d9 + version: 12b7cb8ddad258fb5be9de9fa17d5139e02fe61b subpackages: - pkg/apis/apiextensions - pkg/apis/apiextensions/v1beta1 @@ -278,7 +279,7 @@ imports: - pkg/client/clientset/clientset/scheme - pkg/client/clientset/clientset/typed/apiextensions/v1beta1 - name: k8s.io/apimachinery - version: 208a6980b14bbb263f29482482eabdbcfff9f7bb + version: 1fd2e63a9a370677308a42f24fd40c86438afddf subpackages: - pkg/api/equality - pkg/api/errors @@ -389,9 +390,7 @@ imports: - pkg/apis/storage - pkg/apis/storage/v1 - pkg/apis/storage/v1beta1 - - pkg/labels - pkg/util - - pkg/util/intstr - pkg/util/parsers - pkg/version - plugin/pkg/client/auth @@ -418,6 +417,70 @@ imports: - util/integer - util/jsonpath testImports: +- name: github.com/Azure/go-ansiterm + version: d6e3b3328b783f23731bc4d058875b0371ff8109 + subpackages: + - winterm +- name: github.com/cenkalti/backoff + version: 2ea60e5f094469f9e65adb9cd103795b73ae743e +- name: github.com/containerd/continuity + version: 3e8f2ea4b190484acb976a5b378d373429639a1a + subpackages: + - pathdriver +- name: github.com/docker/docker + version: 7cfd3f4229c82ba61fa13a8818b8ecf58a2dcdbf + subpackages: + - api/types + - api/types/blkiodev + - api/types/container + - api/types/filters + - api/types/mount + - api/types/network + - api/types/registry + - api/types/strslice + - api/types/swarm + - api/types/swarm/runtime + - api/types/versions + - opts + - pkg/archive + - pkg/fileutils + - pkg/homedir + - pkg/idtools + - pkg/ioutils + - pkg/jsonmessage + - pkg/longpath + - pkg/mount + - pkg/pools + - pkg/stdcopy + - pkg/system + - pkg/term + - pkg/term/windows +- name: github.com/docker/go-connections + version: 7395e3f8aa162843a74ed6d48e79627d9792ac55 + subpackages: + - nat +- name: github.com/docker/go-units + version: 47565b4f722fb6ceae66b95f853feed578a4a51c +- name: github.com/fsouza/go-dockerclient + version: ad1213cc21543085ce0c97f4291994e02b95e459 +- name: github.com/Microsoft/go-winio + version: 7da180ee92d8bd8bb8c37fc560e673e6557c392f +- name: github.com/Nvveen/Gotty + version: cd527374f1e5bff4938207604a14f2e38a9cf512 +- name: github.com/opencontainers/go-digest + version: 279bed98673dd5bef374d3b6e4b09e2af76183bf +- name: github.com/opencontainers/image-spec + version: 8e82844449a093ef49d824afad963a5e0a76b63b + subpackages: + - specs-go + - specs-go/v1 +- name: github.com/opencontainers/runc + version: ec9bf5058614525576e60e210a4e645bbd4d8335 + subpackages: + - libcontainer/system + - libcontainer/user +- name: github.com/pkg/errors + version: a22138067af1c4942683050411a841ade67fe1eb - name: github.com/pmezard/go-difflib version: d8ed2627bdf02c080bf22230dbb337003b7aba2d subpackages: @@ -426,3 +489,5 @@ testImports: version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 subpackages: - assert +- name: gopkg.in/ory-am/dockertest.v3 + version: 15c8e8835bba04e0d7c2b57958ffe294d5e643dc diff --git a/glide.yaml b/glide.yaml index d7b96625..54a2defa 100644 --- a/glide.yaml +++ b/glide.yaml @@ -66,3 +66,5 @@ testImport: version: 1.1.4 subpackages: - assert +- package: gopkg.in/ory-am/dockertest.v3 + version: v3.1.6 diff --git a/pkg/fes/backend/mem/mem.go b/pkg/fes/backend/mem/mem.go index 49f5edaa..98b53e4d 100644 --- a/pkg/fes/backend/mem/mem.go +++ b/pkg/fes/backend/mem/mem.go @@ -45,12 +45,11 @@ func (b *Backend) Append(event *fes.Event) error { return b.Publish(event) } -func (b *Backend) Get(aggregate fes.Aggregate) ([]*fes.Event, error) { - if !fes.ValidateAggregate(&aggregate) { +func (b *Backend) Get(key fes.Aggregate) ([]*fes.Event, error) { + if !fes.ValidateAggregate(&key) { return nil, ErrInvalidAggregate } - key := *aggregate b.lock.RLock() defer b.lock.RUnlock() events, ok := b.contents[key] diff --git a/test/integration/fission/fnenv_test.go b/test/integration/fission/fnenv_test.go index ec12c7a3..328040fb 100644 --- a/test/integration/fission/fnenv_test.go +++ b/test/integration/fission/fnenv_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/fission/fission-workflows/pkg/fnenv" - "github.com/fission/fission-workflows/pkg/fnenv/fission" + fissionenv "github.com/fission/fission-workflows/pkg/fnenv/fission" "github.com/fission/fission-workflows/pkg/types" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -19,8 +19,8 @@ const ( defaultFissionExecutorAddr = "http://executor.fission" ) -var fissionRuntime *fission.FunctionEnv -var fissionResolver *fission.Resolver +var fissionRuntime *fissionenv.FunctionEnv +var fissionResolver *fissionenv.Resolver var resolver fnenv.Resolver func TestMain(m *testing.M) { @@ -34,7 +34,7 @@ func TestMain(m *testing.M) { fissionControllerAddr = defaultFissionControllerAddr } - fissionExecutorAddr := os.Getenv("FNENV_FISSION_Executor") + fissionExecutorAddr := os.Getenv("FNENV_FISSION_EXECUTOR") if len(fissionExecutorAddr) == 0 { fissionExecutorAddr = defaultFissionExecutorAddr } @@ -60,8 +60,8 @@ func TestMain(m *testing.M) { } // Setup Fission connection - fissionRuntime = fission.SetupRuntime(fissionExecutorAddr) - fissionResolver = fission.SetupResolver(fissionControllerAddr) + fissionRuntime = fissionenv.SetupRuntime(fissionExecutorAddr) + fissionResolver = fissionenv.SetupResolver(fissionControllerAddr) resolver = fnenv.NewMetaResolver(map[string]fnenv.RuntimeResolver{ "fission": fissionResolver, }) diff --git a/test/integration/nats/nats_test.go b/test/integration/nats/nats_test.go index 4f5eac7d..0e65d4a5 100644 --- a/test/integration/nats/nats_test.go +++ b/test/integration/nats/nats_test.go @@ -1,15 +1,17 @@ package nats import ( - "context" "fmt" "os" "testing" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/fes/backend/nats" - "github.com/fission/fission-workflows/test/integration" + fesnats "github.com/fission/fission-workflows/pkg/fes/backend/nats" + "github.com/fission/fission-workflows/pkg/util" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "gopkg.in/ory-am/dockertest.v3" ) var ( @@ -18,27 +20,64 @@ var ( // Tests the event store implementation with a live NATS cluster. // This test will start and stop a NATS streaming cluster by itself. -// -// Prerequisites: -// - Docker func TestMain(m *testing.M) { if testing.Short() { fmt.Println("Skipping NATS integration tests...") return } + // uses a sensible default on windows (tcp/http) and linux/osx (socket) + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } - ctx := context.TODO() - - cfg := integration.RunNatsStreaming(ctx) + // pulls an image, creates a container based on it and runs it + id := util.Uid() + clusterId := fmt.Sprintf("fission-workflows-tests-%s", id) + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ - natsBackend, err := nats.Connect(cfg) + Repository: "nats-streaming", + Tag: "0.8.0-beta", + Cmd: []string{"-cid", clusterId, "-p", fmt.Sprintf("%d", 4222)}, + ExposedPorts: []string{"4222"}, + }) if err != nil { - panic(fmt.Sprintf("failed to connect to cluster: %v", err)) + log.Fatalf("Could not start resource: %s", err) + } + + cleanup := func() { + if err := pool.Purge(resource); err != nil { + log.Fatalf("Could not purge resource: %s", err) + } } - backend = natsBackend + defer cleanup() + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + if err := pool.Retry(func() error { + cfg := fesnats.Config{ + Cluster: clusterId, + Client: fmt.Sprintf("client-%s", id), + Url: fmt.Sprintf("nats://%s:%s", "0.0.0.0", resource.GetPort("4222/tcp")), + } + + var err error + backend, err = nats.Connect(cfg) + if err != nil { + return fmt.Errorf("failed to connect to cluster: %v", err) + } + return nil // TODO add ping + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + fmt.Println(backend) + fmt.Println("Setup done; running tests") status := m.Run() + fmt.Println("Cleaning up test message queue") + + // You can't defer this because os.Exit doesn't care for defer + cleanup() os.Exit(status) } diff --git a/test/integration/util.go b/test/integration/util.go index 0ee8d086..e3d02b45 100644 --- a/test/integration/util.go +++ b/test/integration/util.go @@ -2,19 +2,8 @@ package integration import ( "context" - "fmt" - "net" - "os" - "os/exec" - "strings" - "time" "github.com/fission/fission-workflows/cmd/fission-workflows-bundle/bundle" - fesnats "github.com/fission/fission-workflows/pkg/fes/backend/nats" - "github.com/fission/fission-workflows/pkg/util" - "github.com/nats-io/go-nats" - "github.com/nats-io/go-nats-streaming" - "github.com/sirupsen/logrus" ) // SetupBundle sets up and runs the workflows-bundle. @@ -22,7 +11,6 @@ import ( // By default the bundle runs with all components are enabled, setting up a NATS cluster as the // backing event store, and internal fnenv and workflow runtime func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options { - nats := RunNatsStreaming(ctx) var bundleOpts bundle.Options if len(opts) > 0 { bundleOpts = opts[0] @@ -35,101 +23,8 @@ func SetupBundle(ctx context.Context, opts ...bundle.Options) bundle.Options { ApiWorkflowInvocation: true, ApiWorkflow: true, ApiAdmin: true, - Nats: &nats, } } go bundle.Run(ctx, &bundleOpts) return bundleOpts } - -// TODO check if there is a nats instance already is running -func RunNatsStreaming(ctx context.Context) fesnats.Config { - id := util.Uid() - clusterId := fmt.Sprintf("fission-workflows-tests-%s", id) - port, err := findFreePort() - if err != nil { - panic(err) - } - address := "127.0.0.1" - args := []string{ - "run", - "--rm", - "-i", - "-p", fmt.Sprintf("%d:%d", port, port), - "nats-streaming:0.8.0-beta", - "-cid", clusterId, - "-p", fmt.Sprintf("%d", port), - } - - go func() { - fmt.Printf("> docker %s\n", strings.Join(args, " ")) - cmd := exec.CommandContext(ctx, "docker", args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Stdin = os.Stdin - err = cmd.Start() - if err != nil { - panic(err) - } - err = cmd.Wait() - if err != nil { - panic(err) - } - }() - cfg := fesnats.Config{ - Cluster: clusterId, - Client: fmt.Sprintf("client-%s", id), - Url: fmt.Sprintf("nats://%s:%d", address, port), - } - - logrus.WithField("config", cfg).Info("Setting up NATS server") - - // wait for a bit to set it up - awaitCtx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second) - defer cancel() - err = waitForNats(awaitCtx, cfg.Url, cfg.Cluster) - if err != nil { - logrus.Error(err) - } - logrus.WithField("config", cfg).Info("NATS Server running") - - return cfg -} - -func findFreePort() (int, error) { - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") - if err != nil { - return 0, err - } - - listener, err := net.ListenTCP("tcp", addr) - if err != nil { - return 0, err - } - defer listener.Close() - tcpAddr := listener.Addr().(*net.TCPAddr) - return tcpAddr.Port, nil -} - -// Wait for NATS to come online, ignoring ErrNoServer as it could mean that NATS is still being setup -func waitForNats(ctx context.Context, url string, cluster string) error { - conn, err := stan.Connect(cluster, "setupEventStore-alive-test", stan.NatsURL(url), - stan.ConnectWait(time.Duration(10)*time.Second)) - if err == nats.ErrNoServers { - logrus.WithFields(logrus.Fields{ - "cluster": cluster, - "url": url, - }).Warnf("retrying due to err: %v", err) - select { - case <-time.After(time.Duration(1) * time.Second): - return waitForNats(ctx, url, cluster) - case <-ctx.Done(): - return ctx.Err() - } - } - if err != nil { - return err - } - defer conn.Close() - return nil -}