Skip to content

Commit

Permalink
tracing: aggregate OperationMetadata on span Finish()
Browse files Browse the repository at this point in the history
This change adds a `ChildrenMetadata` map to `crdbspan` that is a
mapping from operation to the operations' aggregated metadata. This
map is updated whenever a child of the `crdbSpan` finishes, with metadata
from all spans in the finishing childs' Recording. The map is therefore
a bucketed view of all the operations being traced by a span.

The motivation for this change is to surface more metadata about the
suboperations being traced in a spans' Recording. This could in turn provide
more o11y into why a job is slow/stuck, or where the performance of a
distributed operation is bottlenecked.

As part of a span Finish()ing, the span fetches its Recording with the spans'
configured verbosity. Prior to this change the recording would then be
processed as follows:

*Verbose Recording*

In the case of Verbose recording the spans in the recording are added to the parents'
`finishedChildren` slice provided we have not exceeded the maximum number of
children a parent can track.

*Structured Recording*

In the case of a Structured recording, only the StructuredEvents from the spans in
the recording are copied into the parent.

With this change, in both the Verbose and Structured recording mode, a finishing span
is also responsible for rolling up the OperationMetadata of all the spans in its
recording. This involves updating the parents' `childrenMetadata` mapping with:

1) an entry for the finishing span.
2) an entry for each of the finishing spans' Finish()ed children.
3) an entry for each of the finishing spans' open children, and their children recursively

The logic for 2) and 3) is subsumed in the method responsible for getting
the finishing spans' recording. Notably, GetRecording(...) for both Structured and Verbose
recordings, populate the root of the recording with OperationMetadata of all
finished and open children in the recording.

As an example when we are done finishing `child`:

```
parent
  child (finished_C: 4s, finished_D: 3s)
    open_A (finished_B: 1s)
      finished_B
    finished_C (finished_D: 3s)
      finished_D
```

We'd expect `parent` to have:
`{child: 10s, finished_C: 4s, finished_D: 3s, open_A: 3s, finished_B: 1s}`

Given that Finish()ing a child, and importing a remote recording into a span
share the same code path, the above semantics also apply to a remote recording
being imported into a parent span.

Fixes: #80391

Release note: None
  • Loading branch information
adityamaru committed Jun 20, 2022
1 parent 8d34ef1 commit b31ca53
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 36 deletions.
1 change: 1 addition & 0 deletions pkg/server/node_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestRedactRecordingForTenant(t *testing.T) {
GoroutineID uint64
Finished bool
StructuredRecords []tracingpb.StructuredRecord
ChildrenMetadata map[string]tracingpb.OperationMetadata
}
_ = (*calcifiedRecordedSpan)((*tracingpb.RecordedSpan)(nil))
})
Expand Down
210 changes: 177 additions & 33 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ type recordingState struct {
//
// The spans are not maintained in a particular order.
finishedChildren []tracingpb.RecordedSpan

// childrenMetadata is a mapping from operation to the aggregated metadata of
// that operation.
//
// When a child of this span is Finish()ed, it updates the map with all the
// children in its Recording. childrenMetadata therefore provides a bucketed
// view of the various operations that are being traced as part of a span.
childrenMetadata map[string]tracingpb.OperationMetadata
}

// makeSizeLimitedBuffer creates a sizeLimitedBuffer.
Expand Down Expand Up @@ -379,6 +387,9 @@ func (s *crdbSpan) getRecordingImpl(

// getVerboseRecording returns the Span's recording, including its children.
//
// The root of the returned Recording contains the recursively collected
// OperationMetadata of all children (finished and open) in the recording.
//
// finishing indicates whether s is in the process of finishing. If it isn't,
// the recording will include an "_unfinished" tag.
func (s *crdbSpan) getVerboseRecording(
Expand All @@ -395,14 +406,51 @@ func (s *crdbSpan) getVerboseRecording(
result = append(result, s.getRecordingNoChildrenLocked(tracingpb.RecordingVerbose, finishing))
result = append(result, s.mu.recording.finishedChildren...)

for _, child := range s.mu.openChildren {
if child.collectRecording || includeDetachedChildren {
sp := child.Span.i.crdb
result = append(result, sp.getVerboseRecording(includeDetachedChildren, false /* finishing */)...)
// We recurse on s' open children to get their verbose recordings, and to
// aggregate OperationMetadata from s' open children, and their children.
openChildrenMetadata := make(map[string]tracingpb.OperationMetadata)
for _, openChild := range s.mu.openChildren {
if openChild.collectRecording || includeDetachedChildren {
openChildSp := openChild.Span.i.crdb
openChildRecording := openChildSp.getVerboseRecording(includeDetachedChildren, false /* finishing */)

// Record an entry for openChilds' OperationMetadata.
prevMetadata := openChildrenMetadata[openChildSp.operation]
openChildSp.mu.Lock()
prevMetadata.Count++
prevMetadata.Duration += timeutil.Since(s.startTime)
prevMetadata.ContainsUnfinished = true
openChildrenMetadata[openChildSp.operation] = prevMetadata

// Copy over the OperationMetadata collected recursively from openChilds'
// children.
for opName, metadata := range openChildRecording[0].ChildrenMetadata {
openChildrenMetadata[opName] = openChildrenMetadata[opName].Combine(
tracingpb.OperationMetadata{
Count: metadata.Count,
Duration: metadata.Duration,
ContainsUnfinished: metadata.ContainsUnfinished,
})
}
openChildSp.mu.Unlock()
result = append(result, openChildRecording...)
}
}

s.mu.Unlock()

// Copy over the OperationMetadata collected recursively from s' open children
// into the root of the recording.
if len(openChildrenMetadata) != 0 {
rootSpan := &result[0]
if rootSpan.ChildrenMetadata == nil {
rootSpan.ChildrenMetadata = make(map[string]tracingpb.OperationMetadata)
}
for opName, metadata := range openChildrenMetadata {
rootSpan.ChildrenMetadata[opName] = rootSpan.ChildrenMetadata[opName].Combine(metadata)
}
}

// Sort the spans by StartTime, except the first Span (the root of this
// recording) which stays in place.
toSort := sortPool.Get().(*tracingpb.Recording) // avoids allocations in sort.Sort
Expand All @@ -419,6 +467,8 @@ func (s *crdbSpan) getVerboseRecording(
// nil if there are no structured events. If not nil, the Recording will have
// exactly one span corresponding to the receiver, will all events handing from
// this span (even if the events had been recorded on different spans).
// This span will also have a `childrenMetadata` map that will contain an entry
// for all children in s' Recording.
//
// The caller does not take ownership of the events.
func (s *crdbSpan) getStructuredRecording(includeDetachedChildren bool) tracingpb.Recording {
Expand All @@ -430,42 +480,61 @@ func (s *crdbSpan) getStructuredRecording(includeDetachedChildren bool) tracingp
buffer = append(buffer, &c.StructuredRecords[i])
}
}

openChildrenMetadata := make(map[string]tracingpb.OperationMetadata)
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
buffer = sp.getStructuredEventsRecursively(buffer, includeDetachedChildren)
}
}

if len(buffer) == 0 && s.mu.recording.structured.Len() == 0 {
// Optimize out the allocations below.
return nil
// finishedChildren have already copied their metadata entries into s on
// Finish(). These will be picked up in `getRecordingNoChildrenLocked`
// below.
//
// For open children, we need to recurse and fetch the metadata from their
// children.
sp.getChildrenMetadataRecursively(openChildrenMetadata, includeDetachedChildren)
}
}

res := s.getRecordingNoChildrenLocked(
tracingpb.RecordingStructured,
false, // finishing - since we're only asking for the structured recording, the argument doesn't matter
)
// If necessary, grow res.StructuredRecords to have space for buffer.
var reservedSpace []tracingpb.StructuredRecord
if cap(res.StructuredRecords)-len(res.StructuredRecords) < len(buffer) {
// res.StructuredRecords does not have enough capacity to accommodate the
// elements of buffer. We allocate a new, larger array and copy over the old
// entries.
old := res.StructuredRecords
res.StructuredRecords = make([]tracingpb.StructuredRecord, len(old)+len(buffer))
copy(res.StructuredRecords, old)
reservedSpace = res.StructuredRecords[len(old):]
} else {
// res.StructuredRecords has enough capacity for buffer. We extend it in
// place.
oldLen := len(res.StructuredRecords)
res.StructuredRecords = res.StructuredRecords[:oldLen+len(buffer)]
reservedSpace = res.StructuredRecords[oldLen:]

if len(buffer) != 0 || s.mu.recording.structured.Len() != 0 {
// If necessary, grow res.StructuredRecords to have space for buffer.
var reservedSpace []tracingpb.StructuredRecord
if cap(res.StructuredRecords)-len(res.StructuredRecords) < len(buffer) {
// res.StructuredRecords does not have enough capacity to accommodate the
// elements of buffer. We allocate a new, larger array and copy over the old
// entries.
old := res.StructuredRecords
res.StructuredRecords = make([]tracingpb.StructuredRecord, len(old)+len(buffer))
copy(res.StructuredRecords, old)
reservedSpace = res.StructuredRecords[len(old):]
} else {
// res.StructuredRecords has enough capacity for buffer. We extend it in
// place.
oldLen := len(res.StructuredRecords)
res.StructuredRecords = res.StructuredRecords[:oldLen+len(buffer)]
reservedSpace = res.StructuredRecords[oldLen:]
}
for i, e := range buffer {
reservedSpace[i] = *e
}
}
for i, e := range buffer {
reservedSpace[i] = *e

// If s had any open children we must capture their metadata in res as well.
if len(openChildrenMetadata) != 0 {
if res.ChildrenMetadata == nil {
res.ChildrenMetadata = make(map[string]tracingpb.OperationMetadata)
}
for opName, metadata := range openChildrenMetadata {
res.ChildrenMetadata[opName] = res.ChildrenMetadata[opName].Combine(metadata)
}
}

return tracingpb.Recording{res}
}

Expand Down Expand Up @@ -497,6 +566,8 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
return
}

rootChild := &childRecording[0]

// Depending on the type of recording, we either keep all the information
// received, or only the structured events.
switch s.recordingType() {
Expand All @@ -505,7 +576,7 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
// usually already the case, except with DistSQL traces where remote
// processors run in spans that FollowFrom an RPC Span that we don't
// collect.
childRecording[0].ParentSpanID = s.spanID
rootChild.ParentSpanID = s.spanID

if len(s.mu.recording.finishedChildren)+len(childRecording) <= maxRecordedSpansPerTrace {
s.mu.recording.finishedChildren = append(s.mu.recording.finishedChildren, childRecording...)
Expand All @@ -516,17 +587,47 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording tracingpb.Recordi
// records by falling through.
fallthrough
case tracingpb.RecordingStructured:
for ci := range childRecording {
child := &childRecording[ci]
for i := range child.StructuredRecords {
s.recordInternalLocked(&child.StructuredRecords[i], &s.mu.recording.structured)
}
if len(childRecording) != 1 {
panic(fmt.Sprintf("RecordingStructured has %d recordings; expected 1", len(childRecording)))
}

for i := range rootChild.StructuredRecords {
s.recordInternalLocked(&rootChild.StructuredRecords[i], &s.mu.recording.structured)
}
case tracingpb.RecordingOff:
break
default:
panic(fmt.Sprintf("unrecognized recording mode: %v", s.recordingType()))
}

// Update s' FinishedChildrenMetadata to capture all the spans in `childRecording`.
//
// As an example where we are done finishing `child`:
//
// parent
// child (finished_C: 4s, finished_D: 3s)
// open_A (finished_B: 1s)
// finished_B
// finished_C (finished_D: 3s)
// finished_D
//
// `parent` will have:
// {child: 10s, finished_C: 4s, finished_D: 3s, open_A: 3s, finished_B: 1s}
//
// Record finished rootChilds' metadata.
s.mu.recording.childrenMetadata[rootChild.Operation] = s.mu.recording.childrenMetadata[rootChild.Operation].Combine(
tracingpb.OperationMetadata{
Count: 1,
Duration: rootChild.Duration,
ContainsUnfinished: false,
})
// Record the metadata of rootChilds' children (finished and open).
//
// GetRecording(...) is responsible for recursively capturing the metadata for
// rootChilds' open and finished children.
for childOp, metadata := range rootChild.ChildrenMetadata {
s.mu.recording.childrenMetadata[childOp] = s.mu.recording.childrenMetadata[childOp].Combine(metadata)
}
}

func (s *crdbSpan) setTagLocked(key string, value attribute.Value) {
Expand Down Expand Up @@ -720,6 +821,42 @@ func (s *crdbSpan) getStructuredEventsRecursively(
return buffer
}

func (s *crdbSpan) getChildrenMetadataRecursively(
childrenMetadata map[string]tracingpb.OperationMetadata, includeDetachedChildren bool,
) {
s.mu.Lock()
defer s.mu.Unlock()

// Record an entry for s' metadata.
prevMetadata := childrenMetadata[s.operation]
prevMetadata.Count++
if s.mu.duration == -1 {
prevMetadata.Duration += timeutil.Since(s.startTime)
prevMetadata.ContainsUnfinished = true
} else {
prevMetadata.Duration += s.mu.duration
}
childrenMetadata[s.operation] = prevMetadata

// Copy over s' Finish()ed children metadata.
for opName, metadata := range s.mu.recording.childrenMetadata {
childrenMetadata[opName] = childrenMetadata[opName].Combine(
tracingpb.OperationMetadata{
Count: metadata.Count,
Duration: metadata.Duration,
ContainsUnfinished: metadata.ContainsUnfinished,
})
}

// For each of s' open children, recurse to collect their metadata.
for _, c := range s.mu.openChildren {
if c.collectRecording || includeDetachedChildren {
sp := c.Span.i.crdb
sp.getChildrenMetadataRecursively(childrenMetadata, includeDetachedChildren)
}
}
}

func (s *crdbSpan) getStructuredEventsLocked(
buffer []*tracingpb.StructuredRecord,
) []*tracingpb.StructuredRecord {
Expand Down Expand Up @@ -801,6 +938,13 @@ func (s *crdbSpan) getRecordingNoChildrenLocked(
}
}

if numFinishedChildrenMetadata := len(s.mu.recording.childrenMetadata); numFinishedChildrenMetadata != 0 {
rs.ChildrenMetadata = make(map[string]tracingpb.OperationMetadata)
for childOp, metadata := range s.mu.recording.childrenMetadata {
rs.ChildrenMetadata[childOp] = metadata
}
}

if wantTags {
if s.logTags != nil {
setLogTags(s.logTags.Get(), func(remappedKey string, tag *logtags.Tag) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,9 @@ func (sp *Span) reset(
openChildren: h.childrenAlloc[:0],
goroutineID: goroutineID,
recording: recordingState{
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */),
structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]),
childrenMetadata: make(map[string]tracingpb.OperationMetadata),
},
tags: h.tagsAlloc[:0],
}
Expand Down
Loading

0 comments on commit b31ca53

Please sign in to comment.