Skip to content

Commit

Permalink
tracing: introduce a StructuredEvent listener
Browse files Browse the repository at this point in the history
This change extends the tracing API to support registering
and unregistering an EventListener on a span. The registered
EventListener will be notified about every StructuredEvent
recorded by the span. EventListeners registered with a span
are inherited by the span's local children.

The motivation for this change was to allow higher level aggregators
to watch every event without relying on periodically pulling their
root span's Recording. This way the aggregator can be sure not to
miss "important" events that may be rotated out of the underlying
ring buffer due to memory constraints.

Note, event listeners can only be registered, and inherited by spans
with a RecordingStructured or RecordingVerbose recording type.

Informs: #80395

Release note: None
  • Loading branch information
adityamaru committed Apr 25, 2022
1 parent 11a0a9f commit bd66e9a
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 3 deletions.
38 changes: 38 additions & 0 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type crdbSpanMu struct {
// lazyTags are tags whose values are only string-ified on demand. Each lazy
// tag is expected to implement either fmt.Stringer or LazyTag.
lazyTags []lazyTag

// eventListeners is a list of registered EventListener's that are notified
// whenever a Structured event is recorded by the span.
//
// Child spans will inherit this list of EventListeners.
eventListeners []EventListener
}

type lazyTag struct {
Expand Down Expand Up @@ -465,6 +471,14 @@ func (s *crdbSpan) recordFinishedChildren(children []tracingpb.RecordedSpan) {
return
}

// Notify the event listeners registered with s of the StructuredEvents on the
// children being added to s.
for _, span := range children {
for _, record := range span.StructuredRecords {
s.notifyEventListeners(record.Payload)
}
}

s.mu.Lock()
defer s.mu.Unlock()
s.recordFinishedChildrenLocked(children)
Expand Down Expand Up @@ -533,6 +547,26 @@ func (s *crdbSpan) getLazyTagLocked(key string) (interface{}, bool) {
return nil, false
}

// registerEventListenerLocked registers an EventListener to listen for
// Structured events recorded by the span.
func (s *crdbSpan) registerEventListenerLocked(listener EventListener) {
if s.recordingType() == RecordingOff {
return
}
s.mu.eventListeners = append(s.mu.eventListeners, listener)
}

// notifyEventListeners notifies all the EventListeners registered with this
// span about the recorded Structured item.
func (s *crdbSpan) notifyEventListeners(item Structured) {
s.mu.Lock()
defer s.mu.Unlock()

for _, listener := range s.mu.eventListeners {
listener.Notify(item)
}
}

// record includes a log message in s' recording.
func (s *crdbSpan) record(msg redact.RedactableString) {
if s.recordingType() != RecordingVerbose {
Expand Down Expand Up @@ -581,6 +615,10 @@ func (s *crdbSpan) recordStructured(item Structured) {
Payload: p,
}
s.recordInternal(sr, &s.mu.recording.structured)

// If there are any listener's registered with this span, notify them of the
// Structured event being recorded.
s.notifyEventListeners(item)
}

// memorySizable is implemented by log records and structured events for
Expand Down
33 changes: 32 additions & 1 deletion pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,32 @@ func (sp *Span) GetLazyTag(key string) (interface{}, bool) {
return sp.i.GetLazyTag(key)
}

// EventListener is an object that can be registered to listen for Structured
// events recorded by the span.
type EventListener interface {
// Notify is invoked on every Structured event recorded by the span.
//
// Note that this method should not run for a long time as it will hold up the
// span recording Structured events during traced operations.
//
// Notify will not be called concurrently on the same span. However, since
// EventListeners are inherited by child spans, Notify can be called
// concurrently on different spans.
Notify(event Structured)
}

// RegisterEventListener adds an EventListener to the span. The listener is
// notified of Structured events recorded by the span and its children.
//
// The listener will also be notified of StructuredEvents recorded on remote
// spans, when the Recording is imported using sp.ImportRemoteSpans.
func (sp *Span) RegisterEventListener(listener EventListener) {
if sp.detectUseAfterFinish() {
return
}
sp.i.RegisterEventListener(listener)
}

// TraceID retrieves a span's trace ID.
func (sp *Span) TraceID() tracingpb.TraceID {
if sp.detectUseAfterFinish() {
Expand Down Expand Up @@ -544,6 +570,7 @@ func (sp *Span) reset(
goroutineID uint64,
startTime time.Time,
logTags *logtags.Buffer,
eventListeners []EventListener,
kind oteltrace.SpanKind,
otelSpan oteltrace.Span,
netTr trace.Trace,
Expand Down Expand Up @@ -623,6 +650,9 @@ func (sp *Span) reset(
if c.mu.recording.logs.Len() != 0 {
panic("unexpected logs in span being reset")
}
if len(c.mu.eventListeners) != 0 {
panic(fmt.Sprintf("unexpected event listeners in span being reset: %v", c.mu.eventListeners))
}

h := sp.helper
c.mu.crdbSpanMu = crdbSpanMu{
Expand All @@ -633,7 +663,8 @@ func (sp *Span) reset(
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
},
tags: h.tagsAlloc[:0],
tags: h.tagsAlloc[:0],
eventListeners: eventListeners,
}

if kind != oteltrace.SpanKindUnspecified {
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/tracing/span_inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ func (s *spanInner) GetLazyTag(key string) (interface{}, bool) {
return s.crdb.getLazyTagLocked(key)
}

func (s *spanInner) RegisterEventListener(listener EventListener) {
if s.isNoop() {
return
}
s.crdb.mu.Lock()
defer s.crdb.mu.Unlock()
s.crdb.registerEventListenerLocked(listener)
}

func (s *spanInner) RecordStructured(item Structured) {
if s.isNoop() {
return
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/tracing/span_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ func (opts *spanOptions) parentSpanID() tracingpb.SpanID {
return 0
}

// parentEventListeners returns a copy of the parent span's event listeners.
func (opts *spanOptions) parentEventListeners() []EventListener {
if !opts.Parent.empty() && !opts.Parent.IsNoop() {
opts.Parent.i.crdb.mu.Lock()
defer opts.Parent.i.crdb.mu.Unlock()
return opts.Parent.i.crdb.mu.eventListeners[:]
}
return nil
}

func (opts *spanOptions) recordingType() RecordingType {
if opts.recordingTypeExplicit {
return opts.recordingTypeOpt
Expand Down
62 changes: 62 additions & 0 deletions pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,3 +700,65 @@ func TestWithRemoteParentFromTraceInfo(t *testing.T) {
otelCtx := sp.i.otelSpan.SpanContext()
require.Equal(t, oteltrace.TraceID(otelTraceID), otelCtx.TraceID())
}

type mockEventListener struct {
eventsSeen int
}

func (f *mockEventListener) Notify(_ Structured) {
f.eventsSeen++
}

var _ EventListener = &mockEventListener{}

func TestEventListener(t *testing.T) {
ctx := context.Background()
tr := NewTracerWithOpt(ctx, WithTracingMode(TracingModeActiveSpansRegistry))
sp := tr.StartSpan("root", WithRecording(RecordingStructured))

rootEventListener := mockEventListener{}
sp.RegisterEventListener(&rootEventListener)

// Record a few Structured events.
sp.RecordStructured(&types.Int32Value{Value: 4})
sp.RecordStructured(&types.Int32Value{Value: 5})
require.Equal(t, 2, rootEventListener.eventsSeen)

// Start a child span that will inherit the listener.
childSp := tr.StartSpan("child", WithParent(sp))

childSp.RecordStructured(&types.Int32Value{Value: 6})
childSp.RecordStructured(&types.Int32Value{Value: 7})
require.Equal(t, 4, rootEventListener.eventsSeen)

// Register another event listener on only the child span.
childEventListener := mockEventListener{}
childSp.RegisterEventListener(&childEventListener)

// Record an event on the root span, and make sure we don't see it on the
// listener registered with the child span.
sp.RecordStructured(&types.Int32Value{Value: 8})
require.Equal(t, 5, rootEventListener.eventsSeen)
require.Equal(t, 0, childEventListener.eventsSeen)

childSp.RecordStructured(&types.Int32Value{Value: 9})
require.Equal(t, 6, rootEventListener.eventsSeen)
require.Equal(t, 1, childEventListener.eventsSeen)

// Finish the child span, and ensure the Structured events aren't re-seen by
// the listener when the child deposits them with the parent.
childSp.Finish()
require.Equal(t, 6, rootEventListener.eventsSeen)

// Create a remote child, and the root listener should not be inherited.
remoteSp := tr.StartSpan("remote-child", WithRemoteParentFromSpanMeta(sp.Meta()))
remoteSp.RecordStructured(&types.Int32Value{Value: 10})
require.Equal(t, 6, rootEventListener.eventsSeen)

// But, when we import the recording in the root span, the root listener
// should see these events.
sp.ImportRemoteSpans(remoteSp.FinishAndGetConfiguredRecording())
require.Equal(t, 7, rootEventListener.eventsSeen)

sp.Finish()
}
7 changes: 5 additions & 2 deletions pkg/util/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ func (t *Tracer) newSpan(
goroutineID uint64,
startTime time.Time,
logTags *logtags.Buffer,
eventListeners []EventListener,
kind oteltrace.SpanKind,
otelSpan oteltrace.Span,
netTr trace.Trace,
Expand All @@ -984,7 +985,7 @@ func (t *Tracer) newSpan(
h := t.spanPool.Get().(*spanAllocHelper)
h.span.reset(
traceID, spanID, operation, goroutineID,
startTime, logTags, kind,
startTime, logTags, eventListeners, kind,
otelSpan, netTr, sterile)
return &h.span
}
Expand Down Expand Up @@ -1019,6 +1020,7 @@ func (t *Tracer) releaseSpanToPool(sp *Span) {
c.mu.openChildren = nil
c.mu.recording.finishedChildren = nil
c.mu.tags = nil
c.mu.eventListeners = nil
c.mu.recording.logs.Discard()
c.mu.recording.structured.Discard()
c.mu.Unlock()
Expand Down Expand Up @@ -1179,10 +1181,11 @@ child operation: %s, tracer created at:
traceID = tracingpb.TraceID(randutil.FastInt63())
}
spanID := tracingpb.SpanID(randutil.FastInt63())
eventListeners := opts.parentEventListeners()

s := t.newSpan(
traceID, spanID, opName, uint64(goid.Get()),
startTime, opts.LogTags, opts.SpanKind,
startTime, opts.LogTags, eventListeners, opts.SpanKind,
otelSpan, netTr, opts.Sterile)

s.i.crdb.enableRecording(opts.recordingType())
Expand Down

0 comments on commit bd66e9a

Please sign in to comment.