Skip to content

Commit

Permalink
tracing: allow Finish() on nil *Span
Browse files Browse the repository at this point in the history
This is one of the immediate useful outcomes of moving off the
`opentracing.Span` interface: we now get to decide what `(nil).Finish()`
does, removing the need for workarounds.

Release note: None
  • Loading branch information
tbg committed Oct 28, 2020
1 parent 310492d commit a76969f
Show file tree
Hide file tree
Showing 30 changed files with 41 additions and 54 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func backupPlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

if !(p.ExtendedEvalContext().TxnImplicit || backupStmt.Options.Detached) {
return errors.Errorf("BACKUP cannot be used inside a transaction without DETACHED option")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newBackupDataProcessor(

func (cp *backupDataProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "backupDataProcessor")
defer tracing.FinishSpan(span)
defer span.Finish()
defer cp.output.ProducerDone()

progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func WriteDescriptors(
extra []roachpb.KeyValue,
) error {
ctx, span := tracing.ChildSpan(ctx, "WriteDescriptors")
defer tracing.FinishSpan(span)
defer span.Finish()
err := func() error {
b := txn.NewBatch()
wroteDBs := make(map[descpb.ID]catalog.DatabaseDescriptor)
Expand Down Expand Up @@ -580,7 +580,7 @@ func restore(
requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block
g.GoCtx(func(ctx context.Context) error {
ctx, progressSpan := tracing.ChildSpan(ctx, "progress-log")
defer tracing.FinishSpan(progressSpan)
defer progressSpan.Finish()
return progressLogger.Loop(ctx, requestFinishedCh)
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ func restorePlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

if !(p.ExtendedEvalContext().TxnImplicit || restoreStmt.Options.Detached) {
return errors.Errorf("RESTORE cannot be used inside a transaction without DETACHED option")
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func showBackupPlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

str, err := toFn()
if err != nil {
Expand Down Expand Up @@ -487,7 +487,7 @@ func showBackupsInCollectionPlanHook(

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, backup.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

collection, err := collectionFn()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ type entryNode struct {
// Run implements the execinfra.Processor interface.
func (ssp *splitAndScatterProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "splitAndScatterProcessor")
defer tracing.FinishSpan(span)
defer span.Finish()
defer ssp.output.ProducerDone()

numEntries := 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func changefeedPlanHook(

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

ok, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (sp *csvWriter) OutputTypes() []*types.T {

func (sp *csvWriter) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "csvWriter")
defer tracing.FinishSpan(span)
defer span.Finish()

err := func() error {
typs := sp.input.OutputTypes()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newReadImportDataProcessor(

func (cp *readImportDataProcessor) Run(ctx context.Context) {
ctx, span := tracing.ChildSpan(ctx, "readImportDataProcessor")
defer tracing.FinishSpan(span)
defer span.Finish()
defer cp.output.ProducerDone()

progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
Expand Down Expand Up @@ -209,7 +209,7 @@ func ingestKvs(
kvCh <-chan row.KVBatch,
) (*roachpb.BulkOpSummary, error) {
ctx, span := tracing.ChildSpan(ctx, "ingestKVs")
defer tracing.FinishSpan(span)
defer span.Finish()

writeTS := hlc.Timestamp{WallTime: spec.WalltimeNanos}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func importPlanHook(
fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
// TODO(dan): Move this span into sql.
ctx, span := tracing.ChildSpan(ctx, importStmt.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

walltime := p.ExecCfg().Clock.Now().WallTime

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func runImport(
group.GoCtx(func(ctx context.Context) error {
defer close(kvCh)
ctx, span := tracing.ChildSpan(ctx, "readImportFiles")
defer tracing.FinishSpan(span)
defer span.Finish()
var inputs map[int32]string
if spec.ResumePos != nil {
// Filter out files that were completely processed.
Expand Down Expand Up @@ -540,7 +540,7 @@ func runParallelImport(
minEmited := make([]int64, parallelism)
group.GoCtx(func(ctx context.Context) error {
ctx, span := tracing.ChildSpan(ctx, "inputconverter")
defer tracing.FinishSpan(span)
defer span.Finish()
return ctxgroup.GroupWorkers(ctx, parallelism, func(ctx context.Context, id int) error {
return importer.importWorker(ctx, id, consumer, importCtx, fileCtx, minEmited)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func evalExport(
reply := resp.(*roachpb.ExportResponse)

ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
defer tracing.FinishSpan(span)
defer span.Finish()

// For MVCC_All backups with no start time, they'll only be capturing the
// *revisions* since the gc threshold, so noting that in the reply allows the
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func evalWriteBatch(
ms := cArgs.Stats

_, span := tracing.ChildSpan(ctx, fmt.Sprintf("WriteBatch [%s,%s)", args.Key, args.EndKey))
defer tracing.FinishSpan(span)
defer span.Finish()
if log.V(1) {
log.Infof(ctx, "writebatch [%s,%s)", args.Key, args.EndKey)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (rdc *RangeDescriptorCache) tryLookup(
var lookupRes EvictionToken
if err := rdc.stopper.RunTaskWithErr(ctx, "rangecache: range lookup", func(ctx context.Context) error {
ctx, reqSpan := tracing.ForkCtxSpan(ctx, "range lookup")
defer tracing.FinishSpan(reqSpan)
defer reqSpan.Finish()
// Clear the context's cancelation. This request services potentially many
// callers waiting for its result, and using the flight's leader's
// cancelation doesn't make sense.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func EvalAddSSTable(

// TODO(tschottdorf): restore the below in some form (gets in the way of testing).
// _, span := tracing.ChildSpan(ctx, fmt.Sprintf("AddSSTable [%s,%s)", args.Key, args.EndKey))
// defer tracing.FinishSpan(span)
// defer span.Finish()
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", mvccStartKey.Key, mvccEndKey.Key)

// IMPORT INTO should not proceed if any KVs from the SST shadow existing data
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *replicatedCmd) FinishNonLocal(ctx context.Context) {
}

func (c *replicatedCmd) finishTracingSpan() {
tracing.FinishSpan(c.sp)
c.sp.Finish()
c.ctx, c.sp = nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (proposal *ProposalData) finishApplication(ctx context.Context, pr proposal
proposal.ec.done(ctx, proposal.Request, pr.Reply, pr.Err)
proposal.signalProposalResult(pr)
if proposal.sp != nil {
tracing.FinishSpan(proposal.sp)
proposal.sp.Finish()
proposal.sp = nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (ds *ServerImpl) setupFlow(

sd, err := sessiondata.UnmarshalNonLocal(req.EvalContext.SessionData)
if err != nil {
tracing.FinishSpan(sp)
sp.Finish()
return ctx, nil, err
}
ie := &lazyInternalExecutor{
Expand Down Expand Up @@ -329,7 +329,7 @@ func (ds *ServerImpl) setupFlow(
// Flow.Cleanup will not be called, so we have to close the memory monitor
// and finish the span manually.
monitor.Stop(ctx)
tracing.FinishSpan(sp)
sp.Finish()
ctx = tracing.ContextWithSpan(ctx, nil)
return ctx, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ func (pb *ProcessorBase) InternalClose() bool {
}

pb.Closed = true
tracing.FinishSpan(pb.span)
pb.span.Finish()
pb.span = nil
// Reset the context so that any incidental uses after this point do not
// access the finished span.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/flowinfra/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error {
spanFinished := false
defer func() {
if !spanFinished {
tracing.FinishSpan(span)
span.Finish()
}
}()

Expand Down Expand Up @@ -286,7 +286,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error {
m.stats.BytesSent = 0
}
span.SetSpanStats(&m.stats)
tracing.FinishSpan(span)
span.Finish()
spanFinished = true
if trace := execinfra.GetTraceData(ctx); trace != nil {
err := m.addRow(ctx, nil, &execinfrapb.ProducerMetadata{TraceData: trace})
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/grant_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *planner) GrantRoleNode(ctx context.Context, n *tree.GrantRole) (*GrantR
sqltelemetry.IncIAMGrantCounter(n.AdminOption)

ctx, span := tracing.ChildSpan(ctx, n.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

hasAdminRole, err := p.HasAdminRole(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/revoke_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (p *planner) RevokeRoleNode(ctx context.Context, n *tree.RevokeRole) (*Revo
sqltelemetry.IncIAMRevokeCounter(n.AdminOption)

ctx, span := tracing.ChildSpan(ctx, n.StatementTag())
defer tracing.FinishSpan(span)
defer span.Finish()

hasAdminRole, err := p.HasAdminRole(ctx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/rowexec/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (b *backfiller) Run(ctx context.Context) {
opName := fmt.Sprintf("%sBackfiller", b.name)
ctx = logtags.AddTag(ctx, opName, int(b.spec.Table.ID))
ctx, span := execinfra.ProcessorSpan(ctx, opName)
defer tracing.FinishSpan(span)
defer span.Finish()
meta := b.doRun(ctx)
execinfra.SendTraceData(ctx, b.output)
if emitHelper(ctx, &b.out, nil /* row */, meta, func(ctx context.Context) {}) {
Expand Down Expand Up @@ -332,7 +332,7 @@ func WriteResumeSpan(
jobsRegistry *jobs.Registry,
) error {
ctx, traceSpan := tracing.ChildSpan(ctx, "checkpoint")
defer tracing.FinishSpan(traceSpan)
defer traceSpan.Finish()

return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
resumeSpans, job, mutationIdx, error := GetResumeSpans(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (ib *indexBackfiller) runChunk(
}

ctx, traceSpan := tracing.ChildSpan(tctx, "chunk")
defer tracing.FinishSpan(traceSpan)
defer traceSpan.Finish()

var key roachpb.Key

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowflow/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (rb *routerBase) Start(ctx context.Context, wg *sync.WaitGroup, ctxCancel c
ro.stats.MaxAllocatedMem = ro.memoryMonitor.MaximumBytes()
ro.stats.MaxAllocatedDisk = ro.diskMonitor.MaximumBytes()
span.SetSpanStats(&ro.stats)
tracing.FinishSpan(span)
span.Finish()
if trace := execinfra.GetTraceData(ctx); trace != nil {
ro.mu.Unlock()
rb.semaphore <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/stats/stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (sc *TableStatisticsCache) RefreshTableStats(ctx context.Context, tableID d
ctx, span := tracing.ForkCtxSpan(ctx, "refresh-table-stats")
// Perform an asynchronous refresh of the cache.
go func() {
defer tracing.FinishSpan(span)
defer span.Finish()
sc.refreshCacheEntry(ctx, tableID)
}()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/limit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) error {
}
// If not, start a span and begin waiting.
ctx, span := tracing.ChildSpan(ctx, l.spanName)
defer tracing.FinishSpan(span)
defer span.Finish()
return l.sem.Acquire(ctx, 1)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/util/stop/stopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (s *Stopper) RunAsyncTask(
go func() {
defer s.Recover(ctx)
defer s.runPostlude(taskName)
defer tracing.FinishSpan(span)
defer span.Finish()

f(ctx)
}()
Expand Down Expand Up @@ -400,7 +400,7 @@ func (s *Stopper) RunLimitedAsyncTask(
defer s.Recover(ctx)
defer s.runPostlude(taskName)
defer alloc.Release()
defer tracing.FinishSpan(span)
defer span.Finish()

f(ctx)
}()
Expand Down
15 changes: 5 additions & 10 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,20 +382,15 @@ func (s *Span) SetSpanStats(stats SpanStats) {
s.crdb.mu.Unlock()
}

// Finish is part of the opentracing.Span interface.
// Finish marks the Span as completed. Finishing a nil *Span is a noop.
func (s *Span) Finish() {
s.FinishWithOptions(opentracing.FinishOptions{})
}

// FinishWithOptions is part of the opentracing.Span interface.
func (s *Span) FinishWithOptions(opts opentracing.FinishOptions) {
if s.isNoop() {
if s == nil {
return
}
finishTime := opts.FinishTime
if finishTime.IsZero() {
finishTime = time.Now()
if s.isNoop() {
return
}
finishTime := time.Now()
s.crdb.mu.Lock()
s.crdb.mu.duration = finishTime.Sub(s.crdb.startTime)
s.crdb.mu.Unlock()
Expand Down
8 changes: 0 additions & 8 deletions pkg/util/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,14 +552,6 @@ func (t *Tracer) Extract(format interface{}, carrier interface{}) (*SpanContext,
return &sc, nil
}

// FinishSpan closes the given Span (if not nil). It is a convenience wrapper
// for Span.Finish() which tolerates nil spans.
func FinishSpan(span *Span) {
if span != nil {
span.Finish()
}
}

// ForkCtxSpan checks if ctx has a Span open; if it does, it creates a new Span
// that "follows from" the original Span. This allows the resulting context to be
// used in an async task that might outlive the original operation.
Expand Down

0 comments on commit a76969f

Please sign in to comment.