Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: introduce a StructuredEvent listener #80460

Merged
merged 1 commit into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 55 additions & 22 deletions pkg/util/tracing/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,26 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {

staticLogTags := logtags.Buffer{}
staticLogTags.Add("foo", "bar")
mockListener := []EventListener{&mockEventListener{}}

for _, tc := range []struct {
name string
defaultMode TracingMode
parent bool
opts []SpanOption
name string
defaultMode TracingMode
parent bool
withEventListener bool
opts []SpanOption
}{
{"none", TracingModeOnDemand, false, nil},
{"real", TracingModeActiveSpansRegistry, false, nil},
{"real,logtag", TracingModeActiveSpansRegistry, false, []SpanOption{WithLogTags(&staticLogTags)}},
{"real,autoparent", TracingModeActiveSpansRegistry, true, nil},
{"real,manualparent", TracingModeActiveSpansRegistry, true, []SpanOption{WithDetachedRecording()}},
{name: "none", defaultMode: TracingModeOnDemand},
{name: "real", defaultMode: TracingModeActiveSpansRegistry},
{name: "real,logtag", defaultMode: TracingModeActiveSpansRegistry,
opts: []SpanOption{WithLogTags(&staticLogTags)}},
{name: "real,autoparent", defaultMode: TracingModeActiveSpansRegistry, parent: true},
{name: "real,manualparent", defaultMode: TracingModeActiveSpansRegistry, parent: true,
opts: []SpanOption{WithDetachedRecording()}},
{name: "real,autoparent,withEventListener", defaultMode: TracingModeActiveSpansRegistry,
parent: true, withEventListener: true},
{name: "real,manualparent,withEventListener", defaultMode: TracingModeActiveSpansRegistry, parent: true,
withEventListener: true, opts: []SpanOption{WithDetachedRecording()}},
} {
b.Run(fmt.Sprintf("opts=%s", tc.name), func(b *testing.B) {
tr := NewTracerWithOpt(ctx,
Expand All @@ -53,7 +61,11 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
var parent *Span
var numOpts = len(tc.opts)
if tc.parent {
parent = tr.StartSpan("one-off")
if tc.withEventListener {
parent = tr.StartSpan("one-off", WithEventListeners(mockListener))
} else {
parent = tr.StartSpan("one-off")
}
defer parent.Finish()
numOpts++
}
Expand Down Expand Up @@ -106,19 +118,40 @@ func BenchmarkSpan_GetRecording(b *testing.B) {

func BenchmarkRecordingWithStructuredEvent(b *testing.B) {
skip.UnderDeadlock(b, "span reuse triggers false-positives in the deadlock detector")
tr := NewTracerWithOpt(context.Background(),
WithTracingMode(TracingModeActiveSpansRegistry),
WithSpanReusePercent(100))

ev := &types.Int32Value{Value: 5}
b.ReportAllocs()
for i := 0; i < b.N; i++ {
root := tr.StartSpan("foo", WithRecording(RecordingStructured))
root.RecordStructured(ev)
child := tr.StartSpan("bar", WithParent(root))
child.RecordStructured(ev)
child.Finish()
_ = root.FinishAndGetRecording(RecordingStructured)
mockListener := []EventListener{&mockEventListener{}}

for _, tc := range []struct {
name string
withEventListener bool
}{
{name: "with-event-listener", withEventListener: true},
{name: "without-event-listener"},
} {
b.Run(tc.name, func(b *testing.B) {
tr := NewTracerWithOpt(context.Background(),
WithTracingMode(TracingModeActiveSpansRegistry),
WithSpanReusePercent(100))

b.ReportAllocs()
for i := 0; i < b.N; i++ {
var root *Span
if tc.withEventListener {
root = tr.StartSpan("foo", WithRecording(RecordingStructured),
WithEventListeners(mockListener))
} else {
root = tr.StartSpan("foo", WithRecording(RecordingStructured))
}

root.RecordStructured(ev)

// The child span will also inherit the root span's event listener.
child := tr.StartSpan("bar", WithParent(root))
child.RecordStructured(ev)
child.Finish()
_ = root.FinishAndGetRecording(RecordingStructured)
}
})
}
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type crdbSpanMu struct {
// lazyTags are tags whose values are only string-ified on demand. Each lazy
// tag is expected to implement either fmt.Stringer or LazyTag.
lazyTags []lazyTag

// eventListeners is a list of registered EventListener's that are notified
// whenever a Structured event is recorded by the span and its children.
eventListeners []EventListener
}

type lazyTag struct {
Expand All @@ -143,6 +147,10 @@ type recordingState struct {
// finished while this parent span's recording was not verbose.
structured sizeLimitedBuffer

// notifyParentOnStructuredEvent is true if the span's parent has asked to be
// notified of every StructuredEvent recording on this span.
notifyParentOnStructuredEvent bool

// dropped is true if the span has capped out it's memory limits for
// logs and structured events, and has had to drop some. It's used to
// annotate recordings with the _dropped tag, when applicable.
Expand Down Expand Up @@ -225,6 +233,7 @@ func (s *crdbSpan) finish() bool {
return false
}
s.mu.finished = true
s.mu.eventListeners = nil

if s.recordingType() != RecordingOff {
duration := timeutil.Since(s.startTime)
Expand Down Expand Up @@ -465,6 +474,14 @@ func (s *crdbSpan) recordFinishedChildren(childRecording Recording) {
return
}

// Notify the event listeners registered with s of the StructuredEvents on the
// children being added to s.
for _, span := range childRecording {
for _, record := range span.StructuredRecords {
s.notifyEventListeners(record.Payload)
}
}

s.mu.Lock()
defer s.mu.Unlock()
s.recordFinishedChildrenLocked(childRecording)
Expand Down Expand Up @@ -547,6 +564,21 @@ func (s *crdbSpan) getLazyTagLocked(key string) (interface{}, bool) {
return nil, false
}

// notifyEventListeners recursively notifies all the EventListeners registered
// with this span and any ancestor spans in the Recording, of a StructuredEvent.
func (s *crdbSpan) notifyEventListeners(item Structured) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.recording.notifyParentOnStructuredEvent {
parent := s.mu.parent.Span.i.crdb
parent.notifyEventListeners(item)
}

for _, listener := range s.mu.eventListeners {
listener.Notify(item)
}
}

// record includes a log message in s' recording.
func (s *crdbSpan) record(msg redact.RedactableString) {
if s.recordingType() != RecordingVerbose {
Expand Down Expand Up @@ -595,6 +627,10 @@ func (s *crdbSpan) recordStructured(item Structured) {
Payload: p,
}
s.recordInternal(sr, &s.mu.recording.structured)

// If there are any listener's registered with this span, notify them of the
// Structured event being recorded.
s.notifyEventListeners(item)
}

// memorySizable is implemented by log records and structured events for
Expand Down Expand Up @@ -882,6 +918,8 @@ func (s *crdbSpan) parentFinished() {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.parent.release()
// Reset `notifyParentOnStructuredEvent` since s no longer has a parent.
s.mu.recording.notifyParentOnStructuredEvent = false
}

// visitOpenChildren calls the visitor for every open child. The receiver's lock
Expand Down Expand Up @@ -918,6 +956,13 @@ func (s *crdbSpan) withLock(f func()) {
f()
}

// wantEventNotificationsLocked returns true if the span was created
// WithEventListeners(...) or the span has been configured to notify its parent
// span on a StructuredEvent recording.
func (s *crdbSpan) wantEventNotificationsLocked() bool {
return len(s.mu.eventListeners) != 0 || s.mu.recording.notifyParentOnStructuredEvent
}

// setGoroutineID updates the span's goroutine ID.
func (s *crdbSpan) setGoroutineID(gid int64) {
s.mu.Lock()
Expand Down
20 changes: 19 additions & 1 deletion pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,19 @@ func (sp *Span) GetLazyTag(key string) (interface{}, bool) {
return sp.i.GetLazyTag(key)
}

// EventListener is an object that can be registered to listen for Structured
// events recorded by the span and its children.
type EventListener interface {
// Notify is invoked on every Structured event recorded by the span and its
// children, recursively.
//
// Note that this method should not run for a long time as it will hold up the
// span recording Structured events during traced operations.
//
// Notify will not be called concurrently on the same span.
Notify(event Structured)
}

// TraceID retrieves a span's trace ID.
func (sp *Span) TraceID() tracingpb.TraceID {
if sp.detectUseAfterFinish() {
Expand Down Expand Up @@ -546,6 +559,7 @@ func (sp *Span) reset(
goroutineID uint64,
startTime time.Time,
logTags *logtags.Buffer,
eventListeners []EventListener,
kind oteltrace.SpanKind,
otelSpan oteltrace.Span,
netTr trace.Trace,
Expand Down Expand Up @@ -625,6 +639,9 @@ func (sp *Span) reset(
if c.mu.recording.logs.Len() != 0 {
panic("unexpected logs in span being reset")
}
if len(c.mu.eventListeners) != 0 {
panic(fmt.Sprintf("unexpected event listeners in span being reset: %v", c.mu.eventListeners))
}

h := sp.helper
c.mu.crdbSpanMu = crdbSpanMu{
Expand All @@ -635,7 +652,8 @@ func (sp *Span) reset(
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
},
tags: h.tagsAlloc[:0],
tags: h.tagsAlloc[:0],
eventListeners: eventListeners,
}

if kind != oteltrace.SpanKindUnspecified {
Expand Down
39 changes: 39 additions & 0 deletions pkg/util/tracing/span_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type spanOptions struct {
ForceRealSpan bool // see WithForceRealSpan
SpanKind oteltrace.SpanKind // see WithSpanKind
Sterile bool // see WithSterile
EventListeners []EventListener // see WithEventListeners

// recordingTypeExplicit is set if the WithRecording() option was used. In
// that case, spanOptions.recordingType() returns recordingTypeOpt below. If
Expand Down Expand Up @@ -438,3 +439,41 @@ func (w withSterileOption) apply(opts spanOptions) spanOptions {
opts.Sterile = true
return opts
}

type eventListenersOption []EventListener

var _ SpanOption = eventListenersOption{}

func (ev eventListenersOption) apply(opts spanOptions) spanOptions {
// Applying an EventListener span option implies the span has at least
// `RecordingStructured` recording type. If the span explicitly specifies a
// `RecordingVerbose` recording type via the `WithRecording(...)` option, that
// will be respected instead.
if !opts.recordingTypeExplicit {
opts.recordingTypeExplicit = true
opts.recordingTypeOpt = RecordingStructured
}
eventListeners := ([]EventListener)(ev)
opts.EventListeners = eventListeners
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm is it better to take a copy here at the cost of an additional alloc?

return opts
}

// WithEventListeners registers eventListeners to the span. The listeners are
// notified of Structured events recorded by the span and its children. Once the
// span is finished, the listeners are not notified of events any more even from
// surviving child spans.
//
// The listeners will also be notified of StructuredEvents recorded on remote
// spans, when the remote recording is imported by the span or one of its
// children. Note, the listeners will be notified of StructuredEvents in the
// imported remote recording out of order.
//
// WithEventListeners implies a `RecordingStructured` recording type for the
// span. If the recording type has been explicitly set to `RecordingVerbose` via
// the `WithRecording(...) option, that will be respected instead.
//
// The caller should not mutate `eventListeners` after calling
// WithEventListeners.
func WithEventListeners(eventListeners []EventListener) SpanOption {
return (eventListenersOption)(eventListeners)
}
66 changes: 66 additions & 0 deletions pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,3 +700,69 @@ func TestWithRemoteParentFromTraceInfo(t *testing.T) {
otelCtx := sp.i.otelSpan.SpanContext()
require.Equal(t, oteltrace.TraceID(otelTraceID), otelCtx.TraceID())
}

type mockEventListener struct {
eventsSeen int
}

func (f *mockEventListener) Notify(_ Structured) {
f.eventsSeen++
}

var _ EventListener = &mockEventListener{}

func TestEventListener(t *testing.T) {
tr := NewTracer()
rootEventListener := &mockEventListener{}
sp := tr.StartSpan("root", WithRecording(RecordingStructured),
WithEventListeners([]EventListener{rootEventListener}))

// Record a few Structured events.
sp.RecordStructured(&types.Int32Value{Value: 4})
sp.RecordStructured(&types.Int32Value{Value: 5})
require.Equal(t, 2, rootEventListener.eventsSeen)

// Register another event listener on only the child span.
childEventListener := &mockEventListener{}
childSp := tr.StartSpan("child", WithParent(sp),
WithEventListeners([]EventListener{childEventListener}))

childSp.RecordStructured(&types.Int32Value{Value: 6})
childSp.RecordStructured(&types.Int32Value{Value: 7})
require.Equal(t, 4, rootEventListener.eventsSeen)
require.Equal(t, 2, childEventListener.eventsSeen)

// Record an event on the root span, and make sure we don't see it on the
// listener registered with the child span.
sp.RecordStructured(&types.Int32Value{Value: 8})
require.Equal(t, 5, rootEventListener.eventsSeen)
require.Equal(t, 2, childEventListener.eventsSeen)

// Finish the child span, and ensure the Structured events aren't re-seen by
// the listener when the child deposits them with the parent.
childSp.Finish()
require.Equal(t, 5, rootEventListener.eventsSeen)

// Create a remote child, and the root listener should not be inherited.
remoteSp := tr.StartSpan("remote-child", WithRemoteParentFromSpanMeta(sp.Meta()))
remoteSp.RecordStructured(&types.Int32Value{Value: 9})
require.Equal(t, 5, rootEventListener.eventsSeen)

// But, when we import the recording in the root span, the root listener
// should see these events.
sp.ImportRemoteRecording(remoteSp.FinishAndGetConfiguredRecording())
require.Equal(t, 6, rootEventListener.eventsSeen)

// Create another child.
childSp2 := tr.StartSpan("child2", WithParent(sp))
childSp2.RecordStructured(&types.Int32Value{Value: 10})
require.Equal(t, 7, rootEventListener.eventsSeen)

// Now Finish() the parent before the child and ensure that the root event
// listener does not see events from the child once the parent has been
// Finish()ed.
sp.Finish()
childSp2.RecordStructured(&types.Int32Value{Value: 11})
require.Equal(t, 7, rootEventListener.eventsSeen)
childSp2.Finish()
}
Loading