diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 1297a1473261..de93cc8e495c 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -112,6 +112,7 @@ go_library( "//pkg/storage", "//pkg/util", "//pkg/util/admission/admissionpb", + "//pkg/util/bulk", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/encoding", diff --git a/pkg/ccl/backupccl/backup.proto b/pkg/ccl/backupccl/backup.proto index 5ebb34b24e57..c68fc55b9c11 100644 --- a/pkg/ccl/backupccl/backup.proto +++ b/pkg/ccl/backupccl/backup.proto @@ -193,22 +193,3 @@ message BackupProgressTraceEvent { util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false]; } -// BackupExportTraceRequestEvent is the trace event recorded when an -// ExportRequest has been sent. -message BackupExportTraceRequestEvent { - string span = 1; - int32 attempt = 2; - string priority = 3; - string req_sent_time = 4; -} - -// BackupExportTraceResponseEvent is the trace event recorded when we receive a -// response from the ExportRequest. -message BackupExportTraceResponseEvent { - string duration = 1; - int32 num_files = 2; - repeated roachpb.RowCount file_summaries = 3 [(gogoproto.nullable) = false]; - reserved 4 ; - string retryable_error = 5; -} - diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index e2b39571758e..ee4294f36eb5 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -170,8 +171,14 @@ func newBackupDataProcessor( // Start is part of the RowSource interface. func (bp *backupDataProcessor) Start(ctx context.Context) { + agg := bulk.MakeAggregator() ctx = logtags.AddTag(ctx, "job", bp.spec.JobID) - ctx = bp.StartInternal(ctx, backupProcessorName) + ctx = bp.StartInternal(ctx, backupProcessorName, agg) + + // Initialize the aggregator with the tracing span that will live as long as + // this processor is running. + agg.Init(tracing.SpanFromContext(ctx)) + ctx, cancel := context.WithCancel(ctx) bp.cancelAndWaitForWorker = func() { cancel() @@ -254,7 +261,6 @@ func runBackupProcessor( progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, memAcc *mon.BoundAccount, ) error { - backupProcessorSpan := tracing.SpanFromContext(ctx) clusterSettings := flowCtx.Cfg.Settings totalSpans := len(spec.Spans) + len(spec.IntroducedSpans) @@ -411,37 +417,23 @@ func runBackupProcessor( span.span, span.attempts+1, header.UserPriority.String()) var rawResp roachpb.Response var pErr *roachpb.Error - var reqSentTime time.Time - var respReceivedTime time.Time exportRequestErr := contextutil.RunWithTimeout(ctx, fmt.Sprintf("ExportRequest for span %s", span.span), timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - reqSentTime = timeutil.Now() - backupProcessorSpan.RecordStructured(&BackupExportTraceRequestEvent{ - Span: span.span.String(), - Attempt: int32(span.attempts + 1), - Priority: header.UserPriority.String(), - ReqSentTime: reqSentTime.String(), - }) - rawResp, pErr = kv.SendWrappedWithAdmission( ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req) - respReceivedTime = timeutil.Now() if pErr != nil { return pErr.GoError() } return nil }) if exportRequestErr != nil { - if intentErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { + if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { span.lastTried = timeutil.Now() span.attempts++ todo <- span // TODO(dt): send a progress update to update job progress to note // the intents being hit. - backupProcessorSpan.RecordStructured(&BackupExportTraceResponseEvent{ - RetryableError: tracing.RedactAndTruncateError(intentErr), - }) continue } // TimeoutError improves the opaque `context deadline exceeded` error @@ -500,20 +492,12 @@ func runBackupProcessor( completedSpans = 1 } - duration := respReceivedTime.Sub(reqSentTime) - exportResponseTraceEvent := &BackupExportTraceResponseEvent{ - Duration: duration.String(), - FileSummaries: make([]roachpb.RowCount, 0), - } - if len(resp.Files) > 1 { log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") } for i, file := range resp.Files { entryCounts := countRows(file.Exported, spec.PKIDs) - exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, entryCounts) - ret := exportedSpan{ // BackupManifest_File just happens to contain the exact fields // to store the metadata we need, but there's no actual File @@ -541,8 +525,6 @@ func runBackupProcessor( return ctx.Err() } } - exportResponseTraceEvent.NumFiles = int32(len(resp.Files)) - backupProcessorSpan.RecordStructured(exportResponseTraceEvent) default: // No work left to do, so we can exit. Note that another worker could diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 3f58cf52220a..3b78c1fbf47b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - "github.com/gogo/protobuf/types" ) // SSTTargetSizeSetting is the cluster setting name for the @@ -105,14 +104,6 @@ func evalExport( ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey)) defer evalExportSpan.Finish() - var evalExportTrace types.StringValue - if cArgs.EvalCtx.NodeID() == h.GatewayNodeID { - evalExportTrace.Value = fmt.Sprintf("evaluating Export on gateway node %d", cArgs.EvalCtx.NodeID()) - } else { - evalExportTrace.Value = fmt.Sprintf("evaluating Export on remote node %d", cArgs.EvalCtx.NodeID()) - } - evalExportSpan.RecordStructured(&evalExportTrace) - // Table's marked to be excluded from backup are expected to be configured // with a short GC TTL. Additionally, backup excludes such table's from being // protected from GC when writing ProtectedTimestamp records. The @@ -270,5 +261,30 @@ func evalExport( } } + // If we have a trace, emit the export stats corresponding to this + // ExportRequest. + sp := tracing.SpanFromContext(ctx) + recordExportStats(sp, h.GatewayNodeID, cArgs.EvalCtx.NodeID(), reply) + return result.Result{}, nil } + +// recordExportStats emits a StructuredEvent containing the stats about the +// evaluated ExportRequest. +func recordExportStats( + sp *tracing.Span, sendNode, evalNode roachpb.NodeID, resp *roachpb.ExportResponse, +) { + if sp == nil { + return + } + exportStats := roachpb.ExportStats{ + SentPerNode: map[roachpb.NodeID]int32{sendNode: 1}, + EvaluatedPerNode: map[roachpb.NodeID]int32{evalNode: 1}, + } + for _, f := range resp.Files { + file := f + exportStats.NumFiles++ + exportStats.DataSize += int64(len(file.SST)) + } + sp.RecordStructured(&exportStats) +} diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index 375e0c334502..65ddd64fb000 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -41,6 +41,7 @@ go_library( "//pkg/storage/enginepb", "//pkg/util", "//pkg/util/bitarray", + "//pkg/util/bulk", "//pkg/util/caller", "//pkg/util/duration", "//pkg/util/encoding", @@ -60,6 +61,7 @@ go_library( "@com_github_gogo_protobuf//proto", "@com_github_golang_mock//gomock", # keep "@io_etcd_go_etcd_raft_v3//raftpb", + "@io_opentelemetry_go_otel//attribute", "@org_golang_google_grpc//metadata", # keep ], ) diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 4618a3a8b527..95832ce2b4dc 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -14,6 +14,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "github.com/dustin/go-humanize" + "go.opentelemetry.io/otel/attribute" ) //go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient @@ -1800,3 +1802,69 @@ const ( // with the SpecificTenantOverrides precedence.. AllTenantsOverrides ) + +var _ bulk.AggregatorEvent = &ExportStats{} + +// Combine implements the AggregatorEvent interface. +func (e *ExportStats) Combine(other bulk.AggregatorEvent) { + if e == nil { + e.SentPerNode = make(map[NodeID]int32) + e.EvaluatedPerNode = make(map[NodeID]int32) + } + + otherExportStats, ok := other.(*ExportStats) + if !ok { + panic("`other` is not of type ExportStats") + } + e.NumFiles += otherExportStats.NumFiles + e.DataSize += otherExportStats.DataSize + for nodeID, count := range otherExportStats.SentPerNode { + e.SentPerNode[nodeID] += count + } + for nodeID, count := range otherExportStats.EvaluatedPerNode { + e.EvaluatedPerNode[nodeID] += count + } +} + +// Tag implements the AggregatorEvent interface. +func (e *ExportStats) Tag() string { + return "ExportStats" +} + +const ( + tagNumFiles = "export_num_files" + tagDataSize = "export_data_size" + tagRequestSent = "export_sent_per_node" + tagRequestEval = "export_eval_per_node" +) + +// Render implements the AggregatorEvent interface. +func (e *ExportStats) Render() []attribute.KeyValue { + tags := make([]attribute.KeyValue, 0) + if e.NumFiles > 0 { + tags = append(tags, attribute.KeyValue{ + Key: tagNumFiles, + Value: attribute.Int64Value(e.NumFiles), + }) + } + if e.DataSize > 0 { + tags = append(tags, attribute.KeyValue{ + Key: tagDataSize, + Value: attribute.Int64Value(e.DataSize), + }) + } + if len(e.SentPerNode) > 0 { + tags = append(tags, attribute.KeyValue{ + Key: tagRequestSent, + Value: attribute.StringValue(fmt.Sprintf("%v", e.SentPerNode)), + }) + } + if len(e.EvaluatedPerNode) > 0 { + tags = append(tags, attribute.KeyValue{ + Key: tagRequestEval, + Value: attribute.StringValue(fmt.Sprintf("%v", e.EvaluatedPerNode)), + }) + } + + return tags +} diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 80fc56260505..ddca579b572c 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -3020,3 +3020,28 @@ message ScanStats { uint64 point_count = 9; uint64 points_covered_by_range_tombstones = 10; } + +// ExportStats is a message containing information about each +// Export{Request,Response}. +message ExportStats { + // NumFiles is the number of SST files produced by the ExportRequest. + int64 num_files = 1; + // DataSize is the byte size of all the SST files produced by the + // ExportRequest. + int64 data_size = 2; + + + // SentPerNode is a mapping from NodeID to the number of ExportRequests sent + // by the node for evaluation. + map sent_per_node = 3 [ + (gogoproto.nullable) = false, + (gogoproto.castkey) = "NodeID" + ]; + + // EvaluatedPerNode is a mapping from NodeID to the number of ExportRequests + // evaluated by the node. + map evaluated_per_node = 4 [ + (gogoproto.nullable) = false, + (gogoproto.castkey) = "NodeID" + ]; +} diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index cf68e2437f84..727e4c5fa747 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -808,13 +808,18 @@ func (pb *ProcessorBase) AppendTrailingMeta(meta execinfrapb.ProducerMetadata) { // ProcessorSpan creates a child span for a processor (if we are doing any // tracing). The returned span needs to be finished using tracing.FinishSpan. -func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) { +func ProcessorSpan( + ctx context.Context, name string, tracingEventListeners ...tracing.EventListener, +) (context.Context, *tracing.Span) { sp := tracing.SpanFromContext(ctx) if sp == nil { return ctx, nil } - return sp.Tracer().StartSpanCtx(ctx, name, - tracing.WithParent(sp), tracing.WithDetachedRecording()) + spanOpts := []tracing.SpanOption{tracing.WithParent(sp), tracing.WithDetachedRecording()} + if tracingEventListeners != nil { + spanOpts = append(spanOpts, tracing.WithEventListeners(tracingEventListeners)) + } + return sp.Tracer().StartSpanCtx(ctx, name, spanOpts...) } // StartInternal prepares the ProcessorBase for execution. It returns the @@ -826,8 +831,10 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing. // < inputs >.Start(ctx) // if there are any inputs-RowSources to pb // < other initialization > // so that the caller doesn't mistakenly use old ctx object. -func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context { - return pb.startImpl(ctx, true /* createSpan */, name) +func (pb *ProcessorBaseNoHelper) StartInternal( + ctx context.Context, name string, tracingEventListeners ...tracing.EventListener, +) context.Context { + return pb.startImpl(ctx, true /* createSpan */, name, tracingEventListeners...) } // StartInternalNoSpan does the same as StartInternal except that it does not @@ -840,11 +847,14 @@ func (pb *ProcessorBaseNoHelper) StartInternalNoSpan(ctx context.Context) contex } func (pb *ProcessorBaseNoHelper) startImpl( - ctx context.Context, createSpan bool, spanName string, + ctx context.Context, + createSpan bool, + spanName string, + tracingEventListeners ...tracing.EventListener, ) context.Context { pb.origCtx = ctx if createSpan { - pb.Ctx, pb.span = ProcessorSpan(ctx, spanName) + pb.Ctx, pb.span = ProcessorSpan(ctx, spanName, tracingEventListeners...) if pb.span != nil && pb.span.IsVerbose() { pb.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(pb.FlowCtx.ID.String())) pb.span.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(pb.ProcessorID))) diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel new file mode 100644 index 000000000000..d50706260f45 --- /dev/null +++ b/pkg/util/bulk/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "bulk", + srcs = ["aggregator.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/syncutil", + "//pkg/util/tracing", + ], +) diff --git a/pkg/util/bulk/aggregator.go b/pkg/util/bulk/aggregator.go new file mode 100644 index 000000000000..976df1a3bcf6 --- /dev/null +++ b/pkg/util/bulk/aggregator.go @@ -0,0 +1,90 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk + +import ( + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" +) + +// AggregatorEvent describes an event that can be aggregated and stored by the +// Aggregator. An AggregatorEvent also implements the tracing.LazyTag interface +// to render its information on the associated tracing span. +type AggregatorEvent interface { + tracing.LazyTag + + // Combine combines two AggregatorEvents together. + Combine(other AggregatorEvent) + // Tag returns a string used to identify the AggregatorEvent. + Tag() string +} + +// Aggregator implements the tracing.EventListener interface. +// +// An Aggregator when registered as an EventListener with a tracing span, will +// aggregate all AggregatorEvents that are emitted in the span's recording. +// +// Each AggregatorEvent will set itself as a LazyTag on the associated span and +// thereby Render() its aggregated information whenever the span's recording is +// pulled. +type Aggregator struct { + mu struct { + syncutil.Mutex + // aggregatedEvents is a mapping from the tag identifying the + // AggregatorEvent to the running aggregate of the AggregatorEvent. + aggregatedEvents map[string]AggregatorEvent + // sp is the tracing span associated with the Aggregator. + sp *tracing.Span + } +} + +// Notify implements the tracing.EventListener interface. +func (b *Aggregator) Notify(event tracing.Structured) { + bulkEvent, ok := event.(AggregatorEvent) + if !ok { + return + } + + b.mu.Lock() + defer b.mu.Unlock() + + if b.mu.sp == nil { + panic("Init not called on Aggregator before use") + } + + // If this is the first AggregatorEvent with this tag, set it as a LazyTag on + // the associated tracing span. This way the AggregatorEvent will be + // dynamically Render()ed everytime we pull the tracing for the associated + // span. + if _, ok := b.mu.aggregatedEvents[bulkEvent.Tag()]; !ok { + b.mu.aggregatedEvents[bulkEvent.Tag()] = bulkEvent + b.mu.sp.SetLazyTag(bulkEvent.Tag(), b.mu.aggregatedEvents[bulkEvent.Tag()]) + } else { + b.mu.aggregatedEvents[bulkEvent.Tag()].Combine(bulkEvent) + } +} + +// Init initializes the Aggregator. The Aggregator is only ready to use after +// Init has been called. +func (b *Aggregator) Init(sp *tracing.Span) { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.sp = sp +} + +var _ tracing.EventListener = &Aggregator{} + +// MakeAggregator returns an instance of an Aggregator. +func MakeAggregator() *Aggregator { + agg := &Aggregator{} + agg.mu.aggregatedEvents = make(map[string]AggregatorEvent) + return agg +}