From bd66e9ab887a87e35091fe9379dd427bd60c3f44 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Sun, 24 Apr 2022 20:56:24 -0400 Subject: [PATCH] tracing: introduce a StructuredEvent listener This change extends the tracing API to support registering and unregistering an EventListener on a span. The registered EventListener will be notified about every StructuredEvent recorded by the span. EventListeners registered with a span are inherited by the span's local children. The motivation for this change was to allow higher level aggregators to watch every event without relying on periodically pulling their root span's Recording. This way the aggregator can be sure not to miss "important" events that may be rotated out of the underlying ring buffer due to memory constraints. Note, event listeners can only be registered, and inherited by spans with a RecordingStructured or RecordingVerbose recording type. Informs: #80395 Release note: None --- pkg/util/tracing/crdbspan.go | 38 ++++++++++++++++++++ pkg/util/tracing/span.go | 33 ++++++++++++++++- pkg/util/tracing/span_inner.go | 9 +++++ pkg/util/tracing/span_options.go | 10 ++++++ pkg/util/tracing/span_test.go | 62 ++++++++++++++++++++++++++++++++ pkg/util/tracing/tracer.go | 7 ++-- 6 files changed, 156 insertions(+), 3 deletions(-) diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index 72f57e64a79e..2756081df066 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -124,6 +124,12 @@ type crdbSpanMu struct { // lazyTags are tags whose values are only string-ified on demand. Each lazy // tag is expected to implement either fmt.Stringer or LazyTag. lazyTags []lazyTag + + // eventListeners is a list of registered EventListener's that are notified + // whenever a Structured event is recorded by the span. + // + // Child spans will inherit this list of EventListeners. + eventListeners []EventListener } type lazyTag struct { @@ -465,6 +471,14 @@ func (s *crdbSpan) recordFinishedChildren(children []tracingpb.RecordedSpan) { return } + // Notify the event listeners registered with s of the StructuredEvents on the + // children being added to s. + for _, span := range children { + for _, record := range span.StructuredRecords { + s.notifyEventListeners(record.Payload) + } + } + s.mu.Lock() defer s.mu.Unlock() s.recordFinishedChildrenLocked(children) @@ -533,6 +547,26 @@ func (s *crdbSpan) getLazyTagLocked(key string) (interface{}, bool) { return nil, false } +// registerEventListenerLocked registers an EventListener to listen for +// Structured events recorded by the span. +func (s *crdbSpan) registerEventListenerLocked(listener EventListener) { + if s.recordingType() == RecordingOff { + return + } + s.mu.eventListeners = append(s.mu.eventListeners, listener) +} + +// notifyEventListeners notifies all the EventListeners registered with this +// span about the recorded Structured item. +func (s *crdbSpan) notifyEventListeners(item Structured) { + s.mu.Lock() + defer s.mu.Unlock() + + for _, listener := range s.mu.eventListeners { + listener.Notify(item) + } +} + // record includes a log message in s' recording. func (s *crdbSpan) record(msg redact.RedactableString) { if s.recordingType() != RecordingVerbose { @@ -581,6 +615,10 @@ func (s *crdbSpan) recordStructured(item Structured) { Payload: p, } s.recordInternal(sr, &s.mu.recording.structured) + + // If there are any listener's registered with this span, notify them of the + // Structured event being recorded. + s.notifyEventListeners(item) } // memorySizable is implemented by log records and structured events for diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index f4924242b994..6c49329c2904 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -474,6 +474,32 @@ func (sp *Span) GetLazyTag(key string) (interface{}, bool) { return sp.i.GetLazyTag(key) } +// EventListener is an object that can be registered to listen for Structured +// events recorded by the span. +type EventListener interface { + // Notify is invoked on every Structured event recorded by the span. + // + // Note that this method should not run for a long time as it will hold up the + // span recording Structured events during traced operations. + // + // Notify will not be called concurrently on the same span. However, since + // EventListeners are inherited by child spans, Notify can be called + // concurrently on different spans. + Notify(event Structured) +} + +// RegisterEventListener adds an EventListener to the span. The listener is +// notified of Structured events recorded by the span and its children. +// +// The listener will also be notified of StructuredEvents recorded on remote +// spans, when the Recording is imported using sp.ImportRemoteSpans. +func (sp *Span) RegisterEventListener(listener EventListener) { + if sp.detectUseAfterFinish() { + return + } + sp.i.RegisterEventListener(listener) +} + // TraceID retrieves a span's trace ID. func (sp *Span) TraceID() tracingpb.TraceID { if sp.detectUseAfterFinish() { @@ -544,6 +570,7 @@ func (sp *Span) reset( goroutineID uint64, startTime time.Time, logTags *logtags.Buffer, + eventListeners []EventListener, kind oteltrace.SpanKind, otelSpan oteltrace.Span, netTr trace.Trace, @@ -623,6 +650,9 @@ func (sp *Span) reset( if c.mu.recording.logs.Len() != 0 { panic("unexpected logs in span being reset") } + if len(c.mu.eventListeners) != 0 { + panic(fmt.Sprintf("unexpected event listeners in span being reset: %v", c.mu.eventListeners)) + } h := sp.helper c.mu.crdbSpanMu = crdbSpanMu{ @@ -633,7 +663,8 @@ func (sp *Span) reset( logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */), structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]), }, - tags: h.tagsAlloc[:0], + tags: h.tagsAlloc[:0], + eventListeners: eventListeners, } if kind != oteltrace.SpanKindUnspecified { diff --git a/pkg/util/tracing/span_inner.go b/pkg/util/tracing/span_inner.go index 3e5b3c1bc7f8..d745fc210e8b 100644 --- a/pkg/util/tracing/span_inner.go +++ b/pkg/util/tracing/span_inner.go @@ -191,6 +191,15 @@ func (s *spanInner) GetLazyTag(key string) (interface{}, bool) { return s.crdb.getLazyTagLocked(key) } +func (s *spanInner) RegisterEventListener(listener EventListener) { + if s.isNoop() { + return + } + s.crdb.mu.Lock() + defer s.crdb.mu.Unlock() + s.crdb.registerEventListenerLocked(listener) +} + func (s *spanInner) RecordStructured(item Structured) { if s.isNoop() { return diff --git a/pkg/util/tracing/span_options.go b/pkg/util/tracing/span_options.go index 6c58a467c04e..bb8a93a12add 100644 --- a/pkg/util/tracing/span_options.go +++ b/pkg/util/tracing/span_options.go @@ -101,6 +101,16 @@ func (opts *spanOptions) parentSpanID() tracingpb.SpanID { return 0 } +// parentEventListeners returns a copy of the parent span's event listeners. +func (opts *spanOptions) parentEventListeners() []EventListener { + if !opts.Parent.empty() && !opts.Parent.IsNoop() { + opts.Parent.i.crdb.mu.Lock() + defer opts.Parent.i.crdb.mu.Unlock() + return opts.Parent.i.crdb.mu.eventListeners[:] + } + return nil +} + func (opts *spanOptions) recordingType() RecordingType { if opts.recordingTypeExplicit { return opts.recordingTypeOpt diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index f9c18c737385..9afb18f08111 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -700,3 +700,65 @@ func TestWithRemoteParentFromTraceInfo(t *testing.T) { otelCtx := sp.i.otelSpan.SpanContext() require.Equal(t, oteltrace.TraceID(otelTraceID), otelCtx.TraceID()) } + +type mockEventListener struct { + eventsSeen int +} + +func (f *mockEventListener) Notify(_ Structured) { + f.eventsSeen++ +} + +var _ EventListener = &mockEventListener{} + +func TestEventListener(t *testing.T) { + ctx := context.Background() + tr := NewTracerWithOpt(ctx, WithTracingMode(TracingModeActiveSpansRegistry)) + sp := tr.StartSpan("root", WithRecording(RecordingStructured)) + + rootEventListener := mockEventListener{} + sp.RegisterEventListener(&rootEventListener) + + // Record a few Structured events. + sp.RecordStructured(&types.Int32Value{Value: 4}) + sp.RecordStructured(&types.Int32Value{Value: 5}) + require.Equal(t, 2, rootEventListener.eventsSeen) + + // Start a child span that will inherit the listener. + childSp := tr.StartSpan("child", WithParent(sp)) + + childSp.RecordStructured(&types.Int32Value{Value: 6}) + childSp.RecordStructured(&types.Int32Value{Value: 7}) + require.Equal(t, 4, rootEventListener.eventsSeen) + + // Register another event listener on only the child span. + childEventListener := mockEventListener{} + childSp.RegisterEventListener(&childEventListener) + + // Record an event on the root span, and make sure we don't see it on the + // listener registered with the child span. + sp.RecordStructured(&types.Int32Value{Value: 8}) + require.Equal(t, 5, rootEventListener.eventsSeen) + require.Equal(t, 0, childEventListener.eventsSeen) + + childSp.RecordStructured(&types.Int32Value{Value: 9}) + require.Equal(t, 6, rootEventListener.eventsSeen) + require.Equal(t, 1, childEventListener.eventsSeen) + + // Finish the child span, and ensure the Structured events aren't re-seen by + // the listener when the child deposits them with the parent. + childSp.Finish() + require.Equal(t, 6, rootEventListener.eventsSeen) + + // Create a remote child, and the root listener should not be inherited. + remoteSp := tr.StartSpan("remote-child", WithRemoteParentFromSpanMeta(sp.Meta())) + remoteSp.RecordStructured(&types.Int32Value{Value: 10}) + require.Equal(t, 6, rootEventListener.eventsSeen) + + // But, when we import the recording in the root span, the root listener + // should see these events. + sp.ImportRemoteSpans(remoteSp.FinishAndGetConfiguredRecording()) + require.Equal(t, 7, rootEventListener.eventsSeen) + + sp.Finish() +} diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index fb6e47a1f977..186f70fd8bde 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -973,6 +973,7 @@ func (t *Tracer) newSpan( goroutineID uint64, startTime time.Time, logTags *logtags.Buffer, + eventListeners []EventListener, kind oteltrace.SpanKind, otelSpan oteltrace.Span, netTr trace.Trace, @@ -984,7 +985,7 @@ func (t *Tracer) newSpan( h := t.spanPool.Get().(*spanAllocHelper) h.span.reset( traceID, spanID, operation, goroutineID, - startTime, logTags, kind, + startTime, logTags, eventListeners, kind, otelSpan, netTr, sterile) return &h.span } @@ -1019,6 +1020,7 @@ func (t *Tracer) releaseSpanToPool(sp *Span) { c.mu.openChildren = nil c.mu.recording.finishedChildren = nil c.mu.tags = nil + c.mu.eventListeners = nil c.mu.recording.logs.Discard() c.mu.recording.structured.Discard() c.mu.Unlock() @@ -1179,10 +1181,11 @@ child operation: %s, tracer created at: traceID = tracingpb.TraceID(randutil.FastInt63()) } spanID := tracingpb.SpanID(randutil.FastInt63()) + eventListeners := opts.parentEventListeners() s := t.newSpan( traceID, spanID, opName, uint64(goid.Get()), - startTime, opts.LogTags, opts.SpanKind, + startTime, opts.LogTags, eventListeners, opts.SpanKind, otelSpan, netTr, opts.Sterile) s.i.crdb.enableRecording(opts.recordingType())