diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index ef97e114e136..02e28b9a307d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -513,6 +513,7 @@ ALL_TESTS = [ "//pkg/util/binfetcher:binfetcher_test", "//pkg/util/bitarray:bitarray_test", "//pkg/util/buildutil:buildutil_test", + "//pkg/util/bulk:bulk_test", "//pkg/util/cache:cache_test", "//pkg/util/caller:caller_test", "//pkg/util/cgroups:cgroups_test", @@ -1829,6 +1830,8 @@ GO_TARGETS = [ "//pkg/util/bufalloc:bufalloc", "//pkg/util/buildutil:buildutil", "//pkg/util/buildutil:buildutil_test", + "//pkg/util/bulk:bulk", + "//pkg/util/bulk:bulk_test", "//pkg/util/cache:cache", "//pkg/util/cache:cache_test", "//pkg/util/caller:caller", @@ -2806,6 +2809,7 @@ GET_X_DATA_TARGETS = [ "//pkg/util/bitarray:get_x_data", "//pkg/util/bufalloc:get_x_data", "//pkg/util/buildutil:get_x_data", + "//pkg/util/bulk:get_x_data", "//pkg/util/cache:get_x_data", "//pkg/util/caller:get_x_data", "//pkg/util/cancelchecker:get_x_data", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 88b7fbbb6662..b28aade85583 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -116,6 +116,7 @@ go_library( "//pkg/storage", "//pkg/util", "//pkg/util/admission/admissionpb", + "//pkg/util/bulk", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/hlc", diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index f95a48a5d835..3a3ba8e72295 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1467,5 +1467,5 @@ func updateBackupDetails( } func init() { - sql.AddPlanHook("backup", backupPlanHook) + sql.AddPlanHook("backupccl.backupPlanHook", backupPlanHook) } diff --git a/pkg/ccl/backupccl/backup_planning_tenant.go b/pkg/ccl/backupccl/backup_planning_tenant.go index 1d88c577d0bb..0c6258f9879d 100644 --- a/pkg/ccl/backupccl/backup_planning_tenant.go +++ b/pkg/ccl/backupccl/backup_planning_tenant.go @@ -79,7 +79,7 @@ func retrieveSingleTenantMetadata( ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID, ) (descpb.TenantInfoWithUsage, error) { row, err := ie.QueryRow( - ctx, "backup-lookup-tenant", txn, + ctx, "backupccl.retrieveSingleTenantMetadata", txn, tenantMetadataQuery+` WHERE id = $1`, tenantID.ToUint64(), ) if err != nil { @@ -99,7 +99,7 @@ func retrieveAllTenantsMetadata( ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, ) ([]descpb.TenantInfoWithUsage, error) { rows, err := ie.QueryBuffered( - ctx, "backup-lookup-tenants", txn, + ctx, "backupccl.retrieveAllTenantsMetadata", txn, // XXX Should we add a `WHERE active`? We require the tenant to be active // when it is specified.. tenantMetadataQuery, diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index d926450a3a40..0f6b11c3b970 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -106,6 +107,10 @@ type backupDataProcessor struct { // BoundAccount that reserves the memory usage of the backup processor. memAcc *mon.BoundAccount + + // Aggregator that aggregates StructuredEvents emitted in the + // backupDataProcessors' trace recording. + agg *bulk.TracingAggregator } var ( @@ -153,6 +158,12 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { ctx = logtags.AddTag(ctx, "job", bp.spec.JobID) ctx = bp.StartInternal(ctx, backupProcessorName) ctx, cancel := context.WithCancel(ctx) + + // Construct an Aggregator to aggregate and render AggregatorEvents emitted in + // bps' trace recording. + ctx, bp.agg = bulk.MakeTracingAggregatorWithSpan(ctx, + fmt.Sprintf("%s-aggregator", backupProcessorName), bp.EvalCtx.Tracer) + bp.cancelAndWaitForWorker = func() { cancel() for range bp.progCh { @@ -160,7 +171,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { } log.Infof(ctx, "starting backup data") if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{ - TaskName: "backup-worker", + TaskName: "backupDataProcessor.runBackupProcessor", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc) @@ -198,6 +209,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() + bp.agg.Close() if bp.InternalClose() { bp.memAcc.Close(bp.Ctx) } @@ -387,26 +399,16 @@ func runBackupProcessor( Source: roachpb.AdmissionHeader_FROM_SQL, NoMemoryReservedAtSource: true, } - log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)", + log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", span.span, span.attempts+1, header.UserPriority.String()) var rawResp roachpb.Response var pErr *roachpb.Error - var reqSentTime time.Time - var respReceivedTime time.Time + requestSentAt := timeutil.Now() exportRequestErr := contextutil.RunWithTimeout(ctx, fmt.Sprintf("ExportRequest for span %s", span.span), timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - reqSentTime = timeutil.Now() - backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceRequestEvent{ - Span: span.span.String(), - Attempt: int32(span.attempts + 1), - Priority: header.UserPriority.String(), - ReqSentTime: reqSentTime.String(), - }) - rawResp, pErr = kv.SendWrappedWithAdmission( ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req) - respReceivedTime = timeutil.Now() if pErr != nil { return pErr.GoError() } @@ -419,9 +421,7 @@ func runBackupProcessor( todo <- span // TODO(dt): send a progress update to update job progress to note // the intents being hit. - backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceResponseEvent{ - RetryableError: tracing.RedactAndTruncateError(intentErr), - }) + log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error()) continue } // TimeoutError improves the opaque `context deadline exceeded` error @@ -480,19 +480,12 @@ func runBackupProcessor( completedSpans = 1 } - duration := respReceivedTime.Sub(reqSentTime) - exportResponseTraceEvent := &backuppb.BackupExportTraceResponseEvent{ - Duration: duration.String(), - FileSummaries: make([]roachpb.RowCount, 0), - } - if len(resp.Files) > 1 { log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") } for i, file := range resp.Files { entryCounts := countRows(file.Exported, spec.PKIDs) - exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, entryCounts) ret := exportedSpan{ // BackupManifest_File just happens to contain the exact fields @@ -521,8 +514,9 @@ func runBackupProcessor( return ctx.Err() } } - exportResponseTraceEvent.NumFiles = int32(len(resp.Files)) - backupProcessorSpan.RecordStructured(exportResponseTraceEvent) + + // Emit the stats for the processed ExportRequest. + recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt)) default: // No work left to do, so we can exit. Note that another worker could @@ -576,6 +570,22 @@ func runBackupProcessor( return grp.Wait() } +// recordExportStats emits a StructuredEvent containing the stats about the +// evaluated ExportRequest. +func recordExportStats( + sp *tracing.Span, resp *roachpb.ExportResponse, exportDuration time.Duration, +) { + if sp == nil { + return + } + exportStats := backuppb.ExportStats{Duration: exportDuration} + for _, f := range resp.Files { + exportStats.NumFiles++ + exportStats.DataSize += int64(len(f.SST)) + } + sp.RecordStructured(&exportStats) +} + func init() { rowexec.NewBackupDataProcessor = newBackupDataProcessor } diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index eb518b2ef8d8..42d5bfd07059 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -45,7 +45,7 @@ func distBackupPlanSpecs( startTime, endTime hlc.Timestamp, ) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) { var span *tracing.Span - ctx, span = tracing.ChildSpan(ctx, "backup-plan-specs") + ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs") _ = ctx // ctx is currently unused, but this new ctx should be used below in the future. defer span.Finish() user := execCtx.User() @@ -158,7 +158,7 @@ func distBackup( progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, ) error { - ctx, span := tracing.ChildSpan(ctx, "backup-distsql") + ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup") defer span.Finish() evalCtx := execCtx.ExtendedEvalContext() var noTxn *kv.Txn diff --git a/pkg/ccl/backupccl/backuppb/BUILD.bazel b/pkg/ccl/backupccl/backuppb/BUILD.bazel index b8643961005c..c9d57dabd390 100644 --- a/pkg/ccl/backupccl/backuppb/BUILD.bazel +++ b/pkg/ccl/backupccl/backuppb/BUILD.bazel @@ -48,9 +48,11 @@ go_library( "//pkg/sql/parser", "//pkg/sql/protoreflect", "//pkg/sql/sem/tree", + "//pkg/util/bulk", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//jsonpb", + "@io_opentelemetry_go_otel//attribute", ], ) diff --git a/pkg/ccl/backupccl/backuppb/backup.go b/pkg/ccl/backupccl/backuppb/backup.go index 0448199ebe06..1fc62b148fdd 100644 --- a/pkg/ccl/backupccl/backuppb/backup.go +++ b/pkg/ccl/backupccl/backuppb/backup.go @@ -10,15 +10,18 @@ package backuppb import ( "encoding/json" + "fmt" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/bulk" _ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto "github.com/cockroachdb/errors" "github.com/gogo/protobuf/jsonpb" + "go.opentelemetry.io/otel/attribute" ) // IsIncremental returns if the BackupManifest corresponds to an incremental @@ -123,6 +126,64 @@ func (m ScheduledBackupExecutionArgs) MarshalJSONPB(marshaller *jsonpb.Marshaler return json.Marshal(m) } +var _ bulk.TracingAggregatorEvent = &ExportStats{} + +const ( + tagNumFiles = "num_files" + tagDataSize = "data_size" + tagThroughput = "throughput" +) + +// Render implements the LazyTag interface. +func (e *ExportStats) Render() []attribute.KeyValue { + const mb = 1 << 20 + tags := make([]attribute.KeyValue, 0) + if e.NumFiles > 0 { + tags = append(tags, attribute.KeyValue{ + Key: tagNumFiles, + Value: attribute.Int64Value(e.NumFiles), + }) + } + if e.DataSize > 0 { + dataSizeMB := float64(e.DataSize) / mb + tags = append(tags, attribute.KeyValue{ + Key: tagDataSize, + Value: attribute.StringValue(fmt.Sprintf("%.2f MB", dataSizeMB)), + }) + + if e.Duration > 0 { + throughput := dataSizeMB / e.Duration.Seconds() + tags = append(tags, attribute.KeyValue{ + Key: tagThroughput, + Value: attribute.StringValue(fmt.Sprintf("%.2f MB/s", throughput)), + }) + } + } + + return tags +} + +// Identity implements the AggregatorEvent interface. +func (e *ExportStats) Identity() bulk.TracingAggregatorEvent { + return &ExportStats{} +} + +// Combine implements the AggregatorEvent interface. +func (e *ExportStats) Combine(other bulk.TracingAggregatorEvent) { + otherExportStats, ok := other.(*ExportStats) + if !ok { + panic(fmt.Sprintf("`other` is not of type ExportStats: %T", other)) + } + e.NumFiles += otherExportStats.NumFiles + e.DataSize += otherExportStats.DataSize + e.Duration += otherExportStats.Duration +} + +// Tag implements the AggregatorEvent interface. +func (e *ExportStats) Tag() string { + return "ExportStats" +} + func init() { protoreflect.RegisterShorthands((*BackupManifest)(nil), "backup", "backup_manifest") } diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index aa8dcc12e2ef..caa125169601 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -193,22 +193,16 @@ message BackupProgressTraceEvent { util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false]; } -// BackupExportTraceRequestEvent is the trace event recorded when an -// ExportRequest has been sent. -message BackupExportTraceRequestEvent { - string span = 1; - int32 attempt = 2; - string priority = 3; - string req_sent_time = 4; -} - -// BackupExportTraceResponseEvent is the trace event recorded when we receive a -// response from the ExportRequest. -message BackupExportTraceResponseEvent { - string duration = 1; - int32 num_files = 2; - repeated roachpb.RowCount file_summaries = 3 [(gogoproto.nullable) = false]; - reserved 4 ; - string retryable_error = 5; +// ExportStats is a message containing information about each +// Export{Request,Response}. +message ExportStats { + // NumFiles is the number of SST files produced by the ExportRequest. + int64 num_files = 1; + // DataSize is the byte size of all the SST files produced by the + // ExportRequest. + int64 data_size = 2; + // Duration is the total time taken to send an ExportRequest, receive an + // ExportResponse and push the response on a channel. + int64 duration = 3 [(gogoproto.casttype) = "time.Duration"]; } diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 756a42b8399c..57057c9370f3 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -12,6 +12,7 @@ package jobs import ( "context" + "fmt" "strconv" "sync" @@ -389,7 +390,8 @@ func (r *Registry) runJob( // TODO(ajwerner): Move this writing up the trace ID down into // stepThroughStateMachine where we're already often (and soon with // exponential backoff, always) updating the job in that call. - ctx, span := r.ac.Tracer.StartSpanCtx(ctx, typ.String(), spanOptions...) + ctx, span := r.ac.Tracer.StartSpanCtx(ctx, + fmt.Sprintf("%s-%d", typ.String(), job.ID()), spanOptions...) span.SetTag("job-id", attribute.Int64Value(int64(job.ID()))) defer span.Finish() if span.TraceID() != 0 { diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 7b1e399df078..bdc17bb136c4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -102,7 +102,7 @@ func evalExport( h := cArgs.Header reply := resp.(*roachpb.ExportResponse) - ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey)) + ctx, evalExportSpan := tracing.ChildSpan(ctx, "evalExport") defer evalExportSpan.Finish() var evalExportTrace types.StringValue diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go index 57b64ca99e61..00d51673ab71 100644 --- a/pkg/sql/ttl/ttljob/ttljob_processor.go +++ b/pkg/sql/ttl/ttljob/ttljob_processor.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -73,7 +74,7 @@ func (ttl *ttlProcessor) work(ctx context.Context) error { deleteRateLimit, ) - rowCount := int64(0) + processorRowCount := int64(0) var relationName string var pkColumns []string @@ -151,7 +152,7 @@ func (ttl *ttlProcessor) work(ctx context.Context) error { ttlSpec.PreSelectStatement, ) // add before returning err in case of partial success - atomic.AddInt64(&rowCount, rangeRowCount) + atomic.AddInt64(&processorRowCount, rangeRowCount) metrics.RangeTotalDuration.RecordValue(int64(timeutil.Since(start))) if err != nil { // Continue until channel is fully read. @@ -224,18 +225,26 @@ func (ttl *ttlProcessor) work(ctx context.Context) error { return err } - job, err := serverCfg.JobRegistry.LoadJob(ctx, ttlSpec.JobID) - if err != nil { - return err - } - return db.Txn(ctx, func(_ context.Context, txn *kv.Txn) error { - return job.Update(ctx, txn, func(_ *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + jobID := ttlSpec.JobID + return serverCfg.JobRegistry.UpdateJobWithTxn( + ctx, + jobID, + nil, /* txn */ + true, /* useReadLock */ + func(_ *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { progress := md.Progress - progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.RowCount += rowCount + existingRowCount := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.RowCount + progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.RowCount += processorRowCount ju.UpdateProgress(progress) + log.VInfof( + ctx, + 2, /* level */ + "TTL processorRowCount updated jobID=%d processorID=%d tableID=%d existingRowCount=%d processorRowCount=%d progress=%s", + jobID, ttl.ProcessorID, details.TableID, existingRowCount, processorRowCount, progress, + ) return nil - }) - }) + }, + ) } // rangeRowCount should be checked even if the function returns an error because it may have partially succeeded @@ -348,13 +357,13 @@ func runTTLOnRange( defer tokens.Consume() start := timeutil.Now() - rowCount, err := deleteBuilder.run(ctx, ie, txn, deleteBatch) + batchRowCount, err := deleteBuilder.run(ctx, ie, txn, deleteBatch) if err != nil { return err } metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start))) - rangeRowCount += int64(rowCount) + rangeRowCount += int64(batchRowCount) return nil }); err != nil { return rangeRowCount, errors.Wrapf(err, "error during row deletion") diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index cf67cf9578f4..8237ea2ecc3f 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -5726,7 +5726,7 @@ func MVCCExportToSST( ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer, ) (roachpb.BulkOpSummary, MVCCKey, error) { var span *tracing.Span - ctx, span = tracing.ChildSpan(ctx, "MVCCExportToSST") + ctx, span = tracing.ChildSpan(ctx, "storage.MVCCExportToSST") defer span.Finish() sstWriter := MakeBackupSSTWriter(ctx, cs, dest) defer sstWriter.Close() diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel new file mode 100644 index 000000000000..7b528186d084 --- /dev/null +++ b/pkg/util/bulk/BUILD.bazel @@ -0,0 +1,27 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "bulk", + srcs = ["tracing_aggregator.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/syncutil", + "//pkg/util/tracing", + ], +) + +go_test( + name = "bulk_test", + srcs = ["tracing_aggregator_test.go"], + deps = [ + ":bulk", + "//pkg/ccl/backupccl/backuppb", + "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/util/bulk/tracing_aggregator.go b/pkg/util/bulk/tracing_aggregator.go new file mode 100644 index 000000000000..3bb29c7af489 --- /dev/null +++ b/pkg/util/bulk/tracing_aggregator.go @@ -0,0 +1,101 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" +) + +// TracingAggregatorEvent describes an event that can be aggregated and stored by the +// TracingAggregator. A TracingAggregatorEvent also implements the tracing.LazyTag interface +// to render its information on the associated tracing span. +type TracingAggregatorEvent interface { + tracing.LazyTag + + // Identity returns a TracingAggregatorEvent that when combined with another + // event returns the other TracingAggregatorEvent unchanged. + Identity() TracingAggregatorEvent + // Combine combines two TracingAggregatorEvents together. + Combine(other TracingAggregatorEvent) + // Tag returns a string used to identify the TracingAggregatorEvent. + Tag() string +} + +// An TracingAggregator can be used to aggregate and render AggregatorEvents that are +// emitted as part of its tracing spans' recording. +type TracingAggregator struct { + mu struct { + syncutil.Mutex + // aggregatedEvents is a mapping from the tag identifying the + // TracingAggregatorEvent to the running aggregate of the TracingAggregatorEvent. + aggregatedEvents map[string]TracingAggregatorEvent + // sp is the tracing span managed by the TracingAggregator. + sp *tracing.Span + // closed is set to true if the TracingAggregator has already been closed. + closed bool + } +} + +// Notify implements the tracing.EventListener interface. +func (b *TracingAggregator) Notify(event tracing.Structured) { + bulkEvent, ok := event.(TracingAggregatorEvent) + if !ok { + return + } + + b.mu.Lock() + defer b.mu.Unlock() + + // If this is the first AggregatorEvent with this tag, set it as a LazyTag on + // the associated tracing span. + eventTag := bulkEvent.Tag() + if _, ok := b.mu.aggregatedEvents[bulkEvent.Tag()]; !ok { + b.mu.aggregatedEvents[eventTag] = bulkEvent.Identity() + b.mu.sp.SetLazyTag(eventTag, b.mu.aggregatedEvents[eventTag]) + } + b.mu.aggregatedEvents[eventTag].Combine(bulkEvent) +} + +// Close is responsible for finishing the Aggregators' tracing span. +func (b *TracingAggregator) Close() { + b.mu.Lock() + defer b.mu.Unlock() + if !b.mu.closed { + b.mu.sp.Finish() + b.mu.closed = true + } +} + +var _ tracing.EventListener = &TracingAggregator{} + +// MakeTracingAggregatorWithSpan returns an instance of an TracingAggregator along with a +// newly created child context. The TracingAggregator is registered as a +// tracing.EventListener on the span associated with newly created context. +// +// The TracingAggregator instance is responsible for finishing the returned span, and +// so the user must call Close(). +func MakeTracingAggregatorWithSpan( + ctx context.Context, aggregatorName string, tracer *tracing.Tracer, +) (context.Context, *TracingAggregator) { + agg := &TracingAggregator{} + aggCtx, aggSpan := tracing.EnsureChildSpan(ctx, tracer, aggregatorName, + tracing.WithEventListeners(agg)) + + agg.mu.Lock() + defer agg.mu.Unlock() + agg.mu.aggregatedEvents = make(map[string]TracingAggregatorEvent) + agg.mu.sp = aggSpan + + return aggCtx, agg +} diff --git a/pkg/util/bulk/tracing_aggregator_test.go b/pkg/util/bulk/tracing_aggregator_test.go new file mode 100644 index 000000000000..f010a44deed6 --- /dev/null +++ b/pkg/util/bulk/tracing_aggregator_test.go @@ -0,0 +1,90 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk_test + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/stretchr/testify/require" +) + +func TestAggregator(t *testing.T) { + tr := tracing.NewTracer() + ctx := context.Background() + ctx, root := tr.StartSpanCtx(ctx, "root", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer root.Finish() + + ctx, agg := bulk.MakeTracingAggregatorWithSpan(ctx, "mockAggregator", tr) + aggSp := tracing.SpanFromContext(ctx) + defer agg.Close() + + child := tr.StartSpan("child", tracing.WithParent(root), + tracing.WithEventListeners(agg)) + defer child.Finish() + child.RecordStructured(&backuppb.ExportStats{ + NumFiles: 10, + DataSize: 10, + Duration: time.Minute, + }) + + _, childChild := tracing.ChildSpan(ctx, "childChild") + defer childChild.Finish() + childChild.RecordStructured(&backuppb.ExportStats{ + NumFiles: 20, + DataSize: 20, + Duration: time.Minute, + }) + + remoteChild := tr.StartSpan("remoteChild", tracing.WithRemoteParentFromSpanMeta(childChild.Meta())) + remoteChild.RecordStructured(&backuppb.ExportStats{ + NumFiles: 30, + DataSize: 30, + Duration: time.Minute, + }) + + // We only expect to see the aggregated stats from the local children since we + // have not imported the remote children's Recording. + exportStatsTag, found := aggSp.GetLazyTag("ExportStats") + require.True(t, found) + var es *backuppb.ExportStats + var ok bool + if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { + t.Fatal("failed to cast LazyTag to expected type") + } + require.Equal(t, backuppb.ExportStats{ + NumFiles: 30, + DataSize: 30, + Duration: 2 * time.Minute, + }, *es) + + // Import the remote recording into its parent. + rec := remoteChild.FinishAndGetConfiguredRecording() + childChild.ImportRemoteRecording(rec) + + // Now, we expect the ExportStats from the remote child to show up in the + // aggregator. + exportStatsTag, found = aggSp.GetLazyTag("ExportStats") + require.True(t, found) + if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { + t.Fatal("failed to cast LazyTag to expected type") + } + require.Equal(t, backuppb.ExportStats{ + NumFiles: 60, + DataSize: 60, + Duration: 3 * time.Minute, + }, *es) +} diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index 0b34aa6133fc..f9dd295cb019 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -461,7 +461,7 @@ func (s *Stopper) RunAsyncTaskEx(ctx context.Context, opt TaskOpts, f func(conte // // Note that we have to create the child in this parent goroutine; we can't // defer the creation to the spawned async goroutine since the parent span - // might get Finish()ed by then. However, we'll update the child'd goroutine + // might get Finish()ed by then. However, we'll update the child's goroutine // ID. var sp *tracing.Span switch opt.SpanOpt { diff --git a/pkg/util/tracing/bench_test.go b/pkg/util/tracing/bench_test.go index 7a3db23ae3cf..c636abab8b01 100644 --- a/pkg/util/tracing/bench_test.go +++ b/pkg/util/tracing/bench_test.go @@ -32,7 +32,7 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) { staticLogTags := logtags.Buffer{} staticLogTags.Add("foo", "bar") - mockListener := []EventListener{&mockEventListener{}} + mockListener := &mockEventListener{} for _, tc := range []struct { name string @@ -120,7 +120,7 @@ func BenchmarkSpan_GetRecording(b *testing.B) { func BenchmarkRecordingWithStructuredEvent(b *testing.B) { skip.UnderDeadlock(b, "span reuse triggers false-positives in the deadlock detector") ev := &types.Int32Value{Value: 5} - mockListener := []EventListener{&mockEventListener{}} + mockListener := &mockEventListener{} for _, tc := range []struct { name string diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index cbc062f32478..50a9cbf8540c 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -17,6 +17,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/ring" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -509,7 +510,11 @@ func (s *crdbSpan) recordFinishedChildren(childRecording tracingpb.Recording) { // children being added to s. for _, span := range childRecording { for _, record := range span.StructuredRecords { - s.notifyEventListeners(record.Payload) + var d types.DynamicAny + if err := types.UnmarshalAny(record.Payload, &d); err != nil { + continue + } + s.notifyEventListeners(d.Message.(protoutil.Message)) } } @@ -930,6 +935,9 @@ func (s *crdbSpan) getRecordingNoChildrenLocked( childKey := string(tag.Key) childValue := tag.Value.Emit() + if rs.Tags == nil { + rs.Tags = make(map[string]string) + } rs.Tags[childKey] = childValue tagGroup.Tags = append(tagGroup.Tags, diff --git a/pkg/util/tracing/span_options.go b/pkg/util/tracing/span_options.go index 260073f325fc..9b0a74f44561 100644 --- a/pkg/util/tracing/span_options.go +++ b/pkg/util/tracing/span_options.go @@ -474,6 +474,6 @@ func (ev eventListenersOption) apply(opts spanOptions) spanOptions { // // The caller should not mutate `eventListeners` after calling // WithEventListeners. -func WithEventListeners(eventListeners []EventListener) SpanOption { +func WithEventListeners(eventListeners ...EventListener) SpanOption { return (eventListenersOption)(eventListeners) } diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index e4740388484a..efbc7968a84f 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -959,7 +959,7 @@ func TestEventListener(t *testing.T) { tr := NewTracer() rootEventListener := &mockEventListener{} sp := tr.StartSpan("root", WithRecording(tracingpb.RecordingStructured), - WithEventListeners([]EventListener{rootEventListener})) + WithEventListeners(rootEventListener)) // Record a few Structured events. sp.RecordStructured(&types.Int32Value{Value: 4}) @@ -969,7 +969,7 @@ func TestEventListener(t *testing.T) { // Register another event listener on only the child span. childEventListener := &mockEventListener{} childSp := tr.StartSpan("child", WithParent(sp), - WithEventListeners([]EventListener{childEventListener})) + WithEventListeners(childEventListener)) childSp.RecordStructured(&types.Int32Value{Value: 6}) childSp.RecordStructured(&types.Int32Value{Value: 7}) @@ -1015,7 +1015,7 @@ func TestEventListenerNotifiedWithoutHoldingSpanMutex(t *testing.T) { tr := NewTracer() rootEventListener := &mockEventListener{} sp := tr.StartSpan("root", WithRecording(tracingpb.RecordingStructured), - WithEventListeners([]EventListener{rootEventListener})) + WithEventListeners(rootEventListener)) defer sp.Finish() // Set the EventListeners Notify() method to acquire the span's mutex.