Skip to content

Commit

Permalink
Merge #84043 #86322
Browse files Browse the repository at this point in the history
84043: bulk, backupccl: introduce a Structured event Aggregator r=adityamaru a=adityamaru

This change introduces an Aggregator object that is capable
of listening for Structured events emitted in a recording,
aggregating them and rendering them as LazyTags.
We also introduce an AggregatorEvent interface that can be
implemented by a Structured event thereby making it eligible
for aggregation in the Aggregator.

The first user of the Aggregator will be every backup data
processor that is setup on the nodes in the cluster during
a backup. The Aggregator lives as long as the processor, and
listens for Aggregator events emitted by any span that is a child
of the processors' span. This includes both local children as
well as remote children whose recordings have been imported into
a local span. The Aggregator stores running aggregates of each
AggregatorEvent it is notified about, bucketed by the events'
tag name. This aggregate will be rendered on the tracing span as a LazyTag.

This change teaches every ExportRequest to emit an AggregatorEvent.
Going forward we expect many more operations in bulk jobs to
define and emit such events providing visibility into otherwise
opaque operations.

We cleanup some of the StructuredEvents that were previously
added but have not proved useful, and also change some of the
tracing span operation names to be more intuitive.

To view these aggregated events once can navigate to the
`/tracez` endpoint of the debug console to take a snapshot and search for either
`BACKUP` or the job ID to filter for tracing spans on that node.
The span associated with the backup processor will be decorated with
tags that correspond to the fields in the introduced `ExportStats`
proto message. Note these stats are only aggregated on a per node
basis.

Fixes: #80388

Release note: None

Release justification: low risk change for improved observability into backups.

86322: ttl: use JobRegistry.UpdateJobWithTxn to avoid race condition r=rafiss a=ecwall

fixes #85800

UpdateJobWithTxn updates the job progress with useReadLock=true to prevent
concurrent update attempts from undoing each other.

Release justification: Fix test flake.

Release note: None

Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
  • Loading branch information
3 people committed Aug 17, 2022
3 parents 85ae31d + ae1804b + 83776cb commit 395cf17
Show file tree
Hide file tree
Showing 21 changed files with 380 additions and 71 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ ALL_TESTS = [
"//pkg/util/binfetcher:binfetcher_test",
"//pkg/util/bitarray:bitarray_test",
"//pkg/util/buildutil:buildutil_test",
"//pkg/util/bulk:bulk_test",
"//pkg/util/cache:cache_test",
"//pkg/util/caller:caller_test",
"//pkg/util/cgroups:cgroups_test",
Expand Down Expand Up @@ -1829,6 +1830,8 @@ GO_TARGETS = [
"//pkg/util/bufalloc:bufalloc",
"//pkg/util/buildutil:buildutil",
"//pkg/util/buildutil:buildutil_test",
"//pkg/util/bulk:bulk",
"//pkg/util/bulk:bulk_test",
"//pkg/util/cache:cache",
"//pkg/util/cache:cache_test",
"//pkg/util/caller:caller",
Expand Down Expand Up @@ -2806,6 +2809,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/bitarray:get_x_data",
"//pkg/util/bufalloc:get_x_data",
"//pkg/util/buildutil:get_x_data",
"//pkg/util/bulk:get_x_data",
"//pkg/util/cache:get_x_data",
"//pkg/util/caller:get_x_data",
"//pkg/util/cancelchecker:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ go_library(
"//pkg/storage",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/bulk",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1467,5 +1467,5 @@ func updateBackupDetails(
}

func init() {
sql.AddPlanHook("backup", backupPlanHook)
sql.AddPlanHook("backupccl.backupPlanHook", backupPlanHook)
}
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_planning_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func retrieveSingleTenantMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID,
) (descpb.TenantInfoWithUsage, error) {
row, err := ie.QueryRow(
ctx, "backup-lookup-tenant", txn,
ctx, "backupccl.retrieveSingleTenantMetadata", txn,
tenantMetadataQuery+` WHERE id = $1`, tenantID.ToUint64(),
)
if err != nil {
Expand All @@ -99,7 +99,7 @@ func retrieveAllTenantsMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn,
) ([]descpb.TenantInfoWithUsage, error) {
rows, err := ie.QueryBuffered(
ctx, "backup-lookup-tenants", txn,
ctx, "backupccl.retrieveAllTenantsMetadata", txn,
// XXX Should we add a `WHERE active`? We require the tenant to be active
// when it is specified..
tenantMetadataQuery,
Expand Down
60 changes: 35 additions & 25 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -106,6 +107,10 @@ type backupDataProcessor struct {

// BoundAccount that reserves the memory usage of the backup processor.
memAcc *mon.BoundAccount

// Aggregator that aggregates StructuredEvents emitted in the
// backupDataProcessors' trace recording.
agg *bulk.TracingAggregator
}

var (
Expand Down Expand Up @@ -153,14 +158,20 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", bp.spec.JobID)
ctx = bp.StartInternal(ctx, backupProcessorName)
ctx, cancel := context.WithCancel(ctx)

// Construct an Aggregator to aggregate and render AggregatorEvents emitted in
// bps' trace recording.
ctx, bp.agg = bulk.MakeTracingAggregatorWithSpan(ctx,
fmt.Sprintf("%s-aggregator", backupProcessorName), bp.EvalCtx.Tracer)

bp.cancelAndWaitForWorker = func() {
cancel()
for range bp.progCh {
}
}
log.Infof(ctx, "starting backup data")
if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{
TaskName: "backup-worker",
TaskName: "backupDataProcessor.runBackupProcessor",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc)
Expand Down Expand Up @@ -198,6 +209,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer

func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.agg.Close()
if bp.InternalClose() {
bp.memAcc.Close(bp.Ctx)
}
Expand Down Expand Up @@ -387,26 +399,16 @@ func runBackupProcessor(
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}
log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)",
log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
var rawResp roachpb.Response
var pErr *roachpb.Error
var reqSentTime time.Time
var respReceivedTime time.Time
requestSentAt := timeutil.Now()
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest for span %s", span.span),
timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error {
reqSentTime = timeutil.Now()
backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceRequestEvent{
Span: span.span.String(),
Attempt: int32(span.attempts + 1),
Priority: header.UserPriority.String(),
ReqSentTime: reqSentTime.String(),
})

rawResp, pErr = kv.SendWrappedWithAdmission(
ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req)
respReceivedTime = timeutil.Now()
if pErr != nil {
return pErr.GoError()
}
Expand All @@ -419,9 +421,7 @@ func runBackupProcessor(
todo <- span
// TODO(dt): send a progress update to update job progress to note
// the intents being hit.
backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceResponseEvent{
RetryableError: tracing.RedactAndTruncateError(intentErr),
})
log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error())
continue
}
// TimeoutError improves the opaque `context deadline exceeded` error
Expand Down Expand Up @@ -480,19 +480,12 @@ func runBackupProcessor(
completedSpans = 1
}

duration := respReceivedTime.Sub(reqSentTime)
exportResponseTraceEvent := &backuppb.BackupExportTraceResponseEvent{
Duration: duration.String(),
FileSummaries: make([]roachpb.RowCount, 0),
}

if len(resp.Files) > 1 {
log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1")
}

for i, file := range resp.Files {
entryCounts := countRows(file.Exported, spec.PKIDs)
exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, entryCounts)

ret := exportedSpan{
// BackupManifest_File just happens to contain the exact fields
Expand Down Expand Up @@ -521,8 +514,9 @@ func runBackupProcessor(
return ctx.Err()
}
}
exportResponseTraceEvent.NumFiles = int32(len(resp.Files))
backupProcessorSpan.RecordStructured(exportResponseTraceEvent)

// Emit the stats for the processed ExportRequest.
recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt))

default:
// No work left to do, so we can exit. Note that another worker could
Expand Down Expand Up @@ -576,6 +570,22 @@ func runBackupProcessor(
return grp.Wait()
}

// recordExportStats emits a StructuredEvent containing the stats about the
// evaluated ExportRequest.
func recordExportStats(
sp *tracing.Span, resp *roachpb.ExportResponse, exportDuration time.Duration,
) {
if sp == nil {
return
}
exportStats := backuppb.ExportStats{Duration: exportDuration}
for _, f := range resp.Files {
exportStats.NumFiles++
exportStats.DataSize += int64(len(f.SST))
}
sp.RecordStructured(&exportStats)
}

func init() {
rowexec.NewBackupDataProcessor = newBackupDataProcessor
}
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func distBackupPlanSpecs(
startTime, endTime hlc.Timestamp,
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "backup-plan-specs")
ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs")
_ = ctx // ctx is currently unused, but this new ctx should be used below in the future.
defer span.Finish()
user := execCtx.User()
Expand Down Expand Up @@ -158,7 +158,7 @@ func distBackup(
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec,
) error {
ctx, span := tracing.ChildSpan(ctx, "backup-distsql")
ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup")
defer span.Finish()
evalCtx := execCtx.ExtendedEvalContext()
var noTxn *kv.Txn
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backuppb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ go_library(
"//pkg/sql/parser",
"//pkg/sql/protoreflect",
"//pkg/sql/sem/tree",
"//pkg/util/bulk",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//jsonpb",
"@io_opentelemetry_go_otel//attribute",
],
)

Expand Down
61 changes: 61 additions & 0 deletions pkg/ccl/backupccl/backuppb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ package backuppb

import (
"encoding/json"
"fmt"

"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
_ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
"go.opentelemetry.io/otel/attribute"
)

// IsIncremental returns if the BackupManifest corresponds to an incremental
Expand Down Expand Up @@ -123,6 +126,64 @@ func (m ScheduledBackupExecutionArgs) MarshalJSONPB(marshaller *jsonpb.Marshaler
return json.Marshal(m)
}

var _ bulk.TracingAggregatorEvent = &ExportStats{}

const (
tagNumFiles = "num_files"
tagDataSize = "data_size"
tagThroughput = "throughput"
)

// Render implements the LazyTag interface.
func (e *ExportStats) Render() []attribute.KeyValue {
const mb = 1 << 20
tags := make([]attribute.KeyValue, 0)
if e.NumFiles > 0 {
tags = append(tags, attribute.KeyValue{
Key: tagNumFiles,
Value: attribute.Int64Value(e.NumFiles),
})
}
if e.DataSize > 0 {
dataSizeMB := float64(e.DataSize) / mb
tags = append(tags, attribute.KeyValue{
Key: tagDataSize,
Value: attribute.StringValue(fmt.Sprintf("%.2f MB", dataSizeMB)),
})

if e.Duration > 0 {
throughput := dataSizeMB / e.Duration.Seconds()
tags = append(tags, attribute.KeyValue{
Key: tagThroughput,
Value: attribute.StringValue(fmt.Sprintf("%.2f MB/s", throughput)),
})
}
}

return tags
}

// Identity implements the AggregatorEvent interface.
func (e *ExportStats) Identity() bulk.TracingAggregatorEvent {
return &ExportStats{}
}

// Combine implements the AggregatorEvent interface.
func (e *ExportStats) Combine(other bulk.TracingAggregatorEvent) {
otherExportStats, ok := other.(*ExportStats)
if !ok {
panic(fmt.Sprintf("`other` is not of type ExportStats: %T", other))
}
e.NumFiles += otherExportStats.NumFiles
e.DataSize += otherExportStats.DataSize
e.Duration += otherExportStats.Duration
}

// Tag implements the AggregatorEvent interface.
func (e *ExportStats) Tag() string {
return "ExportStats"
}

func init() {
protoreflect.RegisterShorthands((*BackupManifest)(nil), "backup", "backup_manifest")
}
28 changes: 11 additions & 17 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,16 @@ message BackupProgressTraceEvent {
util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false];
}

// BackupExportTraceRequestEvent is the trace event recorded when an
// ExportRequest has been sent.
message BackupExportTraceRequestEvent {
string span = 1;
int32 attempt = 2;
string priority = 3;
string req_sent_time = 4;
}

// BackupExportTraceResponseEvent is the trace event recorded when we receive a
// response from the ExportRequest.
message BackupExportTraceResponseEvent {
string duration = 1;
int32 num_files = 2;
repeated roachpb.RowCount file_summaries = 3 [(gogoproto.nullable) = false];
reserved 4 ;
string retryable_error = 5;
// ExportStats is a message containing information about each
// Export{Request,Response}.
message ExportStats {
// NumFiles is the number of SST files produced by the ExportRequest.
int64 num_files = 1;
// DataSize is the byte size of all the SST files produced by the
// ExportRequest.
int64 data_size = 2;
// Duration is the total time taken to send an ExportRequest, receive an
// ExportResponse and push the response on a channel.
int64 duration = 3 [(gogoproto.casttype) = "time.Duration"];
}

4 changes: 3 additions & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package jobs

import (
"context"
"fmt"
"strconv"
"sync"

Expand Down Expand Up @@ -389,7 +390,8 @@ func (r *Registry) runJob(
// TODO(ajwerner): Move this writing up the trace ID down into
// stepThroughStateMachine where we're already often (and soon with
// exponential backoff, always) updating the job in that call.
ctx, span := r.ac.Tracer.StartSpanCtx(ctx, typ.String(), spanOptions...)
ctx, span := r.ac.Tracer.StartSpanCtx(ctx,
fmt.Sprintf("%s-%d", typ.String(), job.ID()), spanOptions...)
span.SetTag("job-id", attribute.Int64Value(int64(job.ID())))
defer span.Finish()
if span.TraceID() != 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func evalExport(
h := cArgs.Header
reply := resp.(*roachpb.ExportResponse)

ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
ctx, evalExportSpan := tracing.ChildSpan(ctx, "evalExport")
defer evalExportSpan.Finish()

var evalExportTrace types.StringValue
Expand Down
Loading

0 comments on commit 395cf17

Please sign in to comment.