Skip to content

Commit

Permalink
bulk: introduce an ingestionTracer to the bulk ingestion pipeline
Browse files Browse the repository at this point in the history
This change introduces an `ingestionTracer` that is associated with
a tracing span and is responsible for adding ingestion information
to the trace in the form of lazy tags. It currently aggregates information
about AddSSTable, AdminSplit and AdminScatter requests that are issued during
ingestion.

As part of introducing the tracer, all RPC requests during ingestion are
now made in their own child span, and allow for us to inspect the returned trace
for relevant information. This is beneficial if egs: there is a stuck AdminScatter
as it will show up on the `tracez` page as a long running span.

The `Render` methods responsible for displaying the aggregated information are
just stubs that can be iterated on in the future. Egs: instead of displaying
the throughput/duration of each RPC request at the level of the import/restore
processor, we might want to bubble up the information to the flow coordinator
who is then responsible for aggregating this information across nodes and exposing
it. The ingestion tracer gives us a building block to capture and push relevant
trace information further up the stack.

Release note: None
  • Loading branch information
adityamaru committed Apr 20, 2022
1 parent 7506753 commit 40228bc
Show file tree
Hide file tree
Showing 9 changed files with 751 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func runBackupProcessor(
ReqSentTime: reqSentTime.String(),
})

rawRes, pErr = kv.SendWrappedWithAdmission(
_, rawRes, pErr = kv.SendWrappedWithAdmission(
ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req)
respReceivedTime = timeutil.Now()
if pErr != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ go_library(
srcs = [
"buffering_adder.go",
"bulk_metrics.go",
"ingestion_tracer.go",
"kv_buf.go",
"sender.go",
"setting.go",
"sst_batcher.go",
"stats.go",
Expand All @@ -29,16 +31,22 @@ go_library(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//types",
"@io_opentelemetry_go_otel//attribute",
],
)

go_test(
name = "bulk_test",
size = "medium",
srcs = [
"ingestion_tracer_test.go",
"kv_buf_test.go",
"main_test.go",
"sst_batcher_test.go",
Expand All @@ -55,7 +63,9 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/testutils/serverutils",
"//pkg/testutils/sstutil",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
22 changes: 17 additions & 5 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -85,6 +86,12 @@ func MakeBulkAdder(
opts.MaxBufferSize = func() int64 { return 128 << 20 }
}

sp := tracing.SpanFromContext(ctx)
i, err := newIngestionTracer(opts.Name, sp)
if err != nil {
return nil, err
}

b := &BufferingAdder{
name: opts.Name,
sink: SSTBatcher{
Expand All @@ -97,6 +104,7 @@ func MakeBulkAdder(
batchTS: opts.BatchTimestamp,
writeAtBatchTS: opts.WriteAtBatchTimestamp,
stats: ingestionPerformanceStats{sendWaitByStore: make(map[roachpb.StoreID]time.Duration)},
ingestionTracer: i,
},
timestamp: timestamp,
maxBufferLimit: opts.MaxBufferSize,
Expand Down Expand Up @@ -363,7 +371,8 @@ func (b *BufferingAdder) createInitialSplits(ctx context.Context) error {
}
predicateKey := b.curBuf.Key(predicateAt)
log.VEventf(ctx, 1, "pre-splitting span %d of %d at %s", i, b.initialSplits, splitKey)
if err := b.sink.db.AdminSplit(ctx, splitKey, expire, predicateKey); err != nil {
rec, err := AdminSplitWithTracing(ctx, b.sink.db, splitKey, expire, predicateKey)
if err != nil {
// TODO(dt): a typed error would be nice here.
if strings.Contains(err.Error(), "predicate") {
log.VEventf(ctx, 1, "%s adder split at %s rejected, had previously split and no longer included %s",
Expand All @@ -372,6 +381,7 @@ func (b *BufferingAdder) createInitialSplits(ctx context.Context) error {
}
return err
}
b.sink.ingestionTracer.notifyAdminSplit(ctx, rec)
toScatter = append(toScatter, splitKey)
}

Expand All @@ -383,18 +393,20 @@ func (b *BufferingAdder) createInitialSplits(ctx context.Context) error {
b.sink.stats.splitWait += splitsWait

for _, splitKey := range toScatter {
resp, err := b.sink.db.AdminScatter(ctx, splitKey, 0 /* maxSize */)
resp, rec, err := AdminScatterWithTracing(ctx, b.sink.db, splitKey, 0 /* maxSize */)
if err != nil {
log.Warningf(ctx, "failed to scatter: %v", err)
continue
}
b.sink.ingestionTracer.notifyAdminScatter(ctx, rec)
b.sink.stats.scatters++
if resp.MVCCStats != nil {
moved := sz(resp.MVCCStats.Total())
scatterResp := resp.(*roachpb.AdminScatterResponse)
if scatterResp.MVCCStats != nil {
moved := sz(scatterResp.MVCCStats.Total())
b.sink.stats.scatterMoved += moved
if moved > 0 {
log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s",
moved, resp.RangeInfos[0].Desc.KeySpan().AsRawSpanWithNoLocals())
moved, scatterResp.RangeInfos[0].Desc.KeySpan().AsRawSpanWithNoLocals())
}
}
}
Expand Down
Loading

0 comments on commit 40228bc

Please sign in to comment.