Skip to content

Commit

Permalink
Provide context to loggers
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed Feb 6, 2018
1 parent 82e47f3 commit 8040d18
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 79 deletions.
122 changes: 70 additions & 52 deletions pkg/controller/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/fission/fission-workflows/pkg/util/pubsub"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
"reflect"
"sync/atomic"
)

var wfiLog = log.WithField(LogKeyController, "wf")
var wfiLog = log.WithField("component", "controller-wi")

type InvocationController struct {
invokeCache fes.CacheReader
Expand All @@ -39,14 +41,32 @@ type InvocationController struct {

// Queued keeps track of which invocations still have actions in the workQueue
states map[string]ControlState
evalMutex sync.Mutex
// TODO add active cache
}

type ControlState struct {
errorCount int
recentError error
inQueue int
ErrorCount uint32
RecentError error
QueueSize uint32
lock sync.Mutex
}

func (cs ControlState) AddError(err error) uint32 {
cs.RecentError = err
return atomic.AddUint32(&cs.ErrorCount, 1)
}

func (cs ControlState) ResetError() {
cs.RecentError = nil
cs.ErrorCount = 0
}

func (cs ControlState) IncrementQueueSize() uint32 {
return atomic.AddUint32(&cs.QueueSize, 1)
}

func (cs ControlState) DecrementQueueSize() uint32 {
return atomic.AddUint32(&cs.QueueSize, ^uint32(0)) // TODO avoid overflow
}

func NewInvocationController(invokeCache fes.CacheReader, wfCache fes.CacheReader,
Expand Down Expand Up @@ -105,28 +125,20 @@ func (cr *InvocationController) Init(sctx context.Context) error {
for {
select {
case action := <-cr.workQueue:
state := cr.states[action.Id()]

if state.inQueue > 0 {
state.inQueue -= 1
}
cr.states[action.Id()] = state

go func() { // TODO limit goroutine pool size & atomic integer
go func() { // TODO limit goroutine pool size
err := action.Apply()
state := cr.states[action.Id()]
if err != nil {
wfiLog.WithField("action", action).Errorf("WorkflowInvocation action failed: %v", err)
state.recentError = err
state.errorCount += 1
wfiLog.WithField("action", action).Errorf("action failed: %v", err)
cr.states[action.Id()].AddError(err)
} else {
state.errorCount = 0
cr.states[action.Id()].ResetError()
}
cr.states[action.Id()] = state
cr.states[action.Id()].DecrementQueueSize()
}()

case <-ctx.Done():
wfiLog.WithField("ctx.err", ctx.Err()).Debug("WorkflowInvocation workQueue closed.")
wfiLog.WithField("ctx.err", ctx.Err()).Debug("workQueue closed.")
return
}
}
Expand All @@ -139,7 +151,7 @@ func (cr *InvocationController) HandleNotification(msg *fes.Notification) error
wfiLog.WithFields(logrus.Fields{
"notification": msg.EventType,
"labels": msg.Labels(),
}).Info("controller event trigger!")
}).Info("Handling notification!")

switch msg.EventType {
case events.Invocation_INVOCATION_CREATED.String():
Expand Down Expand Up @@ -174,38 +186,39 @@ func (cr *InvocationController) HandleTick() error {
return err
}

// Check if we actually need to evaluate
if wi.Status.Status.Finished() {
// TODO remove finished wfi from active cache
continue
}

// TODO check if workflow invocation is in a back-off

cr.evaluate(wi.WorkflowInvocation)
}
return nil
}

// TODO return error
func (cr *InvocationController) evaluate(invoc *types.WorkflowInvocation) {
// TODO lock on invocation-level
cr.evalMutex.Lock()
defer cr.evalMutex.Unlock()

state := cr.states[invoc.Metadata.Id]
state.lock.Lock()
defer state.lock.Unlock()

// Check if there are still open actions for this invocation
if state.QueueSize > 0 {
return
}

// Check if there are still open actions
if state.inQueue > 0 {
// Check if we actually need to evaluate
if invoc.Status.Status.Finished() {
// TODO remove finished wfi from active cache
return
}

// TODO check if workflow invocation is in a back-off

// Check if the graph has been failing too often
if state.errorCount > MaxErrorCount {
wfiLog.Infof("canceling invocation %v due to error count", invoc.Metadata.Id)
err := cr.invocationApi.Cancel(invoc.Metadata.Id) // TODO just submit?
if err != nil {
wfiLog.Errorf("failed to cancel timed out invocation: %v", err)
if state.ErrorCount > MaxErrorCount {
wfiLog.Infof("canceling due to error count %v exceeds max error count %v", state.ErrorCount, MaxErrorCount)
ok := cr.submit(&abortAction{
api: cr.invocationApi,
invocationId: invoc.Metadata.Id,
})
if !ok {
wfiLog.Error("failed to cancel timed out invocation.")
}
return
}
Expand All @@ -217,11 +230,15 @@ func (cr *InvocationController) evaluate(invoc *types.WorkflowInvocation) {
}

// For now: kill after 10 min
if (time.Now().Unix() - invoc.Metadata.CreatedAt.Seconds) > int64(InvocationTimeout.Seconds()) {
wfiLog.Infof("canceling timeout invocation %v", invoc.Metadata.Id)
err := cr.invocationApi.Cancel(invoc.Metadata.Id) // TODO just submit?
if err != nil {
wfiLog.Errorf("failed to cancel timed out invocation: %v", err)
duration := time.Now().Unix() - invoc.Metadata.CreatedAt.Seconds
if duration > int64(InvocationTimeout.Seconds()) {
wfiLog.Infof("cancelling due to timeout; %v exceeds max timeout %v", duration, int64(InvocationTimeout.Seconds()))
ok := cr.submit(&abortAction{
api: cr.invocationApi,
invocationId: invoc.Metadata.Id,
})
if !ok {
wfiLog.Error("failed to cancel timed out invocation.")
}
return
}
Expand Down Expand Up @@ -313,10 +330,10 @@ func (cr *InvocationController) submit(action Action) (submitted bool) {
select {
case cr.workQueue <- action:
// Ok
state := cr.states[action.Id()]
state.inQueue += 1
cr.states[action.Id()] = state
cr.states[action.Id()].IncrementQueueSize()
submitted = true
wfiLog.WithField("wfi", action.Id()).
Infof("submitted action: '%s'", reflect.TypeOf(action))
default:
// Action overflow
}
Expand All @@ -338,7 +355,7 @@ func (a *abortAction) Id() string {
}

func (a *abortAction) Apply() error {
wfiLog.Infof("aborting: '%v'", a.invocationId)
wfiLog.WithField("wfi", a.Id()).Info("Applying abort action")
return a.api.Cancel(a.invocationId)
}

Expand All @@ -356,6 +373,7 @@ func (a *invokeTaskAction) Id() string {
}

func (a *invokeTaskAction) Apply() error {
actionLog := wfiLog.WithField("wfi", a.Id())
// Find task (static or dynamic)
task, ok := a.wfi.Status.DynamicTasks[a.task.Id]
if !ok {
Expand All @@ -364,7 +382,7 @@ func (a *invokeTaskAction) Apply() error {
return fmt.Errorf("unknown task '%v'", a.task.Id)
}
}
wfiLog.Infof("Invoking function '%s' for task '%s'", task.FunctionRef, a.task.Id)
actionLog.Infof("Invoking function '%s' for task '%s'", task.FunctionRef, a.task.Id)

// Resolve type of the task
taskDef, ok := a.wf.Status.ResolvedTasks[task.FunctionRef]
Expand All @@ -378,15 +396,15 @@ func (a *invokeTaskAction) Apply() error {
for inputKey, val := range a.task.Inputs {
resolvedInput, err := a.expr.Resolve(queryScope, queryScope.Tasks[a.task.Id], nil, val)
if err != nil {
wfiLog.WithFields(logrus.Fields{
actionLog.WithFields(logrus.Fields{
"val": val,
"inputKey": inputKey,
}).Errorf("Failed to parse input: %v", err)
return err
}

inputs[inputKey] = resolvedInput
wfiLog.WithFields(logrus.Fields{
actionLog.WithFields(logrus.Fields{
"val": val,
"key": inputKey,
"resolved": resolvedInput,
Expand All @@ -402,7 +420,7 @@ func (a *invokeTaskAction) Apply() error {

_, err := a.api.Invoke(a.wfi.Metadata.Id, fnSpec)
if err != nil {
wfiLog.WithFields(logrus.Fields{
actionLog.WithFields(logrus.Fields{
"id": a.wfi.Metadata.Id,
"err": err,
}).Errorf("Failed to execute task")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
)

var (
metaLog = log.WithField(LogKeyController, "meta")
metaLog = log.WithField("controller", "controller-meta")
)

// MetaController is a 'controller for controllers', allowing for composition with controllers. It allows users to
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ const (
WorkQueueSize = 50
InvocationTimeout = time.Duration(10) * time.Minute
MaxErrorCount = 3
LogKeyController = "ctrl"
)

var log = logrus.New().WithFields(logrus.Fields{
"component": "controller",
LogKeyController: "?",
})

type Controller interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"fmt"
)

var wfLog = log.WithField(LogKeyController, "wf")
var wfLog = log.WithField("controller", "controller-wf")

// WorkflowController is the controller concerned with the lifecycle of workflows. It handles responsibilities, such as
// parsing of workflows.
Expand Down
9 changes: 4 additions & 5 deletions pkg/fnenv/fission/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package fission

import (
"github.com/fission/fission/controller/client"
"github.com/sirupsen/logrus"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Resolver implements the resolver interface to allow functions to be resolved through Fission
// Resolver implements the Resolver interface and is responsible for resolving function references to
// deterministic Fission function UIDs.
type Resolver struct {
controller *client.Client
}
Expand All @@ -17,7 +16,7 @@ func NewResolver(controller *client.Client) *Resolver {
}

func (re *Resolver) Resolve(fnName string) (string, error) {
logrus.WithField("name", fnName).Info("Resolving function ")
log.Infof("Resolving function: %s", fnName)
fn, err := re.controller.FunctionGet(&metav1.ObjectMeta{
Name: fnName,
Namespace: metav1.NamespaceDefault,
Expand All @@ -27,7 +26,7 @@ func (re *Resolver) Resolve(fnName string) (string, error) {
}
id := string(fn.Metadata.UID)

logrus.WithField("name", fnName).WithField("uid", id).Info("Resolved fission function")
log.Infof("Resolved fission function %s to %s", fnName, id)

return id, nil
}
20 changes: 10 additions & 10 deletions pkg/fnenv/fission/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"github.com/fission/fission/router"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"bytes"
)

var log = logrus.WithField("component", "fnenv-fission")

// FunctionEnv adapts the Fission platform to the function execution runtime. This allows the workflow engine
// to invoke Fission functions.
type FunctionEnv struct {
Expand All @@ -39,17 +42,14 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca
UID: k8stypes.UID(spec.GetType().GetResolved()),
Namespace: metav1.NamespaceDefault,
}
logrus.WithFields(logrus.Fields{
"name": meta.Name,
"uid": meta.UID,
"ns": meta.Namespace,
}).Info("Invoking Fission function.")

// Get reqUrl
// TODO use router instead once we can route to a specific function uid
ctxLog := log.WithField("fn", meta.Name)
ctxLog.Infof("Invoking Fission function: '%v'.", meta.Name, meta.UID)
serviceUrl, err := fe.executor.GetServiceForFunction(meta)
if err != nil {
logrus.WithFields(logrus.Fields{
log.WithFields(logrus.Fields{
"err": err,
"meta": meta,
}).Error("Fission function could not be found!")
Expand Down Expand Up @@ -86,14 +86,14 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca
return nil, fmt.Errorf("failed to parse output: %v", err)
}

logrus.Infof("[%s][Content-Type]: %v ", meta.Name, resp.Header.Get("Content-Type"))
logrus.Infof("[%s][output]: %v", meta.Name, output.Short())
logrus.Infof("[%s][status]: %v", meta.Name, resp.StatusCode)
ctxLog.Infof("[response][status]: %v", meta.Name, resp.StatusCode)
ctxLog.Infof("[response][Content-Type]: %v ", meta.Name, resp.Header.Get("Content-Type"))
ctxLog.Debugf("[%s][output]: %v", meta.Name, output)

// Determine status of the task invocation
if resp.StatusCode >= 400 {
msg, _ := typedvalues.Format(output)
logrus.Warn("[%s] Failed %v: %v", resp.StatusCode, msg)
ctxLog.Warn("[%s] Failed %v: %v", resp.StatusCode, msg)
return &types.TaskInvocationStatus{
Status: types.TaskInvocationStatus_FAILED,
Error: &types.Error{
Expand Down
Loading

0 comments on commit 8040d18

Please sign in to comment.