diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index da157ce0..048963a6 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -2,9 +2,11 @@ package bundle import ( "context" + "fmt" "net" "net/http" "os" + "time" "github.com/fission/fission-workflows/pkg/api" "github.com/fission/fission-workflows/pkg/apiserver" @@ -63,7 +65,10 @@ type FissionOptions struct { // Run serves enabled components in a blocking way func Run(ctx context.Context, opts *Options) error { - log.WithField("version", version.VersionInfo()).Info("Starting bundle...") + log.WithFields(log.Fields{ + "version": fmt.Sprintf("%+v", version.VersionInfo()), + "config": fmt.Sprintf("%+v", opts), + }).Info("Starting bundle...") var es fes.Backend var esPub pubsub.Publisher @@ -72,9 +77,10 @@ func Run(ctx context.Context, opts *Options) error { grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), ) - defer grpcServer.GracefulStop() - // Event Stores + // + // Event Store + // if opts.Nats != nil { log.WithFields(log.Fields{ "url": "!redacted!", @@ -95,7 +101,9 @@ func Run(ctx context.Context, opts *Options) error { wfiCache := getWorkflowInvocationCache(ctx, esPub) wfCache := getWorkflowCache(ctx, esPub) - // Resolvers and runtimes + // + // Function Runtimes + // invocationAPI := api.NewInvocationAPI(es) resolvers := map[string]fnenv.RuntimeResolver{} runtimes := map[string]fnenv.Runtime{} @@ -109,8 +117,10 @@ func Run(ctx context.Context, opts *Options) error { } if opts.InternalRuntime { log.Infof("Using Task Runtime: Internal") - runtimes["internal"] = setupInternalFunctionRuntime() - resolvers["internal"] = setupInternalFunctionRuntime() + internalRuntime := setupInternalFunctionRuntime() + runtimes["internal"] = internalRuntime + resolvers["internal"] = internalRuntime + log.Infof("Internal runtime functions: %v", internalRuntime.Installed()) } if opts.Fission != nil { log.WithFields(log.Fields{ @@ -122,7 +132,9 @@ func Run(ctx context.Context, opts *Options) error { resolvers["fission"] = setupFissionFunctionResolver(opts.Fission.ControllerAddr) } - // Controller + // + // Controllers + // if opts.InvocationController || opts.WorkflowController { var ctrls []controller.Controller if opts.WorkflowController { @@ -135,26 +147,51 @@ func Run(ctx context.Context, opts *Options) error { ctrls = append(ctrls, setupInvocationController(wfiCache(), wfCache(), es, runtimes, resolvers)) } - log.Info("Running controllers.") - runController(ctx, ctrls...) + ctrl := controller.NewMetaController(ctrls...) + go ctrl.Run(ctx) + defer func() { + err := ctrl.Close() + if err != nil { + log.Errorf("Failed to stop controllers: %v", err) + } else { + log.Info("Stopped controllers") + } + }() + } else { + log.Info("No controllers specified to run.") } - // Http servers + // + // Fission integration + // if opts.Fission != nil { proxyMux := http.NewServeMux() runFissionEnvironmentProxy(proxyMux, es, wfiCache(), wfCache(), resolvers) - proxySrv := &http.Server{Addr: fissionProxyAddress} - proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux) + fissionProxySrv := &http.Server{Addr: fissionProxyAddress} + fissionProxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux) if opts.Metrics { setupMetricsEndpoint(proxyMux) } - go proxySrv.ListenAndServe() - defer proxySrv.Shutdown(ctx) - log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr) + go func() { + err := fissionProxySrv.ListenAndServe() + log.WithField("err", err).Info("Fission Proxy server stopped") + }() + defer func() { + err := fissionProxySrv.Shutdown(ctx) + if err != nil { + log.Errorf("Failed to stop Fission Proxy server: %v", err) + } else { + log.Info("Stopped Fission Proxy server") + } + }() + log.Info("Serving HTTP Fission Proxy at: ", fissionProxySrv.Addr) } + // + // gRPC API + // if opts.AdminAPI { serveAdminAPI(grpcServer) } @@ -168,18 +205,27 @@ func Run(ctx context.Context, opts *Options) error { } if opts.AdminAPI || opts.WorkflowAPI || opts.InvocationAPI { - log.Info("Instrumenting gRPC server with Prometheus metrics") - grpc_prometheus.Register(grpcServer) + if opts.Metrics { + log.Debug("Instrumenting gRPC server with Prometheus metrics") + grpc_prometheus.Register(grpcServer) + } lis, err := net.Listen("tcp", gRPCAddress) if err != nil { log.Fatalf("failed to listen: %v", err) } - defer lis.Close() - log.Info("Serving gRPC services at: ", lis.Addr()) go grpcServer.Serve(lis) + defer func() { + grpcServer.GracefulStop() + lis.Close() + log.Info("Stopped gRPC server") + }() + log.Info("Serving gRPC services at: ", lis.Addr()) } + // + // HTTP API + // if opts.HTTPGateway || opts.Metrics { grpcMux := grpcruntime.NewServeMux() httpMux := http.NewServeMux() @@ -199,28 +245,34 @@ func Run(ctx context.Context, opts *Options) error { serveHTTPGateway(ctx, grpcMux, admin, wf, wfi) } - // Metrics if opts.Metrics { setupMetricsEndpoint(httpMux) log.Infof("Set up prometheus collector: %v/metrics", apiGatewayAddress) } - apiSrv := &http.Server{Addr: apiGatewayAddress} + httpApiSrv := &http.Server{Addr: apiGatewayAddress} httpMux.Handle("/", grpcMux) - apiSrv.Handler = httpMux + httpApiSrv.Handler = handlers.LoggingHandler(os.Stdout, httpMux) go func() { - err := apiSrv.ListenAndServe() - log.WithField("err", err).Info("HTTP Gateway exited") + err := httpApiSrv.ListenAndServe() + log.WithField("err", err).Info("HTTP Gateway stopped") + }() + defer func() { + err := httpApiSrv.Shutdown(ctx) + if err != nil { + log.Errorf("Failed to stop HTTP API server: %v", err) + } else { + log.Info("Stopped HTTP API server") + } }() - defer apiSrv.Shutdown(ctx) - log.Info("Serving HTTP API gateway at: ", apiSrv.Addr) + log.Info("Serving HTTP API gateway at: ", httpApiSrv.Addr) } - log.Info("Bundle set up.") + log.Info("Setup completed.") <-ctx.Done() - log.Info("Shutting down...") - // TODO properly shutdown components + log.WithField("reason", ctx.Err()).Info("Shutting down...") + time.Sleep(5 * time.Second) return nil } @@ -391,11 +443,6 @@ func setupWorkflowController(wfCache fes.CacheReader, es fes.Backend, return wfctr.NewController(wfCache, workflowAPI) } -func runController(ctx context.Context, ctrls ...controller.Controller) { - ctrl := controller.NewMetaController(ctrls...) - go ctrl.Run(ctx) -} - func setupMetricsEndpoint(apiMux *http.ServeMux) { apiMux.Handle("/metrics", promhttp.Handler()) } diff --git a/pkg/api/task.go b/pkg/api/task.go index 4609bdf4..d6f6e8b7 100644 --- a/pkg/api/task.go +++ b/pkg/api/task.go @@ -43,7 +43,6 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e return nil, err } - aggregate := aggregates.NewWorkflowInvocationAggregate(spec.InvocationId) taskID := spec.TaskId // assumption: 1 task == 1 TaskInvocation (How to deal with retries? Same invocation?) fn := &types.TaskInvocation{ Metadata: &types.ObjectMetadata{ @@ -58,6 +57,7 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e return nil, err } + aggregate := aggregates.NewWorkflowInvocationAggregate(spec.InvocationId) err = ap.es.Append(&fes.Event{ Type: events.Task_TASK_STARTED.String(), Parent: aggregate, diff --git a/pkg/controller/actions.go b/pkg/controller/actions.go index 9c48f92a..9306cc0f 100644 --- a/pkg/controller/actions.go +++ b/pkg/controller/actions.go @@ -38,7 +38,6 @@ type ActionRemoveFromEvalCache struct { } func (a *ActionRemoveFromEvalCache) Apply() error { - log.Infof("Removing '%s' from EvalCache", a.ID) a.EvalCache.Del(a.ID) return nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ecdddd80..a698d556 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -17,8 +17,7 @@ const ( ) var ( - log = logrus.New().WithFields(logrus.Fields{"component": "controller"}) - metaLog = log.WithField("controller", "controller-meta") + metaLog = logrus.New().WithFields(logrus.Fields{"component": "controller"}) // Controller-related metrics EvalJobs = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -88,16 +87,14 @@ func NewMetaController(ctrls ...Controller) *MetaController { } func (mc *MetaController) Init(ctx context.Context) error { - metaLog.Info("Running MetaController init.") for _, ctrl := range mc.ctrls { err := ctrl.Init(ctx) if err != nil { return err } - metaLog.Infof("'%s' controller init done.", reflect.TypeOf(ctrl)) + metaLog.Debugf("Controller '%s' initialized.", reflect.TypeOf(ctrl)) } - metaLog.Info("Finished MetaController init.") return nil } @@ -151,21 +148,17 @@ func (mc *MetaController) Notify(msg *fes.Notification) error { } func (mc *MetaController) Close() error { - metaLog.Info("Closing metacontroller and its controllers...") - var err error + var merr error for _, ctrl := range mc.ctrls { if closer, ok := ctrl.(io.Closer); ok { - err = closer.Close() + err := closer.Close() + if err != nil { + merr = err + metaLog.Errorf("Failed to stop controller %s: %v", reflect.TypeOf(ctrl), err) + } else { + metaLog.Debugf("Controller '%s' stopped.", reflect.TypeOf(ctrl)) + } } } - metaLog.Info("Closed MetaController") - return err -} - -func (mc *MetaController) Halt() { - mc.suspended = false -} - -func (mc *MetaController) Resume() { - mc.suspended = true + return merr } diff --git a/pkg/controller/invocation/actions.go b/pkg/controller/invocation/actions.go index 08b68de8..f24ad307 100644 --- a/pkg/controller/invocation/actions.go +++ b/pkg/controller/invocation/actions.go @@ -10,6 +10,7 @@ import ( "github.com/fission/fission-workflows/pkg/scheduler" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" + "github.com/fission/fission-workflows/pkg/util" "github.com/imdario/mergo" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -75,83 +76,96 @@ func (a *ActionInvokeTask) Eval(cec controller.EvalContext) controller.Action { panic("not implemented") } -func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) { - // Resolve the inputs - scope, err := expr.NewScope(a.Wf, a.Wfi) - if err != nil { - return nil, errors.Wrapf(err, "failed to create scope for task '%v'", a.Task.Id) - } - a.StateStore.Set(a.Wfi.ID(), scope) - - // Inherit scope if this invocation is part of a dynamic decision - if len(a.Wfi.Spec.ParentId) != 0 { - parentScope, ok := a.StateStore.Get(a.Wfi.Spec.ParentId) - if ok { - err := mergo.Merge(scope, parentScope) - if err != nil { - logrus.Errorf("Failed to inherit parent scope: %v", err) - } - } - } - inputs := map[string]*types.TypedValue{} - for _, input := range typedvalues.Prioritize(a.Task.Inputs) { - resolvedInput, err := expr.Resolve(scope, a.Task.Id, input.Val) - if err != nil { - wfiLog.WithFields(logrus.Fields{ - "val": input.Key, - "key": input.Val, - }).Errorf("Failed to resolve input: %v", err) - return nil, err - } - - inputs[input.Key] = resolvedInput - wfiLog.WithFields(logrus.Fields{ - "key": input.Key, - }).Infof("Resolved input: %v -> %v", typedvalues.MustFormat(input.Val), typedvalues.MustFormat(resolvedInput)) - - // Update the scope with the resolved type - scope.Tasks[a.Task.Id].Inputs[input.Key] = typedvalues.MustFormat(resolvedInput) - } - return inputs, nil +func (a *ActionInvokeTask) logger() logrus.FieldLogger { + return logrus.WithFields(logrus.Fields{ + "invocation": a.Wfi.ID(), + "workflow": a.Wf.ID(), + "task": a.Task.Id, + }) } func (a *ActionInvokeTask) Apply() error { - wfiLog.Infof("Running task: %v", a.Task.Id) - // Find Task (static or dynamic) + log := a.logger() + + // Find task task, ok := types.GetTask(a.Wf, a.Wfi, a.Task.Id) if !ok { return fmt.Errorf("task '%v' could not be found", a.Wfi.ID()) } - wfiLog.Infof("Invoking function '%s' for Task '%s'", task.Spec.FunctionRef, a.Task.Id) // Check if function has been resolved if task.Status.FnRef == nil { return fmt.Errorf("no resolved Task could be found for FunctionRef '%v'", task.Spec.FunctionRef) } - // Resolve inputs + // Pre-execution: Resolve expression inputs exprEvalStart := time.Now() inputs, err := a.resolveInputs() exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart))) if err != nil { + log.Error(err) return err } - // Invoke - fnSpec := &types.TaskInvocationSpec{ + // Invoke task + spec := &types.TaskInvocationSpec{ FnRef: task.Status.FnRef, TaskId: a.Task.Id, InvocationId: a.Wfi.ID(), Inputs: inputs, } - - _, err = a.API.Invoke(fnSpec) + log.Infof("Executing function: %v", spec.GetFnRef().Format()) + if logrus.GetLevel() == logrus.DebugLevel { + i, err := typedvalues.FormatTypedValueMap(typedvalues.DefaultParserFormatter, spec.GetInputs()) + if err != nil { + log.Errorf("Failed to format inputs for debugging: %v", err) + } else { + log.Debugf("Using inputs: %v", i) + } + } + _, err = a.API.Invoke(spec) if err != nil { - wfiLog.WithFields(logrus.Fields{ - "id": a.Wfi.Metadata.Id, - }).Errorf("Failed to execute task: %v", err) + log.Errorf("Failed to execute task: %v", err) return err } return nil } + +func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) { + log := a.logger() + + // Setup the scope for the expressions + scope, err := expr.NewScope(a.Wf, a.Wfi) + if err != nil { + return nil, errors.Wrapf(err, "failed to create scope for task '%v'", a.Task.Id) + } + a.StateStore.Set(a.Wfi.ID(), scope) + + // Inherit scope if this invocation is part of a dynamic invocation + if len(a.Wfi.Spec.ParentId) != 0 { + parentScope, ok := a.StateStore.Get(a.Wfi.Spec.ParentId) + if ok { + err := mergo.Merge(scope, parentScope) + if err != nil { + log.Errorf("Failed to inherit parent scope: %v", err) + } + } + } + + // Resolve each of the inputs (based on priority) + inputs := map[string]*types.TypedValue{} + for _, input := range typedvalues.Prioritize(a.Task.Inputs) { + resolvedInput, err := expr.Resolve(scope, a.Task.Id, input.Val) + if err != nil { + return nil, fmt.Errorf("failed to resolve input field %v: %v", input.Key, err) + } + inputs[input.Key] = resolvedInput + log.Infof("Resolved field %v: %v -> %v", input.Key, typedvalues.MustFormat(input.Val), + util.Truncate(typedvalues.MustFormat(resolvedInput), 100)) + + // Update the scope with the resolved type + scope.Tasks[a.Task.Id].Inputs[input.Key] = typedvalues.MustFormat(resolvedInput) + } + return inputs, nil +} diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index ccd734e5..90f729a7 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -27,7 +27,7 @@ const ( ) var ( - wfiLog = log.WithField("component", "controller-wi") + wfiLog = log.WithField("component", "controller.invocation") // workflow-related metrics invocationStatus = prometheus.NewCounterVec(prometheus.CounterOpts{ @@ -112,7 +112,7 @@ func (cr *Controller) Init(sctx context.Context) error { case notification := <-cr.sub.Ch: cr.handleMsg(notification) case <-ctx.Done(): - wfiLog.WithField("ctx.err", ctx.Err()).Debug("Notification listener closed.") + wfiLog.Debug("Notification listener stopped.") return } } @@ -127,6 +127,7 @@ func (cr *Controller) Init(sctx context.Context) error { go cr.Evaluate(eval) // TODO limit number of goroutines controller.EvalQueueSize.WithLabelValues("invocation").Dec() case <-ctx.Done(): + wfiLog.Debug("Evaluation queue listener stopped.") return } } @@ -326,11 +327,12 @@ func (cr *Controller) Evaluate(invocationID string) { } func (cr *Controller) Close() error { - wfiLog.Info("Closing invocation controller...") if invokePub, ok := cr.invokeCache.(pubsub.Publisher); ok { err := invokePub.Unsubscribe(cr.sub) if err != nil { - return err + wfiLog.Errorf("Failed to unsubscribe from invocation cache: %v", err) + } else { + wfiLog.Info("Unsubscribed from invocation cache") } } diff --git a/pkg/controller/rules.go b/pkg/controller/rules.go index fdc6acab..c479ae93 100644 --- a/pkg/controller/rules.go +++ b/pkg/controller/rules.go @@ -35,8 +35,6 @@ func (tf *RuleTimedOut) Eval(ec EvalContext) Action { } duration := time.Now().UnixNano() - initialStatus.Timestamp.UnixNano() if duration > tf.Timeout.Nanoseconds() { - log.Infof("cancelling due to timeout; %v exceeds max timeout %v", - duration, int64(tf.Timeout.Seconds())) return evalIfNotNil(tf.OnTimedOut, ec) } return evalIfNotNil(tf.OnWithinTime, ec) diff --git a/pkg/controller/workflow/controller.go b/pkg/controller/workflow/controller.go index 88c8937b..6050fc6b 100644 --- a/pkg/controller/workflow/controller.go +++ b/pkg/controller/workflow/controller.go @@ -27,7 +27,7 @@ const ( // TODO add hard limits (cache size, max concurrent invocation) var ( - wfLog = logrus.WithField("component", "controller.wf") + wfLog = logrus.WithField("component", "controller.workflow") workflowProcessDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: "workflows", @@ -90,7 +90,7 @@ func (c *Controller) Init(sctx context.Context) error { case notification := <-c.sub.Ch: c.handleMsg(notification) case <-ctx.Done(): - wfLog.WithField("ctx.err", ctx.Err()).Debug("Notification listener closed.") + wfLog.Debug("Notification listener closed.") return } } @@ -105,6 +105,7 @@ func (c *Controller) Init(sctx context.Context) error { controller.EvalQueueSize.WithLabelValues(Name).Dec() go c.Evaluate(eval) // TODO limit number of goroutines case <-ctx.Done(): + wfLog.Debug("Evaluation queue listener stopped.") return } } @@ -198,11 +199,12 @@ func (c *Controller) Evaluate(workflowID string) { } func (c *Controller) Close() error { - wfLog.Info("Closing workflow controller...") if invokePub, ok := c.wfCache.(pubsub.Publisher); ok { err := invokePub.Unsubscribe(c.sub) if err != nil { - return err + wfLog.Errorf("Failed to unsubscribe from workflow cache: %v", err) + } else { + wfLog.Info("Unsubscribed from workflow cache") } } diff --git a/pkg/fes/caches.go b/pkg/fes/caches.go index 4aa92dae..53ff7007 100644 --- a/pkg/fes/caches.go +++ b/pkg/fes/caches.go @@ -162,7 +162,7 @@ func NewSubscribedCache(ctx context.Context, cache CacheReaderWriter, target fun for { select { case <-ctx.Done(): - logrus.Info("SubscribedCache worker was canceled.") + logrus.Debug("SubscribedCache: listener stopped.") return case msg := <-sub.Ch: // Discard invalid messages diff --git a/pkg/fnenv/native/builtin/noop.go b/pkg/fnenv/native/builtin/noop.go index df4f184e..a3936b61 100644 --- a/pkg/fnenv/native/builtin/noop.go +++ b/pkg/fnenv/native/builtin/noop.go @@ -50,6 +50,10 @@ func (fn *FunctionNoop) Invoke(spec *types.TaskInvocationSpec) (*types.TypedValu break } } - logrus.Infof("[internal://%s] %v", Noop, typedvalues.MustFormat(output)) + logrus.WithFields(logrus.Fields{ + "invocation": spec.InvocationId, + "task": spec.TaskId, + }).Infof("[internal://%s] %v", Noop, + typedvalues.MustFormat(output)) return output, nil } diff --git a/pkg/fnenv/native/native.go b/pkg/fnenv/native/native.go index 211b5307..aed4d962 100644 --- a/pkg/fnenv/native/native.go +++ b/pkg/fnenv/native/native.go @@ -34,7 +34,6 @@ func NewFunctionEnv(fns map[string]InternalFunction) *FunctionEnv { env := &FunctionEnv{ fns: fns, } - log.WithField("fns", env.fns).Debugf("Internal function runtime installed.") return env } @@ -96,3 +95,12 @@ func (fe *FunctionEnv) Resolve(fnName string) (string, error) { func (fe *FunctionEnv) RegisterFn(name string, fn InternalFunction) { fe.fns[name] = fn } + +// Installed lists all installed functions in the internal function runtime. +func (fe *FunctionEnv) Installed() []string { + fns := make([]string, len(fe.fns)) + for fn := range fe.fns { + fns = append(fns, fn) + } + return fns +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2917fa85..8cd8dfc6 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -37,7 +37,8 @@ type WorkflowScheduler struct { func (ws *WorkflowScheduler) Evaluate(request *ScheduleRequest) (*Schedule, error) { ctxLog := log.WithFields(logrus.Fields{ - "wfi": request.Invocation.Metadata.Id, + "invocation": request.Invocation.ID(), + "workflow": request.Workflow.ID(), }) timeStart := time.Now() defer func() { diff --git a/pkg/util/util.go b/pkg/util/util.go index 6ce7096d..d885a64d 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -2,6 +2,7 @@ package util import ( "encoding/json" + "fmt" "sync" "time" @@ -50,3 +51,12 @@ func MustConvertStructsToMap(i interface{}) map[string]interface{} { return result } } + +func Truncate(val interface{}, maxLen int) string { + s := fmt.Sprintf("%v", val) + if len(s) <= maxLen { + return s + } + affix := fmt.Sprintf("", len(s)) + return s[:len(s)-len(affix)] + affix +}