diff --git a/pkg/api/function/api.go b/pkg/api/function/api.go index 9bbf4593..39d695b4 100644 --- a/pkg/api/function/api.go +++ b/pkg/api/function/api.go @@ -71,7 +71,10 @@ func (ap *Api) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, er } if err != nil { // TODO improve error handling here (retries? internal or task related error?) - logrus.WithField("task", spec.InvocationId).Infof("ParseTask failed: %v", err) + logrus.WithField("fn", spec.FnRef). + WithField("wi", spec.InvocationId). + WithField("task", spec.TaskId). + Infof("Failed to invoke task: %v", err) esErr := ap.es.Append(&fes.Event{ Type: events.Task_TASK_FAILED.String(), Parent: aggregate, diff --git a/pkg/controller/invocation/actions.go b/pkg/controller/invocation/actions.go index d3c233b2..787c8a8e 100644 --- a/pkg/controller/invocation/actions.go +++ b/pkg/controller/invocation/actions.go @@ -1,6 +1,7 @@ package invocation import ( + "errors" "fmt" "github.com/fission/fission-workflows/pkg/api/function" @@ -45,11 +46,19 @@ type ActionFail struct { func (a *ActionFail) Eval(cec controller.EvalContext) controller.Action { ec := EnsureInvocationContext(cec) a.InvocationId = ec.Invocation().Id() + if a.Err == nil { + if s, ok := ec.EvalState().Last(); ok { + a.Err = s.Error + } + } + if a.Err == nil { + a.Err = errors.New("unknown error has occurred") + } return a } func (a *ActionFail) Apply() error { - wfiLog.Info("Applying action: fail") + wfiLog.Infof("Applying action: fail (%v)", a.Err) return a.Api.Fail(a.InvocationId, a.Err) } diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index 8c0215e1..9ae246b5 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -310,7 +310,6 @@ func defaultPolicy(ctr *Controller) controller.Rule { &controller.RuleExceededErrorCount{ OnExceeded: &ActionFail{ Api: ctr.invocationApi, - Err: errors.New("error count exceeded"), }, MaxErrorCount: 0, }, diff --git a/pkg/controller/rules.go b/pkg/controller/rules.go index 5ec502ae..717c2ba7 100644 --- a/pkg/controller/rules.go +++ b/pkg/controller/rules.go @@ -63,7 +63,7 @@ func (el *RuleExceededErrorCount) Eval(ec EvalContext) Action { } if errorCount > el.MaxErrorCount { - logrus.Infof("Error count exceeded, evaluating %T.", el.OnExceeded) + logrus.Infof("Error count exceeded, evaluating %T", el.OnExceeded) return evalIfNotNil(el.OnExceeded, ec) } return evalIfNotNil(el.OnNotExceeded, ec) diff --git a/pkg/fnenv/common/httpconv/httpconv.go b/pkg/fnenv/common/httpconv/httpconv.go index 2de8c2f2..e1b9efe0 100644 --- a/pkg/fnenv/common/httpconv/httpconv.go +++ b/pkg/fnenv/common/httpconv/httpconv.go @@ -91,7 +91,7 @@ func ParseBody(data io.Reader, contentType string) (types.TypedValue, error) { case contentTypeTask: fallthrough case contentTypeWorkflow: - // TODO support json + // TODO support json-encoded workflow/task var m proto.Message err := proto.Unmarshal(bs, m) if err != nil { @@ -143,8 +143,7 @@ func FormatResponse(w http.ResponseWriter, output *types.TypedValue, outputErr * if outputErr != nil { // TODO provide different http codes based on error - w.Write([]byte(outputErr.Error())) - http.Error(w, outputErr.Message, http.StatusInternalServerError) + http.Error(w, outputErr.Error(), http.StatusInternalServerError) return } @@ -160,7 +159,6 @@ func FormatResponse(w http.ResponseWriter, output *types.TypedValue, outputErr * bs, err := FormatBody(*output, contentType) if err != nil { FormatResponse(w, nil, &types.Error{ - Code: "500", Message: fmt.Sprintf("Failed to format response body: %v", err), }) } @@ -318,6 +316,11 @@ func DetermineContentType(value *types.TypedValue) string { // Otherwise, check for primitive types of the main input switch typedvalues.ValueType(value.Type) { + // TODO task and workflow + case typedvalues.TypeMap: + fallthrough + case typedvalues.TypeList: + return contentTypeJson case typedvalues.TypeNumber: fallthrough case typedvalues.TypeExpression: diff --git a/pkg/fnenv/native/native.go b/pkg/fnenv/native/native.go index 1880c6d4..1ff0709d 100644 --- a/pkg/fnenv/native/native.go +++ b/pkg/fnenv/native/native.go @@ -64,6 +64,9 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca return &types.TaskInvocationStatus{ UpdatedAt: ptypes.TimestampNow(), Status: types.TaskInvocationStatus_FAILED, + Error: &types.Error{ + Message: err.Error(), + }, }, nil } diff --git a/pkg/fnenv/resolver.go b/pkg/fnenv/resolver.go index 808ba95c..2662b6e9 100644 --- a/pkg/fnenv/resolver.go +++ b/pkg/fnenv/resolver.go @@ -65,7 +65,7 @@ func (ps *MetaResolver) Resolve(targetFn string) (types.FnRef, error) { "err": err, "runtime": cName, "fn": targetFn, - }).Info("Function not found.") + }).Debug("Function not found.") lastErr = err } else { resolved <- def diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 0b6b5cf0..1a5421ab 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -36,8 +36,14 @@ func (ws *WorkflowScheduler) Evaluate(request *ScheduleRequest) (*Schedule, erro continue } if t.Invocation.Status.Status == types.TaskInvocationStatus_FAILED { + + msg := fmt.Sprintf("Task '%v' failed", t.Invocation.Id()) + if err := t.Invocation.GetStatus().GetError(); err != nil { + msg = err.Message + } + AbortActionAny, _ := ptypes.MarshalAny(&AbortAction{ - Reason: fmt.Sprintf("taskContainer '%s' failed!", t.Invocation), + Reason: msg, }) abortAction := &Action{ diff --git a/pkg/types/aggregates/invocation.go b/pkg/types/aggregates/invocation.go index d8556392..2a7a10a0 100644 --- a/pkg/types/aggregates/invocation.go +++ b/pkg/types/aggregates/invocation.go @@ -75,7 +75,6 @@ func (wi *WorkflowInvocation) ApplyEvent(event *fes.Event) error { ivErr := &types.Error{} err := proto.Unmarshal(event.Data, ivErr) if err != nil { - ivErr.Code = "error" ivErr.Message = err.Error() log.Errorf("failed to unmarshal event: '%v' (%v)", event, err) } diff --git a/pkg/types/aggregates/workflow.go b/pkg/types/aggregates/workflow.go index 918547dc..3e4b73fa 100644 --- a/pkg/types/aggregates/workflow.go +++ b/pkg/types/aggregates/workflow.go @@ -44,7 +44,6 @@ func (wf *Workflow) ApplyEvent(event *fes.Event) error { wfErr := &types.Error{} err := proto.Unmarshal(event.Data, wfErr) if err != nil { - wfErr.Code = "error" wfErr.Message = err.Error() log.Errorf("failed to unmarshal event: '%v' (%v)", event, err) } diff --git a/pkg/types/extensions.go b/pkg/types/extensions.go index d5b2b8b5..2ab9a593 100644 --- a/pkg/types/extensions.go +++ b/pkg/types/extensions.go @@ -38,7 +38,7 @@ var taskFinalStates = []TaskInvocationStatus_Status{ // TypedValue // -// Prints a short description of the value +// Prints a short description of the Value func (tv TypedValue) Short() string { var val string if len(tv.Value) > typedValueShortMaxLen { diff --git a/test/integration/util.go b/test/integration/util.go index f9dd7231..34bd029e 100644 --- a/test/integration/util.go +++ b/test/integration/util.go @@ -53,7 +53,6 @@ func SetupNatsCluster(ctx context.Context) fesnats.Config { } address := "127.0.0.1" flags := strings.Split(fmt.Sprintf("-cid %s -p %d -a %s", clusterId, port, address), " ") - logrus.Info(flags) cmd := exec.CommandContext(ctx, "nats-streaming-server", flags...) stdOut, _ := cmd.StdoutPipe() stdErr, _ := cmd.StderrPipe()