Skip to content

Commit

Permalink
Merge #56045
Browse files Browse the repository at this point in the history
56045: tracing: clean up receivers r=RaduBerinde a=tbg

Assorted cleanups that are possible now, that we have moved off the opentracing
interfaces.
There will be more cleanups.

- tracing: move recordings to new file
- tracing: use receivers for everything
- tracing: allow Finish() on nil *Span
- tracing: remove (*Tracer).SetForceRealSpans
- tracing: improve comment on otSpan


Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Oct 29, 2020
2 parents 157b6dc + b25946e commit f945cae
Show file tree
Hide file tree
Showing 74 changed files with 680 additions and 724 deletions.
2 changes: 1 addition & 1 deletion pkg/bench/ddl_analysis/ddl_analysis_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func RunRoundTripBenchmark(b *testing.B, tests []RoundTripBenchTestCase) {
beforePlan := func(sp *tracing.Span, stmt string) {
if _, ok := stmtToKvBatchRequests.Load(stmt); ok {
sp.Finish()
trace := tracing.GetRecording(sp)
trace := sp.GetRecording()
count := countKvBatchRequestsInRecording(trace)
stmtToKvBatchRequests.Store(stmt, count)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func backupPlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

if !(p.ExtendedEvalContext().TxnImplicit || backupStmt.Options.Detached) {
return errors.Errorf("BACKUP cannot be used inside a transaction without DETACHED option")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newBackupDataProcessor(

func (cp *backupDataProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "backupDataProcessor")
defer tracing.FinishSpan(span)
defer span.Finish()
defer cp.output.ProducerDone()

progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func WriteDescriptors(
extra []roachpb.KeyValue,
) error {
ctx, span := tracing.ChildSpan(ctx, "WriteDescriptors")
defer tracing.FinishSpan(span)
defer span.Finish()
err := func() error {
b := txn.NewBatch()
wroteDBs := make(map[descpb.ID]catalog.DatabaseDescriptor)
Expand Down Expand Up @@ -580,7 +580,7 @@ func restore(
requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block
g.GoCtx(func(ctx context.Context) error {
ctx, progressSpan := tracing.ChildSpan(ctx, "progress-log")
defer tracing.FinishSpan(progressSpan)
defer progressSpan.Finish()
return progressLogger.Loop(ctx, requestFinishedCh)
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ func restorePlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

if !(p.ExtendedEvalContext().TxnImplicit || restoreStmt.Options.Detached) {
return errors.Errorf("RESTORE cannot be used inside a transaction without DETACHED option")
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func showBackupPlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

str, err := toFn()
if err != nil {
Expand Down Expand Up @@ -487,7 +487,7 @@ func showBackupsInCollectionPlanHook(

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, backup.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

collection, err := collectionFn()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ type entryNode struct {
// Run implements the execinfra.Processor interface.
func (ssp *splitAndScatterProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "splitAndScatterProcessor")
defer tracing.FinishSpan(span)
defer span.Finish()
defer ssp.output.ProducerDone()

numEntries := 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func changefeedPlanHook(

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

ok, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (sp *csvWriter) OutputTypes() []*types.T {

func (sp *csvWriter) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "csvWriter")
defer tracing.FinishSpan(span)
defer span.Finish()

err := func() error {
typs := sp.input.OutputTypes()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newReadImportDataProcessor(

func (cp *readImportDataProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "readImportDataProcessor")
defer tracing.FinishSpan(span)
defer span.Finish()
defer cp.output.ProducerDone()

progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
Expand Down Expand Up @@ -209,7 +209,7 @@ func ingestKvs(
kvCh <-chan row.KVBatch,
) (*roachpb.BulkOpSummary, error) {
ctx, span := tracing.ChildSpan(ctx, "ingestKVs")
defer tracing.FinishSpan(span)
defer span.Finish()

writeTS := hlc.Timestamp{WallTime: spec.WalltimeNanos}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func importPlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, importStmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

walltime := p.ExecCfg().Clock.Now().WallTime

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func runImport(
group.GoCtx(func(ctx context.Context) error {
defer close(kvCh)
ctx, span := tracing.ChildSpan(ctx, "readImportFiles")
defer tracing.FinishSpan(span)
defer span.Finish()
var inputs map[int32]string
if spec.ResumePos != nil {
// Filter out files that were completely processed.
Expand Down Expand Up @@ -540,7 +540,7 @@ func runParallelImport(
minEmited := make([]int64, parallelism)
group.GoCtx(func(ctx context.Context) error {
ctx, span := tracing.ChildSpan(ctx, "inputconverter")
defer tracing.FinishSpan(span)
defer span.Finish()
return ctxgroup.GroupWorkers(ctx, parallelism, func(ctx context.Context, id int) error {
return importer.importWorker(ctx, id, consumer, importCtx, fileCtx, minEmited)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(sp *tracing.Span, stmt string) {
if stmt == historicalQuery {
recCh <- tracing.GetRecording(sp)
recCh <- sp.GetRecording()
}
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func evalExport(
reply := resp.(*roachpb.ExportResponse)

ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
defer tracing.FinishSpan(span)
defer span.Finish()

// For MVCC_All backups with no start time, they'll only be capturing the
// *revisions* since the gc threshold, so noting that in the reply allows the
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func evalWriteBatch(
ms := cArgs.Stats

_, span := tracing.ChildSpan(ctx, fmt.Sprintf("WriteBatch [%s,%s)", args.Key, args.EndKey))
defer tracing.FinishSpan(span)
defer span.Finish()
if log.V(1) {
log.Infof(ctx, "writebatch [%s,%s)", args.Key, args.EndKey)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (rdc *RangeDescriptorCache) tryLookup(
var lookupRes EvictionToken
if err := rdc.stopper.RunTaskWithErr(ctx, "rangecache: range lookup", func(ctx context.Context) error {
ctx, reqSpan := tracing.ForkCtxSpan(ctx, "range lookup")
defer tracing.FinishSpan(reqSpan)
defer reqSpan.Finish()
// Clear the context's cancelation. This request services potentially many
// callers waiting for its result, and using the flight's leader's
// cancelation doesn't make sense.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (gt *grpcTransport) sendBatch(
return nil, errors.Errorf(
"trying to ingest remote spans but there is no recording span set up")
}
if err := tracing.ImportRemoteSpans(span, reply.CollectedSpans); err != nil {
if err := span.ImportRemoteSpans(reply.CollectedSpans); err != nil {
return nil, errors.Wrap(err, "error ingesting remote spans")
}
}
Expand Down Expand Up @@ -305,7 +305,7 @@ func (s *senderTransport) SendNext(
if span == nil {
panic("trying to ingest remote spans but there is no recording span set up")
}
if err := tracing.ImportRemoteSpans(span, br.CollectedSpans); err != nil {
if err := span.ImportRemoteSpans(br.CollectedSpans); err != nil {
panic(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ func (m *mockInternalClient) Batch(
) (*roachpb.BatchResponse, error) {
sp := m.tr.StartRootSpan("mock", nil /* logTags */, tracing.RecordableSpan)
defer sp.Finish()
tracing.StartRecording(sp, tracing.SnowballRecording)
sp.StartRecording(tracing.SnowballRecording)
ctx = tracing.ContextWithSpan(ctx, sp)

log.Eventf(ctx, "mockInternalClient processing batch")
br := &roachpb.BatchResponse{}
br.Error = m.pErr
if rec := tracing.GetRecording(sp); rec != nil {
if rec := sp.GetRecording(); rec != nil {
br.CollectedSpans = append(br.CollectedSpans, rec...)
}
return br, nil
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -472,7 +471,7 @@ func (tc *TxnCoordSender) Send(
if tc.mu.txn.ID == (uuid.UUID{}) {
log.Fatalf(ctx, "cannot send transactional request through unbound TxnCoordSender")
}
if !tracing.IsBlackHoleSpan(sp) {
if !sp.IsBlackHole() {
sp.SetBaggageItem("txnID", tc.mu.txn.ID.String())
}
ctx = logtags.AddTag(ctx, "txn", uuid.ShortStringer(tc.mu.txn.ID))
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestNoDuplicateHeartbeatLoops(t *testing.T) {

tracer := tracing.NewTracer()
sp := tracer.StartSpan("test", tracing.Recordable)
tracing.StartRecording(sp, tracing.SingleNodeRecording)
sp.StartRecording(tracing.SingleNodeRecording)
txnCtx := tracing.ContextWithSpan(context.Background(), sp)

push := func(ctx context.Context, key roachpb.Key) error {
Expand All @@ -179,7 +179,7 @@ func TestNoDuplicateHeartbeatLoops(t *testing.T) {
t.Fatalf("expected 2 attempts, got: %d", attempts)
}
sp.Finish()
recording := tracing.GetRecording(sp)
recording := sp.GetRecording()
var foundHeartbeatLoop bool
for _, sp := range recording {
if strings.Contains(sp.Operation, "heartbeat loop") {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func EvalAddSSTable(

// TODO(tschottdorf): restore the below in some form (gets in the way of testing).
// _, span := tracing.ChildSpan(ctx, fmt.Sprintf("AddSSTable [%s,%s)", args.Key, args.EndKey))
// defer tracing.FinishSpan(span)
// defer span.Finish()
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

// IMPORT INTO should not proceed if any KVs from the SST shadow existing data
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *replicatedCmd) FinishNonLocal(ctx context.Context) {
}

func (c *replicatedCmd) finishTracingSpan() {
tracing.FinishSpan(c.sp)
c.sp.Finish()
c.ctx, c.sp = nil, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (proposal *ProposalData) finishApplication(ctx context.Context, pr proposal
proposal.ec.done(ctx, proposal.Request, pr.Reply, pr.Err)
proposal.signalProposalResult(pr)
if proposal.sp != nil {
tracing.FinishSpan(proposal.sp)
proposal.sp.Finish()
proposal.sp = nil
}
}
Expand Down Expand Up @@ -891,7 +891,7 @@ func (r *Replica) getTraceData(ctx context.Context) opentracing.TextMapCarrier {
if sp == nil {
return nil
}
if tracing.IsBlackHoleSpan(sp) {
if sp.IsBlackHole() {
return nil
}
traceData := opentracing.TextMapCarrier{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12561,7 +12561,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) {
tracedCtx := tracing.ContextWithSpan(ctx, sp)
// Go out of our way to enable recording so that expensive logging is enabled
// for this context.
tracing.StartRecording(sp, tracing.SingleNodeRecording)
sp.StartRecording(tracing.SingleNodeRecording)
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease)
if pErr != nil {
t.Fatal(pErr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestTxnSnowballTrace(t *testing.T) {
}
log.Event(ctx, "txn complete")
sp.Finish()
collectedSpans := tracing.GetRecording(sp)
collectedSpans := sp.GetRecording()
dump := collectedSpans.String()
// dump:
// 0.105ms 0.000ms event:inside txn
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func (n *Node) setupSpanForIncomingRPC(
// spans in the BatchResponse at the end of the request.
// We don't want to do this if the operation is on the same host, in which
// case everything is already part of the same recording.
if rec := tracing.GetRecording(grpcSpan); rec != nil {
if rec := grpcSpan.GetRecording(); rec != nil {
br.CollectedSpans = append(br.CollectedSpans, rec...)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,6 @@ func (vsc *VectorizedStatsCollector) OutputStats(
vsc.NumBatches = 0
vsc.BytesRead = 0
}
tracing.SetSpanStats(span, &vsc.VectorizedStats)
span.SetSpanStats(&vsc.VectorizedStats)
span.Finish()
}
4 changes: 2 additions & 2 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (f *vectorizedFlow) Setup(
log.Infof(ctx, "setting up vectorize flow %s", f.ID.Short())
}
recordingStats := false
if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() {
recordingStats = true
}
helper := &vectorizedFlowCreatorHelper{f: f.FlowBase}
Expand Down Expand Up @@ -894,7 +894,7 @@ func (s *vectorizedFlowCreator) setupOutput(
finishVectorizedStatsCollectors(
ctx, flowCtx.ID, flowCtx.Cfg.TestingKnobs.DeterministicStats, vscs,
)
return []execinfrapb.ProducerMetadata{{TraceData: tracing.GetRecording(span)}}
return []execinfrapb.ProducerMetadata{{TraceData: span.GetRecording()}}
},
},
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (ex *connExecutor) execStmtInOpenState(
// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
sp.Finish()
trace := tracing.GetRecording(sp)
trace := sp.GetRecording()
ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor)
placeholders := p.extendedEvalCtx.Placeholders
if finishCollectionDiagnostics != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (ds *ServerImpl) setupFlow(

sd, err := sessiondata.UnmarshalNonLocal(req.EvalContext.SessionData)
if err != nil {
tracing.FinishSpan(sp)
sp.Finish()
return ctx, nil, err
}
ie := &lazyInternalExecutor{
Expand Down Expand Up @@ -329,7 +329,7 @@ func (ds *ServerImpl) setupFlow(
// Flow.Cleanup will not be called, so we have to close the memory monitor
// and finish the span manually.
monitor.Stop(ctx)
tracing.FinishSpan(sp)
sp.Finish()
ctx = tracing.ContextWithSpan(ctx, nil)
return ctx, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func (r *DistSQLReceiver) Push(
if span == nil {
r.resultWriter.SetError(
errors.New("trying to ingest remote spans but there is no recording span set up"))
} else if err := tracing.ImportRemoteSpans(span, meta.TraceData); err != nil {
} else if err := span.ImportRemoteSpans(meta.TraceData); err != nil {
r.resultWriter.SetError(errors.Errorf("error ingesting remote spans: %s", err))
}
}
Expand Down
Loading

0 comments on commit f945cae

Please sign in to comment.