Skip to content

Commit

Permalink
tracing: introduce a StructuredEvent listener
Browse files Browse the repository at this point in the history
This change extends the tracing API to support registering
and unregistering an EventListener on a span. The registered
EventListener will be notified about every StructuredEvent
recorded by the span. EventListeners registered with a span
are inherited by the span's local children.

The motivation for this change was to allow higher level aggregators
to watch every event without relying on periodically pulling their
root span's Recording. This way the aggregator can be sure not to
miss "important" events that may be rotated out of the underlying
ring buffer due to memory constraints.

Note, event listeners can only be registered, and inherited by spans
with a RecordingStructured or RecordingVerbose recording type.

Informs: cockroachdb#80395

Release note: None
  • Loading branch information
adityamaru committed Apr 25, 2022
1 parent 11a0a9f commit c5c238e
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 3 deletions.
64 changes: 64 additions & 0 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,24 @@ type crdbSpanMu struct {
// lazyTags are tags whose values are only string-ified on demand. Each lazy
// tag is expected to implement either fmt.Stringer or LazyTag.
lazyTags []lazyTag

// eventListeners is a list of registered EventListener's that are notified
// whenever a Structured event is recorded by the span.
//
// Child spans of this span will inherit this list of EventListeners.
eventListeners []eventListener
}

type lazyTag struct {
Key string
Value interface{}
}

type eventListener struct {
Key string
Listener EventListener
}

type recordingState struct {
// recordingType is the recording type of the ongoing recording, if any.
// Its 'load' method may be called without holding the surrounding mutex,
Expand Down Expand Up @@ -533,6 +544,55 @@ func (s *crdbSpan) getLazyTagLocked(key string) (interface{}, bool) {
return nil, false
}

// registerEventListenerLocked registers an EventListener to listen for
// Structured events recorded by the span. If an EventListener with the same key
// already exists, then this will be a no-op.
func (s *crdbSpan) registerEventListenerLocked(key string, listener EventListener) {
if s.recordingType() == RecordingOff {
return
}

// TODO(during review): Should we limit the number of listeners a span can register?
for i := range s.mu.eventListeners {
if s.mu.eventListeners[i].Key == key {
return
}
}
s.mu.eventListeners = append(s.mu.eventListeners, eventListener{Key: key, Listener: listener})
}

// unregisterEventListenerLocked unregisters a previously registered
// EventListener.
func (s *crdbSpan) unregisterEventListenerLocked(key string) {
numListeners := len(s.mu.eventListeners)
for i := range s.mu.eventListeners {
if s.mu.eventListeners[i].Key == key {
s.mu.eventListeners[i] = s.mu.eventListeners[numListeners-1]
s.mu.eventListeners[numListeners-1] = eventListener{}
s.mu.eventListeners = s.mu.eventListeners[:numListeners-1]
return
}
}
}

// notifyEventListeners notifies all the EventListeners registered with this
// span about the recorded Structured item.
func (s *crdbSpan) notifyEventListeners(item Structured) {
s.mu.Lock()
defer s.mu.Unlock()

var wg sync.WaitGroup
for _, listener := range s.mu.eventListeners {
wg.Add(1)
go func(l EventListener) {
defer wg.Done()
l.Listen(item)
}(listener.Listener)
}

wg.Wait()
}

// record includes a log message in s' recording.
func (s *crdbSpan) record(msg redact.RedactableString) {
if s.recordingType() != RecordingVerbose {
Expand Down Expand Up @@ -581,6 +641,10 @@ func (s *crdbSpan) recordStructured(item Structured) {
Payload: p,
}
s.recordInternal(sr, &s.mu.recording.structured)

// If there are any listener's registered with this span, notify them of the
// Structured event being recorded.
s.notifyEventListeners(item)
}

// memorySizable is implemented by log records and structured events for
Expand Down
40 changes: 39 additions & 1 deletion pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,37 @@ func (sp *Span) GetLazyTag(key string) (interface{}, bool) {
return sp.i.GetLazyTag(key)
}

// EventListener is an object that can be registered to listen for Structured
// events recorded by the span.
type EventListener interface {
// Listen is invoked on every Structured event recorded by the span.
//
// Note that this method should not run for a long time as it will hold up the
// span recording Structured events during traced operations.
//
// Listen can be called concurrently with other Listen calls on the same
// EventListener.
Listen(event Structured)
}

// RegisterEventListener adds an EventListener to the span. The listener can
// listen for Structured events recorded by the span.
func (sp *Span) RegisterEventListener(key string, listener EventListener) {
if sp.detectUseAfterFinish() {
return
}
sp.i.RegisterEventListener(key, listener)
}

// UnregisterEventListener unregisters a previously registered EventListener
// from the span.
func (sp *Span) UnregisterEventListener(key string) {
if sp.detectUseAfterFinish() {
return
}
sp.i.UnregisterEventListener(key)
}

// TraceID retrieves a span's trace ID.
func (sp *Span) TraceID() tracingpb.TraceID {
if sp.detectUseAfterFinish() {
Expand Down Expand Up @@ -544,6 +575,7 @@ func (sp *Span) reset(
goroutineID uint64,
startTime time.Time,
logTags *logtags.Buffer,
eventListeners []eventListener,
kind oteltrace.SpanKind,
otelSpan oteltrace.Span,
netTr trace.Trace,
Expand Down Expand Up @@ -623,6 +655,9 @@ func (sp *Span) reset(
if c.mu.recording.logs.Len() != 0 {
panic("unexpected logs in span being reset")
}
if len(c.mu.eventListeners) != 0 {
panic(fmt.Sprintf("unexpected event listeners in span being reset: %v", c.mu.eventListeners))
}

h := sp.helper
c.mu.crdbSpanMu = crdbSpanMu{
Expand All @@ -633,9 +668,12 @@ func (sp *Span) reset(
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
},
tags: h.tagsAlloc[:0],
tags: h.tagsAlloc[:0],
eventListeners: h.eventListenerAlloc[:0],
}

c.mu.crdbSpanMu.eventListeners = append(c.mu.crdbSpanMu.eventListeners, eventListeners...)

if kind != oteltrace.SpanKindUnspecified {
c.setTagLocked(spanKindTagKey, attribute.StringValue(kind.String()))
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/tracing/span_inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,24 @@ func (s *spanInner) GetLazyTag(key string) (interface{}, bool) {
return s.crdb.getLazyTagLocked(key)
}

func (s *spanInner) RegisterEventListener(key string, listener EventListener) {
if s.isNoop() {
return
}
s.crdb.mu.Lock()
defer s.crdb.mu.Unlock()
s.crdb.registerEventListenerLocked(key, listener)
}

func (s *spanInner) UnregisterEventListener(key string) {
if s.isNoop() {
return
}
s.crdb.mu.Lock()
defer s.crdb.mu.Unlock()
s.crdb.unregisterEventListenerLocked(key)
}

func (s *spanInner) RecordStructured(item Structured) {
if s.isNoop() {
return
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/tracing/span_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ func (opts *spanOptions) parentSpanID() tracingpb.SpanID {
return 0
}

func (opts *spanOptions) parentEventListeners() []eventListener {
if !opts.Parent.empty() && !opts.Parent.IsNoop() {
opts.Parent.i.crdb.mu.Lock()
defer opts.Parent.i.crdb.mu.Unlock()
return opts.Parent.i.crdb.mu.eventListeners
}
return nil
}

func (opts *spanOptions) recordingType() RecordingType {
if opts.recordingTypeExplicit {
return opts.recordingTypeOpt
Expand Down
59 changes: 59 additions & 0 deletions pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,3 +700,62 @@ func TestWithRemoteParentFromTraceInfo(t *testing.T) {
otelCtx := sp.i.otelSpan.SpanContext()
require.Equal(t, oteltrace.TraceID(otelTraceID), otelCtx.TraceID())
}

type mockEventListener struct {
eventsSeen int
}

func (f *mockEventListener) Listen(_ Structured) {
f.eventsSeen++
}

var _ EventListener = &mockEventListener{}

func TestEventListener(t *testing.T) {
ctx := context.Background()
tr := NewTracerWithOpt(ctx, WithTracingMode(TracingModeActiveSpansRegistry))
sp := tr.StartSpan("root", WithRecording(RecordingStructured))

rootEventListener := mockEventListener{}
sp.RegisterEventListener("listener-one", &rootEventListener)

// Record a few Structured events.
sp.RecordStructured(&types.Int32Value{Value: 4})
sp.RecordStructured(&types.Int32Value{Value: 5})
require.Equal(t, 2, rootEventListener.eventsSeen)

// Start a child span that will inherit the listener.
childSp := tr.StartSpan("child", WithParent(sp))

childSp.RecordStructured(&types.Int32Value{Value: 6})
childSp.RecordStructured(&types.Int32Value{Value: 7})
require.Equal(t, 4, rootEventListener.eventsSeen)

// Register another event listener on only the child span.
childEventListener := mockEventListener{}
childSp.RegisterEventListener("listener-two", &childEventListener)

// Record an event on the root span, and make sure we don't see it on the
// listener registered with the child span.
sp.RecordStructured(&types.Int32Value{Value: 8})
require.Equal(t, 5, rootEventListener.eventsSeen)
require.Equal(t, 0, childEventListener.eventsSeen)

childSp.RecordStructured(&types.Int32Value{Value: 9})
require.Equal(t, 6, rootEventListener.eventsSeen)
require.Equal(t, 1, childEventListener.eventsSeen)

// Finish the child span, and ensure the Structured events aren't re-seen by
// the listener when the child deposits them with the parent.
childSp.Finish()
require.Equal(t, 6, rootEventListener.eventsSeen)

// Create a remote child, and the root listener should not be inherited.
remoteSp := tr.StartSpan("remote-child", WithRemoteParentFromSpanMeta(sp.Meta()))
remoteSp.RecordStructured(&types.Int32Value{Value: 10})

sp.UnregisterEventListener("listener-one")
sp.RecordStructured(&types.Int32Value{Value: 11})
require.Equal(t, 6, rootEventListener.eventsSeen)
sp.Finish()
}
9 changes: 7 additions & 2 deletions pkg/util/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ type spanAllocHelper struct {
tagsAlloc [3]attribute.KeyValue
childrenAlloc [4]childRef
structuredEventsAlloc [3]interface{}
eventListenerAlloc [3]eventListener
}

// newSpan allocates a span using the Tracer's sync.Pool. A span that was
Expand All @@ -973,6 +974,7 @@ func (t *Tracer) newSpan(
goroutineID uint64,
startTime time.Time,
logTags *logtags.Buffer,
eventListeners []eventListener,
kind oteltrace.SpanKind,
otelSpan oteltrace.Span,
netTr trace.Trace,
Expand All @@ -984,7 +986,7 @@ func (t *Tracer) newSpan(
h := t.spanPool.Get().(*spanAllocHelper)
h.span.reset(
traceID, spanID, operation, goroutineID,
startTime, logTags, kind,
startTime, logTags, eventListeners, kind,
otelSpan, netTr, sterile)
return &h.span
}
Expand Down Expand Up @@ -1019,6 +1021,7 @@ func (t *Tracer) releaseSpanToPool(sp *Span) {
c.mu.openChildren = nil
c.mu.recording.finishedChildren = nil
c.mu.tags = nil
c.mu.eventListeners = nil
c.mu.recording.logs.Discard()
c.mu.recording.structured.Discard()
c.mu.Unlock()
Expand All @@ -1029,6 +1032,7 @@ func (t *Tracer) releaseSpanToPool(sp *Span) {
h.tagsAlloc = [3]attribute.KeyValue{}
h.childrenAlloc = [4]childRef{}
h.structuredEventsAlloc = [3]interface{}{}
h.eventListenerAlloc = [3]eventListener{}

release := true
if fn := t.testing.ReleaseSpanToPool; fn != nil {
Expand Down Expand Up @@ -1179,10 +1183,11 @@ child operation: %s, tracer created at:
traceID = tracingpb.TraceID(randutil.FastInt63())
}
spanID := tracingpb.SpanID(randutil.FastInt63())
eventListeners := opts.parentEventListeners()

s := t.newSpan(
traceID, spanID, opName, uint64(goid.Get()),
startTime, opts.LogTags, opts.SpanKind,
startTime, opts.LogTags, eventListeners, opts.SpanKind,
otelSpan, netTr, opts.Sterile)

s.i.crdb.enableRecording(opts.recordingType())
Expand Down

0 comments on commit c5c238e

Please sign in to comment.