Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
adityamaru committed May 26, 2022
1 parent e4c7ca5 commit 9544f19
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 62 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ go_library(
"//pkg/storage",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/bulk",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand Down
19 changes: 0 additions & 19 deletions pkg/ccl/backupccl/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,3 @@ message BackupProgressTraceEvent {
util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false];
}

// BackupExportTraceRequestEvent is the trace event recorded when an
// ExportRequest has been sent.
message BackupExportTraceRequestEvent {
string span = 1;
int32 attempt = 2;
string priority = 3;
string req_sent_time = 4;
}

// BackupExportTraceResponseEvent is the trace event recorded when we receive a
// response from the ExportRequest.
message BackupExportTraceResponseEvent {
string duration = 1;
int32 num_files = 2;
repeated roachpb.RowCount file_summaries = 3 [(gogoproto.nullable) = false];
reserved 4 ;
string retryable_error = 5;
}

36 changes: 9 additions & 27 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -170,8 +171,14 @@ func newBackupDataProcessor(

// Start is part of the RowSource interface.
func (bp *backupDataProcessor) Start(ctx context.Context) {
agg := bulk.MakeAggregator()
ctx = logtags.AddTag(ctx, "job", bp.spec.JobID)
ctx = bp.StartInternal(ctx, backupProcessorName)
ctx = bp.StartInternal(ctx, backupProcessorName, agg)

// Initialize the aggregator with the tracing span that will live as long as
// this processor is running.
agg.Init(tracing.SpanFromContext(ctx))

ctx, cancel := context.WithCancel(ctx)
bp.cancelAndWaitForWorker = func() {
cancel()
Expand Down Expand Up @@ -254,7 +261,6 @@ func runBackupProcessor(
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
memAcc *mon.BoundAccount,
) error {
backupProcessorSpan := tracing.SpanFromContext(ctx)
clusterSettings := flowCtx.Cfg.Settings

totalSpans := len(spec.Spans) + len(spec.IntroducedSpans)
Expand Down Expand Up @@ -411,37 +417,23 @@ func runBackupProcessor(
span.span, span.attempts+1, header.UserPriority.String())
var rawResp roachpb.Response
var pErr *roachpb.Error
var reqSentTime time.Time
var respReceivedTime time.Time
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest for span %s", span.span),
timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error {
reqSentTime = timeutil.Now()
backupProcessorSpan.RecordStructured(&BackupExportTraceRequestEvent{
Span: span.span.String(),
Attempt: int32(span.attempts + 1),
Priority: header.UserPriority.String(),
ReqSentTime: reqSentTime.String(),
})

rawResp, pErr = kv.SendWrappedWithAdmission(
ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req)
respReceivedTime = timeutil.Now()
if pErr != nil {
return pErr.GoError()
}
return nil
})
if exportRequestErr != nil {
if intentErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok {
if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok {
span.lastTried = timeutil.Now()
span.attempts++
todo <- span
// TODO(dt): send a progress update to update job progress to note
// the intents being hit.
backupProcessorSpan.RecordStructured(&BackupExportTraceResponseEvent{
RetryableError: tracing.RedactAndTruncateError(intentErr),
})
continue
}
// TimeoutError improves the opaque `context deadline exceeded` error
Expand Down Expand Up @@ -500,20 +492,12 @@ func runBackupProcessor(
completedSpans = 1
}

duration := respReceivedTime.Sub(reqSentTime)
exportResponseTraceEvent := &BackupExportTraceResponseEvent{
Duration: duration.String(),
FileSummaries: make([]roachpb.RowCount, 0),
}

if len(resp.Files) > 1 {
log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1")
}

for i, file := range resp.Files {
entryCounts := countRows(file.Exported, spec.PKIDs)
exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, entryCounts)

ret := exportedSpan{
// BackupManifest_File just happens to contain the exact fields
// to store the metadata we need, but there's no actual File
Expand Down Expand Up @@ -541,8 +525,6 @@ func runBackupProcessor(
return ctx.Err()
}
}
exportResponseTraceEvent.NumFiles = int32(len(resp.Files))
backupProcessorSpan.RecordStructured(exportResponseTraceEvent)

default:
// No work left to do, so we can exit. Note that another worker could
Expand Down
34 changes: 25 additions & 9 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
)

// SSTTargetSizeSetting is the cluster setting name for the
Expand Down Expand Up @@ -105,14 +104,6 @@ func evalExport(
ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
defer evalExportSpan.Finish()

var evalExportTrace types.StringValue
if cArgs.EvalCtx.NodeID() == h.GatewayNodeID {
evalExportTrace.Value = fmt.Sprintf("evaluating Export on gateway node %d", cArgs.EvalCtx.NodeID())
} else {
evalExportTrace.Value = fmt.Sprintf("evaluating Export on remote node %d", cArgs.EvalCtx.NodeID())
}
evalExportSpan.RecordStructured(&evalExportTrace)

// Table's marked to be excluded from backup are expected to be configured
// with a short GC TTL. Additionally, backup excludes such table's from being
// protected from GC when writing ProtectedTimestamp records. The
Expand Down Expand Up @@ -270,5 +261,30 @@ func evalExport(
}
}

// If we have a trace, emit the export stats corresponding to this
// ExportRequest.
sp := tracing.SpanFromContext(ctx)
recordExportStats(sp, h.GatewayNodeID, cArgs.EvalCtx.NodeID(), reply)

return result.Result{}, nil
}

// recordExportStats emits a StructuredEvent containing the stats about the
// evaluated ExportRequest.
func recordExportStats(
sp *tracing.Span, sendNode, evalNode roachpb.NodeID, resp *roachpb.ExportResponse,
) {
if sp == nil {
return
}
exportStats := roachpb.ExportStats{
SentPerNode: map[roachpb.NodeID]int32{sendNode: 1},
EvaluatedPerNode: map[roachpb.NodeID]int32{evalNode: 1},
}
for _, f := range resp.Files {
file := f
exportStats.NumFiles++
exportStats.DataSize += int64(len(file.SST))
}
sp.RecordStructured(&exportStats)
}
2 changes: 2 additions & 0 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/bulk",
"//pkg/util/caller",
"//pkg/util/duration",
"//pkg/util/encoding",
Expand All @@ -60,6 +61,7 @@ go_library(
"@com_github_gogo_protobuf//proto",
"@com_github_golang_mock//gomock", # keep
"@io_etcd_go_etcd_raft_v3//raftpb",
"@io_opentelemetry_go_otel//attribute",
"@org_golang_google_grpc//metadata", # keep
],
)
Expand Down
68 changes: 68 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/dustin/go-humanize"
"go.opentelemetry.io/otel/attribute"
)

//go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient
Expand Down Expand Up @@ -1800,3 +1802,69 @@ const (
// with the SpecificTenantOverrides precedence..
AllTenantsOverrides
)

var _ bulk.AggregatorEvent = &ExportStats{}

// Combine implements the AggregatorEvent interface.
func (e *ExportStats) Combine(other bulk.AggregatorEvent) {
if e == nil {
e.SentPerNode = make(map[NodeID]int32)
e.EvaluatedPerNode = make(map[NodeID]int32)
}

otherExportStats, ok := other.(*ExportStats)
if !ok {
panic("`other` is not of type ExportStats")
}
e.NumFiles += otherExportStats.NumFiles
e.DataSize += otherExportStats.DataSize
for nodeID, count := range otherExportStats.SentPerNode {
e.SentPerNode[nodeID] += count
}
for nodeID, count := range otherExportStats.EvaluatedPerNode {
e.EvaluatedPerNode[nodeID] += count
}
}

// Tag implements the AggregatorEvent interface.
func (e *ExportStats) Tag() string {
return "ExportStats"
}

const (
tagNumFiles = "export_num_files"
tagDataSize = "export_data_size"
tagRequestSent = "export_sent_per_node"
tagRequestEval = "export_eval_per_node"
)

// Render implements the AggregatorEvent interface.
func (e *ExportStats) Render() []attribute.KeyValue {
tags := make([]attribute.KeyValue, 0)
if e.NumFiles > 0 {
tags = append(tags, attribute.KeyValue{
Key: tagNumFiles,
Value: attribute.Int64Value(e.NumFiles),
})
}
if e.DataSize > 0 {
tags = append(tags, attribute.KeyValue{
Key: tagDataSize,
Value: attribute.Int64Value(e.DataSize),
})
}
if len(e.SentPerNode) > 0 {
tags = append(tags, attribute.KeyValue{
Key: tagRequestSent,
Value: attribute.StringValue(fmt.Sprintf("%v", e.SentPerNode)),
})
}
if len(e.EvaluatedPerNode) > 0 {
tags = append(tags, attribute.KeyValue{
Key: tagRequestEval,
Value: attribute.StringValue(fmt.Sprintf("%v", e.EvaluatedPerNode)),
})
}

return tags
}
25 changes: 25 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3020,3 +3020,28 @@ message ScanStats {
uint64 point_count = 9;
uint64 points_covered_by_range_tombstones = 10;
}

// ExportStats is a message containing information about each
// Export{Request,Response}.
message ExportStats {
// NumFiles is the number of SST files produced by the ExportRequest.
int64 num_files = 1;
// DataSize is the byte size of all the SST files produced by the
// ExportRequest.
int64 data_size = 2;


// SentPerNode is a mapping from NodeID to the number of ExportRequests sent
// by the node for evaluation.
map <int32, int32> sent_per_node = 3 [
(gogoproto.nullable) = false,
(gogoproto.castkey) = "NodeID"
];

// EvaluatedPerNode is a mapping from NodeID to the number of ExportRequests
// evaluated by the node.
map <int32, int32> evaluated_per_node = 4 [
(gogoproto.nullable) = false,
(gogoproto.castkey) = "NodeID"
];
}
24 changes: 17 additions & 7 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,13 +808,18 @@ func (pb *ProcessorBase) AppendTrailingMeta(meta execinfrapb.ProducerMetadata) {

// ProcessorSpan creates a child span for a processor (if we are doing any
// tracing). The returned span needs to be finished using tracing.FinishSpan.
func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {
func ProcessorSpan(
ctx context.Context, name string, tracingEventListeners ...tracing.EventListener,
) (context.Context, *tracing.Span) {
sp := tracing.SpanFromContext(ctx)
if sp == nil {
return ctx, nil
}
return sp.Tracer().StartSpanCtx(ctx, name,
tracing.WithParent(sp), tracing.WithDetachedRecording())
spanOpts := []tracing.SpanOption{tracing.WithParent(sp), tracing.WithDetachedRecording()}
if tracingEventListeners != nil {
spanOpts = append(spanOpts, tracing.WithEventListeners(tracingEventListeners))
}
return sp.Tracer().StartSpanCtx(ctx, name, spanOpts...)
}

// StartInternal prepares the ProcessorBase for execution. It returns the
Expand All @@ -826,8 +831,10 @@ func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.
// < inputs >.Start(ctx) // if there are any inputs-RowSources to pb
// < other initialization >
// so that the caller doesn't mistakenly use old ctx object.
func (pb *ProcessorBaseNoHelper) StartInternal(ctx context.Context, name string) context.Context {
return pb.startImpl(ctx, true /* createSpan */, name)
func (pb *ProcessorBaseNoHelper) StartInternal(
ctx context.Context, name string, tracingEventListeners ...tracing.EventListener,
) context.Context {
return pb.startImpl(ctx, true /* createSpan */, name, tracingEventListeners...)
}

// StartInternalNoSpan does the same as StartInternal except that it does not
Expand All @@ -840,11 +847,14 @@ func (pb *ProcessorBaseNoHelper) StartInternalNoSpan(ctx context.Context) contex
}

func (pb *ProcessorBaseNoHelper) startImpl(
ctx context.Context, createSpan bool, spanName string,
ctx context.Context,
createSpan bool,
spanName string,
tracingEventListeners ...tracing.EventListener,
) context.Context {
pb.origCtx = ctx
if createSpan {
pb.Ctx, pb.span = ProcessorSpan(ctx, spanName)
pb.Ctx, pb.span = ProcessorSpan(ctx, spanName, tracingEventListeners...)
if pb.span != nil && pb.span.IsVerbose() {
pb.span.SetTag(execinfrapb.FlowIDTagKey, attribute.StringValue(pb.FlowCtx.ID.String()))
pb.span.SetTag(execinfrapb.ProcessorIDTagKey, attribute.IntValue(int(pb.ProcessorID)))
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "bulk",
srcs = ["aggregator.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/syncutil",
"//pkg/util/tracing",
],
)
Loading

0 comments on commit 9544f19

Please sign in to comment.