From 9e031db51d0ddd1e7e6bb77cdfd0033340f300ef Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Mon, 12 Feb 2018 01:16:05 +0100 Subject: [PATCH] wip --- .travis.yml | 2 + cmd/fission-workflows-bundle/bundle/bundle.go | 37 ++-- .../bundle/bundle_test.go | 10 +- glide.lock | 12 +- glide.yaml | 5 +- pkg/api/function/api.go | 5 +- pkg/api/function/runtime.go | 41 ---- pkg/api/workflow/parse/resolver.go | 9 +- pkg/apiserver/invocation.go | 2 +- pkg/controller/invocation.go | 17 +- pkg/fnenv/fission/envproxy.go | 182 +++++++++++------- pkg/fnenv/fission/request.go | 34 ++-- pkg/fnenv/fission/request_test.go | 15 +- pkg/fnenv/fnenv.go | 74 +++++++ pkg/fnenv/mock/mock.go | 148 ++++++++++++++ pkg/fnenv/native/builtin/compose.go | 4 +- pkg/fnenv/native/builtin/compose_test.go | 2 +- pkg/fnenv/native/builtin/if.go | 13 +- pkg/fnenv/native/builtin/if_test.go | 20 +- pkg/fnenv/native/builtin/noop.go | 4 +- pkg/fnenv/native/builtin/noop_test.go | 2 +- pkg/fnenv/native/builtin/repeat.go | 12 +- pkg/fnenv/native/builtin/scope.go | 4 +- pkg/fnenv/native/builtin/sleep.go | 11 +- pkg/fnenv/native/builtin/sleep_test.go | 4 +- pkg/fnenv/native/builtin/util.go | 2 +- pkg/fnenv/native/{runtime.go => native.go} | 3 +- pkg/fnenv/test/mock.go | 131 ------------- pkg/scheduler/scheduler.go | 2 +- pkg/types/types.go | 12 +- 30 files changed, 456 insertions(+), 363 deletions(-) delete mode 100644 pkg/api/function/runtime.go create mode 100644 pkg/fnenv/fnenv.go create mode 100644 pkg/fnenv/mock/mock.go rename pkg/fnenv/native/{runtime.go => native.go} (99%) delete mode 100644 pkg/fnenv/test/mock.go diff --git a/.travis.yml b/.travis.yml index 1b61b32f..f9b4a916 100644 --- a/.travis.yml +++ b/.travis.yml @@ -38,6 +38,8 @@ install: before_script: # Build - cd ${TRAVIS_BUILD_DIR} +- hack/verify-govet.sh +- hack/verify-gofmt.sh - glide install - build/build-linux.sh diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index 47b73961..be85d171 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -18,6 +18,7 @@ import ( "github.com/fission/fission-workflows/pkg/controller/expr" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/fes/eventstore/nats" + "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/fnenv/fission" "github.com/fission/fission-workflows/pkg/fnenv/native" "github.com/fission/fission-workflows/pkg/fnenv/native/builtin" @@ -38,9 +39,9 @@ import ( ) const ( - GRPC_ADDRESS = ":5555" - API_GATEWAY_ADDRESS = ":8080" - FISSION_PROXY_ADDRESS = ":8888" + gRPCAddress = ":5555" + apiGatewayAddress = ":8080" + fissionProxyAddress = ":8888" ) type Options struct { @@ -96,8 +97,8 @@ func Run(ctx context.Context, opts *Options) error { wfCache := getWorkflowCache(ctx, esPub) // Resolvers and runtimes - resolvers := map[string]function.Resolver{} - runtimes := map[string]function.Runtime{} + resolvers := map[string]fnenv.Resolver{} + runtimes := map[string]fnenv.Runtime{} if opts.InternalRuntime { log.WithField("config", nil).Infof("Using Function Runtime: Internal") runtimes["internal"] = setupInternalFunctionRuntime() @@ -131,7 +132,7 @@ func Run(ctx context.Context, opts *Options) error { // Http servers if opts.Fission != nil { - proxySrv := &http.Server{Addr: FISSION_PROXY_ADDRESS} + proxySrv := &http.Server{Addr: fissionProxyAddress} defer proxySrv.Shutdown(ctx) runFissionEnvironmentProxy(proxySrv, es, wfiCache(), wfCache(), resolvers) } @@ -149,7 +150,7 @@ func Run(ctx context.Context, opts *Options) error { } if opts.ApiAdmin || opts.ApiWorkflow || opts.ApiWorkflowInvocation { - lis, err := net.Listen("tcp", GRPC_ADDRESS) + lis, err := net.Listen("tcp", gRPCAddress) if err != nil { log.Fatalf("failed to listen: %v", err) } @@ -159,17 +160,17 @@ func Run(ctx context.Context, opts *Options) error { } if opts.ApiHttp { - apiSrv := &http.Server{Addr: API_GATEWAY_ADDRESS} + apiSrv := &http.Server{Addr: apiGatewayAddress} defer apiSrv.Shutdown(ctx) var admin, wf, wfi string if opts.ApiAdmin { - admin = GRPC_ADDRESS + admin = gRPCAddress } if opts.ApiWorkflow { - wf = GRPC_ADDRESS + wf = gRPCAddress } if opts.ApiWorkflowInvocation { - wfi = GRPC_ADDRESS + wfi = gRPCAddress } runHttpGateway(ctx, apiSrv, admin, wf, wfi) } @@ -273,23 +274,23 @@ func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher) func runAdminApiServer(s *grpc.Server) { adminServer := &apiserver.GrpcAdminApiServer{} apiserver.RegisterAdminAPIServer(s, adminServer) - log.Infof("Serving admin gRPC API at %s.", GRPC_ADDRESS) + log.Infof("Serving admin gRPC API at %s.", gRPCAddress) } -func runWorkflowApiServer(s *grpc.Server, es fes.EventStore, resolvers map[string]function.Resolver, wfCache fes.CacheReader) { +func runWorkflowApiServer(s *grpc.Server, es fes.EventStore, resolvers map[string]fnenv.Resolver, wfCache fes.CacheReader) { workflowParser := parse.NewResolver(resolvers) workflowValidator := parse.NewValidator() workflowApi := workflow.NewApi(es, workflowParser) workflowServer := apiserver.NewGrpcWorkflowApiServer(workflowApi, workflowValidator, wfCache) apiserver.RegisterWorkflowAPIServer(s, workflowServer) - log.Infof("Serving workflow gRPC API at %s.", GRPC_ADDRESS) + log.Infof("Serving workflow gRPC API at %s.", gRPCAddress) } func runWorkflowInvocationApiServer(s *grpc.Server, es fes.EventStore, wfiCache fes.CacheReader) { invocationApi := invocation.NewApi(es) invocationServer := apiserver.NewGrpcInvocationApiServer(invocationApi, wfiCache) apiserver.RegisterWorkflowInvocationAPIServer(s, invocationServer) - log.Infof("Serving workflow invocation gRPC API at %s.", GRPC_ADDRESS) + log.Infof("Serving workflow invocation gRPC API at %s.", gRPCAddress) } func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string, wfApiAddr string, wfiApiAddr string) { @@ -326,7 +327,7 @@ func runHttpGateway(ctx context.Context, gwSrv *http.Server, adminApiAddr string } func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.EventStore, wfiCache fes.CacheReader, - wfCache fes.CacheReader, resolvers map[string]function.Resolver) { + wfCache fes.CacheReader, resolvers map[string]fnenv.Resolver) { workflowParser := parse.NewResolver(resolvers) workflowValidator := parse.NewValidator() @@ -344,7 +345,7 @@ func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.EventStore, wfiCac } func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.EventStore, - fnRuntimes map[string]function.Runtime) *controller.InvocationController { + fnRuntimes map[string]fnenv.Runtime) *controller.InvocationController { functionApi := function.NewApi(fnRuntimes, es) invocationApi := invocation.NewApi(es) s := &scheduler.WorkflowScheduler{} @@ -352,7 +353,7 @@ func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.Cach return controller.NewInvocationController(invocationCache, wfCache, s, functionApi, invocationApi, ep) } -func setupWorkflowController(wfCache fes.CacheReader, es fes.EventStore, fnResolvers map[string]function.Resolver) *controller.WorkflowController { +func setupWorkflowController(wfCache fes.CacheReader, es fes.EventStore, fnResolvers map[string]fnenv.Resolver) *controller.WorkflowController { workflowApi := workflow.NewApi(es, parse.NewResolver(fnResolvers)) return controller.NewWorkflowController(wfCache, workflowApi) } diff --git a/cmd/fission-workflows-bundle/bundle/bundle_test.go b/cmd/fission-workflows-bundle/bundle/bundle_test.go index 441688c3..2408e57d 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle_test.go +++ b/cmd/fission-workflows-bundle/bundle/bundle_test.go @@ -42,7 +42,7 @@ func TestMain(m *testing.M) { // Tests the submission of a workflow func TestWorkflowCreate(t *testing.T) { ctx := context.Background() - conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure()) + conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure()) if err != nil { panic(err) } @@ -80,7 +80,7 @@ func TestWorkflowCreate(t *testing.T) { func TestWorkflowInvocation(t *testing.T) { ctx := context.Background() - conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure()) + conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure()) if err != nil { panic(err) } @@ -145,7 +145,7 @@ func TestWorkflowInvocation(t *testing.T) { for ti := range tick.C { invoc, err := wi.Get(ctx, &apiserver.WorkflowInvocationIdentifier{Id: wiId}) assert.NoError(t, err) - if invoc.Status.Status.Finished() || ti.After(deadline) { + if invoc.Status.Finished() || ti.After(deadline) { invocation = invoc tick.Stop() break @@ -154,12 +154,12 @@ func TestWorkflowInvocation(t *testing.T) { assert.Equal(t, wiSpec, invocation.Spec) assert.Equal(t, etv, invocation.Status.Output) - assert.True(t, invocation.Status.Status.Successful()) + assert.True(t, invocation.Status.Successful()) } func TestDynamicWorkflowInvocation(t *testing.T) { ctx := context.Background() - conn, err := grpc.Dial(GRPC_ADDRESS, grpc.WithInsecure()) + conn, err := grpc.Dial(gRPCAddress, grpc.WithInsecure()) if err != nil { panic(err) } diff --git a/glide.lock b/glide.lock index 65948a03..331a04e0 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: f128a23dd034612f9bd76d5616b3f7ee2867b7e5d3fca2ed9032159ae1086439 -updated: 2018-02-06T13:25:49.40329+01:00 +hash: 85c7264693e84b8352542e0018e4e3d609fdb4681b5435f38d8db7db1ed97def +updated: 2018-02-12T11:02:55.971444+01:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -182,8 +182,12 @@ imports: - internal - jws - jwt +- name: golang.org/x/sync + version: fd80eb99c8f653c847d294a001bdf2a3a6f768f5 + subpackages: + - syncmap - name: golang.org/x/sys - version: b699b7032584f0953262cb2788a0ca19bb494703 + version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 subpackages: - unix - name: golang.org/x/text @@ -243,7 +247,7 @@ imports: - name: gopkg.in/sourcemap.v1 version: 9753370bfb4cbdcab8e6ecd5551ffe8684e3e5b5 - name: gopkg.in/yaml.v2 - version: bef53efd0c76e49e6de55ead051f886bea7e9420 + version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 - name: k8s.io/apiextensions-apiserver version: fcd622fe88a4a6efcb5aea9e94ee87324ac1b036 subpackages: diff --git a/glide.yaml b/glide.yaml index 53a01964..e84910b9 100644 --- a/glide.yaml +++ b/glide.yaml @@ -40,7 +40,7 @@ import: - package: gopkg.in/sourcemap.v1 version: v2.0.0 - package: gopkg.in/yaml.v2 - # Due to the old version of the kubernetes client we need to fix the openapi versions to avoid incompatible interfaces. +# Due to the old version of the kubernetes client we need to fix the openapi versions to avoid incompatible interfaces. - package: github.com/go-openapi/errors version: 49fe8b3a0e0d32a617d8d50c67f856ad6e45b28b - package: github.com/go-openapi/analysis @@ -49,6 +49,9 @@ import: version: 315567415dfd74b651f7a62cabfc82a57ed7b9ad - package: github.com/go-openapi/validate version: 2c997e169d705fe1e4235c9c1d62a7e704a84290 +- package: golang.org/x/sync + subpackages: + - syncmap testImport: - package: github.com/stretchr/testify version: ^1.1.4 diff --git a/pkg/api/function/api.go b/pkg/api/function/api.go index 2c6f0331..6099969a 100644 --- a/pkg/api/function/api.go +++ b/pkg/api/function/api.go @@ -2,6 +2,7 @@ package function import ( "github.com/fission/fission-workflows/pkg/fes" + "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/aggregates" "github.com/fission/fission-workflows/pkg/types/events" @@ -12,11 +13,11 @@ import ( // Api that servers mainly as a function.Runtime wrapper that deals with the higher-level logic workflow-related logic. type Api struct { - runtime map[string]Runtime // TODO support AsyncRuntime + runtime map[string]fnenv.Runtime // TODO support AsyncRuntime es fes.EventStore } -func NewApi(runtime map[string]Runtime, esClient fes.EventStore) *Api { +func NewApi(runtime map[string]fnenv.Runtime, esClient fes.EventStore) *Api { return &Api{ runtime: runtime, es: esClient, diff --git a/pkg/api/function/runtime.go b/pkg/api/function/runtime.go deleted file mode 100644 index f5357504..00000000 --- a/pkg/api/function/runtime.go +++ /dev/null @@ -1,41 +0,0 @@ -package function - -import ( - "github.com/fission/fission-workflows/pkg/types" -) - -// Runtime is the minimal interface that a function runtime environment needs to conform with to handle tasks. -type Runtime interface { - - // Invoke executes the task in a blocking way. - // - // spec contains the complete configuration needed for the execution. - // It returns the TaskInvocationStatus with a completed (FINISHED, FAILED, ABORTED) status. - // An error is returned only when error occurs outside of the runtime's control. - Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocationStatus, error) -} - -// AsyncRuntime is a more extended interface that a runtime can optionally support. It allows for asynchronous -// invocations, allowing with progress tracking and invocation cancellations. -type AsyncRuntime interface { - - // InvokeAsync invokes a function in the runtime based on the spec and returns an identifier to allow the caller - // to reference the invocation. - InvokeAsync(spec *types.TaskInvocationSpec) (string, error) - - // Cancel cancels a function invocation using the function invocation ID. - Cancel(fnInvocationId string) error - - // Status fetches the status of a invocation. - // - // The interface user is responsible for determining wether the status indicates that a invocation has completed. - Status(fnInvocationId string) (*types.TaskInvocationStatus, error) -} - -// Resolver is the component that resolves a functionRef to a runtime-specific function UID. -type Resolver interface { - // Resolve an ambiguous function name to a unique identifier of a function - // - // If the fnName does not exist an error will be displayed - Resolve(fnName string) (string, error) // TODO refactor to FunctionRef (Fission Env v2) -} diff --git a/pkg/api/workflow/parse/resolver.go b/pkg/api/workflow/parse/resolver.go index 39ec14da..38a87c61 100644 --- a/pkg/api/workflow/parse/resolver.go +++ b/pkg/api/workflow/parse/resolver.go @@ -3,10 +3,9 @@ package parse import ( "fmt" "strings" - "sync" - "github.com/fission/fission-workflows/pkg/api/function" + "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" "github.com/sirupsen/logrus" @@ -23,10 +22,10 @@ import ( // for scheduling (overhead vs. load) // type Resolver struct { - clients map[string]function.Resolver + clients map[string]fnenv.Resolver } -func NewResolver(client map[string]function.Resolver) *Resolver { +func NewResolver(client map[string]fnenv.Resolver) *Resolver { return &Resolver{client} } @@ -138,7 +137,7 @@ func (ps *Resolver) resolveTask(task *types.Task) (*types.TaskTypeDef, error) { case result := <-resolved: return result, nil default: - return nil, fmt.Errorf("failed to resolve function '%s' using clients '%v'", t, ps.clients) + return nil, fmt.Errorf("failed to resolve function '%s' using clients '%v': %v", t, ps.clients, lastErr) } } diff --git a/pkg/apiserver/invocation.go b/pkg/apiserver/invocation.go index 46249443..283f10f3 100644 --- a/pkg/apiserver/invocation.go +++ b/pkg/apiserver/invocation.go @@ -51,7 +51,7 @@ func (gi *grpcInvocationApiServer) InvokeSync(ctx context.Context, spec *types.W if err != nil { logrus.Warnf("Failed to get workflow invocation from cache: %v", err) } - if wi != nil && wi.GetStatus() != nil && wi.GetStatus().Status.Finished() { + if wi != nil && wi.GetStatus() != nil && wi.GetStatus().Finished() { result = wi.WorkflowInvocation break } diff --git a/pkg/controller/invocation.go b/pkg/controller/invocation.go index 654c769e..b90fad18 100644 --- a/pkg/controller/invocation.go +++ b/pkg/controller/invocation.go @@ -2,12 +2,11 @@ package controller import ( "context" - "fmt" - - "time" - + "reflect" "sync" + "sync/atomic" + "time" "github.com/fission/fission-workflows/pkg/api/function" "github.com/fission/fission-workflows/pkg/api/invocation" @@ -21,8 +20,6 @@ import ( "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" "github.com/sirupsen/logrus" - "reflect" - "sync/atomic" ) var wfiLog = log.WithField("component", "controller-wi") @@ -48,7 +45,7 @@ type ControlState struct { ErrorCount uint32 RecentError error QueueSize uint32 - lock sync.Mutex + lock *sync.Mutex } func (cs ControlState) AddError(err error) uint32 { @@ -203,7 +200,7 @@ func (cr *InvocationController) evaluate(invoc *types.WorkflowInvocation) { } // Check if we actually need to evaluate - if invoc.Status.Status.Finished() { + if invoc.Status.Finished() { // TODO remove finished wfi from active cache return } @@ -224,7 +221,7 @@ func (cr *InvocationController) evaluate(invoc *types.WorkflowInvocation) { } // Check if the workflow invocation is in the right state - if invoc.Status.Status.Finished() { + if invoc.Status.Finished() { wfiLog.Infof("No need to evaluate finished invocation %v", invoc.Metadata.Id) return } @@ -263,7 +260,7 @@ func (cr *InvocationController) evaluate(invoc *types.WorkflowInvocation) { finished := true for id := range tasks { t, ok := invoc.Status.Tasks[id] - if !ok || !t.Status.Status.Finished() { + if !ok || !t.Status.Finished() { finished = false break } diff --git a/pkg/fnenv/fission/envproxy.go b/pkg/fnenv/fission/envproxy.go index c067ef82..830e40f1 100644 --- a/pkg/fnenv/fission/envproxy.go +++ b/pkg/fnenv/fission/envproxy.go @@ -1,19 +1,16 @@ package fission import ( - "net/http" - - "context" - - "io/ioutil" - - "os" - "bytes" - + "context" "encoding/json" - + "errors" "fmt" + "io/ioutil" + "net/http" + "os" + "path" + "strings" "github.com/fission/fission" "github.com/fission/fission-workflows/pkg/apiserver" @@ -21,8 +18,7 @@ import ( "github.com/fission/fission/router" "github.com/gogo/protobuf/jsonpb" "github.com/sirupsen/logrus" - "path" - "strings" + "golang.org/x/sync/syncmap" ) // Proxy between Fission and Workflow to ensure that workflowInvocations comply with Fission function interface. This @@ -30,52 +26,57 @@ import ( type Proxy struct { invocationServer apiserver.WorkflowInvocationAPIServer workflowServer apiserver.WorkflowAPIServer - fissionIds map[string]bool + fissionIds syncmap.Map // map[string]bool } +// NewFissionProxyServer creates a proxy server to adheres to the Fission Environment specification. func NewFissionProxyServer(wfiSrv apiserver.WorkflowInvocationAPIServer, wfSrv apiserver.WorkflowAPIServer) *Proxy { return &Proxy{ invocationServer: wfiSrv, workflowServer: wfSrv, - fissionIds: map[string]bool{}, + fissionIds: syncmap.Map{}, } } +// RegisterServer adds the endpoints needed for the Fission Environment interface to the mux server. func (fp *Proxy) RegisterServer(mux *http.ServeMux) { mux.HandleFunc("/", fp.handleRequest) mux.HandleFunc("/v2/specialize", fp.handleSpecialize) + mux.HandleFunc("/healthz", fp.handleSpecialize) } +func (fp *Proxy) handleHealthCheck(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) +} + +// TODO revise !!! func (fp *Proxy) handleRequest(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logrus.Infof("Handling incoming request '%s'... ", r.URL) meta := router.HeadersToMetadata(router.HEADERS_FISSION_FUNCTION_PREFIX, r.Header) - fnId := string(meta.UID) + fnID := string(meta.UID) if len(meta.UID) == 0 { logrus.WithField("meta", meta).Error("Fission function name is missing") http.Error(w, "Fission function name is missing", 400) return } - // Map fission function name to workflow id - _, ok := fp.fissionIds[fnId] - + // Check if the workflow engine contains the workflow associated with the Fission function UID + _, ok := fp.fissionIds.Load(fnID) if !ok { // Fallback 1 : check if it is in the event store somewhere - wf, err := fp.workflowServer.Get(ctx, &apiserver.WorkflowIdentifier{Id: fnId}) - if err != nil || wf == nil { - logrus.WithField("fnId", fnId). - WithField("err", err). + if fp.hasWorkflow(ctx, fnID) { + logrus.WithField("fnID", fnID). WithField("map", fp.fissionIds). Error("Unknown fission function name") logrus.Warn(fp.fissionIds) - http.Error(w, "Unknown fission function name", 400) + http.Error(w, "Unknown fission function name; not specialized", 400) return } - fp.fissionIds[fnId] = true + fp.fissionIds.Store(fnID, true) } // Map request to workflow inputs @@ -86,27 +87,26 @@ func (fp *Proxy) handleRequest(w http.ResponseWriter, r *http.Request) { http.Error(w, "Failed to parse inputs", 400) return } - wfSpec := &types.WorkflowInvocationSpec{ - WorkflowId: fnId, + WorkflowId: fnID, Inputs: inputs, } // Temporary: in case of query header 'X-Async' being present, make request async if len(r.Header.Get("X-Async")) > 0 { - invocationId, err := fp.invocationServer.Invoke(ctx, wfSpec) - if err != nil { - logrus.Errorf("Failed to invoke: %v", err) - http.Error(w, err.Error(), 500) + invocationID, invokeErr := fp.invocationServer.Invoke(ctx, wfSpec) + if invokeErr != nil { + logrus.Errorf("Failed to invoke: %v", invokeErr) + http.Error(w, invokeErr.Error(), 500) return } w.WriteHeader(200) - w.Write([]byte(invocationId.Id)) + w.Write([]byte(invocationID.Id)) return } // Otherwise, the request synchronous like other Fission functions - invocation, err := fp.invocationServer.InvokeSync(ctx, wfSpec) + wi, err := fp.invocationServer.InvokeSync(ctx, wfSpec) if err != nil { logrus.Errorf("Failed to invoke: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) @@ -114,19 +114,19 @@ func (fp *Proxy) handleRequest(w http.ResponseWriter, r *http.Request) { } // In case of an error, create an error response corresponding to Fission function errors - if !invocation.Status.Status.Successful() { - logrus.Errorf("Invocation not successful, was '%v'", invocation.Status.Status.String()) - http.Error(w, invocation.Status.Status.String(), 500) + if !wi.Status.Successful() { + logrus.Errorf("Invocation not successful, was '%v'", wi.Status.Status.String()) + http.Error(w, wi.Status.Status.String(), 500) return } // Otherwise, create a response corresponding to Fission function responses. var resp []byte - if invocation.Status.Output != nil { - resp = invocation.Status.Output.Value - w.Header().Add("Content-Type", inferContentType(invocation.Status.Output, defaultContentType)) + if wi.Status.Output != nil { + resp = wi.Status.Output.Value + w.Header().Add("Content-Type", inferContentType(wi.Status.Output, defaultContentType)) } else { - logrus.Infof("Invocation '%v' has no output.", fnId) + logrus.Infof("Invocation '%v' has no output.", fnID) } w.WriteHeader(http.StatusOK) w.Write(resp) @@ -135,8 +135,7 @@ func (fp *Proxy) handleRequest(w http.ResponseWriter, r *http.Request) { func (fp *Proxy) handleSpecialize(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logrus.Info("Specializing...") - body := r.Body - bs, err := ioutil.ReadAll(body) + bs, err := ioutil.ReadAll(r.Body) if err != nil { logrus.Errorf("Failed to read body: %v", err) w.WriteHeader(http.StatusInternalServerError) @@ -146,6 +145,7 @@ func (fp *Proxy) handleSpecialize(w http.ResponseWriter, r *http.Request) { logrus.Info("Received specialization body:", string(bs)) + // Parse the function load request flr := &fission.FunctionLoadRequest{} err = json.Unmarshal(bs, flr) if err != nil { @@ -155,15 +155,8 @@ func (fp *Proxy) handleSpecialize(w http.ResponseWriter, r *http.Request) { return } - metadata := flr.FunctionMetadata - if metadata == nil { - logrus.Errorf("No workflow metadata provided: %v", metadata) - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte("No workflow metadata provided.")) - return - } - - wfId, err := fp.specializePackage(ctx, flr) + // Attempt to specialize with the provided function load request + wfIDs, err := fp.Specialize(ctx, flr) if err != nil { logrus.Errorf("failed to specialize: %v", err) if os.IsNotExist(err) { @@ -174,55 +167,90 @@ func (fp *Proxy) handleSpecialize(w http.ResponseWriter, r *http.Request) { w.Write([]byte(err.Error())) return } - logrus.Infof("Specialized successfully wf '%s'", wfId) + logrus.Infof("Specialized successfully wf '%s'", wfIDs) w.WriteHeader(http.StatusOK) - w.Write([]byte(wfId)) + // TODO change this to a more structured output once specialization protocol has been formalized + w.Write([]byte(strings.Join(wfIDs, ";"))) +} + +// Specialize creates workflows provided by a Fission Load Request. +// +// The Fission package can either exist out of a single workflow file, or out of a directory filled with +// solely workflow definitions. +func (fp *Proxy) Specialize(ctx context.Context, flr *fission.FunctionLoadRequest) ([]string, error) { + if flr == nil { + return nil, errors.New("no function load request provided") + } + + // Check if request contains the expected metadata + metadata := flr.FunctionMetadata + if metadata == nil { + logrus.Errorf("No workflow metadata provided: %v", metadata) + return nil, errors.New("no metadata provided in function load request") + } + + // Search provided package for workflow definitions + wfPaths, err := fp.findWorkflowFiles(ctx, flr) + if err != nil { + return nil, err + } + var wfIDs []string + + // Create workflows for each detected workflow definition + for _, wfPath := range wfPaths { + wfID, err := fp.createWorkflowFromFile(ctx, flr, wfPath) + if err != nil { + return nil, fmt.Errorf("failed to specialize package: %v", err) + } + wfIDs = append(wfIDs, wfID) + } + + return wfIDs, nil } -func (fp *Proxy) specializePackage(ctx context.Context, flr *fission.FunctionLoadRequest) (string, error) { +// findWorkflowFiles searches for all of the workflow definitions in a Fission package +func (fp *Proxy) findWorkflowFiles(ctx context.Context, flr *fission.FunctionLoadRequest) ([]string, error) { fi, err := os.Stat(flr.FilePath) if err != nil { - return "", fmt.Errorf("no file present at '%v", flr.FilePath) + return nil, fmt.Errorf("no file present at '%v", flr.FilePath) } if fi.IsDir() { // User provided a package contents, err := ioutil.ReadDir(flr.FilePath) if err != nil { - return "", fmt.Errorf("failed to read package: '%v", flr.FilePath) + return nil, fmt.Errorf("failed to read package: '%v", flr.FilePath) } if len(contents) == 0 { - return "", fmt.Errorf("package is empty") + return nil, fmt.Errorf("package is empty") } - var wfIds []string + var paths []string for _, n := range contents { + // Note: this assumes that all files in package are workflow definitions! file := path.Join(flr.FilePath, n.Name()) - wfId, err := fp.specialize(ctx, flr, file) - if err != nil { - return "", fmt.Errorf("failed to specialize package: %v", err) - } - wfIds = append(wfIds, wfId) + paths = append(paths, file) } // TODO maybe change to []string to provide all generated ids - return strings.Join(wfIds, ";"), nil - } else { - // Provided workflow is a file - return fp.specialize(ctx, flr, flr.FilePath) + return paths, nil } + // Provided workflow is a file + return []string{flr.FilePath}, nil } -func (fp *Proxy) specialize(ctx context.Context, flr *fission.FunctionLoadRequest, filePath string) (string, error) { +// createWorkflowFromFile creates a workflow given a path to the file containing the workflow definition +func (fp *Proxy) createWorkflowFromFile(ctx context.Context, flr *fission.FunctionLoadRequest, + path string) (string, error) { - rdr, err := os.Open(filePath) + rdr, err := os.Open(path) if err != nil { - return "", fmt.Errorf("failed to open file '%v': %v", filePath, err) + return "", fmt.Errorf("failed to open file '%v': %v", path, err) } raw, err := ioutil.ReadAll(rdr) if err != nil { - return "", fmt.Errorf("failed to read file '%v': %v", filePath, err) + return "", fmt.Errorf("failed to read file '%v': %v", path, err) } logrus.Infof("received definition: %s", string(raw)) @@ -241,10 +269,18 @@ func (fp *Proxy) specialize(ctx context.Context, flr *fission.FunctionLoadReques if err != nil { return "", fmt.Errorf("failed to store workflow internally: %v", err) } - wfId := resp.Id + wfID := resp.Id // Cache the ID so we don't have to check whether the workflow engine already has it. - fp.fissionIds[wfSpec.Id] = true + fp.fissionIds.Store(wfSpec.Id, true) + + return wfID, nil +} - return wfId, nil +func (fp *Proxy) hasWorkflow(ctx context.Context, fnID string) bool { + wf, err := fp.workflowServer.Get(ctx, &apiserver.WorkflowIdentifier{Id: fnID}) + if err != nil { + logrus.Errorf("Failed to get workflow: %v; assuming it is non-existent", err) + } + return wf != nil } diff --git a/pkg/fnenv/fission/request.go b/pkg/fnenv/fission/request.go index 31c7a0fa..4029731a 100644 --- a/pkg/fnenv/fission/request.go +++ b/pkg/fnenv/fission/request.go @@ -18,9 +18,9 @@ import ( ) const ( - InputBody = "body" // or 'default' - InputHttpMethod = "method" - InputContentType = "content_type" // to force the content type + inputBody = "body" // or 'default' + inputHTTPMethod = "method" + inputContentType = "content_type" // to force the content type defaultContentType = "text/plain" headerContentType = "Content-Type" @@ -32,7 +32,7 @@ func formatRequest(r *http.Request, source map[string]*types.TypedValue) error { formatHeaders(r, source) // TODO move error handling here // Map HTTP method inputs to request, or use default - formatHttpMethod(r, source) + formatMethod(r, source) // Map query inputs to request formatQuery(r.URL, source) // TODO move error handling here @@ -82,19 +82,19 @@ func parseRequest(r *http.Request, target map[string]*types.TypedValue) error { } // parseBody maps the body from a request to the "main" key in the target map -func parseBody(b io.ReadCloser, contentType string) (*types.TypedValue, error) { - body, err := ioutil.ReadAll(b) +func parseBody(body io.Reader, contentType string) (*types.TypedValue, error) { + bs, err := ioutil.ReadAll(body) if err != nil { return nil, errors.New("failed to read body") } - var i interface{} = body + var i interface{} = bs // TODO fix this, remove the hardcoded JSON transform if strings.Contains(contentType, "application/json") || strings.Contains(contentType, "text/json") { - err = json.Unmarshal(body, &i) + err = json.Unmarshal(bs, &i) if err != nil { - logrus.WithField("body", len(body)).Infof("Input is not json: %v", err) - i = body + logrus.WithField("body", len(bs)).Infof("Input is not json: %v", err) + i = bs } } @@ -156,8 +156,8 @@ func flattenMultimap(mm map[string][]string) map[string]interface{} { // formatting logic -func formatHttpMethod(target *http.Request, inputs map[string]*types.TypedValue) { - _, tv := getFirstDefinedTypedValue(inputs, InputHttpMethod) +func formatMethod(target *http.Request, inputs map[string]*types.TypedValue) { + _, tv := getFirstDefinedTypedValue(inputs, inputHTTPMethod) httpMethod := toString(tv) if httpMethod != "" { target.Method = httpMethod @@ -165,7 +165,7 @@ func formatHttpMethod(target *http.Request, inputs map[string]*types.TypedValue) } // TODO support multivalued query params at some point -func formatQuery(targetUrl *url.URL, inputs map[string]*types.TypedValue) { +func formatQuery(targetURL *url.URL, inputs map[string]*types.TypedValue) { queryInput := inputs[types.INPUT_QUERY] if queryInput == nil { return @@ -178,11 +178,11 @@ func formatQuery(targetUrl *url.URL, inputs map[string]*types.TypedValue) { switch i.(type) { case map[string]interface{}: - origQuery := targetUrl.Query() + origQuery := targetURL.Query() for k, v := range i.(map[string]interface{}) { origQuery.Add(k, fmt.Sprintf("%v", v)) } - targetUrl.RawQuery = origQuery.Encode() + targetURL.RawQuery = origQuery.Encode() default: logrus.Warnf("Ignoring invalid type of query input (expected map[string]interface{}, was %v)", reflect.TypeOf(i)) @@ -217,7 +217,7 @@ func formatHeaders(target *http.Request, inputs map[string]*types.TypedValue) { func formatBody(inputs map[string]*types.TypedValue) io.ReadCloser { var input []byte - _, mainInput := getFirstDefinedTypedValue(inputs, types.INPUT_MAIN, InputBody) + _, mainInput := getFirstDefinedTypedValue(inputs, types.INPUT_MAIN, inputBody) if mainInput != nil { // TODO ensure that it is a byte-representation 1-1 of actual value not the representation in TypedValue input = mainInput.Value @@ -228,7 +228,7 @@ func formatBody(inputs map[string]*types.TypedValue) io.ReadCloser { func formatContentType(inputs map[string]*types.TypedValue, defaultContentType string) string { // Check if content type is forced - _, tv := getFirstDefinedTypedValue(inputs, InputContentType) // TODO lookup in headers? + _, tv := getFirstDefinedTypedValue(inputs, inputContentType) // TODO lookup in headers? contentType := toString(tv) if contentType != "" { return contentType diff --git a/pkg/fnenv/fission/request_test.go b/pkg/fnenv/fission/request_test.go index 30e79eb9..305d69d4 100644 --- a/pkg/fnenv/fission/request_test.go +++ b/pkg/fnenv/fission/request_test.go @@ -2,15 +2,16 @@ package fission import ( "fmt" - "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/types/typedvalues" - "github.com/stretchr/testify/assert" "io" "io/ioutil" "net/http" "net/url" "strings" "testing" + + "github.com/fission/fission-workflows/pkg/types" + "github.com/fission/fission-workflows/pkg/types/typedvalues" + "github.com/stretchr/testify/assert" ) func TestFormatRequest(t *testing.T) { @@ -22,12 +23,12 @@ func TestFormatRequest(t *testing.T) { "Header-Key": "headerVal", } method := http.MethodPost - reqUrl, err := url.Parse("http://bar.example") + reqURL, err := url.Parse("http://bar.example") if err != nil { panic(err) } target := &http.Request{ - URL: reqUrl, + URL: reqURL, // TODO verify that existing headers, query params, etc stay in tact. } source := map[string]*types.TypedValue{ @@ -131,12 +132,12 @@ func TestParseRequestMinimal(t *testing.T) { assert.Equal(t, nil, query["nonExistent"]) } -func createRequest(method string, rawUrl string, headers map[string]string, bodyReader io.Reader) *http.Request { +func createRequest(method string, rawURL string, headers map[string]string, bodyReader io.Reader) *http.Request { mheaders := http.Header{} for k, v := range headers { mheaders[k] = []string{v} } - requrl, _ := url.Parse(rawUrl) + requrl, _ := url.Parse(rawURL) body := ioutil.NopCloser(bodyReader) return &http.Request{ Method: method, diff --git a/pkg/fnenv/fnenv.go b/pkg/fnenv/fnenv.go new file mode 100644 index 00000000..054387ff --- /dev/null +++ b/pkg/fnenv/fnenv.go @@ -0,0 +1,74 @@ +// Package fnenv provides interfaces to consistently communicate with 'function runtime environments' (fnenvs). +// +// A fnenv is a component responsible for (part of) the execution of the tasks/functions, it commonly consists +// out of at least the following implemented interfaces: +// - Resolver: resolves function references in workflow definitions to deterministic function IDs of the fnenv. +// - Runtime: executes a function in the fnenv given the task spec and returns the output. +// +// The fnenv package avoids a single, huge interface, which would make new implementations constrained and expensive, +// by splitting up the functionality into small (optional) interfaces. There is no required combination of interfaces +// that a fnenv needs to implement, although a Resolver and Runtime are considered the basic interfaces. +// +// A fnenv could implement additional interfaces which would allow the workflow engine to improve the execution. +// For example, by implementing the Notifier interface, the workflow engine will notify the fnenv ahead of time of the +// incoming function request. +package fnenv + +import ( + "time" + + "github.com/fission/fission-workflows/pkg/types" +) + +// Runtime is the minimal interface that a function runtime environment needs to conform with to handle tasks. +type Runtime interface { + + // Invoke executes the task in a blocking way. + // + // spec contains the complete configuration needed for the execution. + // It returns the TaskInvocationStatus with a completed (FINISHED, FAILED, ABORTED) status. + // An error is returned only when error occurs outside of the runtime's control. + Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocationStatus, error) +} + +// AsyncRuntime is a more extended interface that a runtime can optionally support. It allows for asynchronous +// invocations, allowing with progress tracking and invocation cancellations. +type AsyncRuntime interface { + + // InvokeAsync invokes a function in the runtime based on the spec and returns an identifier to allow the caller + // to reference the invocation. + InvokeAsync(spec *types.TaskInvocationSpec) (fnInvocationID string, err error) + + // Cancel cancels a function invocation using the function invocation ID. + Cancel(fnInvocationID string) error + + // Status fetches the status of a invocation. + // + // The interface user is responsible for determining whether the status indicates that a invocation has completed. + Status(fnInvocationID string) (*types.TaskInvocationStatus, error) +} + +// Resolver is the component that resolves a reference to a function to a deterministic, runtime-specific function UID. +type Resolver interface { + + // Resolve an ambiguous function name to a unique identifier of a function + // + // If the fnName does not exist an error will be displayed + // TODO refactor to FunctionRef (Fission Env v2) + Resolve(fnRef string) (string, error) +} + +// Notifier allows signalling of an incoming function invocation. +// +// This allows implementations to prepare for those invocations; performing the necessary +// resource provisioning or setup. +type Notifier interface { + + // Notify signals that a function invocation is expected at a specific point in time. + // + // expectedAt time should be in the future. Any time in the past is interpreted as + // a signal that the function invocation will come (almost) immediately. fnId is an optional + // identifier for the signal, which the implementation can use this to identify signals. + // By default, if fnId is empty, it is not possible to later update the notification. + Notify(taskID string, fn types.TaskTypeDef, expectedAt time.Time) error +} diff --git a/pkg/fnenv/mock/mock.go b/pkg/fnenv/mock/mock.go new file mode 100644 index 00000000..b804bcdd --- /dev/null +++ b/pkg/fnenv/mock/mock.go @@ -0,0 +1,148 @@ +// Package mock contains a minimal, mocked implementation of a fnenv for test purposes +package mock + +import ( + "fmt" + "time" + + "github.com/fission/fission-workflows/pkg/types" + "github.com/fission/fission-workflows/pkg/util" + "github.com/golang/protobuf/ptypes" + "github.com/sirupsen/logrus" +) + +// Func is the type for mocked functions used in the mock.Runtime +type Func func(spec *types.TaskInvocationSpec) (*types.TypedValue, error) + +// Runtime mocks the implementation of the various runtime. +// +// Mock functions can be added to Functions, and should have the mocked function ID as the key. +// For AsyncRuntime the results are stored and retrieved from the AsyncResults. The result is added +// automatically/instantly using the function, but can be avoided by enabling ManualExecution. +// +// Note it does not mock the resolver, which is mocked by the mock.Resolver +type Runtime struct { + Functions map[string]Func + AsyncResults map[string]*types.TaskInvocation + ManualExecution bool +} + +func (mk *Runtime) InvokeAsync(spec *types.TaskInvocationSpec) (string, error) { + fnName := spec.GetType().GetResolved() + + if _, ok := mk.Functions[fnName]; !ok { + return "", fmt.Errorf("could not invoke unknown function '%s'", fnName) + } + + invocationID := util.Uid() + mk.AsyncResults[invocationID] = &types.TaskInvocation{ + Metadata: &types.ObjectMetadata{ + Id: invocationID, + CreatedAt: ptypes.TimestampNow(), + }, + Spec: spec, + Status: &types.TaskInvocationStatus{ + Status: types.TaskInvocationStatus_IN_PROGRESS, + UpdatedAt: ptypes.TimestampNow(), + }, + } + + if !mk.ManualExecution { + err := mk.MockComplete(invocationID) + if err != nil { + panic(err) + } + } + + return invocationID, nil +} + +func (mk *Runtime) MockComplete(fnInvocationID string) error { + invocation, ok := mk.AsyncResults[fnInvocationID] + if !ok { + return fmt.Errorf("could not invoke unknown invocation '%s'", fnInvocationID) + } + + fnName := invocation.Spec.GetType().GetResolved() + fn, ok := mk.Functions[fnName] + if !ok { + return fmt.Errorf("could not invoke unknown function '%s'", fnName) + } + + result, err := fn(invocation.Spec) + if err != nil { + logrus.Infof("Function '%s' invocation resulted in an error: %v", fnName, err) + mk.AsyncResults[fnInvocationID].Status = &types.TaskInvocationStatus{ + Output: nil, + UpdatedAt: ptypes.TimestampNow(), + Status: types.TaskInvocationStatus_FAILED, + } + } else { + mk.AsyncResults[fnInvocationID].Status = &types.TaskInvocationStatus{ + Output: result, + UpdatedAt: ptypes.TimestampNow(), + Status: types.TaskInvocationStatus_SUCCEEDED, + } + } + + return nil +} + +func (mk *Runtime) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocationStatus, error) { + logrus.Info("Starting invocation...") + invocationID, err := mk.InvokeAsync(spec) + if err != nil { + return nil, err + } + err = mk.MockComplete(invocationID) + if err != nil { + return nil, err + } + + logrus.Infof("...completing function execution for '%v'", invocationID) + return mk.Status(invocationID) +} + +func (mk *Runtime) Cancel(fnInvocationID string) error { + invocation, ok := mk.AsyncResults[fnInvocationID] + if !ok { + return fmt.Errorf("could not invoke unknown invocation '%s'", fnInvocationID) + } + + invocation.Status = &types.TaskInvocationStatus{ + Output: nil, + UpdatedAt: ptypes.TimestampNow(), + Status: types.TaskInvocationStatus_ABORTED, + } + + return nil +} + +func (mk *Runtime) Status(fnInvocationID string) (*types.TaskInvocationStatus, error) { + invocation, ok := mk.AsyncResults[fnInvocationID] + if !ok { + return nil, fmt.Errorf("could not invoke unknown invocation '%s'", fnInvocationID) + } + + return invocation.Status, nil +} + +func (mk *Runtime) Notify(taskID string, fn types.TaskTypeDef, expectedAt time.Time) error { + return nil +} + +// Resolver is a mocked implementation of a resolver. +// +// Use FnNameIDs to setup a mapping to mock resolving function references to IDs. +type Resolver struct { + FnNameIDs map[string]string +} + +func (mf *Resolver) Resolve(fnName string) (string, error) { + fnID, ok := mf.FnNameIDs[fnName] + if !ok { + return "", fmt.Errorf("could not resolve function '%s' using resolve-map '%v'", fnName, mf.FnNameIDs) + } + + return fnID, nil +} diff --git a/pkg/fnenv/native/builtin/compose.go b/pkg/fnenv/native/builtin/compose.go index 943023cf..2491cd1a 100644 --- a/pkg/fnenv/native/builtin/compose.go +++ b/pkg/fnenv/native/builtin/compose.go @@ -7,7 +7,7 @@ import ( ) const ( - COMPOSE_INPUT = types.INPUT_MAIN + ComposeInput = types.INPUT_MAIN ) type FunctionCompose struct{} @@ -19,7 +19,7 @@ func (fn *FunctionCompose) Invoke(spec *types.TaskInvocationSpec) (*types.TypedV case 0: output = nil case 1: - defaultInput, ok := spec.GetInputs()[COMPOSE_INPUT] + defaultInput, ok := spec.GetInputs()[ComposeInput] if ok { output = defaultInput break diff --git a/pkg/fnenv/native/builtin/compose_test.go b/pkg/fnenv/native/builtin/compose_test.go index 0aa26ef3..19f705b7 100644 --- a/pkg/fnenv/native/builtin/compose_test.go +++ b/pkg/fnenv/native/builtin/compose_test.go @@ -12,7 +12,7 @@ func TestFunctionComposePassInput(t *testing.T) { &FunctionCompose{}, &types.TaskInvocationSpec{ Inputs: map[string]*types.TypedValue{ - COMPOSE_INPUT: parseUnsafe(expected), + ComposeInput: parseUnsafe(expected), }, }, expected) diff --git a/pkg/fnenv/native/builtin/if.go b/pkg/fnenv/native/builtin/if.go index 232d9b34..a3a19cfd 100644 --- a/pkg/fnenv/native/builtin/if.go +++ b/pkg/fnenv/native/builtin/if.go @@ -9,9 +9,10 @@ import ( ) const ( - IF_INPUT_CONDITION = "if" - IF_INPUT_CONSEQUENT = "then" - IF_INPUT_ALTERNATIVE = "else" + // Condition + IfInputCondition = "if" + IfInputThen = "then" + IfInputElse = "else" ) type FunctionIf struct{} @@ -19,14 +20,14 @@ type FunctionIf struct{} func (fn *FunctionIf) Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, error) { // Verify and get condition - expr, err := verifyInput(spec.GetInputs(), IF_INPUT_CONDITION, typedvalues.FormatType(typedvalues.FORMAT_JSON, typedvalues.TYPE_BOOL)) + expr, err := verifyInput(spec.GetInputs(), IfInputCondition, typedvalues.FormatType(typedvalues.FORMAT_JSON, typedvalues.TYPE_BOOL)) if err != nil { return nil, err } // Get consequent alternative, if one of those does not exist, that is fine. - consequent := spec.GetInputs()[IF_INPUT_CONSEQUENT] - alternative := spec.GetInputs()[IF_INPUT_ALTERNATIVE] + consequent := spec.GetInputs()[IfInputThen] + alternative := spec.GetInputs()[IfInputElse] // Parse condition to a bool i, err := typedvalues.Format(expr) diff --git a/pkg/fnenv/native/builtin/if_test.go b/pkg/fnenv/native/builtin/if_test.go index 71d97f0e..6a5cad64 100644 --- a/pkg/fnenv/native/builtin/if_test.go +++ b/pkg/fnenv/native/builtin/if_test.go @@ -14,8 +14,8 @@ func TestFunctionIfConsequentFlow(t *testing.T) { &FunctionIf{}, &types.TaskInvocationSpec{ Inputs: map[string]*types.TypedValue{ - IF_INPUT_CONDITION: parseUnsafe(true), - IF_INPUT_CONSEQUENT: parseUnsafe(expectedTask), + IfInputCondition: parseUnsafe(true), + IfInputThen: parseUnsafe(expectedTask), }, }, expectedTask) @@ -32,9 +32,9 @@ func TestFunctionIfAlternativeFlow(t *testing.T) { &FunctionIf{}, &types.TaskInvocationSpec{ Inputs: map[string]*types.TypedValue{ - IF_INPUT_CONDITION: parseUnsafe(false), - IF_INPUT_CONSEQUENT: parseUnsafe(task), - IF_INPUT_ALTERNATIVE: parseUnsafe(alternativeTask), + IfInputCondition: parseUnsafe(false), + IfInputThen: parseUnsafe(task), + IfInputElse: parseUnsafe(alternativeTask), }, }, alternativeTask) @@ -45,9 +45,9 @@ func TestFunctionIfLiteral(t *testing.T) { &FunctionIf{}, &types.TaskInvocationSpec{ Inputs: map[string]*types.TypedValue{ - IF_INPUT_CONDITION: parseUnsafe(true), - IF_INPUT_CONSEQUENT: parseUnsafe("foo"), - IF_INPUT_ALTERNATIVE: parseUnsafe("bar"), + IfInputCondition: parseUnsafe(true), + IfInputThen: parseUnsafe("foo"), + IfInputElse: parseUnsafe("bar"), }, }, "foo") @@ -58,8 +58,8 @@ func TestFunctionIfMissingAlternative(t *testing.T) { &FunctionIf{}, &types.TaskInvocationSpec{ Inputs: map[string]*types.TypedValue{ - IF_INPUT_CONDITION: parseUnsafe(false), - IF_INPUT_CONSEQUENT: parseUnsafe("then"), + IfInputCondition: parseUnsafe(false), + IfInputThen: parseUnsafe("then"), }, }, nil) diff --git a/pkg/fnenv/native/builtin/noop.go b/pkg/fnenv/native/builtin/noop.go index c850bae6..dcc401ef 100644 --- a/pkg/fnenv/native/builtin/noop.go +++ b/pkg/fnenv/native/builtin/noop.go @@ -6,7 +6,7 @@ import ( ) const ( - NOOP_INPUT = types.INPUT_MAIN + NoopInput = types.INPUT_MAIN ) type FunctionNoop struct{} @@ -18,7 +18,7 @@ func (fn *FunctionNoop) Invoke(spec *types.TaskInvocationSpec) (*types.TypedValu case 0: output = nil default: - defaultInput, ok := spec.GetInputs()[NOOP_INPUT] + defaultInput, ok := spec.GetInputs()[NoopInput] if ok { output = defaultInput break diff --git a/pkg/fnenv/native/builtin/noop_test.go b/pkg/fnenv/native/builtin/noop_test.go index b67b0af3..e177a988 100644 --- a/pkg/fnenv/native/builtin/noop_test.go +++ b/pkg/fnenv/native/builtin/noop_test.go @@ -12,7 +12,7 @@ func TestFunctionNoopPassInput(t *testing.T) { &FunctionNoop{}, &types.TaskInvocationSpec{ Inputs: map[string]*types.TypedValue{ - NOOP_INPUT: parseUnsafe(expected), + NoopInput: parseUnsafe(expected), }, }, expected) diff --git a/pkg/fnenv/native/builtin/repeat.go b/pkg/fnenv/native/builtin/repeat.go index 307a6aa3..398fe08a 100644 --- a/pkg/fnenv/native/builtin/repeat.go +++ b/pkg/fnenv/native/builtin/repeat.go @@ -8,22 +8,22 @@ import ( ) const ( - REPEAT_INPUT_TIMES = "times" - REPEAT_INPUT_DO = "do" + RepeatInputTimes = "times" + RepeatInputDo = "do" ) type FunctionRepeat struct{} func (fn *FunctionRepeat) Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, error) { - n, ok := spec.GetInputs()[REPEAT_INPUT_TIMES] + n, ok := spec.GetInputs()[RepeatInputTimes] if !ok { - return nil, fmt.Errorf("repeat needs '%s'", REPEAT_INPUT_TIMES) + return nil, fmt.Errorf("repeat needs '%s'", RepeatInputTimes) } - do, ok := spec.GetInputs()[REPEAT_INPUT_DO] + do, ok := spec.GetInputs()[RepeatInputDo] if !ok { - return nil, fmt.Errorf("repeat needs '%s'", REPEAT_INPUT_DO) + return nil, fmt.Errorf("repeat needs '%s'", RepeatInputDo) } // Parse condition to a int diff --git a/pkg/fnenv/native/builtin/scope.go b/pkg/fnenv/native/builtin/scope.go index 3b2c9660..52bb4a85 100644 --- a/pkg/fnenv/native/builtin/scope.go +++ b/pkg/fnenv/native/builtin/scope.go @@ -8,13 +8,13 @@ import ( ) const ( - SCOPE_INPUT = types.INPUT_MAIN + ScopeInput = types.INPUT_MAIN ) type FunctionScope struct{} func (fn *FunctionScope) Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, error) { - scope, ok := spec.GetInputs()[NOOP_INPUT] + scope, ok := spec.GetInputs()[NoopInput] if !ok { return nil, errors.New("missing scope input") } diff --git a/pkg/fnenv/native/builtin/sleep.go b/pkg/fnenv/native/builtin/sleep.go index 6df7e8af..94c4c774 100644 --- a/pkg/fnenv/native/builtin/sleep.go +++ b/pkg/fnenv/native/builtin/sleep.go @@ -1,24 +1,23 @@ package builtin import ( - "time" - "fmt" + "time" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" ) const ( - SLEEP_INPUT_MS = types.INPUT_MAIN - SLEEP_INPUT_MS_DEFAULT = time.Duration(1) * time.Second + SleepInput = types.INPUT_MAIN + SleepInputDefault = time.Duration(1) * time.Second ) type FunctionSleep struct{} func (f *FunctionSleep) Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, error) { - duration := SLEEP_INPUT_MS_DEFAULT - input, ok := spec.Inputs[SLEEP_INPUT_MS] + duration := SleepInputDefault + input, ok := spec.Inputs[SleepInput] if ok { i, err := typedvalues.Format(input) if err != nil { diff --git a/pkg/fnenv/native/builtin/sleep_test.go b/pkg/fnenv/native/builtin/sleep_test.go index ff76c7a7..0d0f5a19 100644 --- a/pkg/fnenv/native/builtin/sleep_test.go +++ b/pkg/fnenv/native/builtin/sleep_test.go @@ -15,7 +15,7 @@ func TestSleepFunctionString(t *testing.T) { &FunctionSleep{}, &types.TaskInvocationSpec{ Inputs: map[string]*types.TypedValue{ - SLEEP_INPUT_MS: parseUnsafe("1000ms"), + SleepInput: parseUnsafe("1000ms"), }, }, nil) @@ -29,7 +29,7 @@ func TestSleepFunctionInt(t *testing.T) { &FunctionSleep{}, &types.TaskInvocationSpec{ Inputs: map[string]*types.TypedValue{ - SLEEP_INPUT_MS: parseUnsafe(1000), + SleepInput: parseUnsafe(1000), }, }, nil) diff --git a/pkg/fnenv/native/builtin/util.go b/pkg/fnenv/native/builtin/util.go index 50106b98..7dddce47 100644 --- a/pkg/fnenv/native/builtin/util.go +++ b/pkg/fnenv/native/builtin/util.go @@ -24,7 +24,7 @@ func verifyInput(inputs map[string]*types.TypedValue, key string, expType string i, ok := inputs[key] if !ok { - return nil, fmt.Errorf("Input '%s' is not set.", key) + return nil, fmt.Errorf("input '%s' is not set", key) } //if !typedvalues.(i.Type, expType) { diff --git a/pkg/fnenv/native/runtime.go b/pkg/fnenv/native/native.go similarity index 99% rename from pkg/fnenv/native/runtime.go rename to pkg/fnenv/native/native.go index 78ab9639..5d0e0490 100644 --- a/pkg/fnenv/native/runtime.go +++ b/pkg/fnenv/native/native.go @@ -1,3 +1,4 @@ +// Note: package is called 'native' because 'internal' is not an allowed package name. package native import ( @@ -8,8 +9,6 @@ import ( log "github.com/sirupsen/logrus" ) -// Note: package is called 'native' because 'internal' is not an allowed package name. - // An InternalFunction is a function that will be executed in the same process as the invoker. type InternalFunction interface { Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, error) diff --git a/pkg/fnenv/test/mock.go b/pkg/fnenv/test/mock.go deleted file mode 100644 index 05383f16..00000000 --- a/pkg/fnenv/test/mock.go +++ /dev/null @@ -1,131 +0,0 @@ -package test - -import ( - "fmt" - - "github.com/fission/fission-workflows/pkg/types" - "github.com/fission/fission-workflows/pkg/util" - "github.com/golang/protobuf/ptypes" - "github.com/sirupsen/logrus" -) - -type MockFunc func(spec *types.TaskInvocationSpec) (*types.TypedValue, error) - -type MockRuntimeEnv struct { - Functions map[string]MockFunc - Results map[string]*types.TaskInvocation - ManualExecution bool -} - -func (mk *MockRuntimeEnv) InvokeAsync(spec *types.TaskInvocationSpec) (string, error) { - fnName := spec.GetType().GetResolved() - - if _, ok := mk.Functions[fnName]; !ok { - return "", fmt.Errorf("could not invoke unknown function '%s'", fnName) - } - - invocationId := util.Uid() - mk.Results[invocationId] = &types.TaskInvocation{ - Metadata: &types.ObjectMetadata{ - Id: invocationId, - CreatedAt: ptypes.TimestampNow(), - }, - Spec: spec, - Status: &types.TaskInvocationStatus{ - Status: types.TaskInvocationStatus_IN_PROGRESS, - UpdatedAt: ptypes.TimestampNow(), - }, - } - - if !mk.ManualExecution { - err := mk.MockComplete(invocationId) - if err != nil { - panic(err) - } - } - - return invocationId, nil -} - -func (mk *MockRuntimeEnv) MockComplete(fnInvocationId string) error { - invocation, ok := mk.Results[fnInvocationId] - if !ok { - return fmt.Errorf("could not invoke unknown invocation '%s'", fnInvocationId) - } - - fnName := invocation.Spec.GetType().GetResolved() - fn, ok := mk.Functions[fnName] - if !ok { - return fmt.Errorf("could not invoke unknown function '%s'", fnName) - } - - result, err := fn(invocation.Spec) - if err != nil { - logrus.Infof("Function '%s' invocation resulted in an error: %v", fnName, err) - mk.Results[fnInvocationId].Status = &types.TaskInvocationStatus{ - Output: nil, - UpdatedAt: ptypes.TimestampNow(), - Status: types.TaskInvocationStatus_FAILED, - } - } else { - mk.Results[fnInvocationId].Status = &types.TaskInvocationStatus{ - Output: result, - UpdatedAt: ptypes.TimestampNow(), - Status: types.TaskInvocationStatus_SUCCEEDED, - } - } - - return nil -} - -func (mk *MockRuntimeEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocationStatus, error) { - logrus.Info("Starting invocation...") - invocationId, err := mk.InvokeAsync(spec) - if err != nil { - return nil, err - } - err = mk.MockComplete(invocationId) - if err != nil { - return nil, err - } - - logrus.Infof("...completing function execution for '%v'", invocationId) - return mk.Status(invocationId) -} - -func (mk *MockRuntimeEnv) Cancel(fnInvocationId string) error { - invocation, ok := mk.Results[fnInvocationId] - if !ok { - return fmt.Errorf("could not invoke unknown invocation '%s'", fnInvocationId) - } - - invocation.Status = &types.TaskInvocationStatus{ - Output: nil, - UpdatedAt: ptypes.TimestampNow(), - Status: types.TaskInvocationStatus_ABORTED, - } - - return nil -} - -func (mk *MockRuntimeEnv) Status(fnInvocationId string) (*types.TaskInvocationStatus, error) { - invocation, ok := mk.Results[fnInvocationId] - if !ok { - return nil, fmt.Errorf("could not invoke unknown invocation '%s'", fnInvocationId) - } - - return invocation.Status, nil -} - -type MockFunctionResolver struct { - FnNameIds map[string]string -} - -func (mf *MockFunctionResolver) Resolve(fnName string) (string, error) { - fnId, ok := mf.FnNameIds[fnName] - if !ok { - return "", fmt.Errorf("could not resolve function '%s' using resolve-map '%v'", fnName, mf.FnNameIds) - } - - return fnId, nil -} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index af7de856..4bdeb8ae 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -68,7 +68,7 @@ func (ws *WorkflowScheduler) Evaluate(request *ScheduleRequest) (*Schedule, erro ctxLog.Warnf("Unknown task dependency: %v", depName) } - if ok && t.Invocation != nil && t.Invocation.Status.Status.Finished() { + if ok && t.Invocation != nil && t.Invocation.Status.Finished() { completedDeps = completedDeps + 1 } } diff --git a/pkg/types/types.go b/pkg/types/types.go index d846559b..e4d987b2 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -31,22 +31,22 @@ var taskFinalStates = []TaskInvocationStatus_Status{ TaskInvocationStatus_SUCCEEDED, } -func (wi WorkflowInvocationStatus_Status) Finished() bool { +func (wi WorkflowInvocationStatus) Finished() bool { for _, event := range invocationFinalStates { - if event == wi { + if event == wi.Status { return true } } return false } -func (wi WorkflowInvocationStatus_Status) Successful() bool { - return wi == WorkflowInvocationStatus_SUCCEEDED +func (wi WorkflowInvocationStatus) Successful() bool { + return wi.Status == WorkflowInvocationStatus_SUCCEEDED } -func (ti TaskInvocationStatus_Status) Finished() bool { +func (ti TaskInvocationStatus) Finished() bool { for _, event := range taskFinalStates { - if event == ti { + if event == ti.Status { return true } }