Skip to content

Commit

Permalink
Provide context to loggers
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed Nov 29, 2017
1 parent 487ee72 commit 920ed5a
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 85 deletions.
122 changes: 70 additions & 52 deletions pkg/controller/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/fission/fission-workflows/pkg/util/pubsub"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
"reflect"
"sync/atomic"
)

var wfiLog = log.WithField(LogKeyController, "wf")
var wfiLog = log.WithField("component", "controller-wi")

type InvocationController struct {
invokeCache fes.CacheReader
Expand All @@ -39,14 +41,32 @@ type InvocationController struct {

// Queued keeps track of which invocations still have actions in the workQueue
states map[string]ControlState
evalMutex sync.Mutex
// TODO add active cache
}

type ControlState struct {
errorCount int
recentError error
inQueue int
ErrorCount uint32
RecentError error
QueueSize uint32
lock sync.Mutex
}

func (cs ControlState) AddError(err error) uint32 {
cs.RecentError = err
return atomic.AddUint32(&cs.ErrorCount, 1)
}

func (cs ControlState) ResetError() {
cs.RecentError = nil
cs.ErrorCount = 0
}

func (cs ControlState) IncrementQueueSize() uint32 {
return atomic.AddUint32(&cs.QueueSize, 1)
}

func (cs ControlState) DecrementQueueSize() uint32 {
return atomic.AddUint32(&cs.QueueSize, ^uint32(0)) // TODO avoid overflow
}

func NewInvocationController(invokeCache fes.CacheReader, wfCache fes.CacheReader,
Expand Down Expand Up @@ -105,28 +125,20 @@ func (cr *InvocationController) Init(sctx context.Context) error {
for {
select {
case action := <-cr.workQueue:
state := cr.states[action.Id()]

if state.inQueue > 0 {
state.inQueue -= 1
}
cr.states[action.Id()] = state

go func() { // TODO limit goroutine pool size & atomic integer
go func() { // TODO limit goroutine pool size
err := action.Apply()
state := cr.states[action.Id()]
if err != nil {
wfiLog.WithField("action", action).Errorf("WorkflowInvocation action failed: %v", err)
state.recentError = err
state.errorCount += 1
wfiLog.WithField("action", action).Errorf("action failed: %v", err)
cr.states[action.Id()].AddError(err)
} else {
state.errorCount = 0
cr.states[action.Id()].ResetError()
}
cr.states[action.Id()] = state
cr.states[action.Id()].DecrementQueueSize()
}()

case <-ctx.Done():
wfiLog.WithField("ctx.err", ctx.Err()).Debug("WorkflowInvocation workQueue closed.")
wfiLog.WithField("ctx.err", ctx.Err()).Debug("workQueue closed.")
return
}
}
Expand All @@ -139,7 +151,7 @@ func (cr *InvocationController) HandleNotification(msg *fes.Notification) error
wfiLog.WithFields(logrus.Fields{
"notification": msg.EventType,
"labels": msg.Labels(),
}).Info("controller event trigger!")
}).Info("Handling notification!")

switch msg.EventType {
case events.Invocation_INVOCATION_CREATED.String():
Expand Down Expand Up @@ -174,38 +186,39 @@ func (cr *InvocationController) HandleTick() error {
return err
}

// Check if we actually need to evaluate
if wi.Status.Status.Finished() {
// TODO remove finished wfi from active cache
continue
}

// TODO check if workflow invocation is in a back-off

cr.evaluate(wi.WorkflowInvocation)
}
return nil
}

// TODO return error
func (cr *InvocationController) evaluate(invoc *types.WorkflowInvocation) {
// TODO lock on invocation-level
cr.evalMutex.Lock()
defer cr.evalMutex.Unlock()

state := cr.states[invoc.Metadata.Id]
state.lock.Lock()
defer state.lock.Unlock()

// Check if there are still open actions for this invocation
if state.QueueSize > 0 {
return
}

// Check if there are still open actions
if state.inQueue > 0 {
// Check if we actually need to evaluate
if invoc.Status.Status.Finished() {
// TODO remove finished wfi from active cache
return
}

// TODO check if workflow invocation is in a back-off

// Check if the graph has been failing too often
if state.errorCount > MaxErrorCount {
wfiLog.Infof("canceling invocation %v due to error count", invoc.Metadata.Id)
err := cr.invocationApi.Cancel(invoc.Metadata.Id) // TODO just submit?
if err != nil {
wfiLog.Errorf("failed to cancel timed out invocation: %v", err)
if state.ErrorCount > MaxErrorCount {
wfiLog.Infof("canceling due to error count %v exceeds max error count %v", state.ErrorCount, MaxErrorCount)
ok := cr.submit(&abortAction{
api: cr.invocationApi,
invocationId: invoc.Metadata.Id,
})
if !ok {
wfiLog.Error("failed to cancel timed out invocation.")
}
return
}
Expand All @@ -217,11 +230,15 @@ func (cr *InvocationController) evaluate(invoc *types.WorkflowInvocation) {
}

// For now: kill after 10 min
if (time.Now().Unix() - invoc.Metadata.CreatedAt.Seconds) > int64(InvocationTimeout.Seconds()) {
wfiLog.Infof("canceling timeout invocation %v", invoc.Metadata.Id)
err := cr.invocationApi.Cancel(invoc.Metadata.Id) // TODO just submit?
if err != nil {
wfiLog.Errorf("failed to cancel timed out invocation: %v", err)
duration := time.Now().Unix() - invoc.Metadata.CreatedAt.Seconds
if duration > int64(InvocationTimeout.Seconds()) {
wfiLog.Infof("cancelling due to timeout; %v exceeds max timeout %v", duration, int64(InvocationTimeout.Seconds()))
ok := cr.submit(&abortAction{
api: cr.invocationApi,
invocationId: invoc.Metadata.Id,
})
if !ok {
wfiLog.Error("failed to cancel timed out invocation.")
}
return
}
Expand Down Expand Up @@ -313,10 +330,10 @@ func (cr *InvocationController) submit(action Action) (submitted bool) {
select {
case cr.workQueue <- action:
// Ok
state := cr.states[action.Id()]
state.inQueue += 1
cr.states[action.Id()] = state
cr.states[action.Id()].IncrementQueueSize()
submitted = true
wfiLog.WithField("wfi", action.Id()).
Infof("submitted action: '%s'", reflect.TypeOf(action))
default:
// Action overflow
}
Expand All @@ -338,7 +355,7 @@ func (a *abortAction) Id() string {
}

func (a *abortAction) Apply() error {
wfiLog.Infof("aborting: '%v'", a.invocationId)
wfiLog.WithField("wfi", a.Id()).Info("Applying abort action")
return a.api.Cancel(a.invocationId)
}

Expand All @@ -356,6 +373,7 @@ func (a *invokeTaskAction) Id() string {
}

func (a *invokeTaskAction) Apply() error {
actionLog := wfiLog.WithField("wfi", a.Id())
// Find task (static or dynamic)
task, ok := a.wfi.Status.DynamicTasks[a.task.Id]
if !ok {
Expand All @@ -364,7 +382,7 @@ func (a *invokeTaskAction) Apply() error {
return fmt.Errorf("unknown task '%v'", a.task.Id)
}
}
wfiLog.Infof("Invoking function '%s' for task '%s'", task.FunctionRef, a.task.Id)
actionLog.Infof("Invoking function '%s' for task '%s'", task.FunctionRef, a.task.Id)

// Resolve type of the task
taskDef, ok := a.wf.Status.ResolvedTasks[task.FunctionRef]
Expand All @@ -378,15 +396,15 @@ func (a *invokeTaskAction) Apply() error {
for inputKey, val := range a.task.Inputs {
resolvedInput, err := a.expr.Resolve(queryScope, queryScope.Tasks[a.task.Id], nil, val)
if err != nil {
wfiLog.WithFields(logrus.Fields{
actionLog.WithFields(logrus.Fields{
"val": val,
"inputKey": inputKey,
}).Errorf("Failed to parse input: %v", err)
return err
}

inputs[inputKey] = resolvedInput
wfiLog.WithFields(logrus.Fields{
actionLog.WithFields(logrus.Fields{
"val": val,
"key": inputKey,
"resolved": resolvedInput,
Expand All @@ -402,7 +420,7 @@ func (a *invokeTaskAction) Apply() error {

_, err := a.api.Invoke(a.wfi.Metadata.Id, fnSpec)
if err != nil {
wfiLog.WithFields(logrus.Fields{
actionLog.WithFields(logrus.Fields{
"id": a.wfi.Metadata.Id,
"err": err,
}).Errorf("Failed to execute task")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
)

var (
metaLog = log.WithField(LogKeyController, "meta")
metaLog = log.WithField("controller", "controller-meta")
)

// MetaController is a 'controller for controllers', allowing for composition with controllers. It allows users to
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ const (
WorkQueueSize = 50
InvocationTimeout = time.Duration(10) * time.Minute
MaxErrorCount = 3
LogKeyController = "ctrl"
)

var log = logrus.New().WithFields(logrus.Fields{
"component": "controller",
LogKeyController: "?",
})

type Controller interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"fmt"
)

var wfLog = log.WithField(LogKeyController, "wf")
var wfLog = log.WithField("controller", "controller-wf")

// WorkflowController is the controller concerned with the lifecycle of workflows. It handles responsibilities, such as
// parsing of workflows.
Expand Down
8 changes: 3 additions & 5 deletions pkg/fnenv/fission/httputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import (

"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/sirupsen/logrus"
)

func ParseRequest(r *http.Request, target map[string]*types.TypedValue) error {
contentType := r.Header.Get("Content-Type")
logrus.WithField("url", r.URL).WithField("content-type", contentType).Info("Request content-type")
log.WithField("url", r.URL).WithField("content-type", contentType).Info("Request content-type")
// Map Inputs to function parameters
body, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
Expand All @@ -27,18 +26,17 @@ func ParseRequest(r *http.Request, target map[string]*types.TypedValue) error {
if len(body) > 0 {
err = json.Unmarshal(body, &i)
if err != nil {
logrus.WithField("body", len(body)).Infof("Input is not json: %v", err)
log.WithField("body", len(body)).Debugf("Input is not json: %v", err)
i = body
}
}

parsedInput, err := typedvalues.Parse(i)
if err != nil {
logrus.Errorf("Failed to parse body: %v", err)
return errors.New("failed to parse body")
}

logrus.WithField(types.INPUT_MAIN, parsedInput).Info("Parsed body")
log.WithField(types.INPUT_MAIN, parsedInput).Info("Parsed body")
target[types.INPUT_MAIN] = parsedInput
return nil
}
9 changes: 5 additions & 4 deletions pkg/fnenv/fission/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package fission

import (
"github.com/fission/fission/controller/client"
"github.com/sirupsen/logrus"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// fission.Resolver implements the Resolver interface and is responsible for resolving function references to
// deterministic Fission function UIDs.

type Resolver struct {
controller *client.Client
}
Expand All @@ -16,7 +17,7 @@ func NewResolver(controller *client.Client) *Resolver {
}

func (re *Resolver) Resolve(fnName string) (string, error) {
logrus.WithField("name", fnName).Info("Resolving function ")
log.Infof("Resolving function: %s", fnName)
fn, err := re.controller.FunctionGet(&metav1.ObjectMeta{
Name: fnName,
Namespace: metav1.NamespaceDefault,
Expand All @@ -26,7 +27,7 @@ func (re *Resolver) Resolve(fnName string) (string, error) {
}
id := string(fn.Metadata.UID)

logrus.WithField("name", fnName).WithField("uid", id).Info("Resolved fission function")
log.Infof("Resolved fission function %s to %s", fnName, id)

return id, nil
}
Loading

0 comments on commit 920ed5a

Please sign in to comment.