Skip to content

Commit

Permalink
Reduced and clarified logging
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed May 15, 2018
1 parent acb60a6 commit c2264a9
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 14 deletions.
5 changes: 4 additions & 1 deletion pkg/api/function/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func (ap *Api) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, er
}
if err != nil {
// TODO improve error handling here (retries? internal or task related error?)
logrus.WithField("task", spec.InvocationId).Infof("ParseTask failed: %v", err)
logrus.WithField("fn", spec.FnRef).
WithField("wi", spec.InvocationId).
WithField("task", spec.TaskId).
Infof("Failed to invoke task: %v", err)
esErr := ap.es.Append(&fes.Event{
Type: events.Task_TASK_FAILED.String(),
Parent: aggregate,
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/invocation/actions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package invocation

import (
"errors"
"fmt"

"github.com/fission/fission-workflows/pkg/api/function"
Expand Down Expand Up @@ -45,11 +46,19 @@ type ActionFail struct {
func (a *ActionFail) Eval(cec controller.EvalContext) controller.Action {
ec := EnsureInvocationContext(cec)
a.InvocationId = ec.Invocation().Id()
if a.Err == nil {
if s, ok := ec.EvalState().Last(); ok {
a.Err = s.Error
}
}
if a.Err == nil {
a.Err = errors.New("unknown error has occurred")
}
return a
}

func (a *ActionFail) Apply() error {
wfiLog.Info("Applying action: fail")
wfiLog.Infof("Applying action: fail (%v)", a.Err)
return a.Api.Fail(a.InvocationId, a.Err)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/controller/invocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ func defaultPolicy(ctr *Controller) controller.Rule {
&controller.RuleExceededErrorCount{
OnExceeded: &ActionFail{
Api: ctr.invocationApi,
Err: errors.New("error count exceeded"),
},
MaxErrorCount: 0,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (el *RuleExceededErrorCount) Eval(ec EvalContext) Action {
}

if errorCount > el.MaxErrorCount {
logrus.Infof("Error count exceeded, evaluating %T.", el.OnExceeded)
logrus.Infof("Error count exceeded, evaluating %T", el.OnExceeded)
return evalIfNotNil(el.OnExceeded, ec)
}
return evalIfNotNil(el.OnNotExceeded, ec)
Expand Down
11 changes: 7 additions & 4 deletions pkg/fnenv/common/httpconv/httpconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func ParseBody(data io.Reader, contentType string) (types.TypedValue, error) {
case contentTypeTask:
fallthrough
case contentTypeWorkflow:
// TODO support json
// TODO support json-encoded workflow/task
var m proto.Message
err := proto.Unmarshal(bs, m)
if err != nil {
Expand Down Expand Up @@ -143,8 +143,7 @@ func FormatResponse(w http.ResponseWriter, output *types.TypedValue, outputErr *

if outputErr != nil {
// TODO provide different http codes based on error
w.Write([]byte(outputErr.Error()))
http.Error(w, outputErr.Message, http.StatusInternalServerError)
http.Error(w, outputErr.Error(), http.StatusInternalServerError)
return
}

Expand All @@ -160,7 +159,6 @@ func FormatResponse(w http.ResponseWriter, output *types.TypedValue, outputErr *
bs, err := FormatBody(*output, contentType)
if err != nil {
FormatResponse(w, nil, &types.Error{
Code: "500",
Message: fmt.Sprintf("Failed to format response body: %v", err),
})
}
Expand Down Expand Up @@ -318,6 +316,11 @@ func DetermineContentType(value *types.TypedValue) string {

// Otherwise, check for primitive types of the main input
switch typedvalues.ValueType(value.Type) {
// TODO task and workflow
case typedvalues.TypeMap:
fallthrough
case typedvalues.TypeList:
return contentTypeJson
case typedvalues.TypeNumber:
fallthrough
case typedvalues.TypeExpression:
Expand Down
3 changes: 3 additions & 0 deletions pkg/fnenv/native/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca
return &types.TaskInvocationStatus{
UpdatedAt: ptypes.TimestampNow(),
Status: types.TaskInvocationStatus_FAILED,
Error: &types.Error{
Message: err.Error(),
},
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/fnenv/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (ps *MetaResolver) Resolve(targetFn string) (types.FnRef, error) {
"err": err,
"runtime": cName,
"fn": targetFn,
}).Info("Function not found.")
}).Debug("Function not found.")
lastErr = err
} else {
resolved <- def
Expand Down
8 changes: 7 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ func (ws *WorkflowScheduler) Evaluate(request *ScheduleRequest) (*Schedule, erro
continue
}
if t.Invocation.Status.Status == types.TaskInvocationStatus_FAILED {

msg := fmt.Sprintf("Task '%v' failed", t.Invocation.Id())
if err := t.Invocation.GetStatus().GetError(); err != nil {
msg = err.Message
}

AbortActionAny, _ := ptypes.MarshalAny(&AbortAction{
Reason: fmt.Sprintf("taskContainer '%s' failed!", t.Invocation),
Reason: msg,
})

abortAction := &Action{
Expand Down
1 change: 0 additions & 1 deletion pkg/types/aggregates/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func (wi *WorkflowInvocation) ApplyEvent(event *fes.Event) error {
ivErr := &types.Error{}
err := proto.Unmarshal(event.Data, ivErr)
if err != nil {
ivErr.Code = "error"
ivErr.Message = err.Error()
log.Errorf("failed to unmarshal event: '%v' (%v)", event, err)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/types/aggregates/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func (wf *Workflow) ApplyEvent(event *fes.Event) error {
wfErr := &types.Error{}
err := proto.Unmarshal(event.Data, wfErr)
if err != nil {
wfErr.Code = "error"
wfErr.Message = err.Error()
log.Errorf("failed to unmarshal event: '%v' (%v)", event, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/types/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var taskFinalStates = []TaskInvocationStatus_Status{
// TypedValue
//

// Prints a short description of the value
// Prints a short description of the Value
func (tv TypedValue) Short() string {
var val string
if len(tv.Value) > typedValueShortMaxLen {
Expand Down
1 change: 0 additions & 1 deletion test/integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func SetupNatsCluster(ctx context.Context) fesnats.Config {
}
address := "127.0.0.1"
flags := strings.Split(fmt.Sprintf("-cid %s -p %d -a %s", clusterId, port, address), " ")
logrus.Info(flags)
cmd := exec.CommandContext(ctx, "nats-streaming-server", flags...)
stdOut, _ := cmd.StdoutPipe()
stdErr, _ := cmd.StderrPipe()
Expand Down

0 comments on commit c2264a9

Please sign in to comment.