From 7ab7935611672fc7340b8c356520a83d050806f1 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Sun, 24 Apr 2022 20:56:24 -0400 Subject: [PATCH] tracing: introduce a StructuredEvent listener This change adds an option `WithEventListeners(...)` that allows for specifying an EventListener during span creation. An EventListener will be notified on every StructuredEvent recorded by the span and its children. The event listeners will be removed once the span is Finish()'ed and will no longer receive StructuredEvent notifications. Remote spans will notify the registerd event listeners when the remote span recording is imported into the span with the event listeners. The motivation for this change was to allow higher level aggregators to watch every event without relying on periodically pulling their root span's Recording. This way the aggregator can be sure not to miss "important" events that may be rotated out of the underlying ring buffer due to memory constraints. Informs: #80395 Release note: None --- pkg/util/tracing/bench_test.go | 77 +++++++++++++++++++++++--------- pkg/util/tracing/crdbspan.go | 45 +++++++++++++++++++ pkg/util/tracing/span.go | 20 ++++++++- pkg/util/tracing/span_options.go | 39 ++++++++++++++++ pkg/util/tracing/span_test.go | 66 +++++++++++++++++++++++++++ pkg/util/tracing/tracer.go | 7 ++- 6 files changed, 229 insertions(+), 25 deletions(-) diff --git a/pkg/util/tracing/bench_test.go b/pkg/util/tracing/bench_test.go index 567273eecfb6..0db8d10b772e 100644 --- a/pkg/util/tracing/bench_test.go +++ b/pkg/util/tracing/bench_test.go @@ -31,18 +31,26 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) { staticLogTags := logtags.Buffer{} staticLogTags.Add("foo", "bar") + mockListener := []EventListener{&mockEventListener{}} for _, tc := range []struct { - name string - defaultMode TracingMode - parent bool - opts []SpanOption + name string + defaultMode TracingMode + parent bool + withEventListener bool + opts []SpanOption }{ - {"none", TracingModeOnDemand, false, nil}, - {"real", TracingModeActiveSpansRegistry, false, nil}, - {"real,logtag", TracingModeActiveSpansRegistry, false, []SpanOption{WithLogTags(&staticLogTags)}}, - {"real,autoparent", TracingModeActiveSpansRegistry, true, nil}, - {"real,manualparent", TracingModeActiveSpansRegistry, true, []SpanOption{WithDetachedRecording()}}, + {name: "none", defaultMode: TracingModeOnDemand}, + {name: "real", defaultMode: TracingModeActiveSpansRegistry}, + {name: "real,logtag", defaultMode: TracingModeActiveSpansRegistry, + opts: []SpanOption{WithLogTags(&staticLogTags)}}, + {name: "real,autoparent", defaultMode: TracingModeActiveSpansRegistry, parent: true}, + {name: "real,manualparent", defaultMode: TracingModeActiveSpansRegistry, parent: true, + opts: []SpanOption{WithDetachedRecording()}}, + {name: "real,autoparent,withEventListener", defaultMode: TracingModeActiveSpansRegistry, + parent: true, withEventListener: true}, + {name: "real,manualparent,withEventListener", defaultMode: TracingModeActiveSpansRegistry, parent: true, + withEventListener: true, opts: []SpanOption{WithDetachedRecording()}}, } { b.Run(fmt.Sprintf("opts=%s", tc.name), func(b *testing.B) { tr := NewTracerWithOpt(ctx, @@ -53,7 +61,11 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) { var parent *Span var numOpts = len(tc.opts) if tc.parent { - parent = tr.StartSpan("one-off") + if tc.withEventListener { + parent = tr.StartSpan("one-off", WithEventListeners(mockListener)) + } else { + parent = tr.StartSpan("one-off") + } defer parent.Finish() numOpts++ } @@ -106,19 +118,40 @@ func BenchmarkSpan_GetRecording(b *testing.B) { func BenchmarkRecordingWithStructuredEvent(b *testing.B) { skip.UnderDeadlock(b, "span reuse triggers false-positives in the deadlock detector") - tr := NewTracerWithOpt(context.Background(), - WithTracingMode(TracingModeActiveSpansRegistry), - WithSpanReusePercent(100)) - ev := &types.Int32Value{Value: 5} - b.ReportAllocs() - for i := 0; i < b.N; i++ { - root := tr.StartSpan("foo", WithRecording(RecordingStructured)) - root.RecordStructured(ev) - child := tr.StartSpan("bar", WithParent(root)) - child.RecordStructured(ev) - child.Finish() - _ = root.FinishAndGetRecording(RecordingStructured) + mockListener := []EventListener{&mockEventListener{}} + + for _, tc := range []struct { + name string + withEventListener bool + }{ + {name: "with-event-listener", withEventListener: true}, + {name: "without-event-listener"}, + } { + b.Run(tc.name, func(b *testing.B) { + tr := NewTracerWithOpt(context.Background(), + WithTracingMode(TracingModeActiveSpansRegistry), + WithSpanReusePercent(100)) + + b.ReportAllocs() + for i := 0; i < b.N; i++ { + var root *Span + if tc.withEventListener { + root = tr.StartSpan("foo", WithRecording(RecordingStructured), + WithEventListeners(mockListener)) + } else { + root = tr.StartSpan("foo", WithRecording(RecordingStructured)) + } + + root.RecordStructured(ev) + + // The child span will also inherit the root span's event listener. + child := tr.StartSpan("bar", WithParent(root)) + child.RecordStructured(ev) + child.Finish() + _ = root.FinishAndGetRecording(RecordingStructured) + } + }) } } diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index e8f97768ee89..f4d8857ca919 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -124,6 +124,10 @@ type crdbSpanMu struct { // lazyTags are tags whose values are only string-ified on demand. Each lazy // tag is expected to implement either fmt.Stringer or LazyTag. lazyTags []lazyTag + + // eventListeners is a list of registered EventListener's that are notified + // whenever a Structured event is recorded by the span and its children. + eventListeners []EventListener } type lazyTag struct { @@ -143,6 +147,10 @@ type recordingState struct { // finished while this parent span's recording was not verbose. structured sizeLimitedBuffer + // notifyParentOnStructuredEvent is true if the span's parent has asked to be + // notified of every StructuredEvent recording on this span. + notifyParentOnStructuredEvent bool + // dropped is true if the span has capped out it's memory limits for // logs and structured events, and has had to drop some. It's used to // annotate recordings with the _dropped tag, when applicable. @@ -225,6 +233,7 @@ func (s *crdbSpan) finish() bool { return false } s.mu.finished = true + s.mu.eventListeners = nil if s.recordingType() != RecordingOff { duration := timeutil.Since(s.startTime) @@ -465,6 +474,14 @@ func (s *crdbSpan) recordFinishedChildren(childRecording Recording) { return } + // Notify the event listeners registered with s of the StructuredEvents on the + // children being added to s. + for _, span := range childRecording { + for _, record := range span.StructuredRecords { + s.notifyEventListeners(record.Payload) + } + } + s.mu.Lock() defer s.mu.Unlock() s.recordFinishedChildrenLocked(childRecording) @@ -547,6 +564,21 @@ func (s *crdbSpan) getLazyTagLocked(key string) (interface{}, bool) { return nil, false } +// notifyEventListeners recursively notifies all the EventListeners registered +// with this span and any ancestor spans in the Recording, of a StructuredEvent. +func (s *crdbSpan) notifyEventListeners(item Structured) { + s.mu.Lock() + defer s.mu.Unlock() + if s.mu.recording.notifyParentOnStructuredEvent { + parent := s.mu.parent.Span.i.crdb + parent.notifyEventListeners(item) + } + + for _, listener := range s.mu.eventListeners { + listener.Notify(item) + } +} + // record includes a log message in s' recording. func (s *crdbSpan) record(msg redact.RedactableString) { if s.recordingType() != RecordingVerbose { @@ -595,6 +627,10 @@ func (s *crdbSpan) recordStructured(item Structured) { Payload: p, } s.recordInternal(sr, &s.mu.recording.structured) + + // If there are any listener's registered with this span, notify them of the + // Structured event being recorded. + s.notifyEventListeners(item) } // memorySizable is implemented by log records and structured events for @@ -882,6 +918,8 @@ func (s *crdbSpan) parentFinished() { s.mu.Lock() defer s.mu.Unlock() s.mu.parent.release() + // Reset `notifyParentOnStructuredEvent` since s no longer has a parent. + s.mu.recording.notifyParentOnStructuredEvent = false } // visitOpenChildren calls the visitor for every open child. The receiver's lock @@ -918,6 +956,13 @@ func (s *crdbSpan) withLock(f func()) { f() } +// wantEventNotificationsLocked returns true if the span was created +// WithEventListeners(...) or the span has been configured to notify its parent +// span on a StructuredEvent recording. +func (s *crdbSpan) wantEventNotificationsLocked() bool { + return len(s.mu.eventListeners) != 0 || s.mu.recording.notifyParentOnStructuredEvent +} + // setGoroutineID updates the span's goroutine ID. func (s *crdbSpan) setGoroutineID(gid int64) { s.mu.Lock() diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index dbd7f3a19345..781663db408a 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -476,6 +476,19 @@ func (sp *Span) GetLazyTag(key string) (interface{}, bool) { return sp.i.GetLazyTag(key) } +// EventListener is an object that can be registered to listen for Structured +// events recorded by the span and its children. +type EventListener interface { + // Notify is invoked on every Structured event recorded by the span and its + // children, recursively. + // + // Note that this method should not run for a long time as it will hold up the + // span recording Structured events during traced operations. + // + // Notify will not be called concurrently on the same span. + Notify(event Structured) +} + // TraceID retrieves a span's trace ID. func (sp *Span) TraceID() tracingpb.TraceID { if sp.detectUseAfterFinish() { @@ -546,6 +559,7 @@ func (sp *Span) reset( goroutineID uint64, startTime time.Time, logTags *logtags.Buffer, + eventListeners []EventListener, kind oteltrace.SpanKind, otelSpan oteltrace.Span, netTr trace.Trace, @@ -625,6 +639,9 @@ func (sp *Span) reset( if c.mu.recording.logs.Len() != 0 { panic("unexpected logs in span being reset") } + if len(c.mu.eventListeners) != 0 { + panic(fmt.Sprintf("unexpected event listeners in span being reset: %v", c.mu.eventListeners)) + } h := sp.helper c.mu.crdbSpanMu = crdbSpanMu{ @@ -635,7 +652,8 @@ func (sp *Span) reset( logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */), structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]), }, - tags: h.tagsAlloc[:0], + tags: h.tagsAlloc[:0], + eventListeners: eventListeners, } if kind != oteltrace.SpanKindUnspecified { diff --git a/pkg/util/tracing/span_options.go b/pkg/util/tracing/span_options.go index 23d069bce29a..997c27d83e4d 100644 --- a/pkg/util/tracing/span_options.go +++ b/pkg/util/tracing/span_options.go @@ -75,6 +75,7 @@ type spanOptions struct { ForceRealSpan bool // see WithForceRealSpan SpanKind oteltrace.SpanKind // see WithSpanKind Sterile bool // see WithSterile + EventListeners []EventListener // see WithEventListeners // recordingTypeExplicit is set if the WithRecording() option was used. In // that case, spanOptions.recordingType() returns recordingTypeOpt below. If @@ -438,3 +439,41 @@ func (w withSterileOption) apply(opts spanOptions) spanOptions { opts.Sterile = true return opts } + +type eventListenersOption []EventListener + +var _ SpanOption = eventListenersOption{} + +func (ev eventListenersOption) apply(opts spanOptions) spanOptions { + // Applying an EventListener span option implies the span has at least + // `RecordingStructured` recording type. If the span explicitly specifies a + // `RecordingVerbose` recording type via the `WithRecording(...)` option, that + // will be respected instead. + if !opts.recordingTypeExplicit { + opts.recordingTypeExplicit = true + opts.recordingTypeOpt = RecordingStructured + } + eventListeners := ([]EventListener)(ev) + opts.EventListeners = eventListeners + return opts +} + +// WithEventListeners registers eventListeners to the span. The listeners are +// notified of Structured events recorded by the span and its children. Once the +// span is finished, the listeners are not notified of events any more even from +// surviving child spans. +// +// The listeners will also be notified of StructuredEvents recorded on remote +// spans, when the remote recording is imported by the span or one of its +// children. Note, the listeners will be notified of StructuredEvents in the +// imported remote recording out of order. +// +// WithEventListeners implies a `RecordingStructured` recording type for the +// span. If the recording type has been explicitly set to `RecordingVerbose` via +// the `WithRecording(...) option, that will be respected instead. +// +// The caller should not mutate `eventListeners` after calling +// WithEventListeners. +func WithEventListeners(eventListeners []EventListener) SpanOption { + return (eventListenersOption)(eventListeners) +} diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index aa85234ae7a0..c0dce3fc739b 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -700,3 +700,69 @@ func TestWithRemoteParentFromTraceInfo(t *testing.T) { otelCtx := sp.i.otelSpan.SpanContext() require.Equal(t, oteltrace.TraceID(otelTraceID), otelCtx.TraceID()) } + +type mockEventListener struct { + eventsSeen int +} + +func (f *mockEventListener) Notify(_ Structured) { + f.eventsSeen++ +} + +var _ EventListener = &mockEventListener{} + +func TestEventListener(t *testing.T) { + tr := NewTracer() + rootEventListener := &mockEventListener{} + sp := tr.StartSpan("root", WithRecording(RecordingStructured), + WithEventListeners([]EventListener{rootEventListener})) + + // Record a few Structured events. + sp.RecordStructured(&types.Int32Value{Value: 4}) + sp.RecordStructured(&types.Int32Value{Value: 5}) + require.Equal(t, 2, rootEventListener.eventsSeen) + + // Register another event listener on only the child span. + childEventListener := &mockEventListener{} + childSp := tr.StartSpan("child", WithParent(sp), + WithEventListeners([]EventListener{childEventListener})) + + childSp.RecordStructured(&types.Int32Value{Value: 6}) + childSp.RecordStructured(&types.Int32Value{Value: 7}) + require.Equal(t, 4, rootEventListener.eventsSeen) + require.Equal(t, 2, childEventListener.eventsSeen) + + // Record an event on the root span, and make sure we don't see it on the + // listener registered with the child span. + sp.RecordStructured(&types.Int32Value{Value: 8}) + require.Equal(t, 5, rootEventListener.eventsSeen) + require.Equal(t, 2, childEventListener.eventsSeen) + + // Finish the child span, and ensure the Structured events aren't re-seen by + // the listener when the child deposits them with the parent. + childSp.Finish() + require.Equal(t, 5, rootEventListener.eventsSeen) + + // Create a remote child, and the root listener should not be inherited. + remoteSp := tr.StartSpan("remote-child", WithRemoteParentFromSpanMeta(sp.Meta())) + remoteSp.RecordStructured(&types.Int32Value{Value: 9}) + require.Equal(t, 5, rootEventListener.eventsSeen) + + // But, when we import the recording in the root span, the root listener + // should see these events. + sp.ImportRemoteRecording(remoteSp.FinishAndGetConfiguredRecording()) + require.Equal(t, 6, rootEventListener.eventsSeen) + + // Create another child. + childSp2 := tr.StartSpan("child2", WithParent(sp)) + childSp2.RecordStructured(&types.Int32Value{Value: 10}) + require.Equal(t, 7, rootEventListener.eventsSeen) + + // Now Finish() the parent before the child and ensure that the root event + // listener does not see events from the child once the parent has been + // Finish()ed. + sp.Finish() + childSp2.RecordStructured(&types.Int32Value{Value: 11}) + require.Equal(t, 7, rootEventListener.eventsSeen) + childSp2.Finish() +} diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index fb6e47a1f977..d4024e8766e4 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -973,6 +973,7 @@ func (t *Tracer) newSpan( goroutineID uint64, startTime time.Time, logTags *logtags.Buffer, + eventListeners []EventListener, kind oteltrace.SpanKind, otelSpan oteltrace.Span, netTr trace.Trace, @@ -984,7 +985,7 @@ func (t *Tracer) newSpan( h := t.spanPool.Get().(*spanAllocHelper) h.span.reset( traceID, spanID, operation, goroutineID, - startTime, logTags, kind, + startTime, logTags, eventListeners, kind, otelSpan, netTr, sterile) return &h.span } @@ -1019,6 +1020,7 @@ func (t *Tracer) releaseSpanToPool(sp *Span) { c.mu.openChildren = nil c.mu.recording.finishedChildren = nil c.mu.tags = nil + c.mu.eventListeners = nil c.mu.recording.logs.Discard() c.mu.recording.structured.Discard() c.mu.Unlock() @@ -1182,7 +1184,7 @@ child operation: %s, tracer created at: s := t.newSpan( traceID, spanID, opName, uint64(goid.Get()), - startTime, opts.LogTags, opts.SpanKind, + startTime, opts.LogTags, opts.EventListeners, opts.SpanKind, otelSpan, netTr, opts.Sterile) s.i.crdb.enableRecording(opts.recordingType()) @@ -1221,6 +1223,7 @@ child operation: %s, tracer created at: // parent of the child finish). Note that some methods on opts cannot // be used from this moment on. s.i.crdb.mu.parent = opts.Parent.move() + s.i.crdb.mu.recording.notifyParentOnStructuredEvent = parent.wantEventNotificationsLocked() } else { // The parent has already finished, so make this "child" a root. localRoot = true