diff --git a/pkg/server/node_tenant_test.go b/pkg/server/node_tenant_test.go index 26cb2d5900b0..1a400cbcc8cd 100644 --- a/pkg/server/node_tenant_test.go +++ b/pkg/server/node_tenant_test.go @@ -94,20 +94,21 @@ func TestRedactRecordingForTenant(t *testing.T) { // that may leak from the KV layer to tenants. If it does, update // redactRecordingForTenant appropriately. type calcifiedRecordedSpan struct { - TraceID tracingpb.TraceID - SpanID tracingpb.SpanID - ParentSpanID tracingpb.SpanID - Operation string - Tags map[string]string - StartTime time.Time - Duration time.Duration - RedactableLogs bool - Logs []tracingpb.LogRecord - Verbose bool - RecordingMode tracingpb.RecordingMode - GoroutineID uint64 - Finished bool - StructuredRecords []tracingpb.StructuredRecord + TraceID tracingpb.TraceID + SpanID tracingpb.SpanID + ParentSpanID tracingpb.SpanID + Operation string + Tags map[string]string + StartTime time.Time + Duration time.Duration + RedactableLogs bool + Logs []tracingpb.LogRecord + Verbose bool + RecordingMode tracingpb.RecordingMode + GoroutineID uint64 + Finished bool + StructuredRecords []tracingpb.StructuredRecord + FinishedChildrenDurations map[string]time.Duration } _ = (*calcifiedRecordedSpan)((*tracingpb.RecordedSpan)(nil)) }) diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index e8f97768ee89..50bb6e760a3e 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -115,6 +115,13 @@ type crdbSpanMu struct { // The spans are not maintained in a particular order. openChildren []childRef + // finishedChildrenDurations maintains a mapping from operation to duration of + // all the Finish()ed children in the Recording rooted at this span. + // + // This map will also contain entries for all Finish()ed children in a remote + // recording imported into this span. + finishedChildrenDurations map[string]time.Duration + recording recordingState // tags are a list of key/value pairs associated with the span through @@ -401,12 +408,11 @@ func (s *crdbSpan) getVerboseRecording(includeDetachedChildren bool, finishing b return result } -// getStructuredRecording returns the structured events in this span and -// in all the children. The results are returned as a Recording for the caller's -// convenience (and for optimizing memory allocations). The Recording will be -// nil if there are no structured events. If not nil, the Recording will have -// exactly one span corresponding to the receiver, will all events handing from -// this span (even if the events had been recorded on different spans). +// getStructuredRecording returns a Recording with exactly one span (for +// optimizing memory allocations) corresponding to the receiver. This span will +// contain all the structured events in this span and in all its children. +// This span will notably also have a `finishedChildrenDurations` map that will +// contain an entry for all Finish()ed children in s' Recording. // // The caller does not take ownership of the events. func (s *crdbSpan) getStructuredRecording(includeDetachedChildren bool) Recording { @@ -418,42 +424,60 @@ func (s *crdbSpan) getStructuredRecording(includeDetachedChildren bool) Recordin buffer = append(buffer, &c.StructuredRecords[i]) } } + + // finishedChildren have already copied their `finishedChildrenDurations` + // entries into s' on Finish(). + // + // For open children, we need to recurse and copy all the entries that + // correspond to *their* Finish()ed children. + finishedChildrenDurations := make(map[string]time.Duration) for _, c := range s.mu.openChildren { if c.collectRecording || includeDetachedChildren { sp := c.Span.i.crdb buffer = sp.getStructuredEventsRecursively(buffer, includeDetachedChildren) + sp.getFinishedChildrenDurationsRecursively(finishedChildrenDurations, includeDetachedChildren) } } - if len(buffer) == 0 && s.mu.recording.structured.Len() == 0 { - // Optimize out the allocations below. - return nil - } - res := s.getRecordingNoChildrenLocked( RecordingStructured, false, // finishing - since we're only asking for the structured recording, the argument doesn't matter ) - // If necessary, grow res.StructuredRecords to have space for buffer. - var reservedSpace []tracingpb.StructuredRecord - if cap(res.StructuredRecords)-len(res.StructuredRecords) < len(buffer) { - // res.StructuredRecords does not have enough capacity to accommodate the - // elements of buffer. We allocate a new, larger array and copy over the old - // entries. - old := res.StructuredRecords - res.StructuredRecords = make([]tracingpb.StructuredRecord, len(old)+len(buffer)) - copy(res.StructuredRecords, old) - reservedSpace = res.StructuredRecords[len(old):] - } else { - // res.StructuredRecords has enough capacity for buffer. We extend it in - // place. - oldLen := len(res.StructuredRecords) - res.StructuredRecords = res.StructuredRecords[:oldLen+len(buffer)] - reservedSpace = res.StructuredRecords[oldLen:] + + if len(buffer) != 0 || s.mu.recording.structured.Len() != 0 { + // If necessary, grow res.StructuredRecords to have space for buffer. + var reservedSpace []tracingpb.StructuredRecord + if cap(res.StructuredRecords)-len(res.StructuredRecords) < len(buffer) { + // res.StructuredRecords does not have enough capacity to accommodate the + // elements of buffer. We allocate a new, larger array and copy over the old + // entries. + old := res.StructuredRecords + res.StructuredRecords = make([]tracingpb.StructuredRecord, len(old)+len(buffer)) + copy(res.StructuredRecords, old) + reservedSpace = res.StructuredRecords[len(old):] + } else { + // res.StructuredRecords has enough capacity for buffer. We extend it in + // place. + oldLen := len(res.StructuredRecords) + res.StructuredRecords = res.StructuredRecords[:oldLen+len(buffer)] + reservedSpace = res.StructuredRecords[oldLen:] + } + for i, e := range buffer { + reservedSpace[i] = *e + } } - for i, e := range buffer { - reservedSpace[i] = *e + + // If any of the open children had `finishedChildrenDurations` to report, we + // need to copy those entries into res. + if len(finishedChildrenDurations) != 0 { + if res.FinishedChildrenDurations == nil { + res.FinishedChildrenDurations = make(map[string]time.Duration) + } + for operation, duration := range finishedChildrenDurations { + res.FinishedChildrenDurations[operation] += duration + } } + return Recording{res} } @@ -503,10 +527,53 @@ func (s *crdbSpan) recordFinishedChildrenLocked(childRecording Recording) { } } case RecordingOff: - break + return default: panic(fmt.Sprintf("unrecognized recording mode: %v", s.recordingType())) } + + // We need to update s' finishedChildrenDurations to capture all the + // Finish()ed spans in this Recording. + // + // 1) Updating the map with an entry for the root of the recording. + // + // 2) Copying all the entries from the root of the recording into s' map. These + // entries capture the Finish()ed children of the root of the recording. + // + // 3) Copying over the maps of all unfinished (open) children in the recording. + // These maps capture the Finish()ed children of the open children in the + // recording. These open children are considered "orphaned" and will be + // promoted to root spans in the registry but their Finish()ed children + // should be captured since they contributed to the recording's overall + // duration. + // + // Note, recordFinishedChildrenLocked is called with either a Verbose or a + // Structured recording. In the case of a StructuredRecording we have already + // completed step 3) when fetching the Recording in GetRecording(...). + // + //As an exanple where we are done finishing `child`: + // + //parent + //child (finished_C: 4s, finished_D: 3s) + //open_A (finished_B: 1s) + //finished_B + //finished_C (finished_D: 3s) + //finished_D + // + //We'd expect `parent` to have: + //{child: 10s, finished_C: 4s, finished_D: 3s, finished_B: 1s} + root := childRecording[0] + s.mu.finishedChildrenDurations[root.Operation] += root.Duration + for operation, duration := range root.FinishedChildrenDurations { + s.mu.finishedChildrenDurations[operation] += duration + } + for _, rec := range childRecording[1:] { + if !rec.Finished { + for operation, duration := range rec.FinishedChildrenDurations { + s.mu.finishedChildrenDurations[operation] += duration + } + } + } } func (s *crdbSpan) setTagLocked(key string, value attribute.Value) { @@ -657,6 +724,28 @@ func (s *crdbSpan) getStructuredEventsRecursively( return buffer } +func (s *crdbSpan) getFinishedChildrenDurationsRecursively( + finishedChildrenDurations map[string]time.Duration, includeDetachedChildren bool, +) { + s.mu.Lock() + defer s.mu.Unlock() + s.getFinishedChildrenDurationsLocked(finishedChildrenDurations) + for _, c := range s.mu.openChildren { + if c.collectRecording || includeDetachedChildren { + sp := c.Span.i.crdb + sp.getFinishedChildrenDurationsRecursively(finishedChildrenDurations, includeDetachedChildren) + } + } +} + +func (s *crdbSpan) getFinishedChildrenDurationsLocked( + finishedChildrenDurations map[string]time.Duration, +) { + for operation, duration := range s.mu.finishedChildrenDurations { + finishedChildrenDurations[operation] += duration + } +} + func (s *crdbSpan) getStructuredEventsLocked( buffer []*tracingpb.StructuredRecord, ) []*tracingpb.StructuredRecord { @@ -738,6 +827,13 @@ func (s *crdbSpan) getRecordingNoChildrenLocked( } } + if numFinishedChildrenDuration := len(s.mu.finishedChildrenDurations); numFinishedChildrenDuration != 0 { + rs.FinishedChildrenDurations = make(map[string]time.Duration) + for operation, duration := range s.mu.finishedChildrenDurations { + rs.FinishedChildrenDurations[operation] = duration + } + } + if wantTags { if s.logTags != nil { setLogTags(s.logTags.Get(), func(remappedKey string, tag *logtags.Tag) { diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index dbd7f3a19345..71b18b498cfa 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -635,7 +635,8 @@ func (sp *Span) reset( logs: makeSizeLimitedBuffer(maxLogBytesPerSpan, nil /* scratch */), structured: makeSizeLimitedBuffer(maxStructuredBytesPerSpan, h.structuredEventsAlloc[:]), }, - tags: h.tagsAlloc[:0], + tags: h.tagsAlloc[:0], + finishedChildrenDurations: make(map[string]time.Duration), } if kind != oteltrace.SpanKindUnspecified { diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index aa85234ae7a0..764b695f22a5 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -657,6 +657,49 @@ func TestStructureRecording(t *testing.T) { } } +// TestStructuredRecordingFinishedChildrenDurations tests that the +// StructuredRecording of a span includes the `operation : duration` mapping of +// all finished children in that recording. +func TestStructuredRecordingFinishedChildrenDurations(t *testing.T) { + checkFinishedChildrenDurations := func(durations map[string]time.Duration, + expectedOperations []string) { + t.Helper() + require.Len(t, durations, len(expectedOperations)) + for _, operation := range expectedOperations { + dur, ok := durations[operation] + require.True(t, ok) + require.Greater(t, dur, time.Duration(0)) + } + } + tr := NewTracer() + root := tr.StartSpan("root", WithRecording(RecordingStructured)) + child := tr.StartSpan("child", WithParent(root)) + gc := tr.StartSpan("grandchild", WithParent(child)) + child2 := tr.StartSpan("child2", WithParent(root)) + + gc.Finish() + + // child has no StructuredEvents, but it should return information about its + // Finish()ed children. + rec := child.GetConfiguredRecording() + require.Len(t, rec, 1) + checkFinishedChildrenDurations(rec[0].FinishedChildrenDurations, []string{"grandchild"}) + + // Now, let's Finish() child2, but leave child as an open child of root. + child2.Finish() + + // Root should only have entries for Finish()ed spans in the recording. + rec = root.GetConfiguredRecording() + require.Len(t, rec, 1) + checkFinishedChildrenDurations(rec[0].FinishedChildrenDurations, []string{"child2", "grandchild"}) + + // Finish child, and re-check root's recording. + child.Finish() + rec = root.FinishAndGetConfiguredRecording() + require.Len(t, rec, 1) + checkFinishedChildrenDurations(rec[0].FinishedChildrenDurations, []string{"child", "child2", "grandchild"}) +} + // Test that a child span that's still open at the time when // parent.FinishAndGetRecording() is called is included in the parent's // recording. @@ -700,3 +743,55 @@ func TestWithRemoteParentFromTraceInfo(t *testing.T) { otelCtx := sp.i.otelSpan.SpanContext() require.Equal(t, oteltrace.TraceID(otelTraceID), otelCtx.TraceID()) } + +// TestChildrenDurationsOnFinish tests that on Finish() the parent span's +// `finishedChildrenDurations` map is populated with all finished children in +// the recording. +func TestChildrenDurationsOnFinish(t *testing.T) { + checkFinishedChildrenDurations := func(sp *Span, expectedOperations []string) { + t.Helper() + sp.i.crdb.mu.Lock() + defer sp.i.crdb.mu.Unlock() + + durations := sp.i.crdb.mu.finishedChildrenDurations + require.Len(t, durations, len(expectedOperations)) + for _, operation := range expectedOperations { + dur, ok := durations[operation] + require.True(t, ok) + require.Greater(t, dur, time.Duration(0)) + } + } + + tr := NewTracer() + root := tr.StartSpan("root", WithRecording(RecordingVerbose)) + child := tr.StartSpan("child", WithParent(root)) + + gc1 := tr.StartSpan("grandchild1", WithParent(child)) + ggc1 := tr.StartSpan("greatgrandchild1", WithParent(gc1)) + + gc2 := tr.StartSpan("grandchild2", WithParent(child)) + ggc2 := tr.StartSpan("greatgrandchild2", WithParent(gc2)) + + // First let's Finish() the lowest children. This should mean that their + // parents have entries for their duration. + ggc1.Finish() + checkFinishedChildrenDurations(gc1, []string{"greatgrandchild1"}) + + ggc2.Finish() + checkFinishedChildrenDurations(gc2, []string{"greatgrandchild2"}) + + // Finish() one of the grand children. + gc1.Finish() + checkFinishedChildrenDurations(child, []string{"grandchild1", "greatgrandchild1"}) + + // Now Finish() `child` since it has both finished and open children at this + // point. We expect to see entries for all finished spans in the recording, + // but not the open child gc2. + child.Finish() + checkFinishedChildrenDurations(root, []string{"child", "grandchild1", "greatgrandchild1", "greatgrandchild2"}) + + // gc2 should now be a root span in the registry since it is orphaned. + gc2.Finish() + checkFinishedChildrenDurations(root, []string{"child", "grandchild1", "greatgrandchild1", "greatgrandchild2"}) + root.Finish() +} diff --git a/pkg/util/tracing/tracer_test.go b/pkg/util/tracing/tracer_test.go index e1e9c04fb6a7..999478da2ff9 100644 --- a/pkg/util/tracing/tracer_test.go +++ b/pkg/util/tracing/tracer_test.go @@ -85,7 +85,12 @@ func TestTracerRecording(t *testing.T) { } // Initial recording of this fresh (real) span. - require.Nil(t, s1.GetRecording(RecordingStructured)) + rec := s1.GetRecording(RecordingStructured) + require.Len(t, rec, 1) + require.Nil(t, rec[0].FinishedChildrenDurations) + require.Nil(t, rec[0].StructuredRecords) + require.Nil(t, rec[0].Logs) + require.Nil(t, rec[0].Tags) s1.RecordStructured(&types.Int32Value{Value: 5}) if err := CheckRecording(s1.GetRecording(RecordingStructured), ` diff --git a/pkg/util/tracing/tracingpb/recorded_span.proto b/pkg/util/tracing/tracingpb/recorded_span.proto index 67616ce3799a..d93ca4c64c32 100644 --- a/pkg/util/tracing/tracingpb/recorded_span.proto +++ b/pkg/util/tracing/tracingpb/recorded_span.proto @@ -103,7 +103,13 @@ message RecordedSpan { // DeprecatedInternalStructured only stores the Payloads. repeated StructuredRecord structured_records = 14 [(gogoproto.nullable) = false]; + // FinishedChildrenDurations is a mapping from the operation to the duration + // of all finished children of this span. + map finished_children_durations = 18 [(gogoproto.castvalue) = "time.Duration"]; + reserved 5,10,11; + + // Next ID: 19 } // NormalizedSpan is a representation of a RecordedSpan from a trace with all