diff --git a/pkg/ccl/streamingccl/streamingest/metrics.go b/pkg/ccl/streamingccl/streamingest/metrics.go index 851366f6c457..69cb3fe2ac8a 100644 --- a/pkg/ccl/streamingccl/streamingest/metrics.go +++ b/pkg/ccl/streamingccl/streamingest/metrics.go @@ -133,8 +133,8 @@ var ( // Metrics are for production monitoring of stream ingestion jobs. type Metrics struct { IngestedEvents *metric.Counter - IngestedBytes *metric.Counter - SSTBytes *metric.Counter + IngestedLogicalBytes *metric.Counter + IngestedSSTBytes *metric.Counter Flushes *metric.Counter JobProgressUpdates *metric.Counter ResolvedEvents *metric.Counter @@ -156,12 +156,12 @@ func (*Metrics) MetricStruct() {} // MakeMetrics makes the metrics for stream ingestion job monitoring. func MakeMetrics(histogramWindow time.Duration) metric.Struct { m := &Metrics{ - IngestedEvents: metric.NewCounter(metaReplicationEventsIngested), - IngestedBytes: metric.NewCounter(metaReplicationIngestedBytes), - SSTBytes: metric.NewCounter(metaReplicationSSTBytes), - Flushes: metric.NewCounter(metaReplicationFlushes), - ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested), - JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates), + IngestedEvents: metric.NewCounter(metaReplicationEventsIngested), + IngestedLogicalBytes: metric.NewCounter(metaReplicationIngestedBytes), + IngestedSSTBytes: metric.NewCounter(metaReplicationSSTBytes), + Flushes: metric.NewCounter(metaReplicationFlushes), + ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested), + JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates), FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{ Metadata: metaReplicationFlushHistNanos, Duration: histogramWindow, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index cc24ff23367e..c75c55ea74a9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -97,16 +97,24 @@ type rangeKeyBatcher struct { // Minimum timestamp in the current batch. Used for metrics purpose. minTimestamp hlc.Timestamp - // Data size of the current batch. - dataSize int + + // batchSummary is the BulkOpSummary for the current batch of rangekeys. + batchSummary kvpb.BulkOpSummary + + // onFlush is the callback called after the current batch has been + // successfully ingested. + onFlush func(kvpb.BulkOpSummary) } -func newRangeKeyBatcher(ctx context.Context, cs *cluster.Settings, db *kv.DB) *rangeKeyBatcher { +func newRangeKeyBatcher( + ctx context.Context, cs *cluster.Settings, db *kv.DB, onFlush func(summary kvpb.BulkOpSummary), +) *rangeKeyBatcher { batcher := &rangeKeyBatcher{ db: db, minTimestamp: hlc.MaxTimestamp, - dataSize: 0, + batchSummary: kvpb.BulkOpSummary{}, rangeKeySSTFile: &storage.MemFile{}, + onFlush: onFlush, } batcher.rangeKeySSTWriterMaker = func() *storage.SSTWriter { w := storage.MakeIngestionSSTWriter(ctx, cs, batcher.rangeKeySSTFile) @@ -282,14 +290,23 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { evalCtx := sip.FlowCtx.EvalCtx db := sip.FlowCtx.Cfg.DB var err error - sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db.KV(), evalCtx.Settings, - sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), sip.flowCtx.Cfg.BulkSenderLimiter) + sip.batcher, err = bulk.MakeStreamSSTBatcher( + ctx, db.KV(), evalCtx.Settings, sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), + sip.flowCtx.Cfg.BulkSenderLimiter, func(batchSummary kvpb.BulkOpSummary) { + // OnFlush update the ingested logical and SST byte metrics. + sip.metrics.IngestedLogicalBytes.Inc(batchSummary.DataSize) + sip.metrics.IngestedSSTBytes.Inc(batchSummary.SSTDataSize) + }) if err != nil { sip.MoveToDraining(errors.Wrap(err, "creating stream sst batcher")) return } - sip.rangeBatcher = newRangeKeyBatcher(ctx, evalCtx.Settings, db.KV()) + sip.rangeBatcher = newRangeKeyBatcher(ctx, evalCtx.Settings, db.KV(), func(batchSummary kvpb.BulkOpSummary) { + // OnFlush update the ingested logical and SST byte metrics. + sip.metrics.IngestedLogicalBytes.Inc(batchSummary.DataSize) + sip.metrics.IngestedSSTBytes.Inc(batchSummary.SSTDataSize) + }) // Start a poller that checks if the stream ingestion job has been signaled to // cutover. @@ -732,11 +749,6 @@ func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) erro // the current size of all buffered range keys. func (r *rangeKeyBatcher) buffer(rangeKV storage.MVCCRangeKeyValue) { r.curRangeKVBatch = append(r.curRangeKVBatch, rangeKV) - r.dataSize += rangeKV.RangeKey.EncodedSize() + len(rangeKV.Value) -} - -func (r *rangeKeyBatcher) size() int { - return r.dataSize } // Flush all the range keys buffered so far into storage as an SST. @@ -767,6 +779,7 @@ func (r *rangeKeyBatcher) flush(ctx context.Context) error { if rangeKeyVal.RangeKey.Timestamp.Less(r.minTimestamp) { r.minTimestamp = rangeKeyVal.RangeKey.Timestamp } + r.batchSummary.DataSize += int64(rangeKeyVal.RangeKey.EncodedSize() + len(rangeKeyVal.Value)) } // Finish the current batch. @@ -778,7 +791,16 @@ func (r *rangeKeyBatcher) flush(ctx context.Context) error { false /* disallowConflicts */, false, /* disallowShadowing */ hlc.Timestamp{}, nil /* stats */, false, /* ingestAsWrites */ r.db.Clock().Now()) - return err + if err != nil { + return err + } + r.batchSummary.SSTDataSize += int64(len(r.rangeKeySSTFile.Data())) + + if r.onFlush != nil { + r.onFlush(r.batchSummary) + } + + return nil } // Reset all the states inside the batcher and needs to called after flush @@ -789,7 +811,7 @@ func (r *rangeKeyBatcher) reset() { } r.rangeKeySSTFile.Reset() r.minTimestamp = hlc.MaxTimestamp - r.dataSize = 0 + r.batchSummary.Reset() r.curRangeKVBatch = r.curRangeKVBatch[:0] } @@ -798,9 +820,11 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { defer sp.Finish() flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)} + + // First process the point KVs. + // // Ensure that the current batch is sorted. sort.Sort(sip.curKVBatch) - totalSize := 0 minBatchMVCCTimestamp := hlc.MaxTimestamp for _, keyVal := range sip.curKVBatch { if err := sip.batcher.AddMVCCKey(ctx, keyVal.Key, keyVal.Value); err != nil { @@ -809,40 +833,33 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { if keyVal.Key.Timestamp.Less(minBatchMVCCTimestamp) { minBatchMVCCTimestamp = keyVal.Key.Timestamp } - totalSize += len(keyVal.Key.Key) + len(keyVal.Value) } - if sip.rangeBatcher.size() > 0 { - totalSize += sip.rangeBatcher.size() - if sip.rangeBatcher.minTimestamp.Less(minBatchMVCCTimestamp) { - minBatchMVCCTimestamp = sip.rangeBatcher.minTimestamp + preFlushTime := timeutil.Now() + if len(sip.curKVBatch) > 0 { + if err := sip.batcher.Flush(ctx); err != nil { + return nil, errors.Wrap(err, "flushing sst batcher") } } - if len(sip.curKVBatch) > 0 || sip.rangeBatcher.size() > 0 { - preFlushTime := timeutil.Now() - defer func() { - sip.metrics.FlushHistNanos.RecordValue(timeutil.Since(preFlushTime).Nanoseconds()) - sip.metrics.CommitLatency.RecordValue(timeutil.Since(minBatchMVCCTimestamp.GoTime()).Nanoseconds()) - sip.metrics.Flushes.Inc(1) - sip.metrics.IngestedBytes.Inc(int64(totalSize)) - sip.metrics.SSTBytes.Inc(sip.batcher.GetSummary().SSTDataSize) - sip.metrics.IngestedEvents.Inc(int64(len(sip.curKVBatch))) - sip.metrics.IngestedEvents.Inc(int64(sip.rangeBatcher.size())) - }() - if len(sip.curKVBatch) > 0 { - if err := sip.batcher.Flush(ctx); err != nil { - return nil, errors.Wrap(err, "flushing sst batcher") - } + // Now process the range KVs. + if len(sip.rangeBatcher.curRangeKVBatch) > 0 { + if sip.rangeBatcher.minTimestamp.Less(minBatchMVCCTimestamp) { + minBatchMVCCTimestamp = sip.rangeBatcher.minTimestamp } - if sip.rangeBatcher.size() > 0 { - if err := sip.rangeBatcher.flush(ctx); err != nil { - return nil, errors.Wrap(err, "flushing range key sst") - } + if err := sip.rangeBatcher.flush(ctx); err != nil { + return nil, errors.Wrap(err, "flushing range key sst") } } + // Update the flush metrics. + sip.metrics.FlushHistNanos.RecordValue(timeutil.Since(preFlushTime).Nanoseconds()) + sip.metrics.CommitLatency.RecordValue(timeutil.Since(minBatchMVCCTimestamp.GoTime()).Nanoseconds()) + sip.metrics.Flushes.Inc(1) + sip.metrics.IngestedEvents.Inc(int64(len(sip.curKVBatch))) + sip.metrics.IngestedEvents.Inc(int64(len(sip.rangeBatcher.curRangeKVBatch))) + // Go through buffered checkpoint events, and put them on the channel to be // emitted to the downstream frontier processor. sip.frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) span.OpResult { diff --git a/pkg/kv/bulk/buffering_adder.go b/pkg/kv/bulk/buffering_adder.go index 97858f1058ab..eb64c7a45e78 100644 --- a/pkg/kv/bulk/buffering_adder.go +++ b/pkg/kv/bulk/buffering_adder.go @@ -49,6 +49,11 @@ type BufferingAdder struct { // currently buffered kvs. curBuf kvBuf + // curBufSummary is a summary of the currently buffered kvs that is populated + // as and when the kvs are ingested by the underlying sink. This field is + // cleared after each flush of the BufferingAdder. + curBufSummary kvpb.BulkOpSummary + sorted bool initialSplits int @@ -110,8 +115,16 @@ func MakeBulkAdder( sorted: true, initialSplits: opts.InitialSplitsIfUnordered, lastFlush: timeutil.Now(), + curBufSummary: kvpb.BulkOpSummary{}, } + // Register a callback with the underlying sink to accumulate the summary for + // the current buffered KVs. The curBufSummary is reset when the buffering + // adder, and therefore the underlying sink, has completed ingested all the + // currently buffered kvs. + b.sink.mu.onFlush = func(batchSummary kvpb.BulkOpSummary) { + b.curBufSummary.Add(batchSummary) + } b.sink.mem.Mu = &syncutil.Mutex{} // At minimum a bulk adder needs enough space to store a buffer of // curBufferSize, and a subsequent SST of SSTSize in-memory. If the memory @@ -239,9 +252,10 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { if b.bufferedKeys() == 0 { if b.onFlush != nil { - b.onFlush(b.sink.GetBatchSummary()) + b.onFlush(b.curBufSummary) } b.lastFlush = timeutil.Now() + b.curBufSummary.Reset() return nil } if err := b.sink.Reset(ctx); err != nil { @@ -257,7 +271,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { before = b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats) before.Combine(&b.sink.mu.totalStats) before.Combine(&b.sink.currentStats) - beforeSize = b.sink.mu.totalRows.DataSize + beforeSize = b.sink.mu.totalBulkOpSummary.DataSize b.sink.mu.Unlock() } @@ -309,7 +323,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { if log.V(3) && before != nil { b.sink.mu.Lock() - written := b.sink.mu.totalRows.DataSize - beforeSize + written := b.sink.mu.totalBulkOpSummary.DataSize - beforeSize afterStats := b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats) afterStats.Combine(&b.sink.mu.totalStats) afterStats.Combine(&b.sink.currentStats) @@ -349,9 +363,10 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { } if b.onFlush != nil { - b.onFlush(b.sink.GetBatchSummary()) + b.onFlush(b.curBufSummary) } b.curBuf.Reset() + b.curBufSummary.Reset() b.lastFlush = timeutil.Now() return nil } diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 8703fbe1164e..5dd6afe6490f 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -139,7 +139,7 @@ type SSTBatcher struct { disableScatters bool // The rest of the fields accumulated state as opposed to configuration. Some, - // like totalRows, are accumulated _across_ batches and are not reset between + // like totalBulkOpSummary, are accumulated _across_ batches and are not reset between // batches when Reset() is called. // // currentStats contain the stats since the last flush. After each flush, @@ -147,6 +147,19 @@ type SSTBatcher struct { // totalStats. currentStats bulkpb.IngestionPerformanceStats + // Summary of the rows written in the current batch. + // + // NB: It is not advisable to use this field directly to consume per batch + // summaries. This field is reset when the batcher is reset after each flush. + // Under certain conditions the batcher can internally trigger a flush and a + // reset while adding KVs i.e. without the caller explicilty calling Flush. + // Furthermore, the batcher may reset the row counter before the async flush + // has actually completed. + // + // If the caller requires a BulkOpSummary after each flush, they must register + // a callback `onFlush`. + batchRowCounter storage.RowCounter + // span tracks the total span into which this batcher has flushed. It is // only maintained if log.V(1), so if vmodule is upped mid-ingest it may be // incomplete. @@ -172,16 +185,15 @@ type SSTBatcher struct { // stores on-the-fly stats for the SST if disallowShadowingBelow is set. ms enginepb.MVCCStats - // rows written in the current batch. - rowCounter storage.RowCounter asyncAddSSTs ctxgroup.Group mu struct { syncutil.Mutex - maxWriteTS hlc.Timestamp - totalRows kvpb.BulkOpSummary + maxWriteTS hlc.Timestamp + totalBulkOpSummary kvpb.BulkOpSummary + // totalStats contain the stats over the entire lifetime of the SST Batcher. // As rows accumulate, the corresponding stats initially start out in // currentStats. After each flush, the contents of currentStats are combined @@ -189,6 +201,10 @@ type SSTBatcher struct { totalStats bulkpb.IngestionPerformanceStats lastFlush time.Time tracingSpan *tracing.Span + + // onFlush is the callback called after the current batch has been + // successfully ingested. + onFlush func(summary kvpb.BulkOpSummary) } } @@ -228,8 +244,10 @@ func MakeStreamSSTBatcher( settings *cluster.Settings, mem mon.BoundAccount, sendLimiter limit.ConcurrentRequestLimiter, + onFlush func(summary kvpb.BulkOpSummary), ) (*SSTBatcher, error) { b := &SSTBatcher{db: db, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter} + b.SetOnFlush(onFlush) err := b.Reset(ctx) return b, err } @@ -273,6 +291,13 @@ func (b *SSTBatcher) updateMVCCStats(key storage.MVCCKey, value []byte) { b.ms.ValCount++ } +// SetOnFlush sets a callback to run after the SSTBatcher flushes. +func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.onFlush = onFlush +} + // AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed). // This is only for callers that want to control the timestamp on individual // keys -- like RESTORE where we want the restored data to look like the backup. @@ -322,7 +347,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value b.batchEndKey = append(b.batchEndKey[:0], key.Key...) b.batchEndValue = append(b.batchEndValue[:0], value...) - if err := b.rowCounter.Count(key.Key); err != nil { + if err := b.batchRowCounter.Count(key.Key); err != nil { return err } @@ -357,7 +382,7 @@ func (b *SSTBatcher) Reset(ctx context.Context) error { b.batchTS = hlc.Timestamp{} } - b.rowCounter.BulkOpSummary.Reset() + b.batchRowCounter.BulkOpSummary.Reset() if b.currentStats.SendWaitByStore == nil { b.currentStats.SendWaitByStore = make(map[roachpb.StoreID]time.Duration) @@ -572,11 +597,12 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error { b.ms.LastUpdateNanos = timeutil.Now().UnixNano() } + // Take a copy of the fields that can be captured by the call to addSSTable + // below, that could occur asynchronously. stats := b.ms - summary := b.rowCounter.BulkOpSummary data := b.sstFile.Data() batchTS := b.batchTS - + currentBatchSummary := b.batchRowCounter.BulkOpSummary res, err := b.limiter.Begin(ctx) if err != nil { return err @@ -622,20 +648,37 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error { return err } + // Now that we have completed ingesting the SSTables we take a lock and + // update the statistics on the SSTBatcher. b.mu.Lock() defer b.mu.Unlock() - summary.DataSize += int64(size) - summary.SSTDataSize += int64(len(data)) + + // Update the statistics associated with the current batch. We do this on + // our captured copy of the currentBatchSummary instead of the + // b.batchRowCounter since the caller may have reset the batcher by the time + // this flush completes. This is possible in the case of an async flush. + currentBatchSummary.DataSize += int64(size) + currentBatchSummary.SSTDataSize += int64(len(data)) + + // Check if the caller has registered a callback to consume a per batch + // summary. + if b.mu.onFlush != nil { + b.mu.onFlush(currentBatchSummary) + } + currentBatchStatsCopy.LogicalDataSize += int64(size) currentBatchStatsCopy.SSTDataSize += int64(len(data)) - b.mu.totalRows.Add(summary) - afterFlush := timeutil.Now() currentBatchStatsCopy.BatchWait += afterFlush.Sub(beforeFlush) currentBatchStatsCopy.Duration = afterFlush.Sub(b.mu.lastFlush) currentBatchStatsCopy.LastFlushTime = hlc.Timestamp{WallTime: b.mu.lastFlush.UnixNano()} currentBatchStatsCopy.CurrentFlushTime = hlc.Timestamp{WallTime: afterFlush.UnixNano()} + + // Combine the statistics of this batch into the running aggregate + // maintained by the SSTBatcher. + b.mu.totalBulkOpSummary.Add(currentBatchSummary) b.mu.totalStats.Combine(currentBatchStatsCopy) + b.mu.lastFlush = afterFlush if b.mu.tracingSpan != nil { b.mu.tracingSpan.RecordStructured(currentBatchStatsCopy) @@ -663,16 +706,11 @@ func (b *SSTBatcher) Close(ctx context.Context) { b.mem.Close(ctx) } -// GetBatchSummary returns this batcher's total added rows/bytes/etc. -func (b *SSTBatcher) GetBatchSummary() kvpb.BulkOpSummary { - return b.rowCounter.BulkOpSummary -} - // GetSummary returns this batcher's total added rows/bytes/etc. func (b *SSTBatcher) GetSummary() kvpb.BulkOpSummary { b.mu.Lock() defer b.mu.Unlock() - return b.mu.totalRows + return b.mu.totalBulkOpSummary } type sstSpan struct { diff --git a/pkg/storage/row_counter.go b/pkg/storage/row_counter.go index 7b6c5c2a0b40..3bd445337e19 100644 --- a/pkg/storage/row_counter.go +++ b/pkg/storage/row_counter.go @@ -19,7 +19,7 @@ import ( ) // RowCounter is a helper that counts how many distinct rows appear in the KVs -// that is is shown via `Count`. Note: the `DataSize` field of the BulkOpSummary +// that is shown via `Count`. Note: the `DataSize` field of the BulkOpSummary // is *not* populated by this and should be set separately. type RowCounter struct { kvpb.BulkOpSummary diff --git a/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/crossClusterReplication.tsx b/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/crossClusterReplication.tsx index 3c02089335a5..72d0117cfbed 100644 --- a/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/crossClusterReplication.tsx +++ b/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/crossClusterReplication.tsx @@ -36,9 +36,9 @@ export default function (props: GraphDashboardProps) { - + - +