Skip to content

Commit

Permalink
Merge #81020
Browse files Browse the repository at this point in the history
81020: tracing: tighten the API to import remote traces r=adityamaru a=adityamaru

This change is a cosmetic one. It changes what was previously
called `ImportRemoteSpans` to `ImportRemoteRecording`. This is
because, the former suggests that the method can be used to
import spans from disjointed Recordings but this is not the case. We
subsume the remote spans into the receiving span as its children,
and while doing so we assume that they all belong to the same recording
with the root as the first element of the imported slice.

This change will make some of the work being done for #80391 easier
to reason about.

Release note: None

Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed May 15, 2022
2 parents 3d433e1 + 407f220 commit bc5f5b7
Show file tree
Hide file tree
Showing 14 changed files with 49 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/debug_send_kv_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func sendKVBatchRequestWithTracingOption(

if sp != nil {
// Import the remotely collected spans, if any.
sp.ImportRemoteSpans(br.CollectedSpans)
sp.ImportRemoteRecording(br.CollectedSpans)

// Extract the recording.
rec = sp.GetRecording(tracing.RecordingVerbose)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (gt *grpcTransport) sendBatch(
return nil, errors.Errorf(
"trying to ingest remote spans but there is no recording span set up")
}
span.ImportRemoteSpans(reply.CollectedSpans)
span.ImportRemoteRecording(reply.CollectedSpans)
}
}
return reply, err
Expand Down Expand Up @@ -352,7 +352,7 @@ func (s *senderTransport) SendNext(
if span == nil {
panic("trying to ingest remote spans but there is no recording span set up")
}
span.ImportRemoteSpans(br.CollectedSpans)
span.ImportRemoteRecording(br.CollectedSpans)
}

return br, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ func delegateSnapshot(
"trying to ingest remote spans but there is no recording span set up",
)
} else {
span.ImportRemoteSpans(resp.CollectedSpans)
span.ImportRemoteRecording(resp.CollectedSpans)
}
}
switch resp.SnapResponse.Status {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func setupTraces(t1, t2 *tracing.Tracer) (tracingpb.TraceID, func()) {

// Start another remote child span on "node 2" that we finish.
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithRemoteParentFromSpanMeta(child.Meta()))
child.ImportRemoteSpans(childRemoteChildFinished.FinishAndGetRecording(tracing.RecordingVerbose))
child.ImportRemoteRecording(childRemoteChildFinished.FinishAndGetRecording(tracing.RecordingVerbose))

// Start another remote child span on "node 2" that we finish. This will have
// a different trace_id from the spans created above.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra
}
if len(meta.TraceData) > 0 {
if span := tracing.SpanFromContext(r.ctx); span != nil {
span.ImportRemoteSpans(meta.TraceData)
span.ImportRemoteRecording(meta.TraceData)
}
var ev roachpb.ContentionEvent
for i := range meta.TraceData {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/tablereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestLimitScans(t *testing.T) {

// Simulate what the DistSQLReceiver does and ingest the trace.
if meta != nil && len(meta.TraceData) > 0 {
sp.ImportRemoteSpans(meta.TraceData)
sp.ImportRemoteRecording(meta.TraceData)
}

if row == nil && meta == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func setupTraces(t1, t2 *tracing.Tracer) (tracingpb.TraceID, tracingpb.TraceID,

// Start another remote child span on "node 2" that we finish.
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithRemoteParentFromSpanMeta(child.Meta()))
child.ImportRemoteSpans(childRemoteChildFinished.FinishAndGetRecording(tracing.RecordingVerbose))
child.ImportRemoteRecording(childRemoteChildFinished.FinishAndGetRecording(tracing.RecordingVerbose))

// Start a root span on "node 2".
root2 := t2.StartSpan("root2", tracing.WithRecording(tracing.RecordingVerbose))
Expand Down
32 changes: 17 additions & 15 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,45 +457,47 @@ func (s *crdbSpan) getStructuredRecording(includeDetachedChildren bool) Recordin
return Recording{res}
}

// recordFinishedChildren adds children to s' recording.
// recordFinishedChildren adds the spans in childRecording to s' recording.
//
// s takes ownership of children; the caller is not allowed to use them anymore.
func (s *crdbSpan) recordFinishedChildren(children []tracingpb.RecordedSpan) {
if len(children) == 0 {
// s takes ownership of childRecording; the caller is not allowed to use them anymore.
func (s *crdbSpan) recordFinishedChildren(childRecording Recording) {
if len(childRecording) == 0 {
return
}

s.mu.Lock()
defer s.mu.Unlock()
s.recordFinishedChildrenLocked(children)
s.recordFinishedChildrenLocked(childRecording)
}

// s takes ownership of children; the caller is not allowed to use them anymore.
func (s *crdbSpan) recordFinishedChildrenLocked(children []tracingpb.RecordedSpan) {
if len(children) == 0 {
// s takes ownership of childRecording; the caller is not allowed to use them
// anymore.
func (s *crdbSpan) recordFinishedChildrenLocked(childRecording Recording) {
if len(childRecording) == 0 {
return
}

// Depending on the type of recording, we either keep all the information
// received, or only the structured events.
switch s.recordingType() {
case RecordingVerbose:
// Change the root of the remote recording to be a child of this Span. This is
// Change the root of the recording to be a child of this Span. This is
// usually already the case, except with DistSQL traces where remote
// processors run in spans that FollowFrom an RPC Span that we don't collect.
children[0].ParentSpanID = s.spanID
// processors run in spans that FollowFrom an RPC Span that we don't
// collect.
childRecording[0].ParentSpanID = s.spanID

if len(s.mu.recording.finishedChildren)+len(children) <= maxRecordedSpansPerTrace {
s.mu.recording.finishedChildren = append(s.mu.recording.finishedChildren, children...)
if len(s.mu.recording.finishedChildren)+len(childRecording) <= maxRecordedSpansPerTrace {
s.mu.recording.finishedChildren = append(s.mu.recording.finishedChildren, childRecording...)
break
}

// We don't have space for this recording. Let's collect just the structured
// records by falling through.
fallthrough
case RecordingStructured:
for ci := range children {
child := &children[ci]
for ci := range childRecording {
child := &childRecording[ci]
for i := range child.StructuredRecords {
s.recordInternalLocked(&child.StructuredRecords[i], &s.mu.recording.structured)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/grpc_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestGRPCInterceptors(t *testing.T) {
var rec tracingpb.RecordedSpan
require.NoError(t, types.UnmarshalAny(recAny, &rec))
require.Len(t, rec.StructuredRecords, 1)
sp.ImportRemoteSpans([]tracingpb.RecordedSpan{rec})
sp.ImportRemoteRecording([]tracingpb.RecordedSpan{rec})
var n int
finalRecs := sp.FinishAndGetRecording(tracing.RecordingVerbose)
for _, rec := range finalRecs {
Expand Down
12 changes: 7 additions & 5 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,14 @@ func (sp *Span) GetConfiguredRecording() Recording {
return sp.i.GetRecording(recType, false /* finishing */)
}

// ImportRemoteSpans adds RecordedSpan data to the recording of the given Span;
// these spans will be part of the result of GetRecording. Used to import
// recorded traces from other nodes.
func (sp *Span) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) {
// ImportRemoteRecording adds the spans in remoteRecording as children of the
// receiver. As a result of this, the imported recording will be a part of the
// GetRecording() output for the receiver.
//
// This function is used to import a recording from another node.
func (sp *Span) ImportRemoteRecording(remoteRecording Recording) {
if !sp.detectUseAfterFinish() {
sp.i.ImportRemoteSpans(remoteSpans)
sp.i.ImportRemoteRecording(remoteRecording)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/util/tracing/span_inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func (s *spanInner) GetRecording(recType RecordingType, finishing bool) Recordin
return s.crdb.GetRecording(recType, finishing)
}

func (s *spanInner) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) {
s.crdb.recordFinishedChildren(remoteSpans)
func (s *spanInner) ImportRemoteRecording(remoteRecording []tracingpb.RecordedSpan) {
s.crdb.recordFinishedChildren(remoteRecording)
}

func (s *spanInner) Finish() {
Expand Down
11 changes: 5 additions & 6 deletions pkg/util/tracing/span_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,17 @@ type remoteParent SpanMeta
// For the purposes of trace recordings, there's no mechanism ensuring that the
// child's recording will be passed to the parent span. When that's desired, it
// has to be done manually by calling Span.GetRecording() and propagating the
// result to the parent by calling Span.ImportRemoteSpans().
// result to the parent by calling Span.ImportRemoteRecording().
//
// The canonical use case for this is around RPC boundaries, where a server
// handling a request wants to create a child span descending from a parent on a
// remote machine.
//
// node 1 (network) node 2
// node 1 (network) node 2
// --------------------------------------------------------------------------
// Span.Meta() ----------> sp2 := Tracer.StartSpan(
// WithRemoteParentFromSpanMeta(.))
// doSomething(sp2)
// Span.ImportRemoteSpans(.) <---------- sp2.FinishAndGetRecording()
// Span.Meta() ----------> sp2 := Tracer.StartSpan(WithRemoteParentFromSpanMeta(.))
// doSomething(sp2)
// Span.ImportRemoteRecording(.) <---------- sp2.FinishAndGetRecording()
//
// By default, the child span is derived using a ChildOf relationship, which
// corresponds to the expectation that the parent span will usually wait for the
Expand Down
12 changes: 6 additions & 6 deletions pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestRecordingString(t *testing.T) {
remoteChild.Record("remote child 1")

remoteRec := remoteChild.FinishAndGetRecording(RecordingVerbose)
root.ImportRemoteSpans(remoteRec)
root.ImportRemoteRecording(remoteRec)

root.Record("root 3")

Expand Down Expand Up @@ -162,7 +162,7 @@ func TestRecordingInRecording(t *testing.T) {
// have to be imported into the parent manually (this would usually happen via
// code at the RPC boundaries).
grandChild := tr.StartSpan("grandchild", WithParent(child), WithDetachedRecording())
child.ImportRemoteSpans(grandChild.FinishAndGetRecording(RecordingVerbose))
child.ImportRemoteRecording(grandChild.FinishAndGetRecording(RecordingVerbose))
childRec := child.FinishAndGetRecording(RecordingVerbose)
require.NoError(t, CheckRecordedSpans(childRec, `
span: child
Expand All @@ -189,7 +189,7 @@ func TestRecordingInRecording(t *testing.T) {

// Verify that GetRecording propagates the structured events even when the
// receiving Span isn't verbose during import.
func TestImportRemoteSpans(t *testing.T) {
func TestImportRemoteRecording(t *testing.T) {
for _, verbose := range []bool{false, true} {
t.Run(fmt.Sprintf("%s=%t", "verbose-child=", verbose), func(t *testing.T) {
tr := NewTracerWithOpt(context.Background())
Expand All @@ -203,7 +203,7 @@ func TestImportRemoteSpans(t *testing.T) {
ch := tr.StartSpan("child", WithParent(sp), WithDetachedRecording())
ch.RecordStructured(&types.Int32Value{Value: 4})
ch.Record("foo")
sp.ImportRemoteSpans(ch.FinishAndGetRecording(RecordingVerbose))
sp.ImportRemoteRecording(ch.FinishAndGetRecording(RecordingVerbose))

if verbose {
require.NoError(t, CheckRecording(sp.FinishAndGetRecording(RecordingVerbose), `
Expand All @@ -223,15 +223,15 @@ func TestImportRemoteSpans(t *testing.T) {
}
}

func TestImportRemoteSpansMaintainsRightByteSize(t *testing.T) {
func TestImportRemoteRecordingMaintainsRightByteSize(t *testing.T) {
tr1 := NewTracer()

child := tr1.StartSpan("child", WithRecording(RecordingStructured))
child.RecordStructured(&types.Int32Value{Value: 42})
child.RecordStructured(&types.StringValue{Value: "test"})

root := tr1.StartSpan("root", WithRecording(RecordingStructured))
root.ImportRemoteSpans(child.GetRecording(RecordingStructured))
root.ImportRemoteRecording(child.GetRecording(RecordingStructured))
c := root.i.crdb
c.mu.Lock()
buf := c.mu.recording.structured
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/tracing/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func TestTracerInjectExtract(t *testing.T) {
t.Fatal(err)
}

s1.ImportRemoteSpans(rec)
s1.ImportRemoteRecording(rec)
if err := CheckRecordedSpans(s1.FinishAndGetRecording(RecordingVerbose), `
span: a
tags: _verbose=1
Expand Down Expand Up @@ -492,7 +492,7 @@ func TestTracer_VisitSpans(t *testing.T) {
childChild := tr2.StartSpan("root.child.remotechild", WithRemoteParentFromSpanMeta(child.Meta()))
childChildFinished := tr2.StartSpan("root.child.remotechilddone", WithRemoteParentFromSpanMeta(child.Meta()))
require.Len(t, tr2.activeSpansRegistry.mu.m, 2)
child.ImportRemoteSpans(childChildFinished.FinishAndGetRecording(RecordingVerbose))
child.ImportRemoteRecording(childChildFinished.FinishAndGetRecording(RecordingVerbose))
require.Len(t, tr2.activeSpansRegistry.mu.m, 1)

// All spans are part of the recording (root.child.remotechilddone was
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestSpanRecordingFinished(t *testing.T) {
tr2 := NewTracer()
childTraceInfo := child.Meta().ToProto()
remoteChildChild := tr2.StartSpan("root.child.remotechild", WithRemoteParentFromTraceInfo(&childTraceInfo))
child.ImportRemoteSpans(remoteChildChild.GetRecording(RecordingVerbose))
child.ImportRemoteRecording(remoteChildChild.GetRecording(RecordingVerbose))
remoteChildChild.Finish()

// All spans are un-finished.
Expand Down

0 comments on commit bc5f5b7

Please sign in to comment.