Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: aggregate OperationMetadata on span Finish() #81079

Merged
merged 1 commit into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/server/node_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func TestRedactRecordingForTenant(t *testing.T) {
GoroutineID uint64
Finished bool
StructuredRecords []tracingpb.StructuredRecord
ChildrenMetadata map[string]tracingpb.OperationMetadata
}
_ = (*calcifiedRecordedSpan)((*tracingpb.RecordedSpan)(nil))
})
Expand Down
237 changes: 166 additions & 71 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ type recordingState struct {
//
// The spans are not maintained in a particular order.
finishedChildren []tracingpb.RecordedSpan

// childrenMetadata is a mapping from operation to the aggregated metadata of
// that operation.
//
// When a child of this span is Finish()ed, it updates the map with all the
// children in its Recording. childrenMetadata therefore provides a bucketed
// view of the various operations that are being traced as part of a span.
childrenMetadata map[string]tracingpb.OperationMetadata
}

// makeSizeLimitedBuffer creates a sizeLimitedBuffer.
Expand Down Expand Up @@ -377,8 +385,20 @@ func (s *crdbSpan) getRecordingImpl(
}
}

// rollupChildrenMetadata combines the OperationMetadata in `from` into `to`.
func rollupChildrenMetadata(
to map[string]tracingpb.OperationMetadata, from map[string]tracingpb.OperationMetadata,
) {
for op, metadata := range from {
to[op] = to[op].Combine(metadata)
}
}

// getVerboseRecording returns the Span's recording, including its children.
//
// Each RecordedSpan in the Recording contains the ChildrenMetadata of all the
// children, both finished and open, in the spans' subtree.
//
// finishing indicates whether s is in the process of finishing. If it isn't,
// the recording will include an "_unfinished" tag.
func (s *crdbSpan) getVerboseRecording(
Expand All @@ -388,21 +408,53 @@ func (s *crdbSpan) getVerboseRecording(
return nil // noop span
}

s.mu.Lock()
// The capacity here is approximate since we don't know how many
// grandchildren there are.
result := make(tracingpb.Recording, 0, 1+len(s.mu.openChildren)+len(s.mu.recording.finishedChildren))
result = append(result, s.getRecordingNoChildrenLocked(tracingpb.RecordingVerbose, finishing))
result = append(result, s.mu.recording.finishedChildren...)

for _, child := range s.mu.openChildren {
if child.collectRecording || includeDetachedChildren {
sp := child.Span.i.crdb
result = append(result, sp.getVerboseRecording(includeDetachedChildren, false /* finishing */)...)
var result tracingpb.Recording
var childrenMetadata map[string]tracingpb.OperationMetadata
{
s.mu.Lock()
// The capacity here is approximate since we don't know how many
// grandchildren there are.
result = make(tracingpb.Recording, 0, 1+len(s.mu.openChildren)+len(s.mu.recording.finishedChildren))
result = append(result, s.getRecordingNoChildrenLocked(tracingpb.RecordingVerbose, finishing))
result = append(result, s.mu.recording.finishedChildren...)

childrenMetadata = make(map[string]tracingpb.OperationMetadata)

// Copy over the OperationMetadata collected from s' finished children.
rollupChildrenMetadata(childrenMetadata, s.mu.recording.childrenMetadata)

// We recurse on s' open children to get their verbose recordings, and to
// aggregate OperationMetadata from their children, both finished and open.
for _, openChild := range s.mu.openChildren {
if openChild.collectRecording || includeDetachedChildren {
openChildSp := openChild.Span.i.crdb
openChildRecording := openChildSp.getVerboseRecording(includeDetachedChildren, false /* finishing */)
result = append(result, openChildRecording...)

// Record an entry for openChilds' OperationMetadata.
rootOpenChild := openChildRecording[0]
prevMetadata := childrenMetadata[rootOpenChild.Operation]
prevMetadata.Count++
prevMetadata.ContainsUnfinished = !rootOpenChild.Finished
prevMetadata.Duration += rootOpenChild.Duration
childrenMetadata[rootOpenChild.Operation] = prevMetadata

// Copy over the OperationMetadata collected recursively from openChilds'
// children.
rollupChildrenMetadata(childrenMetadata, openChildRecording[0].ChildrenMetadata)
}
}
}

s.mu.Unlock()

// Copy over the OperationMetadata collected from s' children into the root of
// the recording.
if len(childrenMetadata) != 0 {
rootSpan := &result[0]
rootSpan.ChildrenMetadata = childrenMetadata
}

// Sort the spans by StartTime, except the first Span (the root of this
// recording) which stays in place.
toSort := sortPool.Get().(*tracingpb.Recording) // avoids allocations in sort.Sort
Expand All @@ -413,59 +465,35 @@ func (s *crdbSpan) getVerboseRecording(
return result
}

// getStructuredRecording returns the structured events in this span and
// in all the children. The results are returned as a Recording for the caller's
// convenience (and for optimizing memory allocations). The Recording will be
// nil if there are no structured events. If not nil, the Recording will have
// exactly one span corresponding to the receiver, will all events handing from
// getStructuredRecording returns the structured events in this span and in all
// the children. The results are returned as a Recording for the caller's
// convenience (and for optimizing memory allocations). The Recording will have
// exactly one span corresponding to the receiver, with all events hanging from
// this span (even if the events had been recorded on different spans).
// This span will also have a `childrenMetadata` map that will contain an entry
// for all children in s' Recording.
//
// The caller does not take ownership of the events.
func (s *crdbSpan) getStructuredRecording(includeDetachedChildren bool) tracingpb.Recording {
s.mu.Lock()
defer s.mu.Unlock()
buffer := make([]*tracingpb.StructuredRecord, 0, 3)
for _, c := range s.mu.recording.finishedChildren {
for i := range c.StructuredRecords {
buffer = append(buffer, &c.StructuredRecords[i])
}
}
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
buffer = sp.getStructuredEventsRecursively(buffer, includeDetachedChildren)
}
}

if len(buffer) == 0 && s.mu.recording.structured.Len() == 0 {
// Optimize out the allocations below.
return nil
}

res := s.getRecordingNoChildrenLocked(
tracingpb.RecordingStructured,
false, // finishing - since we're only asking for the structured recording, the argument doesn't matter
)
// If necessary, grow res.StructuredRecords to have space for buffer.
var reservedSpace []tracingpb.StructuredRecord
if cap(res.StructuredRecords)-len(res.StructuredRecords) < len(buffer) {
// res.StructuredRecords does not have enough capacity to accommodate the
// elements of buffer. We allocate a new, larger array and copy over the old
// entries.
old := res.StructuredRecords
res.StructuredRecords = make([]tracingpb.StructuredRecord, len(old)+len(buffer))
copy(res.StructuredRecords, old)
reservedSpace = res.StructuredRecords[len(old):]
} else {
// res.StructuredRecords has enough capacity for buffer. We extend it in
// place.
oldLen := len(res.StructuredRecords)
res.StructuredRecords = res.StructuredRecords[:oldLen+len(buffer)]
reservedSpace = res.StructuredRecords[oldLen:]
}
for i, e := range buffer {
reservedSpace[i] = *e
}

// Recursively fetch the StructuredEvents for s' and its children, both
// finished and open.
res.StructuredRecords = s.getStructuredEventsRecursivelyLocked(res.StructuredRecords[:0],
includeDetachedChildren)

// Recursively fetch the OperationMetadata for s' children, both finished and
// open.
res.ChildrenMetadata = make(map[string]tracingpb.OperationMetadata)
s.getChildrenMetadataRecursivelyLocked(res.ChildrenMetadata,
false /* includeRootMetadata */, includeDetachedChildren)

return tracingpb.Recording{res}
}

Expand Down Expand Up @@ -497,6 +525,8 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
return
}

rootChild := &childRecording[0]

// Depending on the type of recording, we either keep all the information
// received, or only the structured events.
switch s.recordingType() {
Expand All @@ -505,7 +535,7 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
// usually already the case, except with DistSQL traces where remote
// processors run in spans that FollowFrom an RPC Span that we don't
// collect.
childRecording[0].ParentSpanID = s.spanID
rootChild.ParentSpanID = s.spanID

if len(s.mu.recording.finishedChildren)+len(childRecording) <= maxRecordedSpansPerTrace {
s.mu.recording.finishedChildren = append(s.mu.recording.finishedChildren, childRecording...)
Expand All @@ -516,17 +546,44 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
// records by falling through.
fallthrough
case tracingpb.RecordingStructured:
for ci := range childRecording {
child := &childRecording[ci]
for i := range child.StructuredRecords {
s.recordInternalLocked(&child.StructuredRecords[i], &s.mu.recording.structured)
}
if len(childRecording) != 1 {
panic(fmt.Sprintf("RecordingStructured has %d recordings; expected 1", len(childRecording)))
}

for i := range rootChild.StructuredRecords {
s.recordInternalLocked(&rootChild.StructuredRecords[i], &s.mu.recording.structured)
}
case tracingpb.RecordingOff:
break
default:
panic(fmt.Sprintf("unrecognized recording mode: %v", s.recordingType()))
}

// Update s' ChildrenMetadata to capture all the spans in `childRecording`.
//
// As an example where we are done finishing `child`:
//
// parent
// child
// grandchild
//
// `parent` will have:
// {child: 2s, grandchild: 1s}
//
// Record finished rootChilds' metadata.
s.mu.recording.childrenMetadata[rootChild.Operation] =
s.mu.recording.childrenMetadata[rootChild.Operation].Combine(
tracingpb.OperationMetadata{
Count: 1,
Duration: rootChild.Duration,
ContainsUnfinished: false,
})

// Record the metadata of rootChilds' children, both finished and open.
//
// GetRecording(...) is responsible for recursively capturing the metadata for
// rootChilds' open and finished children.
rollupChildrenMetadata(s.mu.recording.childrenMetadata, rootChild.ChildrenMetadata)
}

func (s *crdbSpan) setTagLocked(key string, value attribute.Value) {
Expand Down Expand Up @@ -698,35 +755,73 @@ func (s *crdbSpan) recordInternalLocked(payload memorySizable, buffer *sizeLimit
buffer.AddLast(payload)
}

// getStructuredEventsRecursively returns the structured events accumulated by
// this span and its finished and still-open children.
func (s *crdbSpan) getStructuredEventsRecursively(
buffer []*tracingpb.StructuredRecord, includeDetachedChildren bool,
) []*tracingpb.StructuredRecord {
s.mu.Lock()
defer s.mu.Unlock()
// getStructuredEventsRecursivelyLocked returns the structured events
// accumulated by s' and its finished and still-open children.
func (s *crdbSpan) getStructuredEventsRecursivelyLocked(
buffer []tracingpb.StructuredRecord, includeDetachedChildren bool,
) []tracingpb.StructuredRecord {
buffer = s.getStructuredEventsLocked(buffer)
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
buffer = sp.getStructuredEventsRecursively(buffer, includeDetachedChildren)
sp.mu.Lock()
buffer = sp.getStructuredEventsRecursivelyLocked(buffer, includeDetachedChildren)
sp.mu.Unlock()
}
}
for _, c := range s.mu.recording.finishedChildren {
for i := range c.StructuredRecords {
buffer = append(buffer, &c.StructuredRecords[i])
buffer = append(buffer, c.StructuredRecords[i])
}
}
return buffer
}

// getChildrenMetadataRecursivelyLocked populates `childrenMetadata` with
// OperationMetadata entries for all of s' children (open and finished),
// recursively.
//
// The method also populates `childrenMetadata` with an entry for the receiver
// if `includeRootMetadata` is true.
func (s *crdbSpan) getChildrenMetadataRecursivelyLocked(
childrenMetadata map[string]tracingpb.OperationMetadata,
includeRootMetadata, includeDetachedChildren bool,
) {
if includeRootMetadata {
// Record an entry for s' metadata.
prevMetadata := childrenMetadata[s.operation]
prevMetadata.Count++
if s.mu.duration == -1 {
prevMetadata.Duration += timeutil.Since(s.startTime)
prevMetadata.ContainsUnfinished = true
} else {
prevMetadata.Duration += s.mu.duration
}
childrenMetadata[s.operation] = prevMetadata
}

// Copy over s' Finish()ed children metadata.
rollupChildrenMetadata(childrenMetadata, s.mu.recording.childrenMetadata)

// For each of s' open children, recurse to collect their metadata.
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
sp.mu.Lock()
sp.getChildrenMetadataRecursivelyLocked(childrenMetadata,
true /*includeRootMetadata */, includeDetachedChildren)
sp.mu.Unlock()
}
}
}

func (s *crdbSpan) getStructuredEventsLocked(
buffer []*tracingpb.StructuredRecord,
) []*tracingpb.StructuredRecord {
buffer []tracingpb.StructuredRecord,
) []tracingpb.StructuredRecord {
numEvents := s.mu.recording.structured.Len()
for i := 0; i < numEvents; i++ {
event := s.mu.recording.structured.Get(i).(*tracingpb.StructuredRecord)
buffer = append(buffer, event)
buffer = append(buffer, *event)
}
return buffer
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,9 @@ func (sp *Span) reset(
openChildren: h.childrenAlloc[:0],
goroutineID: goroutineID,
recording: recordingState{
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
childrenMetadata: make(map[string]tracingpb.OperationMetadata),
},
tags: h.tagsAlloc[:0],
}
Expand Down
Loading