Skip to content

Commit

Permalink
Added integration test for workflow-engine app
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed Aug 17, 2017
1 parent eb4929f commit 0240b2d
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 30 deletions.
3 changes: 3 additions & 0 deletions cmd/workflow-engine/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//cmd/workflow-engine/app:go_default_library",
"//pkg/client/fission:go_default_library",
"//vendor/github.com/fission/fission/controller/client:go_default_library",
"//vendor/github.com/fission/fission/poolmgr/client:go_default_library",
"//vendor/github.com/nats-io/go-nats:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
"//vendor/github.com/urfave/cli:go_default_library",
Expand Down
2 changes: 0 additions & 2 deletions cmd/workflow-engine/app/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ go_library(
"//pkg/eventstore/nats:go_default_library",
"//pkg/projector/project/invocation:go_default_library",
"//pkg/scheduler:go_default_library",
"//vendor/github.com/fission/fission/controller/client:go_default_library",
"//vendor/github.com/fission/fission/poolmgr/client:go_default_library",
"//vendor/github.com/gorilla/handlers:go_default_library",
"//vendor/github.com/grpc-ecosystem/grpc-gateway/runtime:go_default_library",
"//vendor/github.com/nats-io/go-nats-streaming:go_default_library",
Expand Down
63 changes: 35 additions & 28 deletions cmd/workflow-engine/app/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
inats "github.com/fission/fission-workflow/pkg/eventstore/nats"
ip "github.com/fission/fission-workflow/pkg/projector/project/invocation"
"github.com/fission/fission-workflow/pkg/scheduler"
"github.com/fission/fission/controller/client"
poolmgr "github.com/fission/fission/poolmgr/client"
"github.com/gorilla/handlers"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/nats-io/go-nats-streaming"
Expand All @@ -33,7 +31,12 @@ const (
)

type Options struct {
EventStore *EventStoreOptions
FunctionRuntimeEnv function.Runtime
FunctionRegistry function.Resolver
EventStore *EventStoreOptions
FissionProxyAddress string
GrpcApiServerAddress string
HttpApiServerAddress string
}

type EventStoreOptions struct {
Expand All @@ -44,13 +47,13 @@ type EventStoreOptions struct {

// TODO scratch, should be cleaned up
// Blocking
func Run(ctx context.Context, options *Options) error {
func Run(ctx context.Context, opts *Options) error {
// (shared) gRPC server
lis, err := net.Listen("tcp", GRPC_ADDRESS)
lis, err := net.Listen("tcp", opts.GrpcApiServerAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure())
conn, err := grpc.Dial(opts.GrpcApiServerAddress, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
Expand All @@ -60,19 +63,14 @@ func Run(ctx context.Context, options *Options) error {
defer lis.Close()

// EventStore
stanConn, err := stan.Connect("fissionMQTrigger", "test-client", stan.NatsURL(options.EventStore.Url))
stanConn, err := stan.Connect(opts.EventStore.Cluster, "test-client", stan.NatsURL(opts.EventStore.Url))
if err != nil {
panic(err)
}
natsClient := inats.New(inats.NewConn(stanConn))
cache := cache.NewMapCache()

// Fission client
poolmgrClient := poolmgr.MakeClient("http://192.168.99.100:32101")
controllerClient := client.MakeClient("http://192.168.99.100:31313")
fissionApi := fission.NewFunctionEnv(poolmgrClient, controllerClient)

workflowParser := workflow.NewParser(controllerClient)
workflowParser := workflow.NewParser(opts.FunctionRegistry)
workflowValidator := workflow.NewValidator()
invocationProjector := ip.NewInvocationProjector(natsClient, cache)
err = invocationProjector.Watch("invocation.>")
Expand All @@ -82,7 +80,7 @@ func Run(ctx context.Context, options *Options) error {
// Setup API
workflowApi := workflow.NewApi(natsClient, workflowParser)
invocationApi := invocation.NewApi(natsClient, invocationProjector)
functionApi := function.NewFissionFunctionApi(fissionApi, natsClient)
functionApi := function.NewFissionFunctionApi(opts.FunctionRuntimeEnv, natsClient)
err = workflowApi.Projector.Watch("workflows.>")
if err != nil {
log.Warnf("Failed to watch for workflows, because '%v'.", err)
Expand All @@ -99,16 +97,16 @@ func Run(ctx context.Context, options *Options) error {

// API Gateway server
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err = apiserver.RegisterWorkflowAPIHandlerFromEndpoint(ctx, mux, GRPC_ADDRESS, opts)
grpcOpts := []grpc.DialOption{grpc.WithInsecure()}
err = apiserver.RegisterWorkflowAPIHandlerFromEndpoint(ctx, mux, opts.GrpcApiServerAddress, grpcOpts)
if err != nil {
panic(err)
}
err = apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, GRPC_ADDRESS, opts)
err = apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, opts.GrpcApiServerAddress, grpcOpts)
if err != nil {
panic(err)
}
err = apiserver.RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx, mux, GRPC_ADDRESS, opts)
err = apiserver.RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx, mux, opts.GrpcApiServerAddress, grpcOpts)
if err != nil {
panic(err)
}
Expand All @@ -118,23 +116,32 @@ func Run(ctx context.Context, options *Options) error {
fissionProxyServer := fission.NewFissionProxyServer(invocationServer)
fissionProxyServer.RegisterServer(proxyMux)

go http.ListenAndServe(FISSION_PROXY_ADDRESS, handlers.LoggingHandler(os.Stdout, proxyMux))
log.Info("Serving HTTP Fission Proxy at: ", FISSION_PROXY_ADDRESS)
proxySrv := http.Server{Addr: opts.FissionProxyAddress}
proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)
go proxySrv.ListenAndServe()
log.Info("Serving HTTP Fission Proxy at: ", opts.FissionProxyAddress)

go http.ListenAndServe(API_GATEWAY_ADDRESS, handlers.LoggingHandler(os.Stdout, mux))
log.Info("Serving HTTP API gateway at: ", API_GATEWAY_ADDRESS)
apiSrv := http.Server{Addr: opts.HttpApiServerAddress}
apiSrv.Handler = handlers.LoggingHandler(os.Stdout, mux)
go apiSrv.ListenAndServe()
log.Info("Serving HTTP API gateway at: ", opts.HttpApiServerAddress)

// Controller
s := &scheduler.WorkflowScheduler{}
ctr := controller.NewController(invocationProjector, workflowApi.Projector, s, functionApi, invocationApi)
defer ctr.Close()
go ctr.Run()
go ctr.Run(ctx)

// Serve gRPC server
log.Info("Serving gRPC services at: ", lis.Addr())
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}

return nil
go grpcServer.Serve(lis)

<-ctx.Done()
shutdownCtx := context.Background()
log.Debug("Shutting down servers...")
grpcServer.GracefulStop() // Close
apiSrv.Shutdown(shutdownCtx)
proxySrv.Shutdown(shutdownCtx)
log.Debug("Servers shutdown successfully.")
return shutdownCtx.Err()
}
Loading

0 comments on commit 0240b2d

Please sign in to comment.