From 8ff82ed278452fac681070026fdc6f15ff775d83 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Tue, 9 Oct 2018 14:49:08 +0200 Subject: [PATCH] Improved opentracing support - it adds a couple of additional spans to trace: controller evaluations, fission function calls, workflow calls. - it adds logs to spans, which include (in debug mode) all inputs, outputs and expressions. - it adds tags to the spans, including the function references, workflow ids, task ids, and statuses, to enable searching for specific tasks. - Adds documentation on how to use opentracing for local development. --- Docs/development.md | 20 +++++ cmd/fission-workflows-bundle/bundle/bundle.go | 14 +++- pkg/controller/actions.go | 9 +++ pkg/controller/evaluation.go | 28 +++---- pkg/controller/evaluation_test.go | 2 +- pkg/controller/invocation/actions.go | 74 ++++++++++++++++--- pkg/controller/invocation/controller.go | 32 +++++--- pkg/fnenv/fission/runtime.go | 8 ++ pkg/fnenv/native/native.go | 2 +- pkg/fnenv/workflows/workflows.go | 27 ++++++- pkg/parse/parser.go | 2 + 11 files changed, 175 insertions(+), 43 deletions(-) diff --git a/Docs/development.md b/Docs/development.md index 57055720..28517747 100644 --- a/Docs/development.md +++ b/Docs/development.md @@ -10,6 +10,26 @@ using [telepresence](https://telepresence.io): telepresence --method=vpn-tcp --namespace fission --swap-deployment workflows:workflows --expose 5555 --expose 8080 ``` +### Local OpenTracing + +The locally running instance does not have access to the in-cluster Jaeger deployment. To view the invocations, the +easiest option is to run a development all-in-one Jaeger deployment locally: + +```bash +docker run -d --rm --name jaeger \ + -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \ + -p 5775:5775/udp \ + -p 6831:6831/udp \ + -p 6832:6832/udp \ + -p 5778:5778 \ + -p 16686:16686 \ + -p 14268:14268 \ + -p 9411:9411 \ + jaegertracing/all-in-one:1.6 +``` + +You can then navigate to `http://localhost:16686` to access the Jaeger UI. + ## Testing To run local unit and integration tests: diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index 4d788f32..c91a9b96 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -155,14 +155,24 @@ func Run(ctx context.Context, opts *Options) error { var es fes.Backend var esPub pubsub.Publisher + var otOpts = []grpc_opentracing.Option{ + grpc_opentracing.SpanDecorator(func(span opentracing.Span, method string, req, resp interface{}, + grpcError error) { + span.SetTag("level", log.GetLevel().String()) + }), + } + if opts.Debug { + otOpts = append(otOpts, grpc_opentracing.LogPayloads()) + } + grpcServer := grpc.NewServer( grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_prometheus.StreamServerInterceptor, - grpc_opentracing.OpenTracingStreamServerInterceptor(tracer), + grpc_opentracing.OpenTracingStreamServerInterceptor(tracer, otOpts...), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_prometheus.UnaryServerInterceptor, - grpc_opentracing.OpenTracingServerInterceptor(tracer), + grpc_opentracing.OpenTracingServerInterceptor(tracer, otOpts...), )), ) diff --git a/pkg/controller/actions.go b/pkg/controller/actions.go index 2da02547..2dfd6f24 100644 --- a/pkg/controller/actions.go +++ b/pkg/controller/actions.go @@ -1,6 +1,7 @@ package controller import ( + "fmt" "sync" "sync/atomic" "time" @@ -98,3 +99,11 @@ func (a *MultiAction) Apply() error { } return err.(error) } + +func (a *MultiAction) String() string { + var results []string + for _, action := range a.Actions { + results = append(results, fmt.Sprintf("%+v", action)) + } + return fmt.Sprintf("%v", results) +} diff --git a/pkg/controller/evaluation.go b/pkg/controller/evaluation.go index 67d68b67..c37a4f94 100644 --- a/pkg/controller/evaluation.go +++ b/pkg/controller/evaluation.go @@ -8,8 +8,6 @@ import ( "time" "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - "github.com/opentracing/opentracing-go/log" "github.com/sirupsen/logrus" ) @@ -22,9 +20,9 @@ type EvalStore struct { mp sync.Map } -func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) *EvalState { - s, _ := e.mp.LoadOrStore(id, NewEvalState(id, spanCtx)) - return s.(*EvalState) +func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) (*EvalState, bool) { + s, ok := e.mp.LoadOrStore(id, NewEvalState(id, spanCtx)) + return s.(*EvalState), ok } func (e *EvalStore) Load(id string) (*EvalState, bool) { @@ -90,16 +88,15 @@ func NewEvalState(id string, spanCtx opentracing.SpanContext) *EvalState { log: EvalLog{}, id: id, evalLock: make(chan struct{}, 1), - span: opentracing.StartSpan("EvalState", opentracing.FollowsFrom(spanCtx)), + span: opentracing.StartSpan("/controller/eval", opentracing.FollowsFrom(spanCtx)), } - e.span.SetTag(string(ext.Component), "controller.workflow") - e.span.SetTag("workflow.id", id) + e.span.SetTag("id", id) e.Free() return e } -func (e *EvalState) Span() opentracing.SpanContext { - return e.span.Context() +func (e *EvalState) Span() opentracing.Span { + return e.span } func (e *EvalState) IsFinished() bool { @@ -122,10 +119,6 @@ func (e *EvalState) Finish(success bool, msg ...string) { e.finished = true } -func (e *EvalState) Log(fields ...log.Field) { - e.span.LogFields(fields...) -} - func (e *EvalState) Close() error { e.Finish(false, "controller closed") return nil @@ -195,6 +188,13 @@ func (e *EvalState) Logs() EvalLog { func (e *EvalState) Record(record EvalRecord) { e.dataLock.Lock() e.log.Record(record) + var eval interface{} + if record.Error != nil { + eval = fmt.Sprintf("error: %v", record.Error) + } else { + eval = fmt.Sprintf("action: %T - %+v", record.Action, record.Action) + } + e.Span().LogKV("evaluation", eval) e.dataLock.Unlock() } diff --git a/pkg/controller/evaluation_test.go b/pkg/controller/evaluation_test.go index 7bfc1d04..63a76982 100644 --- a/pkg/controller/evaluation_test.go +++ b/pkg/controller/evaluation_test.go @@ -129,7 +129,7 @@ func TestEvalCache_GetOrCreate(t *testing.T) { assert.False(t, ok) assert.Empty(t, es) - es = ec.LoadOrStore(id, nil) + es, _ = ec.LoadOrStore(id, nil) assert.Equal(t, id, es.ID()) es, ok = ec.Load(id) diff --git a/pkg/controller/invocation/actions.go b/pkg/controller/invocation/actions.go index 0adc00f4..0cf4f6e3 100644 --- a/pkg/controller/invocation/actions.go +++ b/pkg/controller/invocation/actions.go @@ -86,29 +86,64 @@ func (a *ActionInvokeTask) logger() logrus.FieldLogger { }) } +func (a *ActionInvokeTask) String() string { + return fmt.Sprintf("task/run(%s)", a.Task.GetId()) +} + func (a *ActionInvokeTask) Apply() error { log := a.logger() - span := opentracing.StartSpan("pkg/controller/invocation/actions/InvokeTask", - opentracing.ChildOf(a.ec.Span())) + span := opentracing.StartSpan(fmt.Sprintf("/task/%s", a.Task.GetId()), opentracing.ChildOf(a.ec.Span().Context())) + span.SetTag("task", a.Task.GetId()) + defer span.Finish() // Find task task, ok := types.GetTask(a.Wf, a.Wfi, a.Task.Id) if !ok { - return fmt.Errorf("task '%v' could not be found", a.Wfi.ID()) + err := fmt.Errorf("task '%v' could not be found", a.Wfi.ID()) + span.LogKV("error", err) + return err + } + + span.SetTag("fnref", task.GetStatus().GetFnRef()) + if logrus.GetLevel() == logrus.DebugLevel { + var err error + var inputs interface{} + inputs, err = typedvalues.UnwrapMapTypedValue(task.GetSpec().GetInputs()) + if err != nil { + inputs = fmt.Sprintf("error: %v", err) + } + span.LogKV("inputs", inputs) } // Check if function has been resolved if task.Status.FnRef == nil { - return fmt.Errorf("no resolved Task could be found for FunctionRef '%v'", task.Spec.FunctionRef) + err := fmt.Errorf("no resolved Task could be found for FunctionRef '%v'", task.Spec.FunctionRef) + span.LogKV("error", err) + return err } // Pre-execution: Resolve expression inputs - exprEvalStart := time.Now() - inputs, err := a.resolveInputs(a.Task.Inputs) - exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart))) - if err != nil { - log.Error(err) - return err + var inputs map[string]*typedvalues.TypedValue + if len(a.Task.Inputs) > 0 { + var err error + exprEvalStart := time.Now() + inputs, err = a.resolveInputs(a.Task.Inputs) + exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart))) + if err != nil { + log.Error(err) + span.LogKV("error", err) + return err + } + + if logrus.GetLevel() == logrus.DebugLevel { + var err error + var resolvedInputs interface{} + resolvedInputs, err = typedvalues.UnwrapMapTypedValue(inputs) + if err != nil { + resolvedInputs = fmt.Sprintf("error: %v", err) + } + span.LogKV("resolved_inputs", resolvedInputs) + } } // Invoke task @@ -127,13 +162,28 @@ func (a *ActionInvokeTask) Apply() error { log.Debugf("Using inputs: %v", i) } } + ctx := opentracing.ContextWithSpan(context.Background(), span) - _, err = a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer)) + updated, err := a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer)) if err != nil { log.Errorf("Failed to execute task: %v", err) + span.LogKV("error", err) return err } - span.Finish() + span.SetTag("status", updated.GetStatus().GetStatus().String()) + if !updated.GetStatus().Successful() { + span.LogKV("error", updated.GetStatus().GetError().String()) + } + if logrus.GetLevel() == logrus.DebugLevel { + var err error + var output interface{} + output, err = typedvalues.Unwrap(updated.GetStatus().GetOutput()) + if err != nil { + output = fmt.Sprintf("error: %v", err) + } + span.LogKV("output", output) + } + return nil } diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index 975bb417..906be4e0 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -138,22 +138,22 @@ func (cr *Controller) Notify(update *fes.Notification) error { return err } + if es, ok := cr.evalStore.Load(entity.ID()); ok { + es.Span().LogKV("event", fmt.Sprintf("%s - %v", update.EventType, update.Labels())) + } switch update.EventType { case events.EventInvocationCompleted: - fallthrough + cr.finishAndDeleteEvalState(entity.ID(), true, "completion reason: "+events.EventInvocationCompleted) case events.EventInvocationCanceled: - fallthrough + cr.finishAndDeleteEvalState(entity.ID(), false, "completion reason: "+events.EventInvocationCanceled) case events.EventInvocationFailed: - // TODO mark to clean up later instead - cr.stateStore.Delete(entity.ID()) - cr.evalStore.Delete(entity.ID()) - log.Debugf("Removed entity %v from eval state", entity.ID()) + cr.finishAndDeleteEvalState(entity.ID(), false, "completion reason: "+events.EventInvocationFailed) case events.EventTaskFailed: fallthrough case events.EventTaskSucceeded: fallthrough case events.EventInvocationCreated: - es := cr.evalStore.LoadOrStore(entity.ID(), update.SpanCtx) + es, _ := cr.evalStore.LoadOrStore(entity.ID(), update.SpanCtx) cr.workQueue.Add(es) default: log.Debugf("Controller ignored event type: %v", update.EventType) @@ -215,9 +215,10 @@ func (cr *Controller) checkModelCaches() error { } if !wi.Status.Finished() { - span := opentracing.GlobalTracer().StartSpan("recoverFromModelCache") + // TODO grab the span context from the model / events + span := opentracing.GlobalTracer().StartSpan("/controller/recoverFromModelCache") controller.EvalRecovered.WithLabelValues(Name, "cache").Inc() - es := cr.evalStore.LoadOrStore(wi.ID(), span.Context()) + es, _ := cr.evalStore.LoadOrStore(wi.ID(), span.Context()) cr.workQueue.Add(es) span.Finish() } @@ -295,7 +296,7 @@ func (cr *Controller) Evaluate(invocationID string) { controller.EvalDuration.WithLabelValues(Name, fmt.Sprintf("%T", action)).Observe(float64(time.Now().Sub(start))) if wfi.GetStatus().Finished() { - cr.evalStore.Delete(wfi.ID()) + cr.finishAndDeleteEvalState(wfi.ID(), true, "") t, _ := ptypes.Timestamp(wfi.GetMetadata().GetCreatedAt()) invocationDuration.Observe(float64(time.Now().Sub(t))) } @@ -349,6 +350,17 @@ func (cr *Controller) processNextItem(ctx context.Context, pool *gopool.GoPool) return true } +func (cr *Controller) finishAndDeleteEvalState(evalStateID string, success bool, msg string) { + es, ok := cr.evalStore.Load(evalStateID) + if !ok { + return + } + es.Finish(success, msg) + cr.evalStore.Delete(evalStateID) + cr.stateStore.Delete(evalStateID) + log.Debugf("Removed entity %v from eval state", evalStateID) +} + func defaultPolicy(ctr *Controller) controller.Rule { return &controller.RuleEvalUntilAction{ Rules: []controller.Rule{ diff --git a/pkg/fnenv/fission/runtime.go b/pkg/fnenv/fission/runtime.go index 920cfc68..73e881b1 100644 --- a/pkg/fnenv/fission/runtime.go +++ b/pkg/fnenv/fission/runtime.go @@ -60,10 +60,14 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo if err := validate.TaskInvocationSpec(spec); err != nil { return nil, err } + span, _ := opentracing.StartSpanFromContext(cfg.Ctx, "/fnenv/fission") + defer span.Finish() fnRef := *spec.FnRef + span.SetTag("fnref", fnRef.Format()) // Construct request and add body url := fe.createRouterURL(fnRef) + span.SetTag("url", url) req, err := http.NewRequest(defaultHTTPMethod, url, nil) if err != nil { panic(fmt.Errorf("failed to create request for '%v': %v", url, err)) @@ -96,11 +100,14 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo } fmt.Println(string(bs)) fmt.Println("--- HTTP Request end ---") + span.LogKV("HTTP request", string(bs)) } + span.LogKV("http", fmt.Sprintf("%s %v", req.Method, req.URL)) resp, err := http.DefaultClient.Do(req.WithContext(cfg.Ctx)) if err != nil { return nil, fmt.Errorf("error for reqUrl '%v': %v", url, err) } + span.LogKV("status code", resp.Status) fnenv.FnActive.WithLabelValues(Name).Dec() @@ -113,6 +120,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo } fmt.Println(string(bs)) fmt.Println("--- HTTP Response end ---") + span.LogKV("HTTP response", string(bs)) } // Parse output diff --git a/pkg/fnenv/native/native.go b/pkg/fnenv/native/native.go index fae926a5..9d841153 100644 --- a/pkg/fnenv/native/native.go +++ b/pkg/fnenv/native/native.go @@ -60,7 +60,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo if !ok { return nil, fmt.Errorf("could not resolve internal function '%s'", fnID) } - span, _ := opentracing.StartSpanFromContext(cfg.Ctx, fmt.Sprintf("fnenv/internal/fn/%s", fnID)) + span, _ := opentracing.StartSpanFromContext(cfg.Ctx, fmt.Sprintf("/fnenv/internal/%s", fnID)) defer span.Finish() fnenv.FnActive.WithLabelValues(Name).Inc() out, err := fn.Invoke(spec) diff --git a/pkg/fnenv/workflows/workflows.go b/pkg/fnenv/workflows/workflows.go index 8986fa01..ce58d704 100644 --- a/pkg/fnenv/workflows/workflows.go +++ b/pkg/fnenv/workflows/workflows.go @@ -26,6 +26,7 @@ import ( "github.com/fission/fission-workflows/pkg/types/validate" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" + "github.com/opentracing/opentracing-go" "github.com/sirupsen/logrus" ) @@ -85,12 +86,29 @@ func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec, opts ...fn return nil, err } + span, ctx := opentracing.StartSpanFromContext(cfg.Ctx, "/fnenv/workflows") + defer span.Finish() + span.SetTag("workflow", spec.GetWorkflowId()) + span.SetTag("parent", spec.GetParentId()) + span.SetTag("internal", len(spec.GetParentId()) != 0) + // Check if the workflow required by the invocation exists if rt.workflows != nil { - _, err := rt.workflows.GetWorkflow(spec.GetWorkflowId()) + wf, err := rt.workflows.GetWorkflow(spec.GetWorkflowId()) if err != nil { return nil, err } + span.SetTag("workflow.name", wf.GetMetadata().GetName()) + } + + if logrus.GetLevel() == logrus.DebugLevel { + var inputs interface{} + var err error + inputs, err = typedvalues.UnwrapMapTypedValue(spec.GetInputs()) + if err != nil { + inputs = fmt.Errorf("error: %v", err) + } + span.LogKV("inputs", inputs) } timeStart := time.Now() @@ -99,14 +117,16 @@ func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec, opts ...fn defer fnenv.FnActive.WithLabelValues(Name).Dec() defer fnenv.FnCount.WithLabelValues(Name).Inc() - wfiID, err := rt.api.Invoke(spec, api.WithContext(cfg.Ctx)) + wfiID, err := rt.api.Invoke(spec, api.WithContext(ctx)) if err != nil { logrus.WithField("fnenv", Name).Errorf("Failed to invoke workflow: %v", err) + span.LogKV("error", fmt.Errorf("failed to invoke workflow: %v", err)) return nil, err } logrus.WithField("fnenv", Name).Infof("Invoked workflow: %s", wfiID) + span.SetTag("invocation", wfiID) - timedCtx, cancelFn := context.WithTimeout(cfg.Ctx, rt.timeout) + timedCtx, cancelFn := context.WithTimeout(ctx, rt.timeout) defer cancelFn() if pub, ok := rt.invocations.CacheReader.(pubsub.Publisher); ok { sub := pub.Subscribe(pubsub.SubscriptionOptions{ @@ -138,6 +158,7 @@ func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec, opts ...fn } else { logrus.Errorf("Failed to cancel invocation: %v", err) } + span.LogKV("error", err) return nil, err case <-sub.Ch: } diff --git a/pkg/parse/parser.go b/pkg/parse/parser.go index 54d85b9d..6a709519 100644 --- a/pkg/parse/parser.go +++ b/pkg/parse/parser.go @@ -7,6 +7,7 @@ import ( "github.com/fission/fission-workflows/pkg/parse/protobuf" "github.com/fission/fission-workflows/pkg/parse/yaml" "github.com/fission/fission-workflows/pkg/types" + "github.com/sirupsen/logrus" ) var ( @@ -62,6 +63,7 @@ func (mp *MetaParser) ParseWith(r io.Reader, parsers ...string) (*types.Workflow } wf, err := p.Parse(r) if err != nil { + logrus.WithField("parser", name).Warnf("parser failed: %v", name, err) continue }