Skip to content

Commit

Permalink
bulk, dashboards: fix c2c metrics and dashboards
Browse files Browse the repository at this point in the history
This change fixes a few bugs around c2c metric collection
and display:

1) This change fixes the unit on the c2c replication dashboard
for the logical and sst byte metrics.

2) Previously, the IngestedEvents was the number of point keys
in the current batch and the size of rangekeys in the batch. This has
been corrected to be the sum of the lenght of point keys and rangekeys
in the batch.

3) Previously, we were adding the running sum of the SSTDataSize to
the metric tracking SST bytes, instead of only bytes ingested as a part of
the current batch. This is incorrect and has been changed to only capture
the number of bytes that have been ingested as part of the latest flush.

4) Callers of the SSTBatcher were making an incorrect assumption about
the per batch BulkOpSummary that was exposed via the GetBatchSummary method.
This per batch summary is reset whenever the SSTBatcher is reset. This reset
is usually performed by the caller after a manual call to Flush. However, the
batcher itself may decide to flush and reset while adding keys to the current
batch thereby resetting the BulkOpSummary. So when the caller fetches the summary
after the manual flush it would only contain a partial set of results that had
been aggregated since the last internal flush. This change warns users against
directly accessing the per batch BulkOpSummary and instead registering an OnFlush
callback that is called on every successful flush of the batcher.

The stream ingestion processor now updates is logical and sst bytes onFlush.

Informs: #97224

Release note: None
  • Loading branch information
adityamaru authored and stevendanna committed Feb 20, 2023
1 parent a66ad8b commit 9f979ea
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 75 deletions.
16 changes: 8 additions & 8 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ var (
// Metrics are for production monitoring of stream ingestion jobs.
type Metrics struct {
IngestedEvents *metric.Counter
IngestedBytes *metric.Counter
SSTBytes *metric.Counter
IngestedLogicalBytes *metric.Counter
IngestedSSTBytes *metric.Counter
Flushes *metric.Counter
JobProgressUpdates *metric.Counter
ResolvedEvents *metric.Counter
Expand All @@ -156,12 +156,12 @@ func (*Metrics) MetricStruct() {}
// MakeMetrics makes the metrics for stream ingestion job monitoring.
func MakeMetrics(histogramWindow time.Duration) metric.Struct {
m := &Metrics{
IngestedEvents: metric.NewCounter(metaReplicationEventsIngested),
IngestedBytes: metric.NewCounter(metaReplicationIngestedBytes),
SSTBytes: metric.NewCounter(metaReplicationSSTBytes),
Flushes: metric.NewCounter(metaReplicationFlushes),
ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested),
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
IngestedEvents: metric.NewCounter(metaReplicationEventsIngested),
IngestedLogicalBytes: metric.NewCounter(metaReplicationIngestedBytes),
IngestedSSTBytes: metric.NewCounter(metaReplicationSSTBytes),
Flushes: metric.NewCounter(metaReplicationFlushes),
ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested),
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaReplicationFlushHistNanos,
Duration: histogramWindow,
Expand Down
95 changes: 56 additions & 39 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,24 @@ type rangeKeyBatcher struct {

// Minimum timestamp in the current batch. Used for metrics purpose.
minTimestamp hlc.Timestamp
// Data size of the current batch.
dataSize int

// batchSummary is the BulkOpSummary for the current batch of rangekeys.
batchSummary kvpb.BulkOpSummary

// onFlush is the callback called after the current batch has been
// successfully ingested.
onFlush func(kvpb.BulkOpSummary)
}

func newRangeKeyBatcher(ctx context.Context, cs *cluster.Settings, db *kv.DB) *rangeKeyBatcher {
func newRangeKeyBatcher(
ctx context.Context, cs *cluster.Settings, db *kv.DB, onFlush func(summary kvpb.BulkOpSummary),
) *rangeKeyBatcher {
batcher := &rangeKeyBatcher{
db: db,
minTimestamp: hlc.MaxTimestamp,
dataSize: 0,
batchSummary: kvpb.BulkOpSummary{},
rangeKeySSTFile: &storage.MemFile{},
onFlush: onFlush,
}
batcher.rangeKeySSTWriterMaker = func() *storage.SSTWriter {
w := storage.MakeIngestionSSTWriter(ctx, cs, batcher.rangeKeySSTFile)
Expand Down Expand Up @@ -282,14 +290,23 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
evalCtx := sip.FlowCtx.EvalCtx
db := sip.FlowCtx.Cfg.DB
var err error
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db.KV(), evalCtx.Settings,
sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), sip.flowCtx.Cfg.BulkSenderLimiter)
sip.batcher, err = bulk.MakeStreamSSTBatcher(
ctx, db.KV(), evalCtx.Settings, sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
sip.flowCtx.Cfg.BulkSenderLimiter, func(batchSummary kvpb.BulkOpSummary) {
// OnFlush update the ingested logical and SST byte metrics.
sip.metrics.IngestedLogicalBytes.Inc(batchSummary.DataSize)
sip.metrics.IngestedSSTBytes.Inc(batchSummary.SSTDataSize)
})
if err != nil {
sip.MoveToDraining(errors.Wrap(err, "creating stream sst batcher"))
return
}

sip.rangeBatcher = newRangeKeyBatcher(ctx, evalCtx.Settings, db.KV())
sip.rangeBatcher = newRangeKeyBatcher(ctx, evalCtx.Settings, db.KV(), func(batchSummary kvpb.BulkOpSummary) {
// OnFlush update the ingested logical and SST byte metrics.
sip.metrics.IngestedLogicalBytes.Inc(batchSummary.DataSize)
sip.metrics.IngestedSSTBytes.Inc(batchSummary.SSTDataSize)
})

// Start a poller that checks if the stream ingestion job has been signaled to
// cutover.
Expand Down Expand Up @@ -732,11 +749,6 @@ func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) erro
// the current size of all buffered range keys.
func (r *rangeKeyBatcher) buffer(rangeKV storage.MVCCRangeKeyValue) {
r.curRangeKVBatch = append(r.curRangeKVBatch, rangeKV)
r.dataSize += rangeKV.RangeKey.EncodedSize() + len(rangeKV.Value)
}

func (r *rangeKeyBatcher) size() int {
return r.dataSize
}

// Flush all the range keys buffered so far into storage as an SST.
Expand Down Expand Up @@ -767,6 +779,7 @@ func (r *rangeKeyBatcher) flush(ctx context.Context) error {
if rangeKeyVal.RangeKey.Timestamp.Less(r.minTimestamp) {
r.minTimestamp = rangeKeyVal.RangeKey.Timestamp
}
r.batchSummary.DataSize += int64(rangeKeyVal.RangeKey.EncodedSize() + len(rangeKeyVal.Value))
}

// Finish the current batch.
Expand All @@ -778,7 +791,16 @@ func (r *rangeKeyBatcher) flush(ctx context.Context) error {
false /* disallowConflicts */, false, /* disallowShadowing */
hlc.Timestamp{}, nil /* stats */, false, /* ingestAsWrites */
r.db.Clock().Now())
return err
if err != nil {
return err
}
r.batchSummary.SSTDataSize += int64(len(r.rangeKeySSTFile.Data()))

if r.onFlush != nil {
r.onFlush(r.batchSummary)
}

return nil
}

// Reset all the states inside the batcher and needs to called after flush
Expand All @@ -789,7 +811,7 @@ func (r *rangeKeyBatcher) reset() {
}
r.rangeKeySSTFile.Reset()
r.minTimestamp = hlc.MaxTimestamp
r.dataSize = 0
r.batchSummary.Reset()
r.curRangeKVBatch = r.curRangeKVBatch[:0]
}

Expand All @@ -798,9 +820,11 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
defer sp.Finish()

flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)}

// First process the point KVs.
//
// Ensure that the current batch is sorted.
sort.Sort(sip.curKVBatch)
totalSize := 0
minBatchMVCCTimestamp := hlc.MaxTimestamp
for _, keyVal := range sip.curKVBatch {
if err := sip.batcher.AddMVCCKey(ctx, keyVal.Key, keyVal.Value); err != nil {
Expand All @@ -809,40 +833,33 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
if keyVal.Key.Timestamp.Less(minBatchMVCCTimestamp) {
minBatchMVCCTimestamp = keyVal.Key.Timestamp
}
totalSize += len(keyVal.Key.Key) + len(keyVal.Value)
}

if sip.rangeBatcher.size() > 0 {
totalSize += sip.rangeBatcher.size()
if sip.rangeBatcher.minTimestamp.Less(minBatchMVCCTimestamp) {
minBatchMVCCTimestamp = sip.rangeBatcher.minTimestamp
preFlushTime := timeutil.Now()
if len(sip.curKVBatch) > 0 {
if err := sip.batcher.Flush(ctx); err != nil {
return nil, errors.Wrap(err, "flushing sst batcher")
}
}

if len(sip.curKVBatch) > 0 || sip.rangeBatcher.size() > 0 {
preFlushTime := timeutil.Now()
defer func() {
sip.metrics.FlushHistNanos.RecordValue(timeutil.Since(preFlushTime).Nanoseconds())
sip.metrics.CommitLatency.RecordValue(timeutil.Since(minBatchMVCCTimestamp.GoTime()).Nanoseconds())
sip.metrics.Flushes.Inc(1)
sip.metrics.IngestedBytes.Inc(int64(totalSize))
sip.metrics.SSTBytes.Inc(sip.batcher.GetSummary().SSTDataSize)
sip.metrics.IngestedEvents.Inc(int64(len(sip.curKVBatch)))
sip.metrics.IngestedEvents.Inc(int64(sip.rangeBatcher.size()))
}()
if len(sip.curKVBatch) > 0 {
if err := sip.batcher.Flush(ctx); err != nil {
return nil, errors.Wrap(err, "flushing sst batcher")
}
// Now process the range KVs.
if len(sip.rangeBatcher.curRangeKVBatch) > 0 {
if sip.rangeBatcher.minTimestamp.Less(minBatchMVCCTimestamp) {
minBatchMVCCTimestamp = sip.rangeBatcher.minTimestamp
}

if sip.rangeBatcher.size() > 0 {
if err := sip.rangeBatcher.flush(ctx); err != nil {
return nil, errors.Wrap(err, "flushing range key sst")
}
if err := sip.rangeBatcher.flush(ctx); err != nil {
return nil, errors.Wrap(err, "flushing range key sst")
}
}

// Update the flush metrics.
sip.metrics.FlushHistNanos.RecordValue(timeutil.Since(preFlushTime).Nanoseconds())
sip.metrics.CommitLatency.RecordValue(timeutil.Since(minBatchMVCCTimestamp.GoTime()).Nanoseconds())
sip.metrics.Flushes.Inc(1)
sip.metrics.IngestedEvents.Inc(int64(len(sip.curKVBatch)))
sip.metrics.IngestedEvents.Inc(int64(len(sip.rangeBatcher.curRangeKVBatch)))

// Go through buffered checkpoint events, and put them on the channel to be
// emitted to the downstream frontier processor.
sip.frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) span.OpResult {
Expand Down
23 changes: 19 additions & 4 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type BufferingAdder struct {
// currently buffered kvs.
curBuf kvBuf

// curBufSummary is a summary of the currently buffered kvs that is populated
// as and when the kvs are ingested by the underlying sink. This field is
// cleared after each flush of the BufferingAdder.
curBufSummary kvpb.BulkOpSummary

sorted bool

initialSplits int
Expand Down Expand Up @@ -110,8 +115,16 @@ func MakeBulkAdder(
sorted: true,
initialSplits: opts.InitialSplitsIfUnordered,
lastFlush: timeutil.Now(),
curBufSummary: kvpb.BulkOpSummary{},
}

// Register a callback with the underlying sink to accumulate the summary for
// the current buffered KVs. The curBufSummary is reset when the buffering
// adder, and therefore the underlying sink, has completed ingested all the
// currently buffered kvs.
b.sink.mu.onFlush = func(batchSummary kvpb.BulkOpSummary) {
b.curBufSummary.Add(batchSummary)
}
b.sink.mem.Mu = &syncutil.Mutex{}
// At minimum a bulk adder needs enough space to store a buffer of
// curBufferSize, and a subsequent SST of SSTSize in-memory. If the memory
Expand Down Expand Up @@ -239,9 +252,10 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {

if b.bufferedKeys() == 0 {
if b.onFlush != nil {
b.onFlush(b.sink.GetBatchSummary())
b.onFlush(b.curBufSummary)
}
b.lastFlush = timeutil.Now()
b.curBufSummary.Reset()
return nil
}
if err := b.sink.Reset(ctx); err != nil {
Expand All @@ -257,7 +271,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
before = b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats)
before.Combine(&b.sink.mu.totalStats)
before.Combine(&b.sink.currentStats)
beforeSize = b.sink.mu.totalRows.DataSize
beforeSize = b.sink.mu.totalBulkOpSummary.DataSize
b.sink.mu.Unlock()
}

Expand Down Expand Up @@ -309,7 +323,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {

if log.V(3) && before != nil {
b.sink.mu.Lock()
written := b.sink.mu.totalRows.DataSize - beforeSize
written := b.sink.mu.totalBulkOpSummary.DataSize - beforeSize
afterStats := b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats)
afterStats.Combine(&b.sink.mu.totalStats)
afterStats.Combine(&b.sink.currentStats)
Expand Down Expand Up @@ -349,9 +363,10 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
}

if b.onFlush != nil {
b.onFlush(b.sink.GetBatchSummary())
b.onFlush(b.curBufSummary)
}
b.curBuf.Reset()
b.curBufSummary.Reset()
b.lastFlush = timeutil.Now()
return nil
}
Expand Down
Loading

0 comments on commit 9f979ea

Please sign in to comment.