Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: clean up receivers #56045

Merged
merged 5 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/bench/ddl_analysis/ddl_analysis_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func RunRoundTripBenchmark(b *testing.B, tests []RoundTripBenchTestCase) {
beforePlan := func(sp *tracing.Span, stmt string) {
if _, ok := stmtToKvBatchRequests.Load(stmt); ok {
sp.Finish()
trace := tracing.GetRecording(sp)
trace := sp.GetRecording()
count := countKvBatchRequestsInRecording(trace)
stmtToKvBatchRequests.Store(stmt, count)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func backupPlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

if !(p.ExtendedEvalContext().TxnImplicit || backupStmt.Options.Detached) {
return errors.Errorf("BACKUP cannot be used inside a transaction without DETACHED option")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newBackupDataProcessor(

func (cp *backupDataProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "backupDataProcessor")
defer tracing.FinishSpan(span)
defer span.Finish()
defer cp.output.ProducerDone()

progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func WriteDescriptors(
extra []roachpb.KeyValue,
) error {
ctx, span := tracing.ChildSpan(ctx, "WriteDescriptors")
defer tracing.FinishSpan(span)
defer span.Finish()
err := func() error {
b := txn.NewBatch()
wroteDBs := make(map[descpb.ID]catalog.DatabaseDescriptor)
Expand Down Expand Up @@ -580,7 +580,7 @@ func restore(
requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block
g.GoCtx(func(ctx context.Context) error {
ctx, progressSpan := tracing.ChildSpan(ctx, "progress-log")
defer tracing.FinishSpan(progressSpan)
defer progressSpan.Finish()
return progressLogger.Loop(ctx, requestFinishedCh)
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ func restorePlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

if !(p.ExtendedEvalContext().TxnImplicit || restoreStmt.Options.Detached) {
return errors.Errorf("RESTORE cannot be used inside a transaction without DETACHED option")
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func showBackupPlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

str, err := toFn()
if err != nil {
Expand Down Expand Up @@ -487,7 +487,7 @@ func showBackupsInCollectionPlanHook(

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, backup.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

collection, err := collectionFn()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ type entryNode struct {
// Run implements the execinfra.Processor interface.
func (ssp *splitAndScatterProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "splitAndScatterProcessor")
defer tracing.FinishSpan(span)
defer span.Finish()
defer ssp.output.ProducerDone()

numEntries := 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func changefeedPlanHook(

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

ok, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (sp *csvWriter) OutputTypes() []*types.T {

func (sp *csvWriter) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "csvWriter")
defer tracing.FinishSpan(span)
defer span.Finish()

err := func() error {
typs := sp.input.OutputTypes()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newReadImportDataProcessor(

func (cp *readImportDataProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "readImportDataProcessor")
defer tracing.FinishSpan(span)
defer span.Finish()
defer cp.output.ProducerDone()

progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
Expand Down Expand Up @@ -209,7 +209,7 @@ func ingestKvs(
kvCh <-chan row.KVBatch,
) (*roachpb.BulkOpSummary, error) {
ctx, span := tracing.ChildSpan(ctx, "ingestKVs")
defer tracing.FinishSpan(span)
defer span.Finish()

writeTS := hlc.Timestamp{WallTime: spec.WalltimeNanos}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func importPlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, importStmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

walltime := p.ExecCfg().Clock.Now().WallTime

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func runImport(
group.GoCtx(func(ctx context.Context) error {
defer close(kvCh)
ctx, span := tracing.ChildSpan(ctx, "readImportFiles")
defer tracing.FinishSpan(span)
defer span.Finish()
var inputs map[int32]string
if spec.ResumePos != nil {
// Filter out files that were completely processed.
Expand Down Expand Up @@ -540,7 +540,7 @@ func runParallelImport(
minEmited := make([]int64, parallelism)
group.GoCtx(func(ctx context.Context) error {
ctx, span := tracing.ChildSpan(ctx, "inputconverter")
defer tracing.FinishSpan(span)
defer span.Finish()
return ctxgroup.GroupWorkers(ctx, parallelism, func(ctx context.Context, id int) error {
return importer.importWorker(ctx, id, consumer, importCtx, fileCtx, minEmited)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(sp *tracing.Span, stmt string) {
if stmt == historicalQuery {
recCh <- tracing.GetRecording(sp)
recCh <- sp.GetRecording()
}
},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func evalExport(
reply := resp.(*roachpb.ExportResponse)

ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
defer tracing.FinishSpan(span)
defer span.Finish()

// For MVCC_All backups with no start time, they'll only be capturing the
// *revisions* since the gc threshold, so noting that in the reply allows the
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func evalWriteBatch(
ms := cArgs.Stats

_, span := tracing.ChildSpan(ctx, fmt.Sprintf("WriteBatch [%s,%s)", args.Key, args.EndKey))
defer tracing.FinishSpan(span)
defer span.Finish()
if log.V(1) {
log.Infof(ctx, "writebatch [%s,%s)", args.Key, args.EndKey)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (rdc *RangeDescriptorCache) tryLookup(
var lookupRes EvictionToken
if err := rdc.stopper.RunTaskWithErr(ctx, "rangecache: range lookup", func(ctx context.Context) error {
ctx, reqSpan := tracing.ForkCtxSpan(ctx, "range lookup")
defer tracing.FinishSpan(reqSpan)
defer reqSpan.Finish()
// Clear the context's cancelation. This request services potentially many
// callers waiting for its result, and using the flight's leader's
// cancelation doesn't make sense.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (gt *grpcTransport) sendBatch(
return nil, errors.Errorf(
"trying to ingest remote spans but there is no recording span set up")
}
if err := tracing.ImportRemoteSpans(span, reply.CollectedSpans); err != nil {
if err := span.ImportRemoteSpans(reply.CollectedSpans); err != nil {
return nil, errors.Wrap(err, "error ingesting remote spans")
}
}
Expand Down Expand Up @@ -305,7 +305,7 @@ func (s *senderTransport) SendNext(
if span == nil {
panic("trying to ingest remote spans but there is no recording span set up")
}
if err := tracing.ImportRemoteSpans(span, br.CollectedSpans); err != nil {
if err := span.ImportRemoteSpans(br.CollectedSpans); err != nil {
panic(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ func (m *mockInternalClient) Batch(
) (*roachpb.BatchResponse, error) {
sp := m.tr.StartRootSpan("mock", nil /* logTags */, tracing.RecordableSpan)
defer sp.Finish()
tracing.StartRecording(sp, tracing.SnowballRecording)
sp.StartRecording(tracing.SnowballRecording)
ctx = tracing.ContextWithSpan(ctx, sp)

log.Eventf(ctx, "mockInternalClient processing batch")
br := &roachpb.BatchResponse{}
br.Error = m.pErr
if rec := tracing.GetRecording(sp); rec != nil {
if rec := sp.GetRecording(); rec != nil {
br.CollectedSpans = append(br.CollectedSpans, rec...)
}
return br, nil
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -472,7 +471,7 @@ func (tc *TxnCoordSender) Send(
if tc.mu.txn.ID == (uuid.UUID{}) {
log.Fatalf(ctx, "cannot send transactional request through unbound TxnCoordSender")
}
if !tracing.IsBlackHoleSpan(sp) {
if !sp.IsBlackHole() {
sp.SetBaggageItem("txnID", tc.mu.txn.ID.String())
}
ctx = logtags.AddTag(ctx, "txn", uuid.ShortStringer(tc.mu.txn.ID))
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestNoDuplicateHeartbeatLoops(t *testing.T) {

tracer := tracing.NewTracer()
sp := tracer.StartSpan("test", tracing.Recordable)
tracing.StartRecording(sp, tracing.SingleNodeRecording)
sp.StartRecording(tracing.SingleNodeRecording)
txnCtx := tracing.ContextWithSpan(context.Background(), sp)

push := func(ctx context.Context, key roachpb.Key) error {
Expand All @@ -179,7 +179,7 @@ func TestNoDuplicateHeartbeatLoops(t *testing.T) {
t.Fatalf("expected 2 attempts, got: %d", attempts)
}
sp.Finish()
recording := tracing.GetRecording(sp)
recording := sp.GetRecording()
var foundHeartbeatLoop bool
for _, sp := range recording {
if strings.Contains(sp.Operation, "heartbeat loop") {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func EvalAddSSTable(

// TODO(tschottdorf): restore the below in some form (gets in the way of testing).
// _, span := tracing.ChildSpan(ctx, fmt.Sprintf("AddSSTable [%s,%s)", args.Key, args.EndKey))
// defer tracing.FinishSpan(span)
// defer span.Finish()
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

// IMPORT INTO should not proceed if any KVs from the SST shadow existing data
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *replicatedCmd) FinishNonLocal(ctx context.Context) {
}

func (c *replicatedCmd) finishTracingSpan() {
tracing.FinishSpan(c.sp)
c.sp.Finish()
c.ctx, c.sp = nil, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (proposal *ProposalData) finishApplication(ctx context.Context, pr proposal
proposal.ec.done(ctx, proposal.Request, pr.Reply, pr.Err)
proposal.signalProposalResult(pr)
if proposal.sp != nil {
tracing.FinishSpan(proposal.sp)
proposal.sp.Finish()
proposal.sp = nil
}
}
Expand Down Expand Up @@ -894,7 +894,7 @@ func (r *Replica) getTraceData(ctx context.Context) opentracing.TextMapCarrier {
if sp == nil {
return nil
}
if tracing.IsBlackHoleSpan(sp) {
if sp.IsBlackHole() {
return nil
}
traceData := opentracing.TextMapCarrier{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12560,7 +12560,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) {
tracedCtx := tracing.ContextWithSpan(ctx, sp)
// Go out of our way to enable recording so that expensive logging is enabled
// for this context.
tracing.StartRecording(sp, tracing.SingleNodeRecording)
sp.StartRecording(tracing.SingleNodeRecording)
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease)
if pErr != nil {
t.Fatal(pErr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestTxnSnowballTrace(t *testing.T) {
}
log.Event(ctx, "txn complete")
sp.Finish()
collectedSpans := tracing.GetRecording(sp)
collectedSpans := sp.GetRecording()
dump := collectedSpans.String()
// dump:
// 0.105ms 0.000ms event:inside txn
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func (n *Node) setupSpanForIncomingRPC(
// spans in the BatchResponse at the end of the request.
// We don't want to do this if the operation is on the same host, in which
// case everything is already part of the same recording.
if rec := tracing.GetRecording(grpcSpan); rec != nil {
if rec := grpcSpan.GetRecording(); rec != nil {
br.CollectedSpans = append(br.CollectedSpans, rec...)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,6 @@ func (vsc *VectorizedStatsCollector) OutputStats(
vsc.NumBatches = 0
vsc.BytesRead = 0
}
tracing.SetSpanStats(span, &vsc.VectorizedStats)
span.SetSpanStats(&vsc.VectorizedStats)
span.Finish()
}
4 changes: 2 additions & 2 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (f *vectorizedFlow) Setup(
}
log.VEventf(ctx, 1, "setting up vectorize flow %s", f.ID.Short())
recordingStats := false
if sp := tracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
if sp := tracing.SpanFromContext(ctx); sp != nil && sp.IsRecording() {
recordingStats = true
}
helper := &vectorizedFlowCreatorHelper{f: f.FlowBase}
Expand Down Expand Up @@ -885,7 +885,7 @@ func (s *vectorizedFlowCreator) setupOutput(
finishVectorizedStatsCollectors(
ctx, flowCtx.ID, flowCtx.Cfg.TestingKnobs.DeterministicStats, vscs,
)
return []execinfrapb.ProducerMetadata{{TraceData: tracing.GetRecording(span)}}
return []execinfrapb.ProducerMetadata{{TraceData: span.GetRecording()}}
},
},
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (ex *connExecutor) execStmtInOpenState(
// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
sp.Finish()
trace := tracing.GetRecording(sp)
trace := sp.GetRecording()
ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor)
placeholders := p.extendedEvalCtx.Placeholders
if finishCollectionDiagnostics != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (ds *ServerImpl) setupFlow(

sd, err := sessiondata.UnmarshalNonLocal(req.EvalContext.SessionData)
if err != nil {
tracing.FinishSpan(sp)
sp.Finish()
return ctx, nil, err
}
ie := &lazyInternalExecutor{
Expand Down Expand Up @@ -329,7 +329,7 @@ func (ds *ServerImpl) setupFlow(
// Flow.Cleanup will not be called, so we have to close the memory monitor
// and finish the span manually.
monitor.Stop(ctx)
tracing.FinishSpan(sp)
sp.Finish()
ctx = tracing.ContextWithSpan(ctx, nil)
return ctx, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func (r *DistSQLReceiver) Push(
if span == nil {
r.resultWriter.SetError(
errors.New("trying to ingest remote spans but there is no recording span set up"))
} else if err := tracing.ImportRemoteSpans(span, meta.TraceData); err != nil {
} else if err := span.ImportRemoteSpans(meta.TraceData); err != nil {
r.resultWriter.SetError(errors.Errorf("error ingesting remote spans: %s", err))
}
}
Expand Down
Loading