Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update input mapping #15

Merged
merged 3 commits into from
Aug 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Docs/notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Unstructured Notes

- A task can only be executed at most once!
- The identifier of a task must be unique within the workflow!
3 changes: 3 additions & 0 deletions cmd/workflow-engine/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//cmd/workflow-engine/app:go_default_library",
"//pkg/client/fission:go_default_library",
"//vendor/github.com/fission/fission/controller/client:go_default_library",
"//vendor/github.com/fission/fission/poolmgr/client:go_default_library",
"//vendor/github.com/nats-io/go-nats:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
"//vendor/github.com/urfave/cli:go_default_library",
Expand Down
2 changes: 0 additions & 2 deletions cmd/workflow-engine/app/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ go_library(
"//pkg/eventstore/nats:go_default_library",
"//pkg/projector/project/invocation:go_default_library",
"//pkg/scheduler:go_default_library",
"//vendor/github.com/fission/fission/controller/client:go_default_library",
"//vendor/github.com/fission/fission/poolmgr/client:go_default_library",
"//vendor/github.com/gorilla/handlers:go_default_library",
"//vendor/github.com/grpc-ecosystem/grpc-gateway/runtime:go_default_library",
"//vendor/github.com/nats-io/go-nats-streaming:go_default_library",
Expand Down
63 changes: 35 additions & 28 deletions cmd/workflow-engine/app/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
inats "github.com/fission/fission-workflow/pkg/eventstore/nats"
ip "github.com/fission/fission-workflow/pkg/projector/project/invocation"
"github.com/fission/fission-workflow/pkg/scheduler"
"github.com/fission/fission/controller/client"
poolmgr "github.com/fission/fission/poolmgr/client"
"github.com/gorilla/handlers"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/nats-io/go-nats-streaming"
Expand All @@ -33,7 +31,12 @@ const (
)

type Options struct {
EventStore *EventStoreOptions
FunctionRuntimeEnv function.Runtime
FunctionRegistry function.Resolver
EventStore *EventStoreOptions
FissionProxyAddress string
GrpcApiServerAddress string
HttpApiServerAddress string
}

type EventStoreOptions struct {
Expand All @@ -44,13 +47,13 @@ type EventStoreOptions struct {

// TODO scratch, should be cleaned up
// Blocking
func Run(ctx context.Context, options *Options) error {
func Run(ctx context.Context, opts *Options) error {
// (shared) gRPC server
lis, err := net.Listen("tcp", GRPC_ADDRESS)
lis, err := net.Listen("tcp", opts.GrpcApiServerAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure())
conn, err := grpc.Dial(opts.GrpcApiServerAddress, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
Expand All @@ -60,19 +63,14 @@ func Run(ctx context.Context, options *Options) error {
defer lis.Close()

// EventStore
stanConn, err := stan.Connect("fissionMQTrigger", "test-client", stan.NatsURL(options.EventStore.Url))
stanConn, err := stan.Connect(opts.EventStore.Cluster, "test-client", stan.NatsURL(opts.EventStore.Url))
if err != nil {
panic(err)
}
natsClient := inats.New(inats.NewConn(stanConn))
cache := cache.NewMapCache()

// Fission client
poolmgrClient := poolmgr.MakeClient("http://192.168.99.100:32101")
controllerClient := client.MakeClient("http://192.168.99.100:31313")
fissionApi := fission.NewFunctionEnv(poolmgrClient, controllerClient)

workflowParser := workflow.NewParser(controllerClient)
workflowParser := workflow.NewParser(opts.FunctionRegistry)
workflowValidator := workflow.NewValidator()
invocationProjector := ip.NewInvocationProjector(natsClient, cache)
err = invocationProjector.Watch("invocation.>")
Expand All @@ -82,7 +80,7 @@ func Run(ctx context.Context, options *Options) error {
// Setup API
workflowApi := workflow.NewApi(natsClient, workflowParser)
invocationApi := invocation.NewApi(natsClient, invocationProjector)
functionApi := function.NewFissionFunctionApi(fissionApi, natsClient)
functionApi := function.NewFissionFunctionApi(opts.FunctionRuntimeEnv, natsClient)
err = workflowApi.Projector.Watch("workflows.>")
if err != nil {
log.Warnf("Failed to watch for workflows, because '%v'.", err)
Expand All @@ -99,16 +97,16 @@ func Run(ctx context.Context, options *Options) error {

// API Gateway server
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err = apiserver.RegisterWorkflowAPIHandlerFromEndpoint(ctx, mux, GRPC_ADDRESS, opts)
grpcOpts := []grpc.DialOption{grpc.WithInsecure()}
err = apiserver.RegisterWorkflowAPIHandlerFromEndpoint(ctx, mux, opts.GrpcApiServerAddress, grpcOpts)
if err != nil {
panic(err)
}
err = apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, GRPC_ADDRESS, opts)
err = apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, opts.GrpcApiServerAddress, grpcOpts)
if err != nil {
panic(err)
}
err = apiserver.RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx, mux, GRPC_ADDRESS, opts)
err = apiserver.RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx, mux, opts.GrpcApiServerAddress, grpcOpts)
if err != nil {
panic(err)
}
Expand All @@ -118,23 +116,32 @@ func Run(ctx context.Context, options *Options) error {
fissionProxyServer := fission.NewFissionProxyServer(invocationServer)
fissionProxyServer.RegisterServer(proxyMux)

go http.ListenAndServe(FISSION_PROXY_ADDRESS, handlers.LoggingHandler(os.Stdout, proxyMux))
log.Info("Serving HTTP Fission Proxy at: ", FISSION_PROXY_ADDRESS)
proxySrv := http.Server{Addr: opts.FissionProxyAddress}
proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)
go proxySrv.ListenAndServe()
log.Info("Serving HTTP Fission Proxy at: ", opts.FissionProxyAddress)

go http.ListenAndServe(API_GATEWAY_ADDRESS, handlers.LoggingHandler(os.Stdout, mux))
log.Info("Serving HTTP API gateway at: ", API_GATEWAY_ADDRESS)
apiSrv := http.Server{Addr: opts.HttpApiServerAddress}
apiSrv.Handler = handlers.LoggingHandler(os.Stdout, mux)
go apiSrv.ListenAndServe()
log.Info("Serving HTTP API gateway at: ", opts.HttpApiServerAddress)

// Controller
s := &scheduler.WorkflowScheduler{}
ctr := controller.NewController(invocationProjector, workflowApi.Projector, s, functionApi, invocationApi)
defer ctr.Close()
go ctr.Run()
go ctr.Run(ctx)

// Serve gRPC server
log.Info("Serving gRPC services at: ", lis.Addr())
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}

return nil
go grpcServer.Serve(lis)

<-ctx.Done()
shutdownCtx := context.Background()
log.Debug("Shutting down servers...")
grpcServer.GracefulStop() // Close
apiSrv.Shutdown(shutdownCtx)
proxySrv.Shutdown(shutdownCtx)
log.Debug("Servers shutdown successfully.")
return shutdownCtx.Err()
}
Loading