Skip to content

Commit

Permalink
sql: improve tracing of some things
Browse files Browse the repository at this point in the history
This commit makes it so that we create a tracing span for all
processors. Previously, out of performance considerations, we elided the
spans for the columnarizer, materializer, planNodeToRowSource, and
flowCoordinator, but given the improvements to tracing in the last year
or so it doesn't seem necessary to do that anymore. In particular so
given that we don't create tracing spans by default any way, only when
the tracing is enabled for the statement.

Additionally, this commit adds a couple of tags to the tracing span of
the vectorized outbox (similar to what we have in the row-by-row
engine).

Release justification: low-risk improvement.

Release note: None
  • Loading branch information
yuzefovich committed Sep 7, 2022
1 parent 694c41a commit 7291e4d
Show file tree
Hide file tree
Showing 17 changed files with 108 additions and 127 deletions.
14 changes: 10 additions & 4 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type Columnarizer struct {
metadataAllocator *colmem.Allocator
input execinfra.RowSource
da tree.DatumAlloc
// getWrappedExecStats, if non-nil, is the function to get the execution
// statistics of the wrapped row-by-row processor. We store it separately
// from execinfra.ProcessorBaseNoHelper.ExecStatsForTrace so that the
// function is not called when the columnarizer is being drained (which is
// after the vectorized stats are processed).
getWrappedExecStats func() *execinfrapb.ComponentStats

batch coldata.Batch
vecs coldata.TypedVecs
Expand Down Expand Up @@ -174,7 +180,7 @@ func (c *Columnarizer) Init(ctx context.Context) {
return
}
c.accumulatedMeta = make([]execinfrapb.ProducerMetadata, 0, 1)
ctx = c.StartInternalNoSpan(ctx)
ctx = c.StartInternal(ctx, "columnarizer" /* name */)
c.input.Start(ctx)
if execStatsHijacker, ok := c.input.(execinfra.ExecStatsForTraceHijacker); ok {
// The columnarizer is now responsible for propagating the execution
Expand All @@ -188,7 +194,7 @@ func (c *Columnarizer) Init(ctx context.Context) {
// Still, just to be safe, we delay the hijacking until Init so that in
// case the assumption is wrong, we still get the stats from the wrapped
// processor.
c.ExecStatsForTrace = execStatsHijacker.HijackExecStatsForTrace()
c.getWrappedExecStats = execStatsHijacker.HijackExecStatsForTrace()
}
}

Expand All @@ -200,10 +206,10 @@ func (c *Columnarizer) GetStats() *execinfrapb.ComponentStats {
))
}
componentID := c.FlowCtx.ProcessorComponentID(c.ProcessorID)
if c.removedFromFlow || c.ExecStatsForTrace == nil {
if c.removedFromFlow || c.getWrappedExecStats == nil {
return &execinfrapb.ComponentStats{Component: componentID}
}
s := c.ExecStatsForTrace()
s := c.getWrappedExecStats()
s.Component = componentID
return s
}
Expand Down
14 changes: 1 addition & 13 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,19 +265,7 @@ func (m *Materializer) OutputTypes() []*types.T {

// Start is part of the execinfra.RowSource interface.
func (m *Materializer) Start(ctx context.Context) {
if len(m.drainHelper.statsCollectors) > 0 {
// Since we're collecting stats, we'll derive a separate tracing span
// for them. If we don't do this, then the stats would be attached to
// the span of the materializer's user, and if that user itself has a
// lot of payloads to attach (e.g. a joinReader attaching the KV keys it
// looked up), then the stats might be dropped based on the maximum size
// of structured payload per tracing span of 10KiB (see
// tracing.maxStructuredBytesPerSpan). Deriving a separate span
// guarantees that the stats won't be dropped.
ctx = m.StartInternal(ctx, "materializer" /* name */)
} else {
ctx = m.StartInternalNoSpan(ctx)
}
ctx = m.StartInternal(ctx, "materializer" /* name */)
// We can encounter an expected error during Init (e.g. an operator
// attempts to allocate a batch, but the memory budget limit has been
// reached), so we need to wrap it with a catcher.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@io_opentelemetry_go_otel//attribute",
],
)

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"go.opentelemetry.io/otel/attribute"
)

// flowStreamClient is a utility interface used to mock out the RPC layer.
Expand Down Expand Up @@ -165,6 +166,8 @@ func (o *Outbox) Run(
ctx, o.span = execinfra.ProcessorSpan(ctx, "outbox")
if o.span != nil {
defer o.span.Finish()
o.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(flowID.String()))
o.span.SetTag(execinfrapb.StreamIDTagKey, attribute.IntValue(int(streamID)))
}

o.runnerCtx = ctx
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/flow_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (f *FlowCoordinator) OutputTypes() []*types.T {

// Start is part of the execinfra.RowSource interface.
func (f *FlowCoordinator) Start(ctx context.Context) {
ctx = f.StartInternalNoSpan(ctx)
ctx = f.StartInternal(ctx, "flow coordinator" /* name */)
if err := colexecerror.CatchVectorizedRuntimeError(func() {
f.input.Start(ctx)
}); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,10 +1499,6 @@ type ExecutorTestingKnobs struct {
// to use a transaction, and, in doing so, more deterministically allocate
// descriptor IDs at the cost of decreased parallelism.
UseTransactionalDescIDGenerator bool

// NoStatsCollectionWithVerboseTracing is used to disable the execution
// statistics collection in presence of the verbose tracing.
NoStatsCollectionWithVerboseTracing bool
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down
24 changes: 5 additions & 19 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,30 +838,16 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.
//
// so that the caller doesn't mistakenly use old ctx object.
func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context {
return pb.startImpl(ctx, true /* createSpan */, name)
}

// StartInternalNoSpan does the same as StartInternal except that it does not
// start a span. This is used by pass-through components whose goal is to be a
// silent translation layer for components that actually do work (e.g. a
// planNodeToRowSource wrapping an insertNode, or a columnarizer wrapping a
// rowexec flow).
func (pb *ProcessorBaseNoHelper) StartInternalNoSpan(ctx context.Context) context.Context {
return pb.startImpl(ctx, false /* createSpan */, "")
}

func (pb *ProcessorBaseNoHelper) startImpl(
ctx context.Context, createSpan bool, spanName string,
) context.Context {
pb.origCtx = ctx
if createSpan {
pb.Ctx, pb.span = ProcessorSpan(ctx, spanName)
pb.Ctx = ctx
noSpan := pb.FlowCtx != nil && pb.FlowCtx.Cfg != nil &&
pb.FlowCtx.Cfg.TestingKnobs.ProcessorNoTracingSpan
if !noSpan {
pb.Ctx, pb.span = ProcessorSpan(ctx, name)
if pb.span != nil && pb.span.IsVerbose() {
pb.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(pb.FlowCtx.ID.String()))
pb.span.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(pb.ProcessorID)))
}
} else {
pb.Ctx = ctx
}
pb.EvalCtx.Context = pb.Ctx
return pb.Ctx
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ type TestingKnobs struct {
// IndexBackfillMergerTestingKnobs are the index backfill merger specific
// testing knobs.
IndexBackfillMergerTestingKnobs base.ModuleTestingKnobs

// ProcessorNoTracingSpan is used to disable the creation of a tracing span
// in ProcessorBase.StartInternal if the tracing is enabled.
ProcessorNoTracingSpan bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (ih *instrumentationHelper) Setup(
}

if sp := tracing.SpanFromContext(ctx); sp != nil {
if sp.IsVerbose() && !cfg.TestingKnobs.NoStatsCollectionWithVerboseTracing {
if sp.IsVerbose() {
// If verbose tracing was enabled at a higher level, stats
// collection is enabled so that stats are shown in the traces, but
// no extra work is needed by the instrumentationHelper.
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/opt/exec/execbuilder/testdata/delete
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE message LIKE '%DelRange%' OR message LIKE '%DelRng%'
----
batch flow coordinator DelRange /Table/110/1 - /Table/110/2
dist sender send r52: sending batch 1 DelRng to (n1,s1):1
batch flow coordinator DelRange /Table/110/1/601/0 - /Table/110/2
dist sender send r52: sending batch 1 DelRng to (n1,s1):1
delete range DelRange /Table/110/1 - /Table/110/2
dist sender send r52: sending batch 1 DelRng to (n1,s1):1
delete range DelRange /Table/110/1/601/0 - /Table/110/2
dist sender send r52: sending batch 1 DelRng to (n1,s1):1

# Ensure that DelRange requests are autocommitted when DELETE FROM happens on a
# chunk of fewer than 600 keys.
Expand All @@ -251,8 +251,8 @@ query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE message LIKE '%Del%' OR message LIKE '%sending batch%'
----
batch flow coordinator Del /Table/110/1/5/0
dist sender send r52: sending batch 1 Del, 1 EndTxn to (n1,s1):1
delete range Del /Table/110/1/5/0
dist sender send r52: sending batch 1 Del, 1 EndTxn to (n1,s1):1

# Ensure that we send DelRanges when doing a point delete operation on a table
# that has multiple column families.
Expand All @@ -270,8 +270,8 @@ query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE message LIKE '%Del%' OR message LIKE '%sending batch%'
----
batch flow coordinator DelRange /Table/111/1/5 - /Table/111/1/6
dist sender send r52: sending batch 1 DelRng to (n1,s1):1
delete range DelRange /Table/111/1/5 - /Table/111/1/6
dist sender send r52: sending batch 1 DelRng to (n1,s1):1

statement ok
CREATE TABLE xyz (
Expand Down
20 changes: 11 additions & 9 deletions pkg/sql/opt/exec/execbuilder/testdata/select
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ WHERE message LIKE '%SPAN START%' OR message LIKE '%pos%executing%';
5 === SPAN START: optimizer === optimizer
6 === SPAN START: consuming rows === consuming rows
7 === SPAN START: flow === flow
8 === SPAN START: values === values
3 [Open pos:?] executing ExecStmt: COMMIT TRANSACTION sql txn
8 === SPAN START: sql query === sql query
9 === SPAN START: commit sql txn === commit sql txn
9 === SPAN START: sql query === sql query
10 === SPAN START: commit sql txn === commit sql txn
0 [NoTxn pos:?] executing ExecStmt: SELECT 2 session recording
10 === SPAN START: sql txn === sql txn
10 [Open pos:?] executing ExecStmt: SELECT 2 sql txn
11 === SPAN START: sql query === sql query
12 === SPAN START: optimizer === optimizer
13 === SPAN START: consuming rows === consuming rows
14 === SPAN START: flow === flow
15 === SPAN START: commit sql txn === commit sql txn
11 === SPAN START: sql txn === sql txn
11 [Open pos:?] executing ExecStmt: SELECT 2 sql txn
12 === SPAN START: sql query === sql query
13 === SPAN START: optimizer === optimizer
14 === SPAN START: consuming rows === consuming rows
15 === SPAN START: flow === flow
16 === SPAN START: values === values
17 === SPAN START: commit sql txn === commit sql txn
0 [NoTxn pos:?] executing Sync session recording
0 [NoTxn pos:?] executing ExecStmt: SET TRACING = off session recording

Expand Down
Loading

0 comments on commit 7291e4d

Please sign in to comment.