diff --git a/pkg/controller/invocation.go b/pkg/controller/invocation.go index a9f6dac0..d9066d03 100644 --- a/pkg/controller/invocation.go +++ b/pkg/controller/invocation.go @@ -21,9 +21,11 @@ import ( "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" "github.com/sirupsen/logrus" + "reflect" + "sync/atomic" ) -var wfiLog = log.WithField(LogKeyController, "wf") +var wfiLog = log.WithField("component", "controller-wi") type InvocationController struct { invokeCache fes.CacheReader @@ -39,14 +41,32 @@ type InvocationController struct { // Queued keeps track of which invocations still have actions in the workQueue states map[string]ControlState - evalMutex sync.Mutex // TODO add active cache } type ControlState struct { - errorCount int - recentError error - inQueue int + ErrorCount uint32 + RecentError error + QueueSize uint32 + lock sync.Mutex +} + +func (cs ControlState) AddError(err error) uint32 { + cs.RecentError = err + return atomic.AddUint32(&cs.ErrorCount, 1) +} + +func (cs ControlState) ResetError() { + cs.RecentError = nil + cs.ErrorCount = 0 +} + +func (cs ControlState) IncrementQueueSize() uint32 { + return atomic.AddUint32(&cs.QueueSize, 1) +} + +func (cs ControlState) DecrementQueueSize() uint32 { + return atomic.AddUint32(&cs.QueueSize, ^uint32(0)) // TODO avoid overflow } func NewInvocationController(invokeCache fes.CacheReader, wfCache fes.CacheReader, @@ -105,28 +125,20 @@ func (cr *InvocationController) Init(sctx context.Context) error { for { select { case action := <-cr.workQueue: - state := cr.states[action.Id()] - - if state.inQueue > 0 { - state.inQueue -= 1 - } - cr.states[action.Id()] = state - go func() { // TODO limit goroutine pool size & atomic integer + go func() { // TODO limit goroutine pool size err := action.Apply() - state := cr.states[action.Id()] if err != nil { - wfiLog.WithField("action", action).Errorf("WorkflowInvocation action failed: %v", err) - state.recentError = err - state.errorCount += 1 + wfiLog.WithField("action", action).Errorf("action failed: %v", err) + cr.states[action.Id()].AddError(err) } else { - state.errorCount = 0 + cr.states[action.Id()].ResetError() } - cr.states[action.Id()] = state + cr.states[action.Id()].DecrementQueueSize() }() case <-ctx.Done(): - wfiLog.WithField("ctx.err", ctx.Err()).Debug("WorkflowInvocation workQueue closed.") + wfiLog.WithField("ctx.err", ctx.Err()).Debug("workQueue closed.") return } } @@ -139,7 +151,7 @@ func (cr *InvocationController) HandleNotification(msg *fes.Notification) error wfiLog.WithFields(logrus.Fields{ "notification": msg.EventType, "labels": msg.Labels(), - }).Info("controller event trigger!") + }).Info("Handling notification!") switch msg.EventType { case events.Invocation_INVOCATION_CREATED.String(): @@ -174,14 +186,6 @@ func (cr *InvocationController) HandleTick() error { return err } - // Check if we actually need to evaluate - if wi.Status.Status.Finished() { - // TODO remove finished wfi from active cache - continue - } - - // TODO check if workflow invocation is in a back-off - cr.evaluate(wi.WorkflowInvocation) } return nil @@ -189,23 +193,32 @@ func (cr *InvocationController) HandleTick() error { // TODO return error func (cr *InvocationController) evaluate(invoc *types.WorkflowInvocation) { - // TODO lock on invocation-level - cr.evalMutex.Lock() - defer cr.evalMutex.Unlock() - state := cr.states[invoc.Metadata.Id] + state.lock.Lock() + defer state.lock.Unlock() + + // Check if there are still open actions for this invocation + if state.QueueSize > 0 { + return + } - // Check if there are still open actions - if state.inQueue > 0 { + // Check if we actually need to evaluate + if invoc.Status.Status.Finished() { + // TODO remove finished wfi from active cache return } + // TODO check if workflow invocation is in a back-off + // Check if the graph has been failing too often - if state.errorCount > MaxErrorCount { - wfiLog.Infof("canceling invocation %v due to error count", invoc.Metadata.Id) - err := cr.invocationApi.Cancel(invoc.Metadata.Id) // TODO just submit? - if err != nil { - wfiLog.Errorf("failed to cancel timed out invocation: %v", err) + if state.ErrorCount > MaxErrorCount { + wfiLog.Infof("canceling due to error count %v exceeds max error count %v", state.ErrorCount, MaxErrorCount) + ok := cr.submit(&abortAction{ + api: cr.invocationApi, + invocationId: invoc.Metadata.Id, + }) + if !ok { + wfiLog.Error("failed to cancel timed out invocation.") } return } @@ -217,11 +230,15 @@ func (cr *InvocationController) evaluate(invoc *types.WorkflowInvocation) { } // For now: kill after 10 min - if (time.Now().Unix() - invoc.Metadata.CreatedAt.Seconds) > int64(InvocationTimeout.Seconds()) { - wfiLog.Infof("canceling timeout invocation %v", invoc.Metadata.Id) - err := cr.invocationApi.Cancel(invoc.Metadata.Id) // TODO just submit? - if err != nil { - wfiLog.Errorf("failed to cancel timed out invocation: %v", err) + duration := time.Now().Unix() - invoc.Metadata.CreatedAt.Seconds + if duration > int64(InvocationTimeout.Seconds()) { + wfiLog.Infof("cancelling due to timeout; %v exceeds max timeout %v", duration, int64(InvocationTimeout.Seconds())) + ok := cr.submit(&abortAction{ + api: cr.invocationApi, + invocationId: invoc.Metadata.Id, + }) + if !ok { + wfiLog.Error("failed to cancel timed out invocation.") } return } @@ -313,10 +330,10 @@ func (cr *InvocationController) submit(action Action) (submitted bool) { select { case cr.workQueue <- action: // Ok - state := cr.states[action.Id()] - state.inQueue += 1 - cr.states[action.Id()] = state + cr.states[action.Id()].IncrementQueueSize() submitted = true + wfiLog.WithField("wfi", action.Id()). + Infof("submitted action: '%s'", reflect.TypeOf(action)) default: // Action overflow } @@ -338,7 +355,7 @@ func (a *abortAction) Id() string { } func (a *abortAction) Apply() error { - wfiLog.Infof("aborting: '%v'", a.invocationId) + wfiLog.WithField("wfi", a.Id()).Info("Applying abort action") return a.api.Cancel(a.invocationId) } @@ -356,6 +373,7 @@ func (a *invokeTaskAction) Id() string { } func (a *invokeTaskAction) Apply() error { + actionLog := wfiLog.WithField("wfi", a.Id()) // Find task (static or dynamic) task, ok := a.wfi.Status.DynamicTasks[a.task.Id] if !ok { @@ -364,7 +382,7 @@ func (a *invokeTaskAction) Apply() error { return fmt.Errorf("unknown task '%v'", a.task.Id) } } - wfiLog.Infof("Invoking function '%s' for task '%s'", task.FunctionRef, a.task.Id) + actionLog.Infof("Invoking function '%s' for task '%s'", task.FunctionRef, a.task.Id) // Resolve type of the task taskDef, ok := a.wf.Status.ResolvedTasks[task.FunctionRef] @@ -378,7 +396,7 @@ func (a *invokeTaskAction) Apply() error { for inputKey, val := range a.task.Inputs { resolvedInput, err := a.expr.Resolve(queryScope, queryScope.Tasks[a.task.Id], nil, val) if err != nil { - wfiLog.WithFields(logrus.Fields{ + actionLog.WithFields(logrus.Fields{ "val": val, "inputKey": inputKey, }).Errorf("Failed to parse input: %v", err) @@ -386,7 +404,7 @@ func (a *invokeTaskAction) Apply() error { } inputs[inputKey] = resolvedInput - wfiLog.WithFields(logrus.Fields{ + actionLog.WithFields(logrus.Fields{ "val": val, "key": inputKey, "resolved": resolvedInput, @@ -402,7 +420,7 @@ func (a *invokeTaskAction) Apply() error { _, err := a.api.Invoke(a.wfi.Metadata.Id, fnSpec) if err != nil { - wfiLog.WithFields(logrus.Fields{ + actionLog.WithFields(logrus.Fields{ "id": a.wfi.Metadata.Id, "err": err, }).Errorf("Failed to execute task") diff --git a/pkg/controller/meta.go b/pkg/controller/meta.go index 4d0933ea..bf3241ab 100644 --- a/pkg/controller/meta.go +++ b/pkg/controller/meta.go @@ -18,7 +18,7 @@ const ( ) var ( - metaLog = log.WithField(LogKeyController, "meta") + metaLog = log.WithField("controller", "controller-meta") ) // MetaController is a 'controller for controllers', allowing for composition with controllers. It allows users to diff --git a/pkg/controller/types.go b/pkg/controller/types.go index 4a71e234..855c3077 100644 --- a/pkg/controller/types.go +++ b/pkg/controller/types.go @@ -12,12 +12,10 @@ const ( WorkQueueSize = 50 InvocationTimeout = time.Duration(10) * time.Minute MaxErrorCount = 3 - LogKeyController = "ctrl" ) var log = logrus.New().WithFields(logrus.Fields{ "component": "controller", - LogKeyController: "?", }) type Controller interface { diff --git a/pkg/controller/workflow.go b/pkg/controller/workflow.go index 44fc1950..ff8fd0d7 100644 --- a/pkg/controller/workflow.go +++ b/pkg/controller/workflow.go @@ -17,7 +17,7 @@ import ( "fmt" ) -var wfLog = log.WithField(LogKeyController, "wf") +var wfLog = log.WithField("controller", "controller-wf") // WorkflowController is the controller concerned with the lifecycle of workflows. It handles responsibilities, such as // parsing of workflows. diff --git a/pkg/fnenv/fission/httputil.go b/pkg/fnenv/fission/httputil.go index 7682cd36..aed0d439 100644 --- a/pkg/fnenv/fission/httputil.go +++ b/pkg/fnenv/fission/httputil.go @@ -10,12 +10,11 @@ import ( "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/typedvalues" - "github.com/sirupsen/logrus" ) func ParseRequest(r *http.Request, target map[string]*types.TypedValue) error { contentType := r.Header.Get("Content-Type") - logrus.WithField("url", r.URL).WithField("content-type", contentType).Info("Request content-type") + log.WithField("url", r.URL).WithField("content-type", contentType).Info("Request content-type") // Map Inputs to function parameters body, err := ioutil.ReadAll(r.Body) defer r.Body.Close() @@ -27,18 +26,17 @@ func ParseRequest(r *http.Request, target map[string]*types.TypedValue) error { if len(body) > 0 { err = json.Unmarshal(body, &i) if err != nil { - logrus.WithField("body", len(body)).Infof("Input is not json: %v", err) + log.WithField("body", len(body)).Debugf("Input is not json: %v", err) i = body } } parsedInput, err := typedvalues.Parse(i) if err != nil { - logrus.Errorf("Failed to parse body: %v", err) return errors.New("failed to parse body") } - logrus.WithField(types.INPUT_MAIN, parsedInput).Info("Parsed body") + log.WithField(types.INPUT_MAIN, parsedInput).Info("Parsed body") target[types.INPUT_MAIN] = parsedInput return nil } diff --git a/pkg/fnenv/fission/resolver.go b/pkg/fnenv/fission/resolver.go index ed8b76b9..224836ec 100644 --- a/pkg/fnenv/fission/resolver.go +++ b/pkg/fnenv/fission/resolver.go @@ -2,11 +2,12 @@ package fission import ( "github.com/fission/fission/controller/client" - "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// fission.Resolver implements the Resolver interface and is responsible for resolving function references to +// deterministic Fission function UIDs. + type Resolver struct { controller *client.Client } @@ -16,7 +17,7 @@ func NewResolver(controller *client.Client) *Resolver { } func (re *Resolver) Resolve(fnName string) (string, error) { - logrus.WithField("name", fnName).Info("Resolving function ") + log.Infof("Resolving function: %s", fnName) fn, err := re.controller.FunctionGet(&metav1.ObjectMeta{ Name: fnName, Namespace: metav1.NamespaceDefault, @@ -26,7 +27,7 @@ func (re *Resolver) Resolve(fnName string) (string, error) { } id := string(fn.Metadata.UID) - logrus.WithField("name", fnName).WithField("uid", id).Info("Resolved fission function") + log.Infof("Resolved fission function %s to %s", fnName, id) return id, nil } diff --git a/pkg/fnenv/fission/runtime.go b/pkg/fnenv/fission/runtime.go index 61fcaf5e..de118f37 100644 --- a/pkg/fnenv/fission/runtime.go +++ b/pkg/fnenv/fission/runtime.go @@ -20,6 +20,8 @@ import ( k8stypes "k8s.io/apimachinery/pkg/types" ) +var log = logrus.WithField("component", "fnenv-fission") + // FunctionEnv adapts the Fission platform to the function execution runtime. type FunctionEnv struct { executor *executor.Client @@ -39,12 +41,11 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca UID: k8stypes.UID(spec.GetType().GetResolved()), Namespace: metav1.NamespaceDefault, } - logrus.WithFields(logrus.Fields{ - "metadata": meta, - }).Info("Invoking Fission function.") + ctxLog := log.WithField("fn", meta.Name) + ctxLog.Infof("Invoking Fission function: '%v'.", meta.Name, meta.UID) serviceUrl, err := fe.executor.GetServiceForFunction(meta) if err != nil { - logrus.WithFields(logrus.Fields{ + log.WithFields(logrus.Fields{ "err": err, "meta": meta, }).Error("Fission function failed!") @@ -67,7 +68,6 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca } r := bytes.NewReader(input) - logrus.Infof("[request][body]: %v", string(input)) // TODO map other parameters as well (to params) req, err := http.NewRequest(http.MethodPost, url, r) @@ -79,23 +79,24 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca router.MetadataToHeaders(router.HEADERS_FISSION_FUNCTION_PREFIX, meta, req) reqContentType := ToContentType(mainInput) - logrus.Infof("[request][Content-Type]: %v", reqContentType) req.Header.Set("Content-Type", reqContentType) + ctxLog.Infof("[request][Content-Type]: %v", reqContentType) + ctxLog.Debugf("[request][body]: %v", string(input)) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, fmt.Errorf("error for url '%s': %v", serviceUrl, err) } - logrus.Infof("[%s][Content-Type]: %v ", meta.Name, resp.Header.Get("Content-Type")) output := ToTypedValue(resp) - logrus.Infof("[%s][output]: %v", meta.Name, output) - logrus.Infof("[%s][status]: %v", meta.Name, resp.StatusCode) + ctxLog.Infof("[response][status]: %v", meta.Name, resp.StatusCode) + ctxLog.Infof("[response][Content-Type]: %v ", meta.Name, resp.Header.Get("Content-Type")) + ctxLog.Debugf("[%s][output]: %v", meta.Name, output) // Determine status of the task invocation if resp.StatusCode >= 400 { msg, _ := typedvalues.Format(output) - logrus.Warn("[%s] Failed %v: %v", resp.StatusCode, msg) + ctxLog.Warn("[%s] Failed %v: %v", resp.StatusCode, msg) return &types.TaskInvocationStatus{ Status: types.TaskInvocationStatus_FAILED, Error: &types.Error{ @@ -142,10 +143,10 @@ func ToTypedValue(resp *http.Response) *types.TypedValue { var i interface{} = body if strings.Contains(contentType, "application/json") || strings.Contains(contentType, "text/json") { - logrus.Info("Assuming JSON") + log.Info("Assuming JSON") err := json.Unmarshal(body, &i) if err != nil { - logrus.Warnf("Expected JSON response could not be parsed: %v", err) + log.Warnf("Expected JSON response could not be parsed: %v", err) } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 466247b0..27b30dad 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -7,24 +7,26 @@ import ( "github.com/fission/fission-workflows/pkg/types" "github.com/golang/protobuf/ptypes" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) + +var log = logrus.WithField("component", "scheduler") + type WorkflowScheduler struct { } func (ws *WorkflowScheduler) Evaluate(request *ScheduleRequest) (*Schedule, error) { + ctxLog := log.WithFields(logrus.Fields{ + "wfi": request.Invocation.Metadata.Id, + }) + schedule := &Schedule{ InvocationId: request.Invocation.Metadata.Id, CreatedAt: ptypes.TimestampNow(), Actions: []*Action{}, } - ctxLog := log.WithFields(log.Fields{ - "workflow": request.Workflow.Metadata.Id, - "invoke": request.Invocation.Metadata.Id, - }) - ctxLog.Info("Scheduler evaluating...") cwf := types.CalculateTaskDependencyGraph(request.Workflow, request.Invocation) @@ -64,7 +66,7 @@ func (ws *WorkflowScheduler) Evaluate(request *ScheduleRequest) (*Schedule, erro for depName := range task.Requires { t, ok := cwf[depName] if !ok { - log.Warnf("Unknown task dependency: %v", depName) + ctxLog.Warnf("Unknown task dependency: %v", depName) } if ok && t.Invocation != nil && t.Invocation.Status.Status.Finished() { @@ -72,7 +74,7 @@ func (ws *WorkflowScheduler) Evaluate(request *ScheduleRequest) (*Schedule, erro } } - log.WithFields(log.Fields{ + ctxLog.WithFields(logrus.Fields{ "completedDeps": completedDeps, "task": id, "max": int(math.Max(float64(task.Await), float64(len(task.Requires)-1))),