Skip to content

Commit

Permalink
tracing: aggregate OperationMetadata on span Finish()
Browse files Browse the repository at this point in the history
This change adds a `ChildrenMetadata` map to `crdbspan` that is a
mapping from operation to the operations' aggregated metadata. This
map is updated whenever a child of the `crdbSpan` finishes, with metadata
from all spans in the finishing childs' Recording. The map is therefore
a bucketed view of all the operations being traced by a span.

The motivation for this change is to surface more metadata about the
suboperations being traced in a spans' Recording. This could in turn provide
more o11y into why a job is slow/stuck, or where the performance of a
distributed operation is bottlenecked.

As part of a span Finish()ing, the span fetches its Recording with the spans'
configured verbosity. Prior to this change the recording would then be
processed as follows:

*Verbose Recording*

In the case of Verbose recording the spans in the recording are added to the parents'
`finishedChildren` slice provided we have not exceeded the maximum number of
children a parent can track.

*Structured Recording*

In the case of a Structured recording, only the StructuredEvents from the spans in
the recording are copied into the parent.

With this change, in both the Verbose and Structured recording mode, a finishing span
is also responsible for rolling up the OperationMetadata of all the spans in its
recording. This involves updating the parents' `childrenMetadata` mapping with:

1) an entry for the finishing span.
2) an entry for each of the finishing spans' Finish()ed children.
3) an entry for each of the finishing spans' open children, and their children recursively

The logic for 2) and 3) is subsumed in the method responsible for getting
the finishing spans' recording. Notably, GetRecording(...) for both Structured and Verbose
recordings, populate the root of the recording with OperationMetadata of all
finished and open children in the recording.

As an example when we are done finishing `child`:

```
parent
  child
    grandchild
```

We'd expect `parent` to have:
`{child: 2s, grandchild: 1s}`

Given that Finish()ing a child, and importing a remote recording into a span
share the same code path, the above semantics also apply to a remote recording
being imported into a parent span.

Fixes: #80391

Release note: None
  • Loading branch information
adityamaru committed Jun 22, 2022
1 parent 8d34ef1 commit e94c3ec
Show file tree
Hide file tree
Showing 7 changed files with 478 additions and 59 deletions.
1 change: 1 addition & 0 deletions pkg/server/node_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestRedactRecordingForTenant(t *testing.T) {
GoroutineID uint64
Finished bool
StructuredRecords []tracingpb.StructuredRecord
ChildrenMetadata map[string]tracingpb.OperationMetadata
}
_ = (*calcifiedRecordedSpan)((*tracingpb.RecordedSpan)(nil))
})
Expand Down
266 changes: 210 additions & 56 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ type recordingState struct {
//
// The spans are not maintained in a particular order.
finishedChildren []tracingpb.RecordedSpan

// childrenMetadata is a mapping from operation to the aggregated metadata of
// that operation.
//
// When a child of this span is Finish()ed, it updates the map with all the
// children in its Recording. childrenMetadata therefore provides a bucketed
// view of the various operations that are being traced as part of a span.
childrenMetadata map[string]tracingpb.OperationMetadata
}

// makeSizeLimitedBuffer creates a sizeLimitedBuffer.
Expand Down Expand Up @@ -379,6 +387,9 @@ func (s *crdbSpan) getRecordingImpl(

// getVerboseRecording returns the Span's recording, including its children.
//
// Each RecordedSpan in the Recording contains the ChildrenMetadata of all the
// children, both finished and open, in the spans' subtree.
//
// finishing indicates whether s is in the process of finishing. If it isn't,
// the recording will include an "_unfinished" tag.
func (s *crdbSpan) getVerboseRecording(
Expand All @@ -395,14 +406,66 @@ func (s *crdbSpan) getVerboseRecording(
result = append(result, s.getRecordingNoChildrenLocked(tracingpb.RecordingVerbose, finishing))
result = append(result, s.mu.recording.finishedChildren...)

for _, child := range s.mu.openChildren {
if child.collectRecording || includeDetachedChildren {
sp := child.Span.i.crdb
result = append(result, sp.getVerboseRecording(includeDetachedChildren, false /* finishing */)...)
childrenMetadata := make(map[string]tracingpb.OperationMetadata)

// Copy over the OperationMetadata collected from s' finished children.
for opName, metadata := range s.mu.recording.childrenMetadata {
childrenMetadata[opName] = childrenMetadata[opName].Combine(
tracingpb.OperationMetadata{
Count: metadata.Count,
Duration: metadata.Duration,
ContainsUnfinished: metadata.ContainsUnfinished,
})
}

// We recurse on s' open children to get their verbose recordings, and to
// aggregate OperationMetadata from their children, both finished and open.
for _, openChild := range s.mu.openChildren {
if openChild.collectRecording || includeDetachedChildren {
openChildSp := openChild.Span.i.crdb
openChildRecording := openChildSp.getVerboseRecording(includeDetachedChildren, false /* finishing */)

// Record an entry for openChilds' OperationMetadata.
prevMetadata := childrenMetadata[openChildSp.operation]
openChildSp.mu.Lock()
prevMetadata.Count++
if openChildSp.mu.duration == -1 {
prevMetadata.Duration += timeutil.Since(s.startTime)
prevMetadata.ContainsUnfinished = true
} else {
prevMetadata.Duration += s.mu.duration
}
childrenMetadata[openChildSp.operation] = prevMetadata

// Copy over the OperationMetadata collected recursively from openChilds'
// children.
for opName, metadata := range openChildRecording[0].ChildrenMetadata {
childrenMetadata[opName] = childrenMetadata[opName].Combine(
tracingpb.OperationMetadata{
Count: metadata.Count,
Duration: metadata.Duration,
ContainsUnfinished: metadata.ContainsUnfinished,
})
}
openChildSp.mu.Unlock()
result = append(result, openChildRecording...)
}
}

s.mu.Unlock()

// Copy over the OperationMetadata collected from s' children into the root of
// the recording.
if len(childrenMetadata) != 0 {
rootSpan := &result[0]
if rootSpan.ChildrenMetadata == nil {
rootSpan.ChildrenMetadata = make(map[string]tracingpb.OperationMetadata)
}
for opName, metadata := range childrenMetadata {
rootSpan.ChildrenMetadata[opName] = rootSpan.ChildrenMetadata[opName].Combine(metadata)
}
}

// Sort the spans by StartTime, except the first Span (the root of this
// recording) which stays in place.
toSort := sortPool.Get().(*tracingpb.Recording) // avoids allocations in sort.Sort
Expand All @@ -413,59 +476,67 @@ func (s *crdbSpan) getVerboseRecording(
return result
}

// getStructuredRecording returns the structured events in this span and
// in all the children. The results are returned as a Recording for the caller's
// convenience (and for optimizing memory allocations). The Recording will be
// nil if there are no structured events. If not nil, the Recording will have
// exactly one span corresponding to the receiver, will all events handing from
// getStructuredRecording returns the structured events in this span and in all
// the children. The results are returned as a Recording for the caller's
// convenience (and for optimizing memory allocations). The Recording will have
// exactly one span corresponding to the receiver, with all events hanging from
// this span (even if the events had been recorded on different spans).
// This span will also have a `childrenMetadata` map that will contain an entry
// for all children in s' Recording.
//
// The caller does not take ownership of the events.
func (s *crdbSpan) getStructuredRecording(includeDetachedChildren bool) tracingpb.Recording {
s.mu.Lock()
defer s.mu.Unlock()

// Recursively fetch the StructuredEvents for s' children, both finished and
// open.
buffer := make([]*tracingpb.StructuredRecord, 0, 3)
for _, c := range s.mu.recording.finishedChildren {
for i := range c.StructuredRecords {
buffer = append(buffer, &c.StructuredRecords[i])
}
}
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
buffer = sp.getStructuredEventsRecursively(buffer, includeDetachedChildren)
}
}
buffer = s.getChildrenStructuredEventsRecursivelyLocked(buffer,
false /* includeRootStructuredEvents */, includeDetachedChildren)

if len(buffer) == 0 && s.mu.recording.structured.Len() == 0 {
// Optimize out the allocations below.
return nil
}
// Recursively fetch the OperationMetadata for s' children, both finished and
// open.
childrenMetadata := make(map[string]tracingpb.OperationMetadata)
s.getChildrenMetadataRecursivelyLocked(childrenMetadata, false /* includeRootMetadata */, includeDetachedChildren)

// Fetch s' recording without including its children.
res := s.getRecordingNoChildrenLocked(
tracingpb.RecordingStructured,
false, // finishing - since we're only asking for the structured recording, the argument doesn't matter
)
// If necessary, grow res.StructuredRecords to have space for buffer.
var reservedSpace []tracingpb.StructuredRecord
if cap(res.StructuredRecords)-len(res.StructuredRecords) < len(buffer) {
// res.StructuredRecords does not have enough capacity to accommodate the
// elements of buffer. We allocate a new, larger array and copy over the old
// entries.
old := res.StructuredRecords
res.StructuredRecords = make([]tracingpb.StructuredRecord, len(old)+len(buffer))
copy(res.StructuredRecords, old)
reservedSpace = res.StructuredRecords[len(old):]
} else {
// res.StructuredRecords has enough capacity for buffer. We extend it in
// place.
oldLen := len(res.StructuredRecords)
res.StructuredRecords = res.StructuredRecords[:oldLen+len(buffer)]
reservedSpace = res.StructuredRecords[oldLen:]

if len(buffer) != 0 || s.mu.recording.structured.Len() != 0 {
// If necessary, grow res.StructuredRecords to have space for buffer.
var reservedSpace []tracingpb.StructuredRecord
if cap(res.StructuredRecords)-len(res.StructuredRecords) < len(buffer) {
// res.StructuredRecords does not have enough capacity to accommodate the
// elements of buffer. We allocate a new, larger array and copy over the old
// entries.
old := res.StructuredRecords
res.StructuredRecords = make([]tracingpb.StructuredRecord, len(old)+len(buffer))
copy(res.StructuredRecords, old)
reservedSpace = res.StructuredRecords[len(old):]
} else {
// res.StructuredRecords has enough capacity for buffer. We extend it in
// place.
oldLen := len(res.StructuredRecords)
res.StructuredRecords = res.StructuredRecords[:oldLen+len(buffer)]
reservedSpace = res.StructuredRecords[oldLen:]
}
for i, e := range buffer {
reservedSpace[i] = *e
}
}
for i, e := range buffer {
reservedSpace[i] = *e

// If s had any open children we must capture their metadata in res as well.
if len(childrenMetadata) != 0 {
res.ChildrenMetadata = make(map[string]tracingpb.OperationMetadata)
for opName, metadata := range childrenMetadata {
res.ChildrenMetadata[opName] = res.ChildrenMetadata[opName].Combine(metadata)
}
}

return tracingpb.Recording{res}
}

Expand Down Expand Up @@ -497,6 +568,8 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
return
}

rootChild := &childRecording[0]

// Depending on the type of recording, we either keep all the information
// received, or only the structured events.
switch s.recordingType() {
Expand All @@ -505,7 +578,7 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
// usually already the case, except with DistSQL traces where remote
// processors run in spans that FollowFrom an RPC Span that we don't
// collect.
childRecording[0].ParentSpanID = s.spanID
rootChild.ParentSpanID = s.spanID

if len(s.mu.recording.finishedChildren)+len(childRecording) <= maxRecordedSpansPerTrace {
s.mu.recording.finishedChildren = append(s.mu.recording.finishedChildren, childRecording...)
Expand All @@ -516,17 +589,47 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
// records by falling through.
fallthrough
case tracingpb.RecordingStructured:
for ci := range childRecording {
child := &childRecording[ci]
for i := range child.StructuredRecords {
s.recordInternalLocked(&child.StructuredRecords[i], &s.mu.recording.structured)
}
if len(childRecording) != 1 {
panic(fmt.Sprintf("RecordingStructured has %d recordings; expected 1", len(childRecording)))
}

for i := range rootChild.StructuredRecords {
s.recordInternalLocked(&rootChild.StructuredRecords[i], &s.mu.recording.structured)
}
case tracingpb.RecordingOff:
break
default:
panic(fmt.Sprintf("unrecognized recording mode: %v", s.recordingType()))
}

// Update s' ChildrenMetadata to capture all the spans in `childRecording`.
//
// As an example where we are done finishing `child`:
//
// parent
// child
// grandchild
//
// `parent` will have:
// {child: 2s, grandchild: 1s}
//
// Record finished rootChilds' metadata.
s.mu.recording.childrenMetadata[rootChild.Operation] =
s.mu.recording.childrenMetadata[rootChild.Operation].Combine(
tracingpb.OperationMetadata{
Count: 1,
Duration: rootChild.Duration,
ContainsUnfinished: false,
})

// Record the metadata of rootChilds' children, both finished and open.
//
// GetRecording(...) is responsible for recursively capturing the metadata for
// rootChilds' open and finished children.
for childOp, metadata := range rootChild.ChildrenMetadata {
s.mu.recording.childrenMetadata[childOp] =
s.mu.recording.childrenMetadata[childOp].Combine(metadata)
}
}

func (s *crdbSpan) setTagLocked(key string, value attribute.Value) {
Expand Down Expand Up @@ -698,18 +801,24 @@ func (s *crdbSpan) recordInternalLocked(payload memorySizable, buffer *sizeLimit
buffer.AddLast(payload)
}

// getStructuredEventsRecursively returns the structured events accumulated by
// this span and its finished and still-open children.
func (s *crdbSpan) getStructuredEventsRecursively(
buffer []*tracingpb.StructuredRecord, includeDetachedChildren bool,
// getChildrenStructuredEventsRecursivelyLocked returns the structured events
// accumulated by s' finished and still-open children.
//
// The method conditionally returns the structured events accumulated by s if
// `includeRootStructuredEvents` is true.
func (s *crdbSpan) getChildrenStructuredEventsRecursivelyLocked(
buffer []*tracingpb.StructuredRecord, includeRootStructuredEvents, includeDetachedChildren bool,
) []*tracingpb.StructuredRecord {
s.mu.Lock()
defer s.mu.Unlock()
buffer = s.getStructuredEventsLocked(buffer)
if includeRootStructuredEvents {
buffer = s.getStructuredEventsLocked(buffer)
}

for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
buffer = sp.getStructuredEventsRecursively(buffer, includeDetachedChildren)
sp.mu.Lock()
buffer = sp.getChildrenStructuredEventsRecursivelyLocked(buffer, true /* includeRootStructuredEvents */, includeDetachedChildren)
sp.mu.Unlock()
}
}
for _, c := range s.mu.recording.finishedChildren {
Expand All @@ -720,6 +829,51 @@ func (s *crdbSpan) getStructuredEventsRecursively(
return buffer
}

// getChildrenMetadataRecursivelyLocked populates `childrenMetadata` with
// OperationMetadata entries for all of s' children (open and finished),
// recursively.
//
// The method also populates `childrenMetadata` with an entry for the receiver
// if `includeRootMetadata` is true.
func (s *crdbSpan) getChildrenMetadataRecursivelyLocked(
childrenMetadata map[string]tracingpb.OperationMetadata,
includeRootMetadata, includeDetachedChildren bool,
) {
if includeRootMetadata {
// Record an entry for s' metadata.
prevMetadata := childrenMetadata[s.operation]
prevMetadata.Count++
if s.mu.duration == -1 {
prevMetadata.Duration += timeutil.Since(s.startTime)
prevMetadata.ContainsUnfinished = true
} else {
prevMetadata.Duration += s.mu.duration
}
childrenMetadata[s.operation] = prevMetadata
}

// Copy over s' Finish()ed children metadata.
for opName, metadata := range s.mu.recording.childrenMetadata {
childrenMetadata[opName] = childrenMetadata[opName].Combine(
tracingpb.OperationMetadata{
Count: metadata.Count,
Duration: metadata.Duration,
ContainsUnfinished: metadata.ContainsUnfinished,
})
}

// For each of s' open children, recurse to collect their metadata.
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
sp.mu.Lock()
sp.getChildrenMetadataRecursivelyLocked(childrenMetadata,
true /*includeRootMetadata */, includeDetachedChildren)
sp.mu.Unlock()
}
}
}

func (s *crdbSpan) getStructuredEventsLocked(
buffer []*tracingpb.StructuredRecord,
) []*tracingpb.StructuredRecord {
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,9 @@ func (sp *Span) reset(
openChildren: h.childrenAlloc[:0],
goroutineID: goroutineID,
recording: recordingState{
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
childrenMetadata: make(map[string]tracingpb.OperationMetadata),
},
tags: h.tagsAlloc[:0],
}
Expand Down
Loading

0 comments on commit e94c3ec

Please sign in to comment.