diff --git a/pkg/bench/ddl_analysis/ddl_analysis_bench.go b/pkg/bench/ddl_analysis/ddl_analysis_bench.go index 7fecd64d9a3e..60e74b569030 100644 --- a/pkg/bench/ddl_analysis/ddl_analysis_bench.go +++ b/pkg/bench/ddl_analysis/ddl_analysis_bench.go @@ -47,7 +47,7 @@ func RunRoundTripBenchmark(b *testing.B, tests []RoundTripBenchTestCase) { beforePlan := func(sp *tracing.Span, stmt string) { if _, ok := stmtToKvBatchRequests.Load(stmt); ok { sp.Finish() - trace := tracing.GetRecording(sp) + trace := sp.GetRecording() count := countKvBatchRequestsInRecording(trace) stmtToKvBatchRequests.Store(stmt, count) } diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index d7cd59de2a09..fe50c3e88a73 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -541,7 +541,7 @@ func backupPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { // TODO(dan): Move this span into sql. ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() if !(p.ExtendedEvalContext().TxnImplicit || backupStmt.Options.Detached) { return errors.Errorf("BACKUP cannot be used inside a transaction without DETACHED option") diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index ff84afa76948..ff67d54fc012 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -88,7 +88,7 @@ func newBackupDataProcessor( func (cp *backupDataProcessor) Run(ctx context.Context) { ctx, span := tracing.ChildSpan(ctx, "backupDataProcessor") - defer tracing.FinishSpan(span) + defer span.Finish() defer cp.output.ProducerDone() progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index ce5b50f5d6b7..29b23890df86 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -294,7 +294,7 @@ func WriteDescriptors( extra []roachpb.KeyValue, ) error { ctx, span := tracing.ChildSpan(ctx, "WriteDescriptors") - defer tracing.FinishSpan(span) + defer span.Finish() err := func() error { b := txn.NewBatch() wroteDBs := make(map[descpb.ID]catalog.DatabaseDescriptor) @@ -580,7 +580,7 @@ func restore( requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block g.GoCtx(func(ctx context.Context) error { ctx, progressSpan := tracing.ChildSpan(ctx, "progress-log") - defer tracing.FinishSpan(progressSpan) + defer progressSpan.Finish() return progressLogger.Loop(ctx, requestFinishedCh) }) diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index add5516452f8..fa05427452c5 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -1225,7 +1225,7 @@ func restorePlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { // TODO(dan): Move this span into sql. ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() if !(p.ExtendedEvalContext().TxnImplicit || restoreStmt.Options.Detached) { return errors.Errorf("RESTORE cannot be used inside a transaction without DETACHED option") diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index d8aaca756340..c1804033066a 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -95,7 +95,7 @@ func showBackupPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { // TODO(dan): Move this span into sql. ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() str, err := toFn() if err != nil { @@ -487,7 +487,7 @@ func showBackupsInCollectionPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { ctx, span := tracing.ChildSpan(ctx, backup.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() collection, err := collectionFn() if err != nil { diff --git a/pkg/ccl/backupccl/split_and_scatter_processor.go b/pkg/ccl/backupccl/split_and_scatter_processor.go index f234aee46780..1a35b8d09852 100644 --- a/pkg/ccl/backupccl/split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/split_and_scatter_processor.go @@ -158,7 +158,7 @@ type entryNode struct { // Run implements the execinfra.Processor interface. func (ssp *splitAndScatterProcessor) Run(ctx context.Context) { ctx, span := tracing.ChildSpan(ctx, "splitAndScatterProcessor") - defer tracing.FinishSpan(span) + defer span.Finish() defer ssp.output.ProducerDone() numEntries := 0 diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 28a2d5f60d96..beecd945033c 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -108,7 +108,7 @@ func changefeedPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() ok, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED) if err != nil { diff --git a/pkg/ccl/importccl/exportcsv.go b/pkg/ccl/importccl/exportcsv.go index e1fdfa63a1cd..c01d5d20b2da 100644 --- a/pkg/ccl/importccl/exportcsv.go +++ b/pkg/ccl/importccl/exportcsv.go @@ -167,7 +167,7 @@ func (sp *csvWriter) OutputTypes() []*types.T { func (sp *csvWriter) Run(ctx context.Context) { ctx, span := tracing.ChildSpan(ctx, "csvWriter") - defer tracing.FinishSpan(span) + defer span.Finish() err := func() error { typs := sp.input.OutputTypes() diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 2b4cc08dfe5e..1072c775c83f 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -77,7 +77,7 @@ func newReadImportDataProcessor( func (cp *readImportDataProcessor) Run(ctx context.Context) { ctx, span := tracing.ChildSpan(ctx, "readImportDataProcessor") - defer tracing.FinishSpan(span) + defer span.Finish() defer cp.output.ProducerDone() progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) @@ -209,7 +209,7 @@ func ingestKvs( kvCh <-chan row.KVBatch, ) (*roachpb.BulkOpSummary, error) { ctx, span := tracing.ChildSpan(ctx, "ingestKVs") - defer tracing.FinishSpan(span) + defer span.Finish() writeTS := hlc.Timestamp{WallTime: spec.WalltimeNanos} diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 9284f353b7dd..ede03475e085 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -283,7 +283,7 @@ func importPlanHook( fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { // TODO(dan): Move this span into sql. ctx, span := tracing.ChildSpan(ctx, importStmt.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() walltime := p.ExecCfg().Clock.Now().WallTime diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index da7087ce9307..4d379443a861 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -85,7 +85,7 @@ func runImport( group.GoCtx(func(ctx context.Context) error { defer close(kvCh) ctx, span := tracing.ChildSpan(ctx, "readImportFiles") - defer tracing.FinishSpan(span) + defer span.Finish() var inputs map[int32]string if spec.ResumePos != nil { // Filter out files that were completely processed. @@ -540,7 +540,7 @@ func runParallelImport( minEmited := make([]int64, parallelism) group.GoCtx(func(ctx context.Context) error { ctx, span := tracing.ChildSpan(ctx, "inputconverter") - defer tracing.FinishSpan(span) + defer span.Finish() return ctxgroup.GroupWorkers(ctx, parallelism, func(ctx context.Context, id int) error { return importer.importWorker(ctx, id, consumer, importCtx, fileCtx, minEmited) }) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index a020beb955ea..602c1dc8ee58 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -229,7 +229,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { SQLExecutor: &sql.ExecutorTestingKnobs{ WithStatementTrace: func(sp *tracing.Span, stmt string) { if stmt == historicalQuery { - recCh <- tracing.GetRecording(sp) + recCh <- sp.GetRecording() } }, }, diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 97af3bf04916..1ab8645f13a6 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -76,7 +76,7 @@ func evalExport( reply := resp.(*roachpb.ExportResponse) ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey)) - defer tracing.FinishSpan(span) + defer span.Finish() // For MVCC_All backups with no start time, they'll only be capturing the // *revisions* since the gc threshold, so noting that in the reply allows the diff --git a/pkg/ccl/storageccl/writebatch.go b/pkg/ccl/storageccl/writebatch.go index 411fe4410b84..4e87b4840c6b 100644 --- a/pkg/ccl/storageccl/writebatch.go +++ b/pkg/ccl/storageccl/writebatch.go @@ -41,7 +41,7 @@ func evalWriteBatch( ms := cArgs.Stats _, span := tracing.ChildSpan(ctx, fmt.Sprintf("WriteBatch [%s,%s)", args.Key, args.EndKey)) - defer tracing.FinishSpan(span) + defer span.Finish() if log.V(1) { log.Infof(ctx, "writebatch [%s,%s)", args.Key, args.EndKey) } diff --git a/pkg/kv/kvclient/kvcoord/range_cache.go b/pkg/kv/kvclient/kvcoord/range_cache.go index f488f6c62959..3e126247cc26 100644 --- a/pkg/kv/kvclient/kvcoord/range_cache.go +++ b/pkg/kv/kvclient/kvcoord/range_cache.go @@ -655,7 +655,7 @@ func (rdc *RangeDescriptorCache) tryLookup( var lookupRes EvictionToken if err := rdc.stopper.RunTaskWithErr(ctx, "rangecache: range lookup", func(ctx context.Context) error { ctx, reqSpan := tracing.ForkCtxSpan(ctx, "range lookup") - defer tracing.FinishSpan(reqSpan) + defer reqSpan.Finish() // Clear the context's cancelation. This request services potentially many // callers waiting for its result, and using the flight's leader's // cancelation doesn't make sense. diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 158aa6e54142..9deb3852d4af 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -169,7 +169,7 @@ func (gt *grpcTransport) sendBatch( return nil, errors.Errorf( "trying to ingest remote spans but there is no recording span set up") } - if err := tracing.ImportRemoteSpans(span, reply.CollectedSpans); err != nil { + if err := span.ImportRemoteSpans(reply.CollectedSpans); err != nil { return nil, errors.Wrap(err, "error ingesting remote spans") } } @@ -305,7 +305,7 @@ func (s *senderTransport) SendNext( if span == nil { panic("trying to ingest remote spans but there is no recording span set up") } - if err := tracing.ImportRemoteSpans(span, br.CollectedSpans); err != nil { + if err := span.ImportRemoteSpans(br.CollectedSpans); err != nil { panic(err) } } diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 6ce953ca65c4..b7d5c3b01252 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -146,13 +146,13 @@ func (m *mockInternalClient) Batch( ) (*roachpb.BatchResponse, error) { sp := m.tr.StartRootSpan("mock", nil /* logTags */, tracing.RecordableSpan) defer sp.Finish() - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ctx = tracing.ContextWithSpan(ctx, sp) log.Eventf(ctx, "mockInternalClient processing batch") br := &roachpb.BatchResponse{} br.Error = m.pErr - if rec := tracing.GetRecording(sp); rec != nil { + if rec := sp.GetRecording(); rec != nil { br.CollectedSpans = append(br.CollectedSpans, rec...) } return br, nil diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index ec65b7f4d70b..9d3cd6b920c8 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" @@ -472,7 +471,7 @@ func (tc *TxnCoordSender) Send( if tc.mu.txn.ID == (uuid.UUID{}) { log.Fatalf(ctx, "cannot send transactional request through unbound TxnCoordSender") } - if !tracing.IsBlackHoleSpan(sp) { + if !sp.IsBlackHole() { sp.SetBaggageItem("txnID", tc.mu.txn.ID.String()) } ctx = logtags.AddTag(ctx, "txn", uuid.ShortStringer(tc.mu.txn.ID)) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go index 2f6505a66150..e9d538cf5460 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go @@ -152,7 +152,7 @@ func TestNoDuplicateHeartbeatLoops(t *testing.T) { tracer := tracing.NewTracer() sp := tracer.StartSpan("test", tracing.Recordable) - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp.StartRecording(tracing.SingleNodeRecording) txnCtx := tracing.ContextWithSpan(context.Background(), sp) push := func(ctx context.Context, key roachpb.Key) error { @@ -179,7 +179,7 @@ func TestNoDuplicateHeartbeatLoops(t *testing.T) { t.Fatalf("expected 2 attempts, got: %d", attempts) } sp.Finish() - recording := tracing.GetRecording(sp) + recording := sp.GetRecording() var foundHeartbeatLoop bool for _, sp := range recording { if strings.Contains(sp.Operation, "heartbeat loop") { diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index f824a1eefed0..39f80ed4478f 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -42,7 +42,7 @@ func EvalAddSSTable( // TODO(tschottdorf): restore the below in some form (gets in the way of testing). // _, span := tracing.ChildSpan(ctx, fmt.Sprintf("AddSSTable [%s,%s)", args.Key, args.EndKey)) - // defer tracing.FinishSpan(span) + // defer span.Finish() log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key) // IMPORT INTO should not proceed if any KVs from the SST shadow existing data diff --git a/pkg/kv/kvserver/replica_application_cmd.go b/pkg/kv/kvserver/replica_application_cmd.go index d348950c0605..7e4c51d9903f 100644 --- a/pkg/kv/kvserver/replica_application_cmd.go +++ b/pkg/kv/kvserver/replica_application_cmd.go @@ -181,7 +181,7 @@ func (c *replicatedCmd) FinishNonLocal(ctx context.Context) { } func (c *replicatedCmd) finishTracingSpan() { - tracing.FinishSpan(c.sp) + c.sp.Finish() c.ctx, c.sp = nil, nil } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index ea52188001c8..790eca3d16ef 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -137,7 +137,7 @@ func (proposal *ProposalData) finishApplication(ctx context.Context, pr proposal proposal.ec.done(ctx, proposal.Request, pr.Reply, pr.Err) proposal.signalProposalResult(pr) if proposal.sp != nil { - tracing.FinishSpan(proposal.sp) + proposal.sp.Finish() proposal.sp = nil } } @@ -891,7 +891,7 @@ func (r *Replica) getTraceData(ctx context.Context) opentracing.TextMapCarrier { if sp == nil { return nil } - if tracing.IsBlackHoleSpan(sp) { + if sp.IsBlackHole() { return nil } traceData := opentracing.TextMapCarrier{} diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index a77307aef9d7..e035385ffefb 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -12561,7 +12561,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) { tracedCtx := tracing.ContextWithSpan(ctx, sp) // Go out of our way to enable recording so that expensive logging is enabled // for this context. - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp.StartRecording(tracing.SingleNodeRecording) ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease) if pErr != nil { t.Fatal(pErr) diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index 465d1029725c..2b574fb35f98 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -56,7 +56,7 @@ func TestTxnSnowballTrace(t *testing.T) { } log.Event(ctx, "txn complete") sp.Finish() - collectedSpans := tracing.GetRecording(sp) + collectedSpans := sp.GetRecording() dump := collectedSpans.String() // dump: // 0.105ms 0.000ms event:inside txn diff --git a/pkg/server/node.go b/pkg/server/node.go index 30bc0407122e..41a6bfa90c5b 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1000,7 +1000,7 @@ func (n *Node) setupSpanForIncomingRPC( // spans in the BatchResponse at the end of the request. // We don't want to do this if the operation is on the same host, in which // case everything is already part of the same recording. - if rec := tracing.GetRecording(grpcSpan); rec != nil { + if rec := grpcSpan.GetRecording(); rec != nil { br.CollectedSpans = append(br.CollectedSpans, rec...) } } diff --git a/pkg/sql/colexec/stats.go b/pkg/sql/colexec/stats.go index 64f4cfaff78d..d2d50af719b1 100644 --- a/pkg/sql/colexec/stats.go +++ b/pkg/sql/colexec/stats.go @@ -164,6 +164,6 @@ func (vsc *VectorizedStatsCollector) OutputStats( vsc.NumBatches = 0 vsc.BytesRead = 0 } - tracing.SetSpanStats(span, &vsc.VectorizedStats) + span.SetSpanStats(&vsc.VectorizedStats) span.Finish() } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 2f69ad88f3fa..132d9c98a9e0 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -179,7 +179,7 @@ func (f *vectorizedFlow) Setup( log.Infof(ctx, "setting up vectorize flow %s", f.ID.Short()) } recordingStats := false - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { recordingStats = true } helper := &vectorizedFlowCreatorHelper{f: f.FlowBase} @@ -894,7 +894,7 @@ func (s *vectorizedFlowCreator) setupOutput( finishVectorizedStatsCollectors( ctx, flowCtx.ID, flowCtx.Cfg.TestingKnobs.DeterministicStats, vscs, ) - return []execinfrapb.ProducerMetadata{{TraceData: tracing.GetRecording(span)}} + return []execinfrapb.ProducerMetadata{{TraceData: span.GetRecording()}} }, }, ) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 4b332459d35d..8046a257df66 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -363,7 +363,7 @@ func (ex *connExecutor) execStmtInOpenState( // Record the statement information that we've collected. // Note that in case of implicit transactions, the trace contains the auto-commit too. sp.Finish() - trace := tracing.GetRecording(sp) + trace := sp.GetRecording() ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor) placeholders := p.extendedEvalCtx.Placeholders if finishCollectionDiagnostics != nil { diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index e833b0418c41..3e95b2cd7f08 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -263,7 +263,7 @@ func (ds *ServerImpl) setupFlow( sd, err := sessiondata.UnmarshalNonLocal(req.EvalContext.SessionData) if err != nil { - tracing.FinishSpan(sp) + sp.Finish() return ctx, nil, err } ie := &lazyInternalExecutor{ @@ -329,7 +329,7 @@ func (ds *ServerImpl) setupFlow( // Flow.Cleanup will not be called, so we have to close the memory monitor // and finish the span manually. monitor.Stop(ctx) - tracing.FinishSpan(sp) + sp.Finish() ctx = tracing.ContextWithSpan(ctx, nil) return ctx, nil, err } diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 70b9fdd42fc5..6cc0c83c27ea 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -685,7 +685,7 @@ func (r *DistSQLReceiver) Push( if span == nil { r.resultWriter.SetError( errors.New("trying to ingest remote spans but there is no recording span set up")) - } else if err := tracing.ImportRemoteSpans(span, meta.TraceData); err != nil { + } else if err := span.ImportRemoteSpans(meta.TraceData); err != nil { r.resultWriter.SetError(errors.Errorf("error ingesting remote spans: %s", err)) } } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index a7fc18782885..77e7ff076429 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1435,9 +1435,9 @@ func (st *SessionTracing) getSessionTrace() ([]traceRow, error) { func (st *SessionTracing) getRecording() []tracingpb.RecordedSpan { var spans []tracingpb.RecordedSpan if st.firstTxnSpan != nil { - spans = append(spans, tracing.GetRecording(st.firstTxnSpan)...) + spans = append(spans, st.firstTxnSpan.GetRecording()...) } - return append(spans, tracing.GetRecording(st.connSpan)...) + return append(spans, st.connSpan.GetRecording()...) } // StartTracing starts "session tracing". From this moment on, everything @@ -1498,7 +1498,7 @@ func (st *SessionTracing) StartTracing( if sp == nil { return errors.Errorf("no txn span for SessionTracing") } - tracing.StartRecording(sp, recType) + sp.StartRecording(recType) st.firstTxnSpan = sp } @@ -1530,7 +1530,7 @@ func (st *SessionTracing) StartTracing( tracing.LogTagsFromCtx(connCtx), ) } - tracing.StartRecording(sp, recType) + sp.StartRecording(recType) st.connSpan = sp // Hijack the connections context. @@ -1555,15 +1555,15 @@ func (st *SessionTracing) StopTracing() error { var spans []tracingpb.RecordedSpan if st.firstTxnSpan != nil { - spans = append(spans, tracing.GetRecording(st.firstTxnSpan)...) - tracing.StopRecording(st.firstTxnSpan) + spans = append(spans, st.firstTxnSpan.GetRecording()...) + st.firstTxnSpan.StopRecording() } st.connSpan.Finish() - spans = append(spans, tracing.GetRecording(st.connSpan)...) + spans = append(spans, st.connSpan.GetRecording()...) // NOTE: We're stopping recording on the connection's ctx only; the stopping // is not inherited by children. If we are inside of a txn, that span will // continue recording, even though nobody will collect its recording again. - tracing.StopRecording(st.connSpan) + st.connSpan.StopRecording() st.ex.ctxHolder.unhijack() var err error diff --git a/pkg/sql/execinfra/base.go b/pkg/sql/execinfra/base.go index b871724f85d0..ee12bec0f990 100644 --- a/pkg/sql/execinfra/base.go +++ b/pkg/sql/execinfra/base.go @@ -236,7 +236,7 @@ func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver // GetTraceData returns the trace data. func GetTraceData(ctx context.Context) []tracingpb.RecordedSpan { if sp := tracing.SpanFromContext(ctx); sp != nil { - return tracing.GetRecording(sp) + return sp.GetRecording() } return nil } diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 2786be690ebe..b8b6e0c67bc8 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -857,7 +857,7 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing. func (pb *ProcessorBase) StartInternal(ctx context.Context, name string) context.Context { pb.origCtx = ctx pb.Ctx, pb.span = ProcessorSpan(ctx, name) - if pb.span != nil && tracing.IsRecording(pb.span) { + if pb.span != nil && pb.span.IsRecording() { pb.span.SetTag(execinfrapb.FlowIDTagKey, pb.FlowCtx.ID.String()) pb.span.SetTag(execinfrapb.ProcessorIDTagKey, pb.processorID) } @@ -885,7 +885,7 @@ func (pb *ProcessorBase) InternalClose() bool { } pb.Closed = true - tracing.FinishSpan(pb.span) + pb.span.Finish() pb.span = nil // Reset the context so that any incidental uses after this point do not // access the finished span. diff --git a/pkg/sql/explain_distsql.go b/pkg/sql/explain_distsql.go index 6bbe636a5abb..5b76ca9a88d7 100644 --- a/pkg/sql/explain_distsql.go +++ b/pkg/sql/explain_distsql.go @@ -165,7 +165,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error { // don't get a noopSpan. var sp *tracing.Span if parentSp := tracing.SpanFromContext(params.ctx); parentSp != nil && - !tracing.IsRecording(parentSp) { + !parentSp.IsRecording() { tracer := parentSp.Tracer() sp = tracer.StartSpan( "explain-distsql", tracing.Recordable, @@ -177,7 +177,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error { "explain-distsql", tracing.Recordable, tracing.LogTagsFromCtx(params.ctx)) } - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ctx := tracing.ContextWithSpan(params.ctx, sp) planCtx.ctx = ctx // Make a copy of the evalContext with the recording span in it; we can't @@ -218,7 +218,7 @@ func (n *explainDistSQLNode) startExec(params runParams) error { n.run.executedStatement = true sp.Finish() - spans := tracing.GetRecording(sp) + spans := sp.GetRecording() if err := rw.Err(); err != nil { return err diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index 7138719bf22f..7b6812ab6e24 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -202,7 +202,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error { var span *tracing.Span ctx, span = execinfra.ProcessorSpan(ctx, "outbox") - if span != nil && tracing.IsRecording(span) { + if span != nil && span.IsRecording() { m.statsCollectionEnabled = true span.SetTag(execinfrapb.FlowIDTagKey, m.flowCtx.ID.String()) span.SetTag(execinfrapb.StreamIDTagKey, m.streamID) @@ -213,7 +213,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error { spanFinished := false defer func() { if !spanFinished { - tracing.FinishSpan(span) + span.Finish() } }() @@ -285,8 +285,8 @@ func (m *Outbox) mainLoop(ctx context.Context) error { if m.flowCtx.Cfg.TestingKnobs.DeterministicStats { m.stats.BytesSent = 0 } - tracing.SetSpanStats(span, &m.stats) - tracing.FinishSpan(span) + span.SetSpanStats(&m.stats) + span.Finish() spanFinished = true if trace := execinfra.GetTraceData(ctx); trace != nil { err := m.addRow(ctx, nil, &execinfrapb.ProducerMetadata{TraceData: trace}) diff --git a/pkg/sql/grant_role.go b/pkg/sql/grant_role.go index 5dd58335e0e0..ac1fc3027d64 100644 --- a/pkg/sql/grant_role.go +++ b/pkg/sql/grant_role.go @@ -48,7 +48,7 @@ func (p *planner) GrantRoleNode(ctx context.Context, n *tree.GrantRole) (*GrantR sqltelemetry.IncIAMGrantCounter(n.AdminOption) ctx, span := tracing.ChildSpan(ctx, n.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() hasAdminRole, err := p.HasAdminRole(ctx) if err != nil { diff --git a/pkg/sql/revoke_role.go b/pkg/sql/revoke_role.go index 2ac4e4e5d098..998dab33d884 100644 --- a/pkg/sql/revoke_role.go +++ b/pkg/sql/revoke_role.go @@ -45,7 +45,7 @@ func (p *planner) RevokeRoleNode(ctx context.Context, n *tree.RevokeRole) (*Revo sqltelemetry.IncIAMRevokeCounter(n.AdminOption) ctx, span := tracing.ChildSpan(ctx, n.StatementTag()) - defer tracing.FinishSpan(span) + defer span.Finish() hasAdminRole, err := p.HasAdminRole(ctx) if err != nil { diff --git a/pkg/sql/rowexec/aggregator.go b/pkg/sql/rowexec/aggregator.go index 4512745f0333..b02ee41ef843 100644 --- a/pkg/sql/rowexec/aggregator.go +++ b/pkg/sql/rowexec/aggregator.go @@ -95,7 +95,7 @@ func (ag *aggregatorBase) init( ) error { ctx := flowCtx.EvalCtx.Ctx() memMonitor := execinfra.NewMonitor(ctx, flowCtx.EvalCtx.Mon, "aggregator-mem") - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { input = newInputStatCollector(input) ag.FinishTrace = ag.outputStatsToTrace } @@ -182,8 +182,7 @@ func (ag *aggregatorBase) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(ag.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &AggregatorStats{ InputStats: is, MaxAllocatedMem: ag.MemMonitor.MaximumBytes(), diff --git a/pkg/sql/rowexec/backfiller.go b/pkg/sql/rowexec/backfiller.go index 1a52d7a030bc..4edcaa344d01 100644 --- a/pkg/sql/rowexec/backfiller.go +++ b/pkg/sql/rowexec/backfiller.go @@ -119,7 +119,7 @@ func (b *backfiller) Run(ctx context.Context) { opName := fmt.Sprintf("%sBackfiller", b.name) ctx = logtags.AddTag(ctx, opName, int(b.spec.Table.ID)) ctx, span := execinfra.ProcessorSpan(ctx, opName) - defer tracing.FinishSpan(span) + defer span.Finish() meta := b.doRun(ctx) execinfra.SendTraceData(ctx, b.output) if emitHelper(ctx, &b.out, nil /* row */, meta, func(ctx context.Context) {}) { @@ -332,7 +332,7 @@ func WriteResumeSpan( jobsRegistry *jobs.Registry, ) error { ctx, traceSpan := tracing.ChildSpan(ctx, "checkpoint") - defer tracing.FinishSpan(traceSpan) + defer traceSpan.Finish() return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { resumeSpans, job, mutationIdx, error := GetResumeSpans( diff --git a/pkg/sql/rowexec/countrows.go b/pkg/sql/rowexec/countrows.go index 3f1eb1b43dc9..18f7ba2896f7 100644 --- a/pkg/sql/rowexec/countrows.go +++ b/pkg/sql/rowexec/countrows.go @@ -47,7 +47,7 @@ func newCountAggregator( ag := &countAggregator{} ag.input = input - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { ag.input = newInputStatCollector(input) ag.FinishTrace = ag.outputStatsToTrace } @@ -115,8 +115,8 @@ func (ag *countAggregator) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(ag.Ctx); sp != nil { - tracing.SetSpanStats( - sp, &AggregatorStats{InputStats: is}, + sp.SetSpanStats( + &AggregatorStats{InputStats: is}, ) } } diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index 9402466ff7fc..32c7c9568ac7 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -134,7 +134,7 @@ func newDistinct( // So we have to set up the account here. d.arena = stringarena.Make(&d.memAcc) - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { d.input = newInputStatCollector(d.input) d.FinishTrace = d.outputStatsToTrace } @@ -376,8 +376,8 @@ func (d *distinct) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(d.Ctx); sp != nil { - tracing.SetSpanStats( - sp, &DistinctStats{InputStats: is, MaxAllocatedMem: d.MemMonitor.MaximumBytes()}, + sp.SetSpanStats( + &DistinctStats{InputStats: is, MaxAllocatedMem: d.MemMonitor.MaximumBytes()}, ) } } diff --git a/pkg/sql/rowexec/hashjoiner.go b/pkg/sql/rowexec/hashjoiner.go index 823e98585efd..a79e7894f93c 100644 --- a/pkg/sql/rowexec/hashjoiner.go +++ b/pkg/sql/rowexec/hashjoiner.go @@ -191,7 +191,7 @@ func newHashJoiner( } // If the trace is recording, instrument the hashJoiner to collect stats. - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { h.leftSource = newInputStatCollector(h.leftSource) h.rightSource = newInputStatCollector(h.rightSource) h.FinishTrace = h.outputStatsToTrace @@ -804,8 +804,7 @@ func (h *hashJoiner) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(h.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &HashJoinerStats{ LeftInputStats: lis, RightInputStats: ris, diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index fbbdeeb9fec3..19dbf2b980cc 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -161,7 +161,7 @@ func (ib *indexBackfiller) runChunk( } ctx, traceSpan := tracing.ChildSpan(tctx, "chunk") - defer tracing.FinishSpan(traceSpan) + defer traceSpan.Finish() var key roachpb.Key diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index 18fd51d58011..3d5037b1332e 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -125,7 +125,7 @@ func newInvertedFilterer( ifr.diskMonitor, ) - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { ifr.input = newInputStatCollector(ifr.input) ifr.FinishTrace = ifr.outputStatsToTrace } @@ -321,8 +321,7 @@ func (ifr *invertedFilterer) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(ifr.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &InvertedFiltererStats{ InputStats: is, MaxAllocatedMem: ifr.MemMonitor.MaximumBytes(), diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index f8f15e869354..49f94d8984d9 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -285,7 +285,7 @@ func newInvertedJoiner( } collectingStats := false - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { collectingStats = true } if collectingStats { @@ -738,8 +738,7 @@ func (ij *invertedJoiner) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(ij.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &InvertedJoinerStats{ InputStats: is, IndexScanStats: fis, diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index ad55a480e784..7d738fd7ee44 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -253,7 +253,7 @@ func newJoinReader( } collectingStats := false - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { collectingStats = true } @@ -690,7 +690,7 @@ func (jr *joinReader) outputStatsToTrace() { IndexLookupStats: ils, } if sp := tracing.SpanFromContext(jr.Ctx); sp != nil { - tracing.SetSpanStats(sp, jrs) + sp.SetSpanStats(jrs) } } diff --git a/pkg/sql/rowexec/mergejoiner.go b/pkg/sql/rowexec/mergejoiner.go index a7f0e9284c9b..57bf911fcb55 100644 --- a/pkg/sql/rowexec/mergejoiner.go +++ b/pkg/sql/rowexec/mergejoiner.go @@ -74,7 +74,7 @@ func newMergeJoiner( rightSource: rightSource, } - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { m.leftSource = newInputStatCollector(m.leftSource) m.rightSource = newInputStatCollector(m.rightSource) m.FinishTrace = m.outputStatsToTrace @@ -295,8 +295,7 @@ func (m *mergeJoiner) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(m.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &MergeJoinerStats{ LeftInputStats: lis, RightInputStats: ris, diff --git a/pkg/sql/rowexec/ordinality.go b/pkg/sql/rowexec/ordinality.go index 1eee1af48045..5d7075c7085b 100644 --- a/pkg/sql/rowexec/ordinality.go +++ b/pkg/sql/rowexec/ordinality.go @@ -67,7 +67,7 @@ func newOrdinalityProcessor( return nil, err } - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { o.input = newInputStatCollector(o.input) o.FinishTrace = o.outputStatsToTrace } @@ -134,8 +134,6 @@ func (o *ordinalityProcessor) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(o.Ctx); sp != nil { - tracing.SetSpanStats( - sp, &OrdinalityStats{InputStats: is}, - ) + sp.SetSpanStats(&OrdinalityStats{InputStats: is}) } } diff --git a/pkg/sql/rowexec/sample_aggregator.go b/pkg/sql/rowexec/sample_aggregator.go index 6359c7e7d746..ef19f11b4e0b 100644 --- a/pkg/sql/rowexec/sample_aggregator.go +++ b/pkg/sql/rowexec/sample_aggregator.go @@ -382,7 +382,7 @@ func (s *sampleAggregator) sampleRow( func (s *sampleAggregator) writeResults(ctx context.Context) error { // Turn off tracing so these writes don't affect the results of EXPLAIN // ANALYZE. - if span := tracing.SpanFromContext(ctx); span != nil && tracing.IsRecording(span) { + if span := tracing.SpanFromContext(ctx); span != nil && span.IsRecording() { // TODO(rytaft): this also hides writes in this function from SQL session // traces. ctx = tracing.ContextWithSpan(ctx, nil) diff --git a/pkg/sql/rowexec/sorter.go b/pkg/sql/rowexec/sorter.go index 9d37ecde4c0e..1f98122b9815 100644 --- a/pkg/sql/rowexec/sorter.go +++ b/pkg/sql/rowexec/sorter.go @@ -54,7 +54,7 @@ func (s *sorterBase) init( opts execinfra.ProcStateOpts, ) error { ctx := flowCtx.EvalCtx.Ctx() - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { input = newInputStatCollector(input) s.FinishTrace = s.outputStatsToTrace } @@ -163,8 +163,7 @@ func (s *sorterBase) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(s.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &SorterStats{ InputStats: is, MaxAllocatedMem: s.MemMonitor.MaximumBytes(), diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index 176f3127651a..01f95627a076 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -159,7 +159,7 @@ func newTableReader( tr.spans[i] = s.Span } - if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(flowCtx.EvalCtx.Ctx()); sp != nil && sp.IsRecording() { tr.fetcher = newRowFetcherStatCollector(&fetcher) tr.FinishTrace = tr.outputStatsToTrace } else { @@ -299,7 +299,7 @@ func (tr *tableReader) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(tr.Ctx); sp != nil { - tracing.SetSpanStats(sp, &TableReaderStats{ + sp.SetSpanStats(&TableReaderStats{ InputStats: is, BytesRead: tr.GetBytesRead(), }) diff --git a/pkg/sql/rowexec/tablereader_test.go b/pkg/sql/rowexec/tablereader_test.go index eea4c11714f5..19c10ef31859 100644 --- a/pkg/sql/rowexec/tablereader_test.go +++ b/pkg/sql/rowexec/tablereader_test.go @@ -393,7 +393,7 @@ func TestLimitScans(t *testing.T) { // Now we're going to run the tableReader and trace it. tracer := tracing.NewTracer() sp := tracer.StartSpan("root", tracing.Recordable) - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ctx = tracing.ContextWithSpan(ctx, sp) flowCtx.EvalCtx.Context = ctx @@ -412,7 +412,7 @@ func TestLimitScans(t *testing.T) { // Simulate what the DistSQLReceiver does and ingest the trace. if meta != nil && len(meta.TraceData) > 0 { - if err := tracing.ImportRemoteSpans(sp, meta.TraceData); err != nil { + if err := sp.ImportRemoteSpans(meta.TraceData); err != nil { t.Fatal(err) } } @@ -433,7 +433,7 @@ func TestLimitScans(t *testing.T) { // scans from the same key as the DistSender retries scans when it detects // splits. re := regexp.MustCompile(fmt.Sprintf(`querying next range at /Table/%d/1(\S.*)?`, tableDesc.ID)) - spans := tracing.GetRecording(sp) + spans := sp.GetRecording() ranges := make(map[string]struct{}) for _, span := range spans { if span.Operation == tableReaderProcName { diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index f0d230976dc6..5b35efd2aeea 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -206,7 +206,7 @@ func newWindower( // them to reuse the same shared memory account with the windower. evalCtx.SingleDatumAggMemAccount = &w.acc - if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() { w.input = newInputStatCollector(w.input) w.FinishTrace = w.outputStatsToTrace } @@ -878,8 +878,7 @@ func (w *windower) outputStatsToTrace() { return } if sp := tracing.SpanFromContext(w.Ctx); sp != nil { - tracing.SetSpanStats( - sp, + sp.SetSpanStats( &WindowerStats{ InputStats: is, MaxAllocatedMem: w.MemMonitor.MaximumBytes(), diff --git a/pkg/sql/rowflow/routers.go b/pkg/sql/rowflow/routers.go index 30305c87f4f9..1f49afcf59a0 100644 --- a/pkg/sql/rowflow/routers.go +++ b/pkg/sql/rowflow/routers.go @@ -262,7 +262,7 @@ func (rb *routerBase) setupStreams( // init must be called after setupStreams but before Start. func (rb *routerBase) init(ctx context.Context, flowCtx *execinfra.FlowCtx, types []*types.T) { // Check if we're recording stats. - if s := tracing.SpanFromContext(ctx); s != nil && tracing.IsRecording(s) { + if s := tracing.SpanFromContext(ctx); s != nil && s.IsRecording() { rb.statsCollectionEnabled = true } @@ -373,8 +373,8 @@ func (rb *routerBase) Start(ctx context.Context, wg *sync.WaitGroup, ctxCancel c if rb.statsCollectionEnabled { ro.stats.MaxAllocatedMem = ro.memoryMonitor.MaximumBytes() ro.stats.MaxAllocatedDisk = ro.diskMonitor.MaximumBytes() - tracing.SetSpanStats(span, &ro.stats) - tracing.FinishSpan(span) + span.SetSpanStats(&ro.stats) + span.Finish() if trace := execinfra.GetTraceData(ctx); trace != nil { ro.mu.Unlock() rb.semaphore <- struct{}{} diff --git a/pkg/sql/rowflow/routers_test.go b/pkg/sql/rowflow/routers_test.go index 97ca352cd5a2..00c1c53e019e 100644 --- a/pkg/sql/rowflow/routers_test.go +++ b/pkg/sql/rowflow/routers_test.go @@ -753,7 +753,7 @@ func TestRouterDiskSpill(t *testing.T) { // Enable stats recording. tracer := tracing.NewTracer() sp := tracer.StartSpan("root", tracing.Recordable) - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ctx := tracing.ContextWithSpan(context.Background(), sp) st := cluster.MakeTestingClusterSettings() diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index 379e95f08654..08ff89504b7b 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -318,7 +318,7 @@ func (sc *TableStatisticsCache) RefreshTableStats(ctx context.Context, tableID d ctx, span := tracing.ForkCtxSpan(ctx, "refresh-table-stats") // Perform an asynchronous refresh of the cache. go func() { - defer tracing.FinishSpan(span) + defer span.Finish() sc.refreshCacheEntry(ctx, tableID) }() } diff --git a/pkg/sql/trace_test.go b/pkg/sql/trace_test.go index c1f43a14387c..648671afd2f8 100644 --- a/pkg/sql/trace_test.go +++ b/pkg/sql/trace_test.go @@ -589,7 +589,7 @@ func TestTraceDistSQL(t *testing.T) { SQLExecutor: &sql.ExecutorTestingKnobs{ WithStatementTrace: func(sp *tracing.Span, stmt string) { if stmt == countStmt { - recCh <- tracing.GetRecording(sp) + recCh <- sp.GetRecording() } }, }, diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index be8040fa0ac9..ffcb8c42d2ee 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -189,7 +189,7 @@ func (ts *txnState) resetForNewSQLTxn( alreadyRecording := tranCtx.sessionTracing.Enabled() duration := traceTxnThreshold.Get(&tranCtx.settings.SV) if !alreadyRecording && (duration > 0) { - tracing.StartRecording(sp, tracing.SnowballRecording) + sp.StartRecording(tracing.SnowballRecording) ts.recordingThreshold = duration ts.recordingStart = timeutil.Now() } @@ -197,10 +197,6 @@ func (ts *txnState) resetForNewSQLTxn( // Put the new span in the context. txnCtx := tracing.ContextWithSpan(connCtx, sp) - if !tracing.IsRecordable(sp) { - log.Fatalf(connCtx, "non-recordable transaction span of type: %T", sp) - } - ts.sp = sp ts.Ctx, ts.cancel = contextutil.WithCancel(txnCtx) @@ -243,7 +239,7 @@ func (ts *txnState) finishSQLTxn() { } if ts.recordingThreshold > 0 { - if r := tracing.GetRecording(ts.sp); r != nil { + if r := ts.sp.GetRecording(); r != nil { if elapsed := timeutil.Since(ts.recordingStart); elapsed >= ts.recordingThreshold { dump := r.String() if len(dump) > 0 { diff --git a/pkg/util/limit/limiter.go b/pkg/util/limit/limiter.go index 6a0acc88780d..962a1003d9d8 100644 --- a/pkg/util/limit/limiter.go +++ b/pkg/util/limit/limiter.go @@ -44,7 +44,7 @@ func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) error { } // If not, start a span and begin waiting. ctx, span := tracing.ChildSpan(ctx, l.spanName) - defer tracing.FinishSpan(span) + defer span.Finish() return l.sem.Acquire(ctx, 1) } diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index c835d17566da..3fa097003eae 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -145,6 +145,7 @@ go_test( "//vendor/github.com/kr/pretty", "//vendor/github.com/pmezard/go-difflib/difflib", "//vendor/github.com/stretchr/testify/assert", + "//vendor/github.com/stretchr/testify/require", "//vendor/golang.org/x/net/trace", ] + select({ "@io_bazel_rules_go//go/platform:aix": [ diff --git a/pkg/util/log/ambient_context_test.go b/pkg/util/log/ambient_context_test.go index 25c397fdaa4f..bbae949066df 100644 --- a/pkg/util/log/ambient_context_test.go +++ b/pkg/util/log/ambient_context_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/logtags" + "github.com/stretchr/testify/require" ) func TestAnnotateCtxTags(t *testing.T) { @@ -40,15 +41,14 @@ func TestAnnotateCtxTags(t *testing.T) { func TestAnnotateCtxSpan(t *testing.T) { tracer := tracing.NewTracer() - tracer.SetForceRealSpans(true) ac := AmbientContext{Tracer: tracer} ac.AddLogTag("ambient", nil) // Annotate a context that has an open span. - sp1 := tracer.StartSpan("root") - tracing.StartRecording(sp1, tracing.SingleNodeRecording) + sp1 := tracer.StartRootSpan("root", nil /* logTags */, tracing.RecordableSpan) + sp1.StartRecording(tracing.SingleNodeRecording) ctx1 := tracing.ContextWithSpan(context.Background(), sp1) Event(ctx1, "a") @@ -59,7 +59,7 @@ func TestAnnotateCtxSpan(t *testing.T) { sp2.Finish() sp1.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp1), ` + if err := tracing.TestingCheckRecordedSpans(sp1.GetRecording(), ` Span root: event: a event: c @@ -70,20 +70,15 @@ func TestAnnotateCtxSpan(t *testing.T) { t.Fatal(err) } - // Annotate a context that has no span. + // Annotate a context that has no span. The tracer will create a non-recordable + // span. We just check here that AnnotateCtxWithSpan properly returns it to the + // caller. ac.Tracer = tracer ctx, sp := ac.AnnotateCtxWithSpan(context.Background(), "s") - tracing.StartRecording(sp, tracing.SingleNodeRecording) - Event(ctx, "a") - sp.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp), ` - Span s: - tags: ambient= - event: [ambient] a - `); err != nil { - t.Fatal(err) - } + require.Equal(t, sp, tracing.SpanFromContext(ctx)) + require.NotNil(t, sp) + require.True(t, sp.IsBlackHole()) } func TestAnnotateCtxNodeStoreReplica(t *testing.T) { diff --git a/pkg/util/log/log.go b/pkg/util/log/log.go index f7e921bcb0d1..831c01979c50 100644 --- a/pkg/util/log/log.go +++ b/pkg/util/log/log.go @@ -215,7 +215,7 @@ func V(level Level) bool { // func ExpensiveLogEnabled(ctx context.Context, level Level) bool { if sp := tracing.SpanFromContext(ctx); sp != nil { - if tracing.IsRecording(sp) { + if sp.IsRecording() { return true } } diff --git a/pkg/util/log/trace.go b/pkg/util/log/trace.go index d0c67f9fe93a..2ca2f70bf407 100644 --- a/pkg/util/log/trace.go +++ b/pkg/util/log/trace.go @@ -91,7 +91,7 @@ func FinishEventLog(ctx context.Context) { // false. func getSpanOrEventLog(ctx context.Context) (*tracing.Span, *ctxEventLog, bool) { if sp := tracing.SpanFromContext(ctx); sp != nil { - if tracing.IsBlackHoleSpan(sp) { + if sp.IsBlackHole() { return nil, nil, false } return sp, nil, true diff --git a/pkg/util/log/trace_test.go b/pkg/util/log/trace_test.go index ee7ddd6c1e21..d4d2de9b096c 100644 --- a/pkg/util/log/trace_test.go +++ b/pkg/util/log/trace_test.go @@ -65,9 +65,8 @@ func TestTrace(t *testing.T) { Event(ctx, "should-not-show-up") tracer := tracing.NewTracer() - tracer.SetForceRealSpans(true) - sp := tracer.StartSpan("s") - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp := tracer.StartRootSpan("s", nil /* logTags */, tracing.RecordableSpan) + sp.StartRecording(tracing.SingleNodeRecording) ctxWithSpan := tracing.ContextWithSpan(ctx, sp) Event(ctxWithSpan, "test1") VEvent(ctxWithSpan, noLogV(), "test2") @@ -79,7 +78,7 @@ func TestTrace(t *testing.T) { sp.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp), ` + if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` Span s: event: test1 event: test2 @@ -95,10 +94,9 @@ func TestTraceWithTags(t *testing.T) { ctx = logtags.AddTag(ctx, "tag", 1) tracer := tracing.NewTracer() - tracer.SetForceRealSpans(true) - sp := tracer.StartSpan("s") + sp := tracer.StartRootSpan("s", nil /* logTags */, tracing.RecordableSpan) ctxWithSpan := tracing.ContextWithSpan(ctx, sp) - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp.StartRecording(tracing.SingleNodeRecording) Event(ctxWithSpan, "test1") VEvent(ctxWithSpan, noLogV(), "test2") @@ -106,7 +104,7 @@ func TestTraceWithTags(t *testing.T) { Info(ctxWithSpan, "log") sp.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp), ` + if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` Span s: event: [tag=1] test1 event: [tag=1] test2 @@ -182,9 +180,8 @@ func TestEventLogAndTrace(t *testing.T) { VErrEvent(ctxWithEventLog, noLogV(), "testerr") tracer := tracing.NewTracer() - tracer.SetForceRealSpans(true) - sp := tracer.StartSpan("s") - tracing.StartRecording(sp, tracing.SingleNodeRecording) + sp := tracer.StartRootSpan("s", nil /* logTags */, tracing.RecordableSpan) + sp.StartRecording(tracing.SingleNodeRecording) ctxWithBoth := tracing.ContextWithSpan(ctxWithEventLog, sp) // Events should only go to the trace. Event(ctxWithBoth, "test3") @@ -197,7 +194,7 @@ func TestEventLogAndTrace(t *testing.T) { sp.Finish() el.Finish() - if err := tracing.TestingCheckRecordedSpans(tracing.GetRecording(sp), ` + if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` Span s: event: test3 event: test4 diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index f79ce3b2fc4f..f2f8bfb460fe 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -341,7 +341,7 @@ func (s *Stopper) RunAsyncTask( go func() { defer s.Recover(ctx) defer s.runPostlude(taskName) - defer tracing.FinishSpan(span) + defer span.Finish() f(ctx) }() @@ -400,7 +400,7 @@ func (s *Stopper) RunLimitedAsyncTask( defer s.Recover(ctx) defer s.runPostlude(taskName) defer alloc.Release() - defer tracing.FinishSpan(span) + defer span.Finish() f(ctx) }() diff --git a/pkg/util/tracing/BUILD.bazel b/pkg/util/tracing/BUILD.bazel index 1e8fca620df2..c181e61562b2 100644 --- a/pkg/util/tracing/BUILD.bazel +++ b/pkg/util/tracing/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "annotate.go", "annotate_nocgo.go", "grpc_interceptor.go", + "recording.go", "shadow.go", "span.go", "tags.go", diff --git a/pkg/util/tracing/grpc_interceptor.go b/pkg/util/tracing/grpc_interceptor.go index bb76cc7bba67..7edb1add5972 100644 --- a/pkg/util/tracing/grpc_interceptor.go +++ b/pkg/util/tracing/grpc_interceptor.go @@ -17,6 +17,7 @@ import ( "strings" "sync/atomic" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" @@ -70,7 +71,7 @@ func extractSpanContext(ctx context.Context, tracer *Tracer) (*SpanContext, erro // create a span. func spanInclusionFuncForServer(t *Tracer, parentSpanCtx *SpanContext) bool { // Is client tracing? - return (parentSpanCtx != nil && !IsNoopContext(parentSpanCtx)) || + return (parentSpanCtx != nil && !parentSpanCtx.IsNoop()) || // Should we trace regardless of the client? This is useful for calls coming // through the HTTP->RPC gateway (i.e. the AdminUI), where client is never // tracing. @@ -206,7 +207,7 @@ func (ss *tracingServerStream) Context() context.Context { // // See #17177. func spanInclusionFuncForClient(parentSpanCtx *SpanContext) bool { - return parentSpanCtx != nil && !IsNoopContext(parentSpanCtx) + return parentSpanCtx != nil && !parentSpanCtx.IsNoop() } func injectSpanContext(ctx context.Context, tracer *Tracer, clientSpan *Span) context.Context { @@ -425,3 +426,7 @@ func (cs *tracingClientStream) CloseSend() error { } return err } + +// Recording represents a group of RecordedSpans, as returned by GetRecording. +// Spans are sorted by StartTime. +type Recording []tracingpb.RecordedSpan diff --git a/pkg/util/tracing/recording.go b/pkg/util/tracing/recording.go new file mode 100644 index 000000000000..9942959c2fbc --- /dev/null +++ b/pkg/util/tracing/recording.go @@ -0,0 +1,463 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracing + +import ( + "encoding/json" + "fmt" + "regexp" + "sort" + "strconv" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/caller" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/errors" + jaegerjson "github.com/jaegertracing/jaeger/model/json" + "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" +) + +// RecordingType is the type of recording that a Span might be performing. +type RecordingType int + +const ( + // NoRecording means that the Span isn't recording. Child spans created from + // it similarly won't be recording by default. + NoRecording RecordingType = iota + // SnowballRecording means that the Span is recording and that derived + // spans will be as well, in the same mode (this includes remote spans, + // i.e. this mode crosses RPC boundaries). Derived spans will maintain + // their own recording, and this recording will be included in that of + // any local parent spans. + SnowballRecording + // SingleNodeRecording means that the Span is recording and that locally + // derived spans will as well (i.e. a remote Span typically won't be + // recording by default, in contrast to SnowballRecording). Similar to + // SnowballRecording, children have their own recording which is also + // included in that of their parents. + SingleNodeRecording +) + +type traceLogData struct { + opentracing.LogRecord + depth int + // timeSincePrev represents the duration since the previous log line (previous in the + // set of log lines that this is part of). This is always computed relative to a log line + // from the same Span, except for start of Span in which case the duration is computed relative + // to the last log in the parent occurring before this start. For example: + // start Span A + // log 1 // duration relative to "start Span A" + // start Span B // duration relative to "log 1" + // log 2 // duration relative to "start Span B" + // log 3 // duration relative to "log 1" + timeSincePrev time.Duration +} + +// String formats the given spans for human consumption, showing the +// relationship using nesting and times as both relative to the previous event +// and cumulative. +// +// Child spans are inserted into the parent at the point of the child's +// StartTime; see the diagram on generateSessionTraceVTable() for the ordering +// of messages. +// +// Each log line show the time since the beginning of the trace +// and since the previous log line. Span starts are shown with special "=== +// " lines. For a Span start, the time since the relative log line +// can be negative when the Span start follows a message from the parent that +// was generated after the child Span started (or even after the child +// finished). +// +// TODO(andrei): this should be unified with +// SessionTracing.generateSessionTraceVTable(). +func (r Recording) String() string { + if len(r) == 0 { + return "" + } + + var buf strings.Builder + start := r[0].StartTime + writeLogs := func(logs []traceLogData) { + for _, entry := range logs { + fmt.Fprintf(&buf, "% 10.3fms % 10.3fms%s", + 1000*entry.Timestamp.Sub(start).Seconds(), + 1000*entry.timeSincePrev.Seconds(), + strings.Repeat(" ", entry.depth+1)) + for i, f := range entry.Fields { + if i != 0 { + buf.WriteByte(' ') + } + fmt.Fprintf(&buf, "%s:%v", f.Key(), f.Value()) + } + buf.WriteByte('\n') + } + } + + logs := r.visitSpan(r[0], 0 /* depth */) + writeLogs(logs) + + // Check if there's any orphan spans (spans for which the parent is missing). + // This shouldn't happen, but we're protecting against incomplete traces. For + // example, ingesting of remote spans through DistSQL is complex. Orphan spans + // would not be reflected in the output string at all without this. + orphans := r.OrphanSpans() + if len(orphans) > 0 { + // This shouldn't happen. + buf.WriteString("orphan spans (trace is missing spans):\n") + for _, o := range orphans { + logs := r.visitSpan(o, 0 /* depth */) + writeLogs(logs) + } + } + return buf.String() +} + +// OrphanSpans returns the spans with parents missing from the recording. +func (r Recording) OrphanSpans() []tracingpb.RecordedSpan { + spanIDs := make(map[uint64]struct{}) + for _, sp := range r { + spanIDs[sp.SpanID] = struct{}{} + } + + var orphans []tracingpb.RecordedSpan + for i, sp := range r { + if i == 0 { + // The first Span can be a root Span. Note that any other root Span will + // be considered an orphan. + continue + } + if _, ok := spanIDs[sp.ParentSpanID]; !ok { + orphans = append(orphans, sp) + } + } + return orphans +} + +// FindLogMessage returns the first log message in the recording that matches +// the given regexp. The bool return value is true if such a message is found. +func (r Recording) FindLogMessage(pattern string) (string, bool) { + re := regexp.MustCompile(pattern) + for _, sp := range r { + for _, l := range sp.Logs { + msg := l.Msg() + if re.MatchString(msg) { + return msg, true + } + } + } + return "", false +} + +// FindSpan returns the Span with the given operation. The bool retval is false +// if the Span is not found. +func (r Recording) FindSpan(operation string) (tracingpb.RecordedSpan, bool) { + for _, sp := range r { + if sp.Operation == operation { + return sp, true + } + } + return tracingpb.RecordedSpan{}, false +} + +// visitSpan returns the log messages for sp, and all of sp's children. +// +// All messages from a Span are kept together. Sibling spans are ordered within +// the parent in their start order. +func (r Recording) visitSpan(sp tracingpb.RecordedSpan, depth int) []traceLogData { + ownLogs := make([]traceLogData, 0, len(sp.Logs)+1) + + conv := func(l opentracing.LogRecord, ref time.Time) traceLogData { + var timeSincePrev time.Duration + if ref != (time.Time{}) { + timeSincePrev = l.Timestamp.Sub(ref) + } + return traceLogData{ + LogRecord: l, + depth: depth, + timeSincePrev: timeSincePrev, + } + } + + // Add a log line representing the start of the Span. + lr := opentracing.LogRecord{ + Timestamp: sp.StartTime, + Fields: []otlog.Field{otlog.String("=== operation", sp.Operation)}, + } + if len(sp.Tags) > 0 { + tags := make([]string, 0, len(sp.Tags)) + for k := range sp.Tags { + tags = append(tags, k) + } + sort.Strings(tags) + for _, k := range tags { + lr.Fields = append(lr.Fields, otlog.String(k, sp.Tags[k])) + } + } + ownLogs = append(ownLogs, conv( + lr, + // ref - this entries timeSincePrev will be computed when we merge it into the parent + time.Time{})) + + for _, l := range sp.Logs { + lr := opentracing.LogRecord{ + Timestamp: l.Time, + Fields: make([]otlog.Field, len(l.Fields)), + } + for i, f := range l.Fields { + lr.Fields[i] = otlog.String(f.Key, f.Value) + } + lastLog := ownLogs[len(ownLogs)-1] + ownLogs = append(ownLogs, conv(lr, lastLog.Timestamp)) + } + + childSpans := make([][]traceLogData, 0) + for _, osp := range r { + if osp.ParentSpanID != sp.SpanID { + continue + } + childSpans = append(childSpans, r.visitSpan(osp, depth+1)) + } + + // Merge ownLogs with childSpans. + mergedLogs := make([]traceLogData, 0, len(ownLogs)) + timeMax := time.Date(2200, 0, 0, 0, 0, 0, 0, time.UTC) + i, j := 0, 0 + var lastTimestamp time.Time + for i < len(ownLogs) || j < len(childSpans) { + if len(mergedLogs) > 0 { + lastTimestamp = mergedLogs[len(mergedLogs)-1].Timestamp + } + nextLog, nextChild := timeMax, timeMax + if i < len(ownLogs) { + nextLog = ownLogs[i].Timestamp + } + if j < len(childSpans) { + nextChild = childSpans[j][0].Timestamp + } + if nextLog.After(nextChild) { + // Fill in timeSincePrev for the first one of the child's entries. + if lastTimestamp != (time.Time{}) { + childSpans[j][0].timeSincePrev = childSpans[j][0].Timestamp.Sub(lastTimestamp) + } + mergedLogs = append(mergedLogs, childSpans[j]...) + lastTimestamp = childSpans[j][0].Timestamp + j++ + } else { + mergedLogs = append(mergedLogs, ownLogs[i]) + lastTimestamp = ownLogs[i].Timestamp + i++ + } + } + + return mergedLogs +} + +// ToJaegerJSON returns the trace as a JSON that can be imported into Jaeger for +// visualization. +// +// The format is described here: https://github.com/jaegertracing/jaeger-ui/issues/381#issuecomment-494150826 +// +// The statement is passed in so it can be included in the trace. +func (r Recording) ToJaegerJSON(stmt string) (string, error) { + if len(r) == 0 { + return "", nil + } + + cpy := make(Recording, len(r)) + copy(cpy, r) + r = cpy + tagsCopy := make(map[string]string) + for k, v := range r[0].Tags { + tagsCopy[k] = v + } + tagsCopy["statement"] = stmt + r[0].Tags = tagsCopy + + toJaegerSpanID := func(spanID uint64) jaegerjson.SpanID { + return jaegerjson.SpanID(strconv.FormatUint(spanID, 10)) + } + + // Each Span in Jaeger belongs to a "process" that generated it. Spans + // belonging to different colors are colored differently in Jaeger. We're + // going to map our different nodes to different processes. + processes := make(map[jaegerjson.ProcessID]jaegerjson.Process) + // getProcessID figures out what "process" a Span belongs to. It looks for an + // "node: " tag. The processes map is populated with an entry for every + // node present in the trace. + getProcessID := func(sp tracingpb.RecordedSpan) jaegerjson.ProcessID { + node := "unknown node" + for k, v := range sp.Tags { + if k == "node" { + node = fmt.Sprintf("node %s", v) + break + } + } + pid := jaegerjson.ProcessID(node) + if _, ok := processes[pid]; !ok { + processes[pid] = jaegerjson.Process{ + ServiceName: node, + Tags: nil, + } + } + return pid + } + + var t jaegerjson.Trace + t.TraceID = jaegerjson.TraceID(strconv.FormatUint(r[0].TraceID, 10)) + t.Processes = processes + + for _, sp := range r { + var s jaegerjson.Span + + s.TraceID = t.TraceID + s.Duration = uint64(sp.Duration.Microseconds()) + s.StartTime = uint64(sp.StartTime.UnixNano() / 1000) + s.SpanID = toJaegerSpanID(sp.SpanID) + s.OperationName = sp.Operation + s.ProcessID = getProcessID(sp) + + if sp.ParentSpanID != 0 { + s.References = []jaegerjson.Reference{{ + RefType: jaegerjson.ChildOf, + TraceID: s.TraceID, + SpanID: toJaegerSpanID(sp.ParentSpanID), + }} + } + + for k, v := range sp.Tags { + s.Tags = append(s.Tags, jaegerjson.KeyValue{ + Key: k, + Value: v, + Type: "STRING", + }) + } + for _, l := range sp.Logs { + jl := jaegerjson.Log{Timestamp: uint64(l.Time.UnixNano() / 1000)} + for _, field := range l.Fields { + jl.Fields = append(jl.Fields, jaegerjson.KeyValue{ + Key: field.Key, + Value: field.Value, + Type: "STRING", + }) + } + s.Logs = append(s.Logs, jl) + } + t.Spans = append(t.Spans, s) + } + + data := TraceCollection{ + Data: []jaegerjson.Trace{t}, + // Add a comment that will show-up at the top of the JSON file, is someone opens the file. + // NOTE: This comment is scarce on newlines because they appear as \n in the + // generated file doing more harm than good. + Comment: fmt.Sprintf(`This is a trace for SQL statement: %s +This trace can be imported into Jaeger for visualization. From the Jaeger Search screen, select JSON File. +Jaeger can be started using docker with: docker run -d --name jaeger -p 16686:16686 jaegertracing/all-in-one:1.17 +The UI can then be accessed at http://localhost:16686/search`, + stmt), + } + json, err := json.MarshalIndent(data, "" /* prefix */, "\t" /* indent */) + if err != nil { + return "", err + } + return string(json), nil +} + +// TraceCollection is the format accepted by the Jaegar upload feature, as per +// https://github.com/jaegertracing/jaeger-ui/issues/381#issuecomment-494150826 +type TraceCollection struct { + // Comment is a dummy field we use to put instructions on how to load the trace. + Comment string `json:"_comment"` + Data []jaegerjson.Trace `json:"data"` +} + +// TestingCheckRecordedSpans checks whether a recording looks like an expected +// one represented by a string with one line per expected Span and one line per +// expected event (i.e. log message). +// +// Use with something like: +// if err := TestingCheckRecordedSpans(Span.GetRecording(), ` +// Span root: +// event: a +// event: c +// Span child: +// event: [ambient] b +// `); err != nil { +// t.Fatal(err) +// } +// +// The event lines can (and generally should) omit the file:line part that they +// might contain (depending on the level at which they were logged). +// +// Note: this test function is in this file because it needs to be used by +// both tests in the tracing package and tests outside of it, and the function +// itself depends on tracing. +func TestingCheckRecordedSpans(recSpans []tracingpb.RecordedSpan, expected string) error { + expected = strings.TrimSpace(expected) + var rows []string + row := func(format string, args ...interface{}) { + rows = append(rows, fmt.Sprintf(format, args...)) + } + + for _, rs := range recSpans { + row("Span %s:", rs.Operation) + if len(rs.Tags) > 0 { + var tags []string + for k, v := range rs.Tags { + tags = append(tags, fmt.Sprintf("%s=%v", k, v)) + } + sort.Strings(tags) + row(" tags: %s", strings.Join(tags, " ")) + } + for _, l := range rs.Logs { + msg := "" + for _, f := range l.Fields { + msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value) + } + row("%s", msg) + } + } + var expRows []string + if expected != "" { + expRows = strings.Split(expected, "\n") + } + match := false + if len(expRows) == len(rows) { + match = true + for i := range expRows { + e := strings.Trim(expRows[i], " \t") + r := strings.Trim(rows[i], " \t") + if e != r && !matchesWithoutFileLine(r, e) { + match = false + break + } + } + } + if !match { + file, line, _ := caller.Lookup(1) + return errors.Errorf( + "%s:%d expected:\n%s\ngot:\n%s", + file, line, expected, strings.Join(rows, "\n")) + } + return nil +} + +// matchesWithoutFileLine tries to match an event by stripping a file:line from +// it. For example: +// "event: util/log/trace_test.go:111 log" will match "event: log". +// +// Returns true if it matches. +func matchesWithoutFileLine(msg string, expected string) bool { + groups := regexp.MustCompile(`^(event: ).*:[0-9]* (.*)$`).FindStringSubmatch(msg) + return len(groups) == 3 && fmt.Sprintf("event: %s", groups[2]) == expected +} diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index 3e9375119264..760b299112f9 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -12,12 +12,8 @@ package tracing import ( "bytes" - "encoding/json" "fmt" - "regexp" "sort" - "strconv" - "strings" "sync/atomic" "time" @@ -28,7 +24,6 @@ import ( "github.com/cockroachdb/logtags" proto "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" - jaegerjson "github.com/jaegertracing/jaeger/model/json" opentracing "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "golang.org/x/net/trace" @@ -95,27 +90,6 @@ func (sc *SpanContext) ForeachBaggageItem(handler func(k, v string) bool) { } } -// RecordingType is the type of recording that a Span might be performing. -type RecordingType int - -const ( - // NoRecording means that the Span isn't recording. Child spans created from - // it similarly won't be recording by default. - NoRecording RecordingType = iota - // SnowballRecording means that the Span is recording and that derived - // spans will be as well, in the same mode (this includes remote spans, - // i.e. this mode crosses RPC boundaries). Derived spans will maintain - // their own recording, and this recording will be included in that of - // any local parent spans. - SnowballRecording - // SingleNodeRecording means that the Span is recording and that locally - // derived spans will as well (i.e. a remote Span typically won't be - // recording by default, in contrast to SnowballRecording). Similar to - // SnowballRecording, children have their own recording which is also - // included in that of their parents. - SingleNodeRecording -) - type crdbSpan struct { spanMeta @@ -171,10 +145,15 @@ func (s *crdbSpan) isRecording() bool { return s != nil && atomic.LoadInt32(&s.recording) != 0 } +// otSpan is a span for an external opentracing compatible tracer +// such as lightstep, zipkin, jaeger, etc. type otSpan struct { - // TODO(tbg): see if we can lose the shadowTr here and rely on shadowSpan.Tracer(). - // Probably not - but worth checking. - // TODO(tbg): consider renaming 'shadow' -> 'ot' or 'external'. + // shadowTr is the shadowTracer this span was created from. We need + // to hold on to it separately because shadowSpan.Tracer() returns + // the wrapper tracer and we lose the ability to find out + // what tracer it is. This is important when deriving children from + // this span, as we want to avoid mixing different tracers, which + // would otherwise be the result of cluster settings changed. shadowTr *shadowTracer shadowSpan opentracing.Span } @@ -230,8 +209,8 @@ func (s *Span) isNoop() bool { } // IsRecording returns true if the Span is recording its events. -func IsRecording(sp *Span) bool { - return sp.crdb.isRecording() +func (s *Span) IsRecording() bool { + return s.crdb.isRecording() } // enableRecording start recording on the Span. From now on, log events and child spans @@ -274,18 +253,18 @@ func (s *crdbSpan) enableRecording( // // Children spans created from the Span while it is *not* recording will not // necessarily be recordable. -func StartRecording(sp *Span, recType RecordingType) { +func (s *Span) StartRecording(recType RecordingType) { if recType == NoRecording { panic("StartRecording called with NoRecording") } - if sp.isNoop() { + if s.isNoop() { panic("StartRecording called on NoopSpan; use the Recordable option for StartSpan") } // If we're already recording (perhaps because the parent was recording when // this Span was created), there's nothing to do. - if !sp.crdb.isRecording() { - sp.crdb.enableRecording(nil /* parent */, recType, false /* separateRecording */) + if !s.crdb.isRecording() { + s.crdb.enableRecording(nil /* parent */, recType, false /* separateRecording */) } } @@ -296,11 +275,7 @@ func StartRecording(sp *Span, recType RecordingType) { // when all the spans finish. // // StopRecording() can be called on a Finish()ed Span. -func StopRecording(sp *Span) { - sp.disableRecording() -} - -func (s *Span) disableRecording() { +func (s *Span) StopRecording() { if s.isNoop() { panic("can't disable recording a noop Span") } @@ -321,24 +296,11 @@ func (s *crdbSpan) disableRecording() { } } -// IsRecordable returns true if {Start,Stop}Recording() can be called on this -// Span. -// -// In other words, this tests if the Span is our custom type, and not a noopSpan -// or anything else. -func IsRecordable(sp *Span) bool { - return !sp.isNoop() -} - -// Recording represents a group of RecordedSpans, as returned by GetRecording. -// Spans are sorted by StartTime. -type Recording []tracingpb.RecordedSpan - // GetRecording retrieves the current recording, if the Span has recording // enabled. This can be called while spans that are part of the recording are // still open; it can run concurrently with operations on those spans. -func GetRecording(sp *Span) Recording { - return sp.crdb.getRecording() +func (s *Span) GetRecording() Recording { + return s.crdb.getRecording() } func (s *crdbSpan) getRecording() Recording { @@ -368,344 +330,11 @@ func (s *crdbSpan) getRecording() Recording { return result } -type traceLogData struct { - opentracing.LogRecord - depth int - // timeSincePrev represents the duration since the previous log line (previous in the - // set of log lines that this is part of). This is always computed relative to a log line - // from the same Span, except for start of Span in which case the duration is computed relative - // to the last log in the parent occurring before this start. For example: - // start Span A - // log 1 // duration relative to "start Span A" - // start Span B // duration relative to "log 1" - // log 2 // duration relative to "start Span B" - // log 3 // duration relative to "log 1" - timeSincePrev time.Duration -} - -// String formats the given spans for human consumption, showing the -// relationship using nesting and times as both relative to the previous event -// and cumulative. -// -// Child spans are inserted into the parent at the point of the child's -// StartTime; see the diagram on generateSessionTraceVTable() for the ordering -// of messages. -// -// Each log line show the time since the beginning of the trace -// and since the previous log line. Span starts are shown with special "=== -// " lines. For a Span start, the time since the relative log line -// can be negative when the Span start follows a message from the parent that -// was generated after the child Span started (or even after the child -// finished). -// -// TODO(andrei): this should be unified with -// SessionTracing.generateSessionTraceVTable(). -func (r Recording) String() string { - if len(r) == 0 { - return "" - } - - var buf strings.Builder - start := r[0].StartTime - writeLogs := func(logs []traceLogData) { - for _, entry := range logs { - fmt.Fprintf(&buf, "% 10.3fms % 10.3fms%s", - 1000*entry.Timestamp.Sub(start).Seconds(), - 1000*entry.timeSincePrev.Seconds(), - strings.Repeat(" ", entry.depth+1)) - for i, f := range entry.Fields { - if i != 0 { - buf.WriteByte(' ') - } - fmt.Fprintf(&buf, "%s:%v", f.Key(), f.Value()) - } - buf.WriteByte('\n') - } - } - - logs := r.visitSpan(r[0], 0 /* depth */) - writeLogs(logs) - - // Check if there's any orphan spans (spans for which the parent is missing). - // This shouldn't happen, but we're protecting against incomplete traces. For - // example, ingesting of remote spans through DistSQL is complex. Orphan spans - // would not be reflected in the output string at all without this. - orphans := r.OrphanSpans() - if len(orphans) > 0 { - // This shouldn't happen. - buf.WriteString("orphan spans (trace is missing spans):\n") - for _, o := range orphans { - logs := r.visitSpan(o, 0 /* depth */) - writeLogs(logs) - } - } - return buf.String() -} - -// OrphanSpans returns the spans with parents missing from the recording. -func (r Recording) OrphanSpans() []tracingpb.RecordedSpan { - spanIDs := make(map[uint64]struct{}) - for _, sp := range r { - spanIDs[sp.SpanID] = struct{}{} - } - - var orphans []tracingpb.RecordedSpan - for i, sp := range r { - if i == 0 { - // The first Span can be a root Span. Note that any other root Span will - // be considered an orphan. - continue - } - if _, ok := spanIDs[sp.ParentSpanID]; !ok { - orphans = append(orphans, sp) - } - } - return orphans -} - -// FindLogMessage returns the first log message in the recording that matches -// the given regexp. The bool return value is true if such a message is found. -func (r Recording) FindLogMessage(pattern string) (string, bool) { - re := regexp.MustCompile(pattern) - for _, sp := range r { - for _, l := range sp.Logs { - msg := l.Msg() - if re.MatchString(msg) { - return msg, true - } - } - } - return "", false -} - -// FindSpan returns the Span with the given operation. The bool retval is false -// if the Span is not found. -func (r Recording) FindSpan(operation string) (tracingpb.RecordedSpan, bool) { - for _, sp := range r { - if sp.Operation == operation { - return sp, true - } - } - return tracingpb.RecordedSpan{}, false -} - -// visitSpan returns the log messages for sp, and all of sp's children. -// -// All messages from a Span are kept together. Sibling spans are ordered within -// the parent in their start order. -func (r Recording) visitSpan(sp tracingpb.RecordedSpan, depth int) []traceLogData { - ownLogs := make([]traceLogData, 0, len(sp.Logs)+1) - - conv := func(l opentracing.LogRecord, ref time.Time) traceLogData { - var timeSincePrev time.Duration - if ref != (time.Time{}) { - timeSincePrev = l.Timestamp.Sub(ref) - } - return traceLogData{ - LogRecord: l, - depth: depth, - timeSincePrev: timeSincePrev, - } - } - - // Add a log line representing the start of the Span. - lr := opentracing.LogRecord{ - Timestamp: sp.StartTime, - Fields: []otlog.Field{otlog.String("=== operation", sp.Operation)}, - } - if len(sp.Tags) > 0 { - tags := make([]string, 0, len(sp.Tags)) - for k := range sp.Tags { - tags = append(tags, k) - } - sort.Strings(tags) - for _, k := range tags { - lr.Fields = append(lr.Fields, otlog.String(k, sp.Tags[k])) - } - } - ownLogs = append(ownLogs, conv( - lr, - // ref - this entries timeSincePrev will be computed when we merge it into the parent - time.Time{})) - - for _, l := range sp.Logs { - lr := opentracing.LogRecord{ - Timestamp: l.Time, - Fields: make([]otlog.Field, len(l.Fields)), - } - for i, f := range l.Fields { - lr.Fields[i] = otlog.String(f.Key, f.Value) - } - lastLog := ownLogs[len(ownLogs)-1] - ownLogs = append(ownLogs, conv(lr, lastLog.Timestamp)) - } - - childSpans := make([][]traceLogData, 0) - for _, osp := range r { - if osp.ParentSpanID != sp.SpanID { - continue - } - childSpans = append(childSpans, r.visitSpan(osp, depth+1)) - } - - // Merge ownLogs with childSpans. - mergedLogs := make([]traceLogData, 0, len(ownLogs)) - timeMax := time.Date(2200, 0, 0, 0, 0, 0, 0, time.UTC) - i, j := 0, 0 - var lastTimestamp time.Time - for i < len(ownLogs) || j < len(childSpans) { - if len(mergedLogs) > 0 { - lastTimestamp = mergedLogs[len(mergedLogs)-1].Timestamp - } - nextLog, nextChild := timeMax, timeMax - if i < len(ownLogs) { - nextLog = ownLogs[i].Timestamp - } - if j < len(childSpans) { - nextChild = childSpans[j][0].Timestamp - } - if nextLog.After(nextChild) { - // Fill in timeSincePrev for the first one of the child's entries. - if lastTimestamp != (time.Time{}) { - childSpans[j][0].timeSincePrev = childSpans[j][0].Timestamp.Sub(lastTimestamp) - } - mergedLogs = append(mergedLogs, childSpans[j]...) - lastTimestamp = childSpans[j][0].Timestamp - j++ - } else { - mergedLogs = append(mergedLogs, ownLogs[i]) - lastTimestamp = ownLogs[i].Timestamp - i++ - } - } - - return mergedLogs -} - -// ToJaegerJSON returns the trace as a JSON that can be imported into Jaeger for -// visualization. -// -// The format is described here: https://github.com/jaegertracing/jaeger-ui/issues/381#issuecomment-494150826 -// -// The statement is passed in so it can be included in the trace. -func (r Recording) ToJaegerJSON(stmt string) (string, error) { - if len(r) == 0 { - return "", nil - } - - cpy := make(Recording, len(r)) - copy(cpy, r) - r = cpy - tagsCopy := make(map[string]string) - for k, v := range r[0].Tags { - tagsCopy[k] = v - } - tagsCopy["statement"] = stmt - r[0].Tags = tagsCopy - - toJaegerSpanID := func(spanID uint64) jaegerjson.SpanID { - return jaegerjson.SpanID(strconv.FormatUint(spanID, 10)) - } - - // Each Span in Jaeger belongs to a "process" that generated it. Spans - // belonging to different colors are colored differently in Jaeger. We're - // going to map our different nodes to different processes. - processes := make(map[jaegerjson.ProcessID]jaegerjson.Process) - // getProcessID figures out what "process" a Span belongs to. It looks for an - // "node: " tag. The processes map is populated with an entry for every - // node present in the trace. - getProcessID := func(sp tracingpb.RecordedSpan) jaegerjson.ProcessID { - node := "unknown node" - for k, v := range sp.Tags { - if k == "node" { - node = fmt.Sprintf("node %s", v) - break - } - } - pid := jaegerjson.ProcessID(node) - if _, ok := processes[pid]; !ok { - processes[pid] = jaegerjson.Process{ - ServiceName: node, - Tags: nil, - } - } - return pid - } - - var t jaegerjson.Trace - t.TraceID = jaegerjson.TraceID(strconv.FormatUint(r[0].TraceID, 10)) - t.Processes = processes - - for _, sp := range r { - var s jaegerjson.Span - - s.TraceID = t.TraceID - s.Duration = uint64(sp.Duration.Microseconds()) - s.StartTime = uint64(sp.StartTime.UnixNano() / 1000) - s.SpanID = toJaegerSpanID(sp.SpanID) - s.OperationName = sp.Operation - s.ProcessID = getProcessID(sp) - - if sp.ParentSpanID != 0 { - s.References = []jaegerjson.Reference{{ - RefType: jaegerjson.ChildOf, - TraceID: s.TraceID, - SpanID: toJaegerSpanID(sp.ParentSpanID), - }} - } - - for k, v := range sp.Tags { - s.Tags = append(s.Tags, jaegerjson.KeyValue{ - Key: k, - Value: v, - Type: "STRING", - }) - } - for _, l := range sp.Logs { - jl := jaegerjson.Log{Timestamp: uint64(l.Time.UnixNano() / 1000)} - for _, field := range l.Fields { - jl.Fields = append(jl.Fields, jaegerjson.KeyValue{ - Key: field.Key, - Value: field.Value, - Type: "STRING", - }) - } - s.Logs = append(s.Logs, jl) - } - t.Spans = append(t.Spans, s) - } - - data := TraceCollection{ - Data: []jaegerjson.Trace{t}, - // Add a comment that will show-up at the top of the JSON file, is someone opens the file. - // NOTE: This comment is scarce on newlines because they appear as \n in the - // generated file doing more harm than good. - Comment: fmt.Sprintf(`This is a trace for SQL statement: %s -This trace can be imported into Jaeger for visualization. From the Jaeger Search screen, select JSON File. -Jaeger can be started using docker with: docker run -d --name jaeger -p 16686:16686 jaegertracing/all-in-one:1.17 -The UI can then be accessed at http://localhost:16686/search`, - stmt), - } - json, err := json.MarshalIndent(data, "" /* prefix */, "\t" /* indent */) - if err != nil { - return "", err - } - return string(json), nil -} - -// TraceCollection is the format accepted by the Jaegar upload feature, as per -// https://github.com/jaegertracing/jaeger-ui/issues/381#issuecomment-494150826 -type TraceCollection struct { - // Comment is a dummy field we use to put instructions on how to load the trace. - Comment string `json:"_comment"` - Data []jaegerjson.Trace `json:"data"` -} - // ImportRemoteSpans adds RecordedSpan data to the recording of the given Span; // these spans will be part of the result of GetRecording. Used to import // recorded traces from other nodes. -func ImportRemoteSpans(sp *Span, remoteSpans []tracingpb.RecordedSpan) error { - return sp.crdb.ImportRemoteSpans(remoteSpans) +func (s *Span) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error { + return s.crdb.ImportRemoteSpans(remoteSpans) } func (s *crdbSpan) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error { @@ -723,7 +352,7 @@ func (s *crdbSpan) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error return nil } -// IsBlackHoleSpan returns true if events for this Span are just dropped. This +// IsBlackHole returns true if events for this Span are just dropped. This // is the case when the Span is not recording and no external tracer is configured. // Tracing clients can use this method to figure out if they can short-circuit some // tracing-related work that would be discarded anyway. @@ -731,51 +360,42 @@ func (s *crdbSpan) ImportRemoteSpans(remoteSpans []tracingpb.RecordedSpan) error // The child of a blackhole Span is a non-recordable blackhole Span[*]. These incur // only minimal overhead. It is therefore not worth it to call this method to avoid // starting spans. -func IsBlackHoleSpan(sp *Span) bool { - return sp.isBlackHole() +func (s *Span) IsBlackHole() bool { + return s.isBlackHole() } -// IsNoopContext returns true if the Span context is from a "no-op" Span. If +// IsNoop returns true if the Span context is from a "no-op" Span. If // this is true, any Span derived from this context will be a "black hole Span". // // You should never need to care about this method. It is exported for technical // reasons. -func IsNoopContext(sc *SpanContext) bool { - return sc.isNoop() -} - -func (sc *SpanContext) isNoop() bool { +func (sc *SpanContext) IsNoop() bool { return sc.recordingType == NoRecording && sc.shadowTr == nil } // SetSpanStats sets the stats on a Span. stats.Stats() will also be added to // the Span tags. -func SetSpanStats(sp *Span, stats SpanStats) { - if sp.isNoop() { +func (s *Span) SetSpanStats(stats SpanStats) { + if s.isNoop() { return } - sp.crdb.mu.Lock() - sp.crdb.mu.stats = stats + s.crdb.mu.Lock() + s.crdb.mu.stats = stats for name, value := range stats.Stats() { - sp.setTagInner(StatTagPrefix+name, value, true /* locked */) + s.setTagInner(StatTagPrefix+name, value, true /* locked */) } - sp.crdb.mu.Unlock() + s.crdb.mu.Unlock() } -// Finish is part of the opentracing.Span interface. +// Finish marks the Span as completed. Finishing a nil *Span is a noop. func (s *Span) Finish() { - s.FinishWithOptions(opentracing.FinishOptions{}) -} - -// FinishWithOptions is part of the opentracing.Span interface. -func (s *Span) FinishWithOptions(opts opentracing.FinishOptions) { - if s.isNoop() { + if s == nil { return } - finishTime := opts.FinishTime - if finishTime.IsZero() { - finishTime = time.Now() + if s.isNoop() { + return } + finishTime := time.Now() s.crdb.mu.Lock() s.crdb.mu.duration = finishTime.Sub(s.crdb.startTime) s.crdb.mu.Unlock() diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index ceaa76a82a52..9b4fbff73d4d 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -28,7 +28,7 @@ func TestRecordingString(t *testing.T) { tr2 := NewTracer() root := tr.StartSpan("root", Recordable) - StartRecording(root, SnowballRecording) + root.StartRecording(SnowballRecording) root.LogFields(otlog.String(tracingpb.LogMessageField, "root 1")) // Hackily fix the timing on the first log message, so that we can check it later. root.crdb.mu.recording.recordedLogs[0].Timestamp = root.crdb.startTime.Add(time.Millisecond) @@ -45,8 +45,8 @@ func TestRecordingString(t *testing.T) { remoteChild.LogFields(otlog.String(tracingpb.LogMessageField, "remote child 1")) require.NoError(t, err) remoteChild.Finish() - remoteRec := GetRecording(remoteChild) - err = ImportRemoteSpans(root, remoteRec) + remoteRec := remoteChild.GetRecording() + err = root.ImportRemoteSpans(remoteRec) require.NoError(t, err) root.Finish() @@ -60,7 +60,7 @@ func TestRecordingString(t *testing.T) { root.LogFields(otlog.String(tracingpb.LogMessageField, "root 5")) root.Finish() - rec := GetRecording(root) + rec := root.GetRecording() // Sanity check that the recording looks like we want. Note that this is not // its String() representation; this just list all the spans in order. err = TestingCheckRecordedSpans(rec, ` @@ -146,15 +146,15 @@ func TestRecordingInRecording(t *testing.T) { tr := NewTracer() root := tr.StartSpan("root", Recordable) - StartRecording(root, SnowballRecording) + root.StartRecording(SnowballRecording) child := tr.StartSpan("child", opentracing.ChildOf(root.Context()), Recordable) - StartRecording(child, SnowballRecording) + child.StartRecording(SnowballRecording) grandChild := tr.StartSpan("grandchild", opentracing.ChildOf(child.Context())) grandChild.Finish() child.Finish() root.Finish() - rootRec := GetRecording(root) + rootRec := root.GetRecording() require.NoError(t, TestingCheckRecordedSpans(rootRec, ` Span root: tags: sb=1 @@ -164,7 +164,7 @@ Span grandchild: tags: sb=1 `)) - childRec := GetRecording(child) + childRec := child.GetRecording() require.NoError(t, TestingCheckRecordedSpans(childRec, ` Span child: tags: sb=1 diff --git a/pkg/util/tracing/tags_test.go b/pkg/util/tracing/tags_test.go index 7f90825ce0a2..592a3a675050 100644 --- a/pkg/util/tracing/tags_test.go +++ b/pkg/util/tracing/tags_test.go @@ -25,9 +25,9 @@ func TestLogTags(t *testing.T) { l := logtags.SingleTagBuffer("tag1", "val1") l = l.Add("tag2", "val2") sp1 := tr.StartSpan("foo", Recordable, LogTags(l)) - StartRecording(sp1, SingleNodeRecording) + sp1.StartRecording(SingleNodeRecording) sp1.Finish() - require.NoError(t, TestingCheckRecordedSpans(GetRecording(sp1), ` + require.NoError(t, TestingCheckRecordedSpans(sp1.GetRecording(), ` Span foo: tags: tag1=val1 tag2=val2 `)) @@ -38,9 +38,9 @@ func TestLogTags(t *testing.T) { RegisterTagRemapping("tag2", "two") sp2 := tr.StartSpan("bar", Recordable, LogTags(l)) - StartRecording(sp2, SingleNodeRecording) + sp2.StartRecording(SingleNodeRecording) sp2.Finish() - require.NoError(t, TestingCheckRecordedSpans(GetRecording(sp2), ` + require.NoError(t, TestingCheckRecordedSpans(sp2.GetRecording(), ` Span bar: tags: one=val1 two=val2 `)) @@ -48,9 +48,9 @@ func TestLogTags(t *testing.T) { shadowTracer.clear() sp3 := tr.StartRootSpan("baz", l, RecordableSpan) - StartRecording(sp3, SingleNodeRecording) + sp3.StartRecording(SingleNodeRecording) sp3.Finish() - require.NoError(t, TestingCheckRecordedSpans(GetRecording(sp3), ` + require.NoError(t, TestingCheckRecordedSpans(sp3.GetRecording(), ` Span baz: tags: one=val1 two=val2 `)) diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 384d47014c29..07f6c3f79254 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -12,10 +12,7 @@ package tracing import ( "context" - "fmt" "math/rand" - "regexp" - "sort" "strconv" "strings" "sync/atomic" @@ -23,10 +20,7 @@ import ( "unsafe" "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/util/caller" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" - "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" opentracing "github.com/opentracing/opentracing-go" "golang.org/x/net/trace" @@ -94,12 +88,6 @@ type Tracer struct { // x/net/trace or lightstep and we are not recording. noopSpan *Span - // If forceRealSpans is set, this Tracer will always create real spans (never - // noopSpans), regardless of the recording or lightstep configuration. Used - // by tests for situations when they need to indirectly create spans and don't - // have the option of passing the Recordable option to their constructor. - forceRealSpans bool - // True if tracing to the debug/requests endpoint. Accessed via t.useNetTrace(). _useNetTrace int32 // updated atomically @@ -151,14 +139,6 @@ func (t *Tracer) Close() { t.setShadowTracer(nil, nil) } -// SetForceRealSpans sets forceRealSpans option to v and returns the previous -// value. -func (t *Tracer) SetForceRealSpans(v bool) bool { - prevVal := t.forceRealSpans - t.forceRealSpans = v - return prevVal -} - func (t *Tracer) setShadowTracer(manager shadowTracerManager, tr opentracing.Tracer) { var shadow *shadowTracer if manager != nil { @@ -202,7 +182,7 @@ func (t *Tracer) StartSpan(operationName string, opts ...opentracing.StartSpanOp // case) with a noop context, return a noop Span now. if len(opts) == 1 { if o, ok := opts[0].(opentracing.SpanReference); ok { - if IsNoopContext(o.ReferencedContext.(*SpanContext)) { + if o.ReferencedContext.(*SpanContext).IsNoop() { return t.noopSpan } } @@ -243,7 +223,7 @@ func (t *Tracer) StartSpan(operationName string, opts ...opentracing.StartSpanOp if r.ReferencedContext == nil { continue } - if IsNoopContext(r.ReferencedContext.(*SpanContext)) { + if r.ReferencedContext.(*SpanContext).IsNoop() { continue } parentType = r.Type @@ -276,7 +256,7 @@ const ( // context. func (t *Tracer) AlwaysTrace() bool { shadowTracer := t.getShadowTracer() - return t.useNetTrace() || shadowTracer != nil || t.forceRealSpans + return t.useNetTrace() || shadowTracer != nil } // StartRootSpan creates a root Span. This is functionally equivalent to: @@ -428,7 +408,7 @@ func (fn textMapWriterFn) Set(key, val string) { func (t *Tracer) Inject( osc opentracing.SpanContext, format interface{}, carrier interface{}, ) error { - if IsNoopContext(osc.(*SpanContext)) { + if osc.(*SpanContext).IsNoop() { // Fast path when tracing is disabled. Extract will accept an empty map as a // noop context. return nil @@ -558,14 +538,6 @@ func (t *Tracer) Extract(format interface{}, carrier interface{}) (*SpanContext, return &sc, nil } -// FinishSpan closes the given Span (if not nil). It is a convenience wrapper -// for Span.Finish() which tolerates nil spans. -func FinishSpan(span *Span) { - if span != nil { - span.Finish() - } -} - // ForkCtxSpan checks if ctx has a Span open; if it does, it creates a new Span // that "follows from" the original Span. This allows the resulting context to be // used in an async task that might outlive the original operation. @@ -581,7 +553,7 @@ func ForkCtxSpan(ctx context.Context, opName string) (context.Context, *Span) { return ctx, sp } tr := sp.Tracer() - if IsBlackHoleSpan(sp) { + if sp.IsBlackHole() { ns := tr.noopSpan return ContextWithSpan(ctx, ns), ns } @@ -616,7 +588,7 @@ func childSpan( return ctx, sp } tr := sp.Tracer() - if IsBlackHoleSpan(sp) { + if sp.IsBlackHole() { ns := tr.noopSpan return ContextWithSpan(ctx, ns), ns } @@ -685,91 +657,10 @@ func StartSnowballTrace( } else { span = tracer.StartSpan(opName, Recordable, LogTagsFromCtx(ctx)) } - StartRecording(span, SnowballRecording) + span.StartRecording(SnowballRecording) return ContextWithSpan(ctx, span), span } -// TestingCheckRecordedSpans checks whether a recording looks like an expected -// one represented by a string with one line per expected Span and one line per -// expected event (i.e. log message). -// -// Use with something like: -// if err := TestingCheckRecordedSpans(tracing.GetRecording(Span), ` -// Span root: -// event: a -// event: c -// Span child: -// event: [ambient] b -// `); err != nil { -// t.Fatal(err) -// } -// -// The event lines can (and generally should) omit the file:line part that they -// might contain (depending on the level at which they were logged). -// -// Note: this test function is in this file because it needs to be used by -// both tests in the tracing package and tests outside of it, and the function -// itself depends on tracing. -func TestingCheckRecordedSpans(recSpans []tracingpb.RecordedSpan, expected string) error { - expected = strings.TrimSpace(expected) - var rows []string - row := func(format string, args ...interface{}) { - rows = append(rows, fmt.Sprintf(format, args...)) - } - - for _, rs := range recSpans { - row("Span %s:", rs.Operation) - if len(rs.Tags) > 0 { - var tags []string - for k, v := range rs.Tags { - tags = append(tags, fmt.Sprintf("%s=%v", k, v)) - } - sort.Strings(tags) - row(" tags: %s", strings.Join(tags, " ")) - } - for _, l := range rs.Logs { - msg := "" - for _, f := range l.Fields { - msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value) - } - row("%s", msg) - } - } - var expRows []string - if expected != "" { - expRows = strings.Split(expected, "\n") - } - match := false - if len(expRows) == len(rows) { - match = true - for i := range expRows { - e := strings.Trim(expRows[i], " \t") - r := strings.Trim(rows[i], " \t") - if e != r && !matchesWithoutFileLine(r, e) { - match = false - break - } - } - } - if !match { - file, line, _ := caller.Lookup(1) - return errors.Errorf( - "%s:%d expected:\n%s\ngot:\n%s", - file, line, expected, strings.Join(rows, "\n")) - } - return nil -} - -// matchesWithoutFileLine tries to match an event by stripping a file:line from -// it. For example: -// "event: util/log/trace_test.go:111 log" will match "event: log". -// -// Returns true if it matches. -func matchesWithoutFileLine(msg string, expected string) bool { - groups := regexp.MustCompile(`^(event: ).*:[0-9]* (.*)$`).FindStringSubmatch(msg) - return len(groups) == 3 && fmt.Sprintf("event: %s", groups[2]) == expected -} - // ContextWithRecordingSpan returns a context with an embedded trace Span which // returns its contents when getRecording is called and must be stopped by // calling the cancel method when done with the context (getRecording() needs to @@ -782,18 +673,15 @@ func ContextWithRecordingSpan( ) (retCtx context.Context, getRecording func() Recording, cancel func()) { tr := NewTracer() sp := tr.StartSpan(opName, Recordable, LogTagsFromCtx(ctx)) - StartRecording(sp, SnowballRecording) + sp.StartRecording(SnowballRecording) ctx, cancelCtx := context.WithCancel(ctx) ctx = ContextWithSpan(ctx, sp) - getRecording = func() Recording { - return GetRecording(sp) - } cancel = func() { cancelCtx() - StopRecording(sp) + sp.StopRecording() sp.Finish() tr.Close() } - return ctx, getRecording, cancel + return ctx, sp.GetRecording, cancel } diff --git a/pkg/util/tracing/tracer_test.go b/pkg/util/tracing/tracer_test.go index 6afa4a1c96d7..26c4473f9476 100644 --- a/pkg/util/tracing/tracer_test.go +++ b/pkg/util/tracing/tracer_test.go @@ -38,7 +38,7 @@ func TestTracerRecording(t *testing.T) { if s1.isNoop() { t.Error("Recordable (but not recording) Span should not be noop") } - if !IsBlackHoleSpan(s1) { + if !s1.IsBlackHole() { t.Error("Recordable Span should be black hole") } @@ -50,15 +50,15 @@ func TestTracerRecording(t *testing.T) { noop3.Finish() s1.LogKV("x", 1) - StartRecording(s1, SingleNodeRecording) + s1.StartRecording(SingleNodeRecording) s1.LogKV("x", 2) s2 := tr.StartSpan("b", opentracing.ChildOf(s1.Context())) - if IsBlackHoleSpan(s2) { + if s2.IsBlackHole() { t.Error("recording Span should not be black hole") } s2.LogKV("x", 3) - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: unfinished= x: 2 @@ -69,7 +69,7 @@ func TestTracerRecording(t *testing.T) { t.Fatal(err) } - if err := TestingCheckRecordedSpans(GetRecording(s2), ` + if err := TestingCheckRecordedSpans(s2.GetRecording(), ` Span b: tags: unfinished= x: 3 @@ -83,7 +83,7 @@ func TestTracerRecording(t *testing.T) { s2.Finish() - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: unfinished= x: 2 @@ -96,7 +96,7 @@ func TestTracerRecording(t *testing.T) { t.Fatal(err) } s3.Finish() - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: unfinished= x: 2 @@ -108,15 +108,15 @@ func TestTracerRecording(t *testing.T) { `); err != nil { t.Fatal(err) } - StopRecording(s1) + s1.StopRecording() s1.LogKV("x", 100) - if err := TestingCheckRecordedSpans(GetRecording(s1), ``); err != nil { + if err := TestingCheckRecordedSpans(s1.GetRecording(), ``); err != nil { t.Fatal(err) } // The child Span is still recording. s3.LogKV("x", 5) - if err := TestingCheckRecordedSpans(GetRecording(s3), ` + if err := TestingCheckRecordedSpans(s3.GetRecording(), ` Span c: tags: tag=val x: 4 @@ -130,11 +130,11 @@ func TestTracerRecording(t *testing.T) { func TestStartChildSpan(t *testing.T) { tr := NewTracer() sp1 := tr.StartSpan("parent", Recordable) - StartRecording(sp1, SingleNodeRecording) + sp1.StartRecording(SingleNodeRecording) sp2 := tr.StartChildSpan("child", sp1.SpanContext(), nil /* logTags */, false /* recordable */, false /*separateRecording*/) sp2.Finish() sp1.Finish() - if err := TestingCheckRecordedSpans(GetRecording(sp1), ` + if err := TestingCheckRecordedSpans(sp1.GetRecording(), ` Span parent: Span child: `); err != nil { @@ -142,29 +142,29 @@ func TestStartChildSpan(t *testing.T) { } sp1 = tr.StartSpan("parent", Recordable) - StartRecording(sp1, SingleNodeRecording) + sp1.StartRecording(SingleNodeRecording) sp2 = tr.StartChildSpan("child", sp1.SpanContext(), nil /* logTags */, false /* recordable */, true /*separateRecording*/) sp2.Finish() sp1.Finish() - if err := TestingCheckRecordedSpans(GetRecording(sp1), ` + if err := TestingCheckRecordedSpans(sp1.GetRecording(), ` Span parent: `); err != nil { t.Fatal(err) } - if err := TestingCheckRecordedSpans(GetRecording(sp2), ` + if err := TestingCheckRecordedSpans(sp2.GetRecording(), ` Span child: `); err != nil { t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) - StartRecording(sp1, SingleNodeRecording) + sp1.StartRecording(SingleNodeRecording) sp2 = tr.StartChildSpan( "child", sp1.SpanContext(), logtags.SingleTagBuffer("key", "val"), false /* recordable */, false, /*separateRecording*/ ) sp2.Finish() sp1.Finish() - if err := TestingCheckRecordedSpans(GetRecording(sp1), ` + if err := TestingCheckRecordedSpans(sp1.GetRecording(), ` Span parent: Span child: tags: key=val @@ -195,7 +195,7 @@ func TestTracerInjectExtract(t *testing.T) { if err != nil { t.Fatal(err) } - if !wireContext.isNoop() { + if !wireContext.IsNoop() { t.Errorf("expected noop context: %v", wireContext) } noop2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) @@ -209,7 +209,7 @@ func TestTracerInjectExtract(t *testing.T) { // remote side. s1 := tr.StartSpan("a", Recordable) - StartRecording(s1, SnowballRecording) + s1.StartRecording(SnowballRecording) carrier = make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(s1.Context(), opentracing.HTTPHeaders, carrier); err != nil { @@ -232,7 +232,7 @@ func TestTracerInjectExtract(t *testing.T) { s2.Finish() // Verify that recording was started automatically. - rec := GetRecording(s2) + rec := s2.GetRecording() if err := TestingCheckRecordedSpans(rec, ` Span remote op: tags: sb=1 @@ -241,19 +241,19 @@ func TestTracerInjectExtract(t *testing.T) { t.Fatal(err) } - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: sb=1 unfinished= `); err != nil { t.Fatal(err) } - if err := ImportRemoteSpans(s1, rec); err != nil { + if err := s1.ImportRemoteSpans(rec); err != nil { t.Fatal(err) } s1.Finish() - if err := TestingCheckRecordedSpans(GetRecording(s1), ` + if err := TestingCheckRecordedSpans(s1.GetRecording(), ` Span a: tags: sb=1 Span remote op: