Skip to content

Commit

Permalink
tracing: introduce a StructuredEvent listener
Browse files Browse the repository at this point in the history
This change adds an option `WithEventListeners(...)` that allows for
specifying an EventListener during span creation. An EventListener will
be notified on every StructuredEvent recorded by the span and its children.
The event listeners will be removed once the span is Finish()'ed and will no
longer receive StructuredEvent notifications. Remote spans will notify
the registerd event listeners when the remote span recording is imported into
the span with the event listeners.

The motivation for this change was to allow higher level aggregators
to watch every event without relying on periodically pulling their
root span's Recording. This way the aggregator can be sure not to
miss "important" events that may be rotated out of the underlying
ring buffer due to memory constraints.

Informs: #80395

Release note: None
  • Loading branch information
adityamaru committed May 16, 2022
1 parent bc5f5b7 commit 7ab7935
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 25 deletions.
77 changes: 55 additions & 22 deletions pkg/util/tracing/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,26 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {

staticLogTags := logtags.Buffer{}
staticLogTags.Add("foo", "bar")
mockListener := []EventListener{&mockEventListener{}}

for _, tc := range []struct {
name string
defaultMode TracingMode
parent bool
opts []SpanOption
name string
defaultMode TracingMode
parent bool
withEventListener bool
opts []SpanOption
}{
{"none", TracingModeOnDemand, false, nil},
{"real", TracingModeActiveSpansRegistry, false, nil},
{"real,logtag", TracingModeActiveSpansRegistry, false, []SpanOption{WithLogTags(&staticLogTags)}},
{"real,autoparent", TracingModeActiveSpansRegistry, true, nil},
{"real,manualparent", TracingModeActiveSpansRegistry, true, []SpanOption{WithDetachedRecording()}},
{name: "none", defaultMode: TracingModeOnDemand},
{name: "real", defaultMode: TracingModeActiveSpansRegistry},
{name: "real,logtag", defaultMode: TracingModeActiveSpansRegistry,
opts: []SpanOption{WithLogTags(&staticLogTags)}},
{name: "real,autoparent", defaultMode: TracingModeActiveSpansRegistry, parent: true},
{name: "real,manualparent", defaultMode: TracingModeActiveSpansRegistry, parent: true,
opts: []SpanOption{WithDetachedRecording()}},
{name: "real,autoparent,withEventListener", defaultMode: TracingModeActiveSpansRegistry,
parent: true, withEventListener: true},
{name: "real,manualparent,withEventListener", defaultMode: TracingModeActiveSpansRegistry, parent: true,
withEventListener: true, opts: []SpanOption{WithDetachedRecording()}},
} {
b.Run(fmt.Sprintf("opts=%s", tc.name), func(b *testing.B) {
tr := NewTracerWithOpt(ctx,
Expand All @@ -53,7 +61,11 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
var parent *Span
var numOpts = len(tc.opts)
if tc.parent {
parent = tr.StartSpan("one-off")
if tc.withEventListener {
parent = tr.StartSpan("one-off", WithEventListeners(mockListener))
} else {
parent = tr.StartSpan("one-off")
}
defer parent.Finish()
numOpts++
}
Expand Down Expand Up @@ -106,19 +118,40 @@ func BenchmarkSpan_GetRecording(b *testing.B) {

func BenchmarkRecordingWithStructuredEvent(b *testing.B) {
skip.UnderDeadlock(b, "span reuse triggers false-positives in the deadlock detector")
tr := NewTracerWithOpt(context.Background(),
WithTracingMode(TracingModeActiveSpansRegistry),
WithSpanReusePercent(100))

ev := &types.Int32Value{Value: 5}
b.ReportAllocs()
for i := 0; i < b.N; i++ {
root := tr.StartSpan("foo", WithRecording(RecordingStructured))
root.RecordStructured(ev)
child := tr.StartSpan("bar", WithParent(root))
child.RecordStructured(ev)
child.Finish()
_ = root.FinishAndGetRecording(RecordingStructured)
mockListener := []EventListener{&mockEventListener{}}

for _, tc := range []struct {
name string
withEventListener bool
}{
{name: "with-event-listener", withEventListener: true},
{name: "without-event-listener"},
} {
b.Run(tc.name, func(b *testing.B) {
tr := NewTracerWithOpt(context.Background(),
WithTracingMode(TracingModeActiveSpansRegistry),
WithSpanReusePercent(100))

b.ReportAllocs()
for i := 0; i < b.N; i++ {
var root *Span
if tc.withEventListener {
root = tr.StartSpan("foo", WithRecording(RecordingStructured),
WithEventListeners(mockListener))
} else {
root = tr.StartSpan("foo", WithRecording(RecordingStructured))
}

root.RecordStructured(ev)

// The child span will also inherit the root span's event listener.
child := tr.StartSpan("bar", WithParent(root))
child.RecordStructured(ev)
child.Finish()
_ = root.FinishAndGetRecording(RecordingStructured)
}
})
}
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type crdbSpanMu struct {
// lazyTags are tags whose values are only string-ified on demand. Each lazy
// tag is expected to implement either fmt.Stringer or LazyTag.
lazyTags []lazyTag

// eventListeners is a list of registered EventListener's that are notified
// whenever a Structured event is recorded by the span and its children.
eventListeners []EventListener
}

type lazyTag struct {
Expand All @@ -143,6 +147,10 @@ type recordingState struct {
// finished while this parent span's recording was not verbose.
structured sizeLimitedBuffer

// notifyParentOnStructuredEvent is true if the span's parent has asked to be
// notified of every StructuredEvent recording on this span.
notifyParentOnStructuredEvent bool

// dropped is true if the span has capped out it's memory limits for
// logs and structured events, and has had to drop some. It's used to
// annotate recordings with the _dropped tag, when applicable.
Expand Down Expand Up @@ -225,6 +233,7 @@ func (s *crdbSpan) finish() bool {
return false
}
s.mu.finished = true
s.mu.eventListeners = nil

if s.recordingType() != RecordingOff {
duration := timeutil.Since(s.startTime)
Expand Down Expand Up @@ -465,6 +474,14 @@ func (s *crdbSpan) recordFinishedChildren(childRecording Recording) {
return
}

// Notify the event listeners registered with s of the StructuredEvents on the
// children being added to s.
for _, span := range childRecording {
for _, record := range span.StructuredRecords {
s.notifyEventListeners(record.Payload)
}
}

s.mu.Lock()
defer s.mu.Unlock()
s.recordFinishedChildrenLocked(childRecording)
Expand Down Expand Up @@ -547,6 +564,21 @@ func (s *crdbSpan) getLazyTagLocked(key string) (interface{}, bool) {
return nil, false
}

// notifyEventListeners recursively notifies all the EventListeners registered
// with this span and any ancestor spans in the Recording, of a StructuredEvent.
func (s *crdbSpan) notifyEventListeners(item Structured) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.recording.notifyParentOnStructuredEvent {
parent := s.mu.parent.Span.i.crdb
parent.notifyEventListeners(item)
}

for _, listener := range s.mu.eventListeners {
listener.Notify(item)
}
}

// record includes a log message in s' recording.
func (s *crdbSpan) record(msg redact.RedactableString) {
if s.recordingType() != RecordingVerbose {
Expand Down Expand Up @@ -595,6 +627,10 @@ func (s *crdbSpan) recordStructured(item Structured) {
Payload: p,
}
s.recordInternal(sr, &s.mu.recording.structured)

// If there are any listener's registered with this span, notify them of the
// Structured event being recorded.
s.notifyEventListeners(item)
}

// memorySizable is implemented by log records and structured events for
Expand Down Expand Up @@ -882,6 +918,8 @@ func (s *crdbSpan) parentFinished() {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.parent.release()
// Reset `notifyParentOnStructuredEvent` since s no longer has a parent.
s.mu.recording.notifyParentOnStructuredEvent = false
}

// visitOpenChildren calls the visitor for every open child. The receiver's lock
Expand Down Expand Up @@ -918,6 +956,13 @@ func (s *crdbSpan) withLock(f func()) {
f()
}

// wantEventNotificationsLocked returns true if the span was created
// WithEventListeners(...) or the span has been configured to notify its parent
// span on a StructuredEvent recording.
func (s *crdbSpan) wantEventNotificationsLocked() bool {
return len(s.mu.eventListeners) != 0 || s.mu.recording.notifyParentOnStructuredEvent
}

// setGoroutineID updates the span's goroutine ID.
func (s *crdbSpan) setGoroutineID(gid int64) {
s.mu.Lock()
Expand Down
20 changes: 19 additions & 1 deletion pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,19 @@ func (sp *Span) GetLazyTag(key string) (interface{}, bool) {
return sp.i.GetLazyTag(key)
}

// EventListener is an object that can be registered to listen for Structured
// events recorded by the span and its children.
type EventListener interface {
// Notify is invoked on every Structured event recorded by the span and its
// children, recursively.
//
// Note that this method should not run for a long time as it will hold up the
// span recording Structured events during traced operations.
//
// Notify will not be called concurrently on the same span.
Notify(event Structured)
}

// TraceID retrieves a span's trace ID.
func (sp *Span) TraceID() tracingpb.TraceID {
if sp.detectUseAfterFinish() {
Expand Down Expand Up @@ -546,6 +559,7 @@ func (sp *Span) reset(
goroutineID uint64,
startTime time.Time,
logTags *logtags.Buffer,
eventListeners []EventListener,
kind oteltrace.SpanKind,
otelSpan oteltrace.Span,
netTr trace.Trace,
Expand Down Expand Up @@ -625,6 +639,9 @@ func (sp *Span) reset(
if c.mu.recording.logs.Len() != 0 {
panic("unexpected logs in span being reset")
}
if len(c.mu.eventListeners) != 0 {
panic(fmt.Sprintf("unexpected event listeners in span being reset: %v", c.mu.eventListeners))
}

h := sp.helper
c.mu.crdbSpanMu = crdbSpanMu{
Expand All @@ -635,7 +652,8 @@ func (sp *Span) reset(
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
},
tags: h.tagsAlloc[:0],
tags: h.tagsAlloc[:0],
eventListeners: eventListeners,
}

if kind != oteltrace.SpanKindUnspecified {
Expand Down
39 changes: 39 additions & 0 deletions pkg/util/tracing/span_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type spanOptions struct {
ForceRealSpan bool // see WithForceRealSpan
SpanKind oteltrace.SpanKind // see WithSpanKind
Sterile bool // see WithSterile
EventListeners []EventListener // see WithEventListeners

// recordingTypeExplicit is set if the WithRecording() option was used. In
// that case, spanOptions.recordingType() returns recordingTypeOpt below. If
Expand Down Expand Up @@ -438,3 +439,41 @@ func (w withSterileOption) apply(opts spanOptions) spanOptions {
opts.Sterile = true
return opts
}

type eventListenersOption []EventListener

var _ SpanOption = eventListenersOption{}

func (ev eventListenersOption) apply(opts spanOptions) spanOptions {
// Applying an EventListener span option implies the span has at least
// `RecordingStructured` recording type. If the span explicitly specifies a
// `RecordingVerbose` recording type via the `WithRecording(...)` option, that
// will be respected instead.
if !opts.recordingTypeExplicit {
opts.recordingTypeExplicit = true
opts.recordingTypeOpt = RecordingStructured
}
eventListeners := ([]EventListener)(ev)
opts.EventListeners = eventListeners
return opts
}

// WithEventListeners registers eventListeners to the span. The listeners are
// notified of Structured events recorded by the span and its children. Once the
// span is finished, the listeners are not notified of events any more even from
// surviving child spans.
//
// The listeners will also be notified of StructuredEvents recorded on remote
// spans, when the remote recording is imported by the span or one of its
// children. Note, the listeners will be notified of StructuredEvents in the
// imported remote recording out of order.
//
// WithEventListeners implies a `RecordingStructured` recording type for the
// span. If the recording type has been explicitly set to `RecordingVerbose` via
// the `WithRecording(...) option, that will be respected instead.
//
// The caller should not mutate `eventListeners` after calling
// WithEventListeners.
func WithEventListeners(eventListeners []EventListener) SpanOption {
return (eventListenersOption)(eventListeners)
}
66 changes: 66 additions & 0 deletions pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,3 +700,69 @@ func TestWithRemoteParentFromTraceInfo(t *testing.T) {
otelCtx := sp.i.otelSpan.SpanContext()
require.Equal(t, oteltrace.TraceID(otelTraceID), otelCtx.TraceID())
}

type mockEventListener struct {
eventsSeen int
}

func (f *mockEventListener) Notify(_ Structured) {
f.eventsSeen++
}

var _ EventListener = &mockEventListener{}

func TestEventListener(t *testing.T) {
tr := NewTracer()
rootEventListener := &mockEventListener{}
sp := tr.StartSpan("root", WithRecording(RecordingStructured),
WithEventListeners([]EventListener{rootEventListener}))

// Record a few Structured events.
sp.RecordStructured(&types.Int32Value{Value: 4})
sp.RecordStructured(&types.Int32Value{Value: 5})
require.Equal(t, 2, rootEventListener.eventsSeen)

// Register another event listener on only the child span.
childEventListener := &mockEventListener{}
childSp := tr.StartSpan("child", WithParent(sp),
WithEventListeners([]EventListener{childEventListener}))

childSp.RecordStructured(&types.Int32Value{Value: 6})
childSp.RecordStructured(&types.Int32Value{Value: 7})
require.Equal(t, 4, rootEventListener.eventsSeen)
require.Equal(t, 2, childEventListener.eventsSeen)

// Record an event on the root span, and make sure we don't see it on the
// listener registered with the child span.
sp.RecordStructured(&types.Int32Value{Value: 8})
require.Equal(t, 5, rootEventListener.eventsSeen)
require.Equal(t, 2, childEventListener.eventsSeen)

// Finish the child span, and ensure the Structured events aren't re-seen by
// the listener when the child deposits them with the parent.
childSp.Finish()
require.Equal(t, 5, rootEventListener.eventsSeen)

// Create a remote child, and the root listener should not be inherited.
remoteSp := tr.StartSpan("remote-child", WithRemoteParentFromSpanMeta(sp.Meta()))
remoteSp.RecordStructured(&types.Int32Value{Value: 9})
require.Equal(t, 5, rootEventListener.eventsSeen)

// But, when we import the recording in the root span, the root listener
// should see these events.
sp.ImportRemoteRecording(remoteSp.FinishAndGetConfiguredRecording())
require.Equal(t, 6, rootEventListener.eventsSeen)

// Create another child.
childSp2 := tr.StartSpan("child2", WithParent(sp))
childSp2.RecordStructured(&types.Int32Value{Value: 10})
require.Equal(t, 7, rootEventListener.eventsSeen)

// Now Finish() the parent before the child and ensure that the root event
// listener does not see events from the child once the parent has been
// Finish()ed.
sp.Finish()
childSp2.RecordStructured(&types.Int32Value{Value: 11})
require.Equal(t, 7, rootEventListener.eventsSeen)
childSp2.Finish()
}
Loading

0 comments on commit 7ab7935

Please sign in to comment.