Skip to content

Commit

Permalink
Merge pull request #222 from fission/opentracing-2
Browse files Browse the repository at this point in the history
Improved opentracing support
  • Loading branch information
erwinvaneyk authored Oct 12, 2018
2 parents d65f316 + 8ff82ed commit 82d9a77
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 43 deletions.
20 changes: 20 additions & 0 deletions Docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ using [telepresence](https://telepresence.io):
telepresence --method=vpn-tcp --namespace fission --swap-deployment workflows:workflows --expose 5555 --expose 8080
```

### Local OpenTracing

The locally running instance does not have access to the in-cluster Jaeger deployment. To view the invocations, the
easiest option is to run a development all-in-one Jaeger deployment locally:

```bash
docker run -d --rm --name jaeger \
-e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 9411:9411 \
jaegertracing/all-in-one:1.6
```

You can then navigate to `http://localhost:16686` to access the Jaeger UI.

## Testing

To run local unit and integration tests:
Expand Down
14 changes: 12 additions & 2 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,24 @@ func Run(ctx context.Context, opts *Options) error {
var es fes.Backend
var esPub pubsub.Publisher

var otOpts = []grpc_opentracing.Option{
grpc_opentracing.SpanDecorator(func(span opentracing.Span, method string, req, resp interface{},
grpcError error) {
span.SetTag("level", log.GetLevel().String())
}),
}
if opts.Debug {
otOpts = append(otOpts, grpc_opentracing.LogPayloads())
}

grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_prometheus.StreamServerInterceptor,
grpc_opentracing.OpenTracingStreamServerInterceptor(tracer),
grpc_opentracing.OpenTracingStreamServerInterceptor(tracer, otOpts...),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_prometheus.UnaryServerInterceptor,
grpc_opentracing.OpenTracingServerInterceptor(tracer),
grpc_opentracing.OpenTracingServerInterceptor(tracer, otOpts...),
)),
)

Expand Down
9 changes: 9 additions & 0 deletions pkg/controller/actions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -98,3 +99,11 @@ func (a *MultiAction) Apply() error {
}
return err.(error)
}

func (a *MultiAction) String() string {
var results []string
for _, action := range a.Actions {
results = append(results, fmt.Sprintf("%+v", action))
}
return fmt.Sprintf("%v", results)
}
28 changes: 14 additions & 14 deletions pkg/controller/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/sirupsen/logrus"
)

Expand All @@ -22,9 +20,9 @@ type EvalStore struct {
mp sync.Map
}

func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) *EvalState {
s, _ := e.mp.LoadOrStore(id, NewEvalState(id, spanCtx))
return s.(*EvalState)
func (e *EvalStore) LoadOrStore(id string, spanCtx opentracing.SpanContext) (*EvalState, bool) {
s, ok := e.mp.LoadOrStore(id, NewEvalState(id, spanCtx))
return s.(*EvalState), ok
}

func (e *EvalStore) Load(id string) (*EvalState, bool) {
Expand Down Expand Up @@ -90,16 +88,15 @@ func NewEvalState(id string, spanCtx opentracing.SpanContext) *EvalState {
log: EvalLog{},
id: id,
evalLock: make(chan struct{}, 1),
span: opentracing.StartSpan("EvalState", opentracing.FollowsFrom(spanCtx)),
span: opentracing.StartSpan("/controller/eval", opentracing.FollowsFrom(spanCtx)),
}
e.span.SetTag(string(ext.Component), "controller.workflow")
e.span.SetTag("workflow.id", id)
e.span.SetTag("id", id)
e.Free()
return e
}

func (e *EvalState) Span() opentracing.SpanContext {
return e.span.Context()
func (e *EvalState) Span() opentracing.Span {
return e.span
}

func (e *EvalState) IsFinished() bool {
Expand All @@ -122,10 +119,6 @@ func (e *EvalState) Finish(success bool, msg ...string) {
e.finished = true
}

func (e *EvalState) Log(fields ...log.Field) {
e.span.LogFields(fields...)
}

func (e *EvalState) Close() error {
e.Finish(false, "controller closed")
return nil
Expand Down Expand Up @@ -195,6 +188,13 @@ func (e *EvalState) Logs() EvalLog {
func (e *EvalState) Record(record EvalRecord) {
e.dataLock.Lock()
e.log.Record(record)
var eval interface{}
if record.Error != nil {
eval = fmt.Sprintf("error: %v", record.Error)
} else {
eval = fmt.Sprintf("action: %T - %+v", record.Action, record.Action)
}
e.Span().LogKV("evaluation", eval)
e.dataLock.Unlock()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/evaluation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestEvalCache_GetOrCreate(t *testing.T) {
assert.False(t, ok)
assert.Empty(t, es)

es = ec.LoadOrStore(id, nil)
es, _ = ec.LoadOrStore(id, nil)
assert.Equal(t, id, es.ID())

es, ok = ec.Load(id)
Expand Down
74 changes: 62 additions & 12 deletions pkg/controller/invocation/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,64 @@ func (a *ActionInvokeTask) logger() logrus.FieldLogger {
})
}

func (a *ActionInvokeTask) String() string {
return fmt.Sprintf("task/run(%s)", a.Task.GetId())
}

func (a *ActionInvokeTask) Apply() error {
log := a.logger()
span := opentracing.StartSpan("pkg/controller/invocation/actions/InvokeTask",
opentracing.ChildOf(a.ec.Span()))
span := opentracing.StartSpan(fmt.Sprintf("/task/%s", a.Task.GetId()), opentracing.ChildOf(a.ec.Span().Context()))
span.SetTag("task", a.Task.GetId())
defer span.Finish()

// Find task
task, ok := types.GetTask(a.Wf, a.Wfi, a.Task.Id)
if !ok {
return fmt.Errorf("task '%v' could not be found", a.Wfi.ID())
err := fmt.Errorf("task '%v' could not be found", a.Wfi.ID())
span.LogKV("error", err)
return err
}

span.SetTag("fnref", task.GetStatus().GetFnRef())
if logrus.GetLevel() == logrus.DebugLevel {
var err error
var inputs interface{}
inputs, err = typedvalues.UnwrapMapTypedValue(task.GetSpec().GetInputs())
if err != nil {
inputs = fmt.Sprintf("error: %v", err)
}
span.LogKV("inputs", inputs)
}

// Check if function has been resolved
if task.Status.FnRef == nil {
return fmt.Errorf("no resolved Task could be found for FunctionRef '%v'", task.Spec.FunctionRef)
err := fmt.Errorf("no resolved Task could be found for FunctionRef '%v'", task.Spec.FunctionRef)
span.LogKV("error", err)
return err
}

// Pre-execution: Resolve expression inputs
exprEvalStart := time.Now()
inputs, err := a.resolveInputs(a.Task.Inputs)
exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart)))
if err != nil {
log.Error(err)
return err
var inputs map[string]*typedvalues.TypedValue
if len(a.Task.Inputs) > 0 {
var err error
exprEvalStart := time.Now()
inputs, err = a.resolveInputs(a.Task.Inputs)
exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart)))
if err != nil {
log.Error(err)
span.LogKV("error", err)
return err
}

if logrus.GetLevel() == logrus.DebugLevel {
var err error
var resolvedInputs interface{}
resolvedInputs, err = typedvalues.UnwrapMapTypedValue(inputs)
if err != nil {
resolvedInputs = fmt.Sprintf("error: %v", err)
}
span.LogKV("resolved_inputs", resolvedInputs)
}
}

// Invoke task
Expand All @@ -127,13 +162,28 @@ func (a *ActionInvokeTask) Apply() error {
log.Debugf("Using inputs: %v", i)
}
}

ctx := opentracing.ContextWithSpan(context.Background(), span)
_, err = a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer))
updated, err := a.API.Invoke(spec, api.WithContext(ctx), api.PostTransformer(a.postTransformer))
if err != nil {
log.Errorf("Failed to execute task: %v", err)
span.LogKV("error", err)
return err
}
span.Finish()
span.SetTag("status", updated.GetStatus().GetStatus().String())
if !updated.GetStatus().Successful() {
span.LogKV("error", updated.GetStatus().GetError().String())
}
if logrus.GetLevel() == logrus.DebugLevel {
var err error
var output interface{}
output, err = typedvalues.Unwrap(updated.GetStatus().GetOutput())
if err != nil {
output = fmt.Sprintf("error: %v", err)
}
span.LogKV("output", output)
}

return nil
}

Expand Down
32 changes: 22 additions & 10 deletions pkg/controller/invocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,22 @@ func (cr *Controller) Notify(update *fes.Notification) error {
return err
}

if es, ok := cr.evalStore.Load(entity.ID()); ok {
es.Span().LogKV("event", fmt.Sprintf("%s - %v", update.EventType, update.Labels()))
}
switch update.EventType {
case events.EventInvocationCompleted:
fallthrough
cr.finishAndDeleteEvalState(entity.ID(), true, "completion reason: "+events.EventInvocationCompleted)
case events.EventInvocationCanceled:
fallthrough
cr.finishAndDeleteEvalState(entity.ID(), false, "completion reason: "+events.EventInvocationCanceled)
case events.EventInvocationFailed:
// TODO mark to clean up later instead
cr.stateStore.Delete(entity.ID())
cr.evalStore.Delete(entity.ID())
log.Debugf("Removed entity %v from eval state", entity.ID())
cr.finishAndDeleteEvalState(entity.ID(), false, "completion reason: "+events.EventInvocationFailed)
case events.EventTaskFailed:
fallthrough
case events.EventTaskSucceeded:
fallthrough
case events.EventInvocationCreated:
es := cr.evalStore.LoadOrStore(entity.ID(), update.SpanCtx)
es, _ := cr.evalStore.LoadOrStore(entity.ID(), update.SpanCtx)
cr.workQueue.Add(es)
default:
log.Debugf("Controller ignored event type: %v", update.EventType)
Expand Down Expand Up @@ -215,9 +215,10 @@ func (cr *Controller) checkModelCaches() error {
}

if !wi.Status.Finished() {
span := opentracing.GlobalTracer().StartSpan("recoverFromModelCache")
// TODO grab the span context from the model / events
span := opentracing.GlobalTracer().StartSpan("/controller/recoverFromModelCache")
controller.EvalRecovered.WithLabelValues(Name, "cache").Inc()
es := cr.evalStore.LoadOrStore(wi.ID(), span.Context())
es, _ := cr.evalStore.LoadOrStore(wi.ID(), span.Context())
cr.workQueue.Add(es)
span.Finish()
}
Expand Down Expand Up @@ -295,7 +296,7 @@ func (cr *Controller) Evaluate(invocationID string) {

controller.EvalDuration.WithLabelValues(Name, fmt.Sprintf("%T", action)).Observe(float64(time.Now().Sub(start)))
if wfi.GetStatus().Finished() {
cr.evalStore.Delete(wfi.ID())
cr.finishAndDeleteEvalState(wfi.ID(), true, "")
t, _ := ptypes.Timestamp(wfi.GetMetadata().GetCreatedAt())
invocationDuration.Observe(float64(time.Now().Sub(t)))
}
Expand Down Expand Up @@ -349,6 +350,17 @@ func (cr *Controller) processNextItem(ctx context.Context, pool *gopool.GoPool)
return true
}

func (cr *Controller) finishAndDeleteEvalState(evalStateID string, success bool, msg string) {
es, ok := cr.evalStore.Load(evalStateID)
if !ok {
return
}
es.Finish(success, msg)
cr.evalStore.Delete(evalStateID)
cr.stateStore.Delete(evalStateID)
log.Debugf("Removed entity %v from eval state", evalStateID)
}

func defaultPolicy(ctr *Controller) controller.Rule {
return &controller.RuleEvalUntilAction{
Rules: []controller.Rule{
Expand Down
8 changes: 8 additions & 0 deletions pkg/fnenv/fission/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo
if err := validate.TaskInvocationSpec(spec); err != nil {
return nil, err
}
span, _ := opentracing.StartSpanFromContext(cfg.Ctx, "/fnenv/fission")
defer span.Finish()
fnRef := *spec.FnRef
span.SetTag("fnref", fnRef.Format())

// Construct request and add body
url := fe.createRouterURL(fnRef)
span.SetTag("url", url)
req, err := http.NewRequest(defaultHTTPMethod, url, nil)
if err != nil {
panic(fmt.Errorf("failed to create request for '%v': %v", url, err))
Expand Down Expand Up @@ -96,11 +100,14 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo
}
fmt.Println(string(bs))
fmt.Println("--- HTTP Request end ---")
span.LogKV("HTTP request", string(bs))
}
span.LogKV("http", fmt.Sprintf("%s %v", req.Method, req.URL))
resp, err := http.DefaultClient.Do(req.WithContext(cfg.Ctx))
if err != nil {
return nil, fmt.Errorf("error for reqUrl '%v': %v", url, err)
}
span.LogKV("status code", resp.Status)

fnenv.FnActive.WithLabelValues(Name).Dec()

Expand All @@ -113,6 +120,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo
}
fmt.Println(string(bs))
fmt.Println("--- HTTP Response end ---")
span.LogKV("HTTP response", string(bs))
}

// Parse output
Expand Down
2 changes: 1 addition & 1 deletion pkg/fnenv/native/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec, opts ...fnenv.Invo
if !ok {
return nil, fmt.Errorf("could not resolve internal function '%s'", fnID)
}
span, _ := opentracing.StartSpanFromContext(cfg.Ctx, fmt.Sprintf("fnenv/internal/fn/%s", fnID))
span, _ := opentracing.StartSpanFromContext(cfg.Ctx, fmt.Sprintf("/fnenv/internal/%s", fnID))
defer span.Finish()
fnenv.FnActive.WithLabelValues(Name).Inc()
out, err := fn.Invoke(spec)
Expand Down
Loading

0 comments on commit 82d9a77

Please sign in to comment.