From 3d635bb2db7e1945f67f797648394c76de85d110 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 9 Feb 2023 17:52:56 +0000 Subject: [PATCH 1/4] workload: support import parallelism for multi-tenant cluster All nodes now produce entries in the sql_instances table, so we are able to use that table to get a node count even when running against a secondary tenant. I've kept the original query in place to support older clusters. Fixes #78968 Release note: None --- pkg/ccl/workloadccl/BUILD.bazel | 1 + pkg/ccl/workloadccl/fixture.go | 42 ++++++++++++++++++----------- pkg/ccl/workloadccl/fixture_test.go | 40 +++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 16 deletions(-) diff --git a/pkg/ccl/workloadccl/BUILD.bazel b/pkg/ccl/workloadccl/BUILD.bazel index 67437276d689..766bbbe819d3 100644 --- a/pkg/ccl/workloadccl/BUILD.bazel +++ b/pkg/ccl/workloadccl/BUILD.bazel @@ -52,6 +52,7 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util/envutil", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/timeutil", "//pkg/workload", diff --git a/pkg/ccl/workloadccl/fixture.go b/pkg/ccl/workloadccl/fixture.go index 45211aeb55ce..e8998722702f 100644 --- a/pkg/ccl/workloadccl/fixture.go +++ b/pkg/ccl/workloadccl/fixture.go @@ -206,10 +206,6 @@ func csvServerPaths( return paths } -// Specify an explicit empty prefix for crdb_internal to avoid an error if -// the database we're connected to does not exist. -const numNodesQuery = `SELECT count(node_id) FROM "".crdb_internal.gossip_liveness` - // MakeFixture regenerates a fixture, storing it to GCS. It is expected that the // generator will have had Configure called on it. // @@ -330,6 +326,29 @@ func (l ImportDataLoader) InitialDataLoad( return bytes, nil } +// Specify an explicit empty prefix for crdb_internal to avoid an error if +// the database we're connected to does not exist. +const numNodesQuery = `SELECT count(node_id) FROM "".crdb_internal.gossip_liveness` +const numNodesQuerySQLInstances = `SELECT count(1) FROM system.sql_instances WHERE addr IS NOT NULL` + +func getNodeCount(ctx context.Context, sqlDB *gosql.DB) (int, error) { + var numNodes int + if err := sqlDB.QueryRow(numNodesQuery).Scan(&numNodes); err != nil { + // If the query is unsupported because we're in + // multi-tenant mode, use the sql_instances table. + if !strings.Contains(err.Error(), errorutil.UnsupportedWithMultiTenancyMessage) { + return 0, err + + } + } else { + return numNodes, nil + } + if err := sqlDB.QueryRow(numNodesQuerySQLInstances).Scan(&numNodes); err != nil { + return 0, err + } + return numNodes, nil +} + // ImportFixture works like MakeFixture, but instead of stopping halfway or // writing a backup to cloud storage, it finishes ingesting the data. // It also includes the option to inject pre-calculated table statistics if @@ -349,18 +368,9 @@ func ImportFixture( ) } - var numNodes int - if err := sqlDB.QueryRow(numNodesQuery).Scan(&numNodes); err != nil { - if strings.Contains(err.Error(), errorutil.UnsupportedWithMultiTenancyMessage) { - // If the query is unsupported because we're in multi-tenant mode. Assume - // that the cluster has 1 node for the purposes of running CSV servers. - // Tenants won't use DistSQL to parallelize IMPORT across SQL pods. Doing - // something better here is tracked in: - // https://github.com/cockroachdb/cockroach/issues/78968 - numNodes = 1 - } else { - return 0, err - } + numNodes, err := getNodeCount(ctx, sqlDB) + if err != nil { + return 0, err } var bytesAtomic int64 diff --git a/pkg/ccl/workloadccl/fixture_test.go b/pkg/ccl/workloadccl/fixture_test.go index e87b9d35a9c8..c2812363ce9f 100644 --- a/pkg/ccl/workloadccl/fixture_test.go +++ b/pkg/ccl/workloadccl/fixture_test.go @@ -11,8 +11,10 @@ package workloadccl_test import ( "context" "fmt" + "math" "net/http/httptest" "strconv" + "strings" "testing" "time" @@ -24,8 +26,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/spf13/pflag" @@ -250,3 +254,39 @@ func TestImportFixtureCSVServer(t *testing.T) { sqlDB.CheckQueryResults(t, `SELECT count(*) FROM d.fx`, [][]string{{strconv.Itoa(fixtureTestGenRows)}}) } + +func TestImportFixtureNodeCount(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + const ( + nodes = 3 + filesPerNode = 1 + ) + + tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(db) + + gen := makeTestWorkload() + flag := fmt.Sprintf("--val=%d", timeutil.Now().UnixNano()) + require.NoError(t, gen.Flags().Parse([]string{flag})) + + sqlDB.Exec(t, "CREATE DATABASE ingest") + _, err := workloadccl.ImportFixture( + ctx, db, gen, "ingest", filesPerNode, false, /* injectStats */ + ``, /* csvServer */ + ) + require.NoError(t, err) + + var desc string + sqlDB.QueryRow(t, "SELECT description FROM crdb_internal.jobs WHERE job_type = 'IMPORT'").Scan(&desc) + + expectedFiles := math.Ceil(float64(fixtureTestGenRows) / float64(nodes)) + actualFiles := strings.Count(desc, "workload://") + require.Equal(t, int(expectedFiles), actualFiles) +} From 50bfa4ac861fbfa7ac4da344b8331ac4d8933fa9 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Mon, 20 Feb 2023 13:17:43 +0000 Subject: [PATCH 2/4] workloadccl: add missing log scope calls Release note: None --- pkg/ccl/workloadccl/fixture_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/ccl/workloadccl/fixture_test.go b/pkg/ccl/workloadccl/fixture_test.go index c2812363ce9f..c5f1e6a9bc34 100644 --- a/pkg/ccl/workloadccl/fixture_test.go +++ b/pkg/ccl/workloadccl/fixture_test.go @@ -87,6 +87,8 @@ func (g fixtureTestGen) Tables() []workload.Table { func TestFixture(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() // This test is brittle and requires manual intervention to run. @@ -169,6 +171,7 @@ func TestFixture(t *testing.T) { func TestImportFixture(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() @@ -224,6 +227,8 @@ func TestImportFixture(t *testing.T) { func TestImportFixtureCSVServer(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() ts := httptest.NewServer(workload.CSVMux(workload.Registered())) defer ts.Close() From 5f66471ca3232de0540326abddab8585a79ff931 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Mon, 20 Feb 2023 13:20:20 +0000 Subject: [PATCH 3/4] workloadccl: enable tenant testing I've run these under stress for a few hundred iterations and haven't encountered a problem. Release note: none --- pkg/ccl/workloadccl/fixture_test.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pkg/ccl/workloadccl/fixture_test.go b/pkg/ccl/workloadccl/fixture_test.go index c5f1e6a9bc34..51faf9af9a4c 100644 --- a/pkg/ccl/workloadccl/fixture_test.go +++ b/pkg/ccl/workloadccl/fixture_test.go @@ -182,12 +182,7 @@ func TestImportFixture(t *testing.T) { stats.DefaultRefreshInterval = time.Millisecond stats.DefaultAsOfTime = 10 * time.Millisecond - s, db, _ := serverutils.StartServer(t, - // Need to disable the test tenant until we have a fix for #75449. - base.TestServerArgs{ - DisableDefaultTestTenant: true, - }, - ) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) @@ -236,8 +231,6 @@ func TestImportFixtureCSVServer(t *testing.T) { s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ UseDatabase: `d`, - // Test fails within a test tenant due to #75449. - DisableDefaultTestTenant: true, }, ) defer s.Stopper().Stop(ctx) From 9f979eab8ee14ca3aa7053c8f35805e4f9a22898 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Thu, 16 Feb 2023 12:57:08 -0500 Subject: [PATCH 4/4] bulk, dashboards: fix c2c metrics and dashboards This change fixes a few bugs around c2c metric collection and display: 1) This change fixes the unit on the c2c replication dashboard for the logical and sst byte metrics. 2) Previously, the IngestedEvents was the number of point keys in the current batch and the size of rangekeys in the batch. This has been corrected to be the sum of the lenght of point keys and rangekeys in the batch. 3) Previously, we were adding the running sum of the SSTDataSize to the metric tracking SST bytes, instead of only bytes ingested as a part of the current batch. This is incorrect and has been changed to only capture the number of bytes that have been ingested as part of the latest flush. 4) Callers of the SSTBatcher were making an incorrect assumption about the per batch BulkOpSummary that was exposed via the GetBatchSummary method. This per batch summary is reset whenever the SSTBatcher is reset. This reset is usually performed by the caller after a manual call to Flush. However, the batcher itself may decide to flush and reset while adding keys to the current batch thereby resetting the BulkOpSummary. So when the caller fetches the summary after the manual flush it would only contain a partial set of results that had been aggregated since the last internal flush. This change warns users against directly accessing the per batch BulkOpSummary and instead registering an OnFlush callback that is called on every successful flush of the batcher. The stream ingestion processor now updates is logical and sst bytes onFlush. Informs: #97224 Release note: None --- pkg/ccl/streamingccl/streamingest/metrics.go | 16 ++-- .../stream_ingestion_processor.go | 95 +++++++++++-------- pkg/kv/bulk/buffering_adder.go | 23 ++++- pkg/kv/bulk/sst_batcher.go | 76 +++++++++++---- pkg/storage/row_counter.go | 2 +- .../dashboards/crossClusterReplication.tsx | 8 +- 6 files changed, 145 insertions(+), 75 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/metrics.go b/pkg/ccl/streamingccl/streamingest/metrics.go index 851366f6c457..69cb3fe2ac8a 100644 --- a/pkg/ccl/streamingccl/streamingest/metrics.go +++ b/pkg/ccl/streamingccl/streamingest/metrics.go @@ -133,8 +133,8 @@ var ( // Metrics are for production monitoring of stream ingestion jobs. type Metrics struct { IngestedEvents *metric.Counter - IngestedBytes *metric.Counter - SSTBytes *metric.Counter + IngestedLogicalBytes *metric.Counter + IngestedSSTBytes *metric.Counter Flushes *metric.Counter JobProgressUpdates *metric.Counter ResolvedEvents *metric.Counter @@ -156,12 +156,12 @@ func (*Metrics) MetricStruct() {} // MakeMetrics makes the metrics for stream ingestion job monitoring. func MakeMetrics(histogramWindow time.Duration) metric.Struct { m := &Metrics{ - IngestedEvents: metric.NewCounter(metaReplicationEventsIngested), - IngestedBytes: metric.NewCounter(metaReplicationIngestedBytes), - SSTBytes: metric.NewCounter(metaReplicationSSTBytes), - Flushes: metric.NewCounter(metaReplicationFlushes), - ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested), - JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates), + IngestedEvents: metric.NewCounter(metaReplicationEventsIngested), + IngestedLogicalBytes: metric.NewCounter(metaReplicationIngestedBytes), + IngestedSSTBytes: metric.NewCounter(metaReplicationSSTBytes), + Flushes: metric.NewCounter(metaReplicationFlushes), + ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested), + JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates), FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{ Metadata: metaReplicationFlushHistNanos, Duration: histogramWindow, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index cc24ff23367e..c75c55ea74a9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -97,16 +97,24 @@ type rangeKeyBatcher struct { // Minimum timestamp in the current batch. Used for metrics purpose. minTimestamp hlc.Timestamp - // Data size of the current batch. - dataSize int + + // batchSummary is the BulkOpSummary for the current batch of rangekeys. + batchSummary kvpb.BulkOpSummary + + // onFlush is the callback called after the current batch has been + // successfully ingested. + onFlush func(kvpb.BulkOpSummary) } -func newRangeKeyBatcher(ctx context.Context, cs *cluster.Settings, db *kv.DB) *rangeKeyBatcher { +func newRangeKeyBatcher( + ctx context.Context, cs *cluster.Settings, db *kv.DB, onFlush func(summary kvpb.BulkOpSummary), +) *rangeKeyBatcher { batcher := &rangeKeyBatcher{ db: db, minTimestamp: hlc.MaxTimestamp, - dataSize: 0, + batchSummary: kvpb.BulkOpSummary{}, rangeKeySSTFile: &storage.MemFile{}, + onFlush: onFlush, } batcher.rangeKeySSTWriterMaker = func() *storage.SSTWriter { w := storage.MakeIngestionSSTWriter(ctx, cs, batcher.rangeKeySSTFile) @@ -282,14 +290,23 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { evalCtx := sip.FlowCtx.EvalCtx db := sip.FlowCtx.Cfg.DB var err error - sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db.KV(), evalCtx.Settings, - sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), sip.flowCtx.Cfg.BulkSenderLimiter) + sip.batcher, err = bulk.MakeStreamSSTBatcher( + ctx, db.KV(), evalCtx.Settings, sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), + sip.flowCtx.Cfg.BulkSenderLimiter, func(batchSummary kvpb.BulkOpSummary) { + // OnFlush update the ingested logical and SST byte metrics. + sip.metrics.IngestedLogicalBytes.Inc(batchSummary.DataSize) + sip.metrics.IngestedSSTBytes.Inc(batchSummary.SSTDataSize) + }) if err != nil { sip.MoveToDraining(errors.Wrap(err, "creating stream sst batcher")) return } - sip.rangeBatcher = newRangeKeyBatcher(ctx, evalCtx.Settings, db.KV()) + sip.rangeBatcher = newRangeKeyBatcher(ctx, evalCtx.Settings, db.KV(), func(batchSummary kvpb.BulkOpSummary) { + // OnFlush update the ingested logical and SST byte metrics. + sip.metrics.IngestedLogicalBytes.Inc(batchSummary.DataSize) + sip.metrics.IngestedSSTBytes.Inc(batchSummary.SSTDataSize) + }) // Start a poller that checks if the stream ingestion job has been signaled to // cutover. @@ -732,11 +749,6 @@ func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) erro // the current size of all buffered range keys. func (r *rangeKeyBatcher) buffer(rangeKV storage.MVCCRangeKeyValue) { r.curRangeKVBatch = append(r.curRangeKVBatch, rangeKV) - r.dataSize += rangeKV.RangeKey.EncodedSize() + len(rangeKV.Value) -} - -func (r *rangeKeyBatcher) size() int { - return r.dataSize } // Flush all the range keys buffered so far into storage as an SST. @@ -767,6 +779,7 @@ func (r *rangeKeyBatcher) flush(ctx context.Context) error { if rangeKeyVal.RangeKey.Timestamp.Less(r.minTimestamp) { r.minTimestamp = rangeKeyVal.RangeKey.Timestamp } + r.batchSummary.DataSize += int64(rangeKeyVal.RangeKey.EncodedSize() + len(rangeKeyVal.Value)) } // Finish the current batch. @@ -778,7 +791,16 @@ func (r *rangeKeyBatcher) flush(ctx context.Context) error { false /* disallowConflicts */, false, /* disallowShadowing */ hlc.Timestamp{}, nil /* stats */, false, /* ingestAsWrites */ r.db.Clock().Now()) - return err + if err != nil { + return err + } + r.batchSummary.SSTDataSize += int64(len(r.rangeKeySSTFile.Data())) + + if r.onFlush != nil { + r.onFlush(r.batchSummary) + } + + return nil } // Reset all the states inside the batcher and needs to called after flush @@ -789,7 +811,7 @@ func (r *rangeKeyBatcher) reset() { } r.rangeKeySSTFile.Reset() r.minTimestamp = hlc.MaxTimestamp - r.dataSize = 0 + r.batchSummary.Reset() r.curRangeKVBatch = r.curRangeKVBatch[:0] } @@ -798,9 +820,11 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { defer sp.Finish() flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)} + + // First process the point KVs. + // // Ensure that the current batch is sorted. sort.Sort(sip.curKVBatch) - totalSize := 0 minBatchMVCCTimestamp := hlc.MaxTimestamp for _, keyVal := range sip.curKVBatch { if err := sip.batcher.AddMVCCKey(ctx, keyVal.Key, keyVal.Value); err != nil { @@ -809,40 +833,33 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { if keyVal.Key.Timestamp.Less(minBatchMVCCTimestamp) { minBatchMVCCTimestamp = keyVal.Key.Timestamp } - totalSize += len(keyVal.Key.Key) + len(keyVal.Value) } - if sip.rangeBatcher.size() > 0 { - totalSize += sip.rangeBatcher.size() - if sip.rangeBatcher.minTimestamp.Less(minBatchMVCCTimestamp) { - minBatchMVCCTimestamp = sip.rangeBatcher.minTimestamp + preFlushTime := timeutil.Now() + if len(sip.curKVBatch) > 0 { + if err := sip.batcher.Flush(ctx); err != nil { + return nil, errors.Wrap(err, "flushing sst batcher") } } - if len(sip.curKVBatch) > 0 || sip.rangeBatcher.size() > 0 { - preFlushTime := timeutil.Now() - defer func() { - sip.metrics.FlushHistNanos.RecordValue(timeutil.Since(preFlushTime).Nanoseconds()) - sip.metrics.CommitLatency.RecordValue(timeutil.Since(minBatchMVCCTimestamp.GoTime()).Nanoseconds()) - sip.metrics.Flushes.Inc(1) - sip.metrics.IngestedBytes.Inc(int64(totalSize)) - sip.metrics.SSTBytes.Inc(sip.batcher.GetSummary().SSTDataSize) - sip.metrics.IngestedEvents.Inc(int64(len(sip.curKVBatch))) - sip.metrics.IngestedEvents.Inc(int64(sip.rangeBatcher.size())) - }() - if len(sip.curKVBatch) > 0 { - if err := sip.batcher.Flush(ctx); err != nil { - return nil, errors.Wrap(err, "flushing sst batcher") - } + // Now process the range KVs. + if len(sip.rangeBatcher.curRangeKVBatch) > 0 { + if sip.rangeBatcher.minTimestamp.Less(minBatchMVCCTimestamp) { + minBatchMVCCTimestamp = sip.rangeBatcher.minTimestamp } - if sip.rangeBatcher.size() > 0 { - if err := sip.rangeBatcher.flush(ctx); err != nil { - return nil, errors.Wrap(err, "flushing range key sst") - } + if err := sip.rangeBatcher.flush(ctx); err != nil { + return nil, errors.Wrap(err, "flushing range key sst") } } + // Update the flush metrics. + sip.metrics.FlushHistNanos.RecordValue(timeutil.Since(preFlushTime).Nanoseconds()) + sip.metrics.CommitLatency.RecordValue(timeutil.Since(minBatchMVCCTimestamp.GoTime()).Nanoseconds()) + sip.metrics.Flushes.Inc(1) + sip.metrics.IngestedEvents.Inc(int64(len(sip.curKVBatch))) + sip.metrics.IngestedEvents.Inc(int64(len(sip.rangeBatcher.curRangeKVBatch))) + // Go through buffered checkpoint events, and put them on the channel to be // emitted to the downstream frontier processor. sip.frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) span.OpResult { diff --git a/pkg/kv/bulk/buffering_adder.go b/pkg/kv/bulk/buffering_adder.go index 97858f1058ab..eb64c7a45e78 100644 --- a/pkg/kv/bulk/buffering_adder.go +++ b/pkg/kv/bulk/buffering_adder.go @@ -49,6 +49,11 @@ type BufferingAdder struct { // currently buffered kvs. curBuf kvBuf + // curBufSummary is a summary of the currently buffered kvs that is populated + // as and when the kvs are ingested by the underlying sink. This field is + // cleared after each flush of the BufferingAdder. + curBufSummary kvpb.BulkOpSummary + sorted bool initialSplits int @@ -110,8 +115,16 @@ func MakeBulkAdder( sorted: true, initialSplits: opts.InitialSplitsIfUnordered, lastFlush: timeutil.Now(), + curBufSummary: kvpb.BulkOpSummary{}, } + // Register a callback with the underlying sink to accumulate the summary for + // the current buffered KVs. The curBufSummary is reset when the buffering + // adder, and therefore the underlying sink, has completed ingested all the + // currently buffered kvs. + b.sink.mu.onFlush = func(batchSummary kvpb.BulkOpSummary) { + b.curBufSummary.Add(batchSummary) + } b.sink.mem.Mu = &syncutil.Mutex{} // At minimum a bulk adder needs enough space to store a buffer of // curBufferSize, and a subsequent SST of SSTSize in-memory. If the memory @@ -239,9 +252,10 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { if b.bufferedKeys() == 0 { if b.onFlush != nil { - b.onFlush(b.sink.GetBatchSummary()) + b.onFlush(b.curBufSummary) } b.lastFlush = timeutil.Now() + b.curBufSummary.Reset() return nil } if err := b.sink.Reset(ctx); err != nil { @@ -257,7 +271,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { before = b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats) before.Combine(&b.sink.mu.totalStats) before.Combine(&b.sink.currentStats) - beforeSize = b.sink.mu.totalRows.DataSize + beforeSize = b.sink.mu.totalBulkOpSummary.DataSize b.sink.mu.Unlock() } @@ -309,7 +323,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { if log.V(3) && before != nil { b.sink.mu.Lock() - written := b.sink.mu.totalRows.DataSize - beforeSize + written := b.sink.mu.totalBulkOpSummary.DataSize - beforeSize afterStats := b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats) afterStats.Combine(&b.sink.mu.totalStats) afterStats.Combine(&b.sink.currentStats) @@ -349,9 +363,10 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { } if b.onFlush != nil { - b.onFlush(b.sink.GetBatchSummary()) + b.onFlush(b.curBufSummary) } b.curBuf.Reset() + b.curBufSummary.Reset() b.lastFlush = timeutil.Now() return nil } diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 8703fbe1164e..5dd6afe6490f 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -139,7 +139,7 @@ type SSTBatcher struct { disableScatters bool // The rest of the fields accumulated state as opposed to configuration. Some, - // like totalRows, are accumulated _across_ batches and are not reset between + // like totalBulkOpSummary, are accumulated _across_ batches and are not reset between // batches when Reset() is called. // // currentStats contain the stats since the last flush. After each flush, @@ -147,6 +147,19 @@ type SSTBatcher struct { // totalStats. currentStats bulkpb.IngestionPerformanceStats + // Summary of the rows written in the current batch. + // + // NB: It is not advisable to use this field directly to consume per batch + // summaries. This field is reset when the batcher is reset after each flush. + // Under certain conditions the batcher can internally trigger a flush and a + // reset while adding KVs i.e. without the caller explicilty calling Flush. + // Furthermore, the batcher may reset the row counter before the async flush + // has actually completed. + // + // If the caller requires a BulkOpSummary after each flush, they must register + // a callback `onFlush`. + batchRowCounter storage.RowCounter + // span tracks the total span into which this batcher has flushed. It is // only maintained if log.V(1), so if vmodule is upped mid-ingest it may be // incomplete. @@ -172,16 +185,15 @@ type SSTBatcher struct { // stores on-the-fly stats for the SST if disallowShadowingBelow is set. ms enginepb.MVCCStats - // rows written in the current batch. - rowCounter storage.RowCounter asyncAddSSTs ctxgroup.Group mu struct { syncutil.Mutex - maxWriteTS hlc.Timestamp - totalRows kvpb.BulkOpSummary + maxWriteTS hlc.Timestamp + totalBulkOpSummary kvpb.BulkOpSummary + // totalStats contain the stats over the entire lifetime of the SST Batcher. // As rows accumulate, the corresponding stats initially start out in // currentStats. After each flush, the contents of currentStats are combined @@ -189,6 +201,10 @@ type SSTBatcher struct { totalStats bulkpb.IngestionPerformanceStats lastFlush time.Time tracingSpan *tracing.Span + + // onFlush is the callback called after the current batch has been + // successfully ingested. + onFlush func(summary kvpb.BulkOpSummary) } } @@ -228,8 +244,10 @@ func MakeStreamSSTBatcher( settings *cluster.Settings, mem mon.BoundAccount, sendLimiter limit.ConcurrentRequestLimiter, + onFlush func(summary kvpb.BulkOpSummary), ) (*SSTBatcher, error) { b := &SSTBatcher{db: db, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter} + b.SetOnFlush(onFlush) err := b.Reset(ctx) return b, err } @@ -273,6 +291,13 @@ func (b *SSTBatcher) updateMVCCStats(key storage.MVCCKey, value []byte) { b.ms.ValCount++ } +// SetOnFlush sets a callback to run after the SSTBatcher flushes. +func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.onFlush = onFlush +} + // AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed). // This is only for callers that want to control the timestamp on individual // keys -- like RESTORE where we want the restored data to look like the backup. @@ -322,7 +347,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value b.batchEndKey = append(b.batchEndKey[:0], key.Key...) b.batchEndValue = append(b.batchEndValue[:0], value...) - if err := b.rowCounter.Count(key.Key); err != nil { + if err := b.batchRowCounter.Count(key.Key); err != nil { return err } @@ -357,7 +382,7 @@ func (b *SSTBatcher) Reset(ctx context.Context) error { b.batchTS = hlc.Timestamp{} } - b.rowCounter.BulkOpSummary.Reset() + b.batchRowCounter.BulkOpSummary.Reset() if b.currentStats.SendWaitByStore == nil { b.currentStats.SendWaitByStore = make(map[roachpb.StoreID]time.Duration) @@ -572,11 +597,12 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error { b.ms.LastUpdateNanos = timeutil.Now().UnixNano() } + // Take a copy of the fields that can be captured by the call to addSSTable + // below, that could occur asynchronously. stats := b.ms - summary := b.rowCounter.BulkOpSummary data := b.sstFile.Data() batchTS := b.batchTS - + currentBatchSummary := b.batchRowCounter.BulkOpSummary res, err := b.limiter.Begin(ctx) if err != nil { return err @@ -622,20 +648,37 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error { return err } + // Now that we have completed ingesting the SSTables we take a lock and + // update the statistics on the SSTBatcher. b.mu.Lock() defer b.mu.Unlock() - summary.DataSize += int64(size) - summary.SSTDataSize += int64(len(data)) + + // Update the statistics associated with the current batch. We do this on + // our captured copy of the currentBatchSummary instead of the + // b.batchRowCounter since the caller may have reset the batcher by the time + // this flush completes. This is possible in the case of an async flush. + currentBatchSummary.DataSize += int64(size) + currentBatchSummary.SSTDataSize += int64(len(data)) + + // Check if the caller has registered a callback to consume a per batch + // summary. + if b.mu.onFlush != nil { + b.mu.onFlush(currentBatchSummary) + } + currentBatchStatsCopy.LogicalDataSize += int64(size) currentBatchStatsCopy.SSTDataSize += int64(len(data)) - b.mu.totalRows.Add(summary) - afterFlush := timeutil.Now() currentBatchStatsCopy.BatchWait += afterFlush.Sub(beforeFlush) currentBatchStatsCopy.Duration = afterFlush.Sub(b.mu.lastFlush) currentBatchStatsCopy.LastFlushTime = hlc.Timestamp{WallTime: b.mu.lastFlush.UnixNano()} currentBatchStatsCopy.CurrentFlushTime = hlc.Timestamp{WallTime: afterFlush.UnixNano()} + + // Combine the statistics of this batch into the running aggregate + // maintained by the SSTBatcher. + b.mu.totalBulkOpSummary.Add(currentBatchSummary) b.mu.totalStats.Combine(currentBatchStatsCopy) + b.mu.lastFlush = afterFlush if b.mu.tracingSpan != nil { b.mu.tracingSpan.RecordStructured(currentBatchStatsCopy) @@ -663,16 +706,11 @@ func (b *SSTBatcher) Close(ctx context.Context) { b.mem.Close(ctx) } -// GetBatchSummary returns this batcher's total added rows/bytes/etc. -func (b *SSTBatcher) GetBatchSummary() kvpb.BulkOpSummary { - return b.rowCounter.BulkOpSummary -} - // GetSummary returns this batcher's total added rows/bytes/etc. func (b *SSTBatcher) GetSummary() kvpb.BulkOpSummary { b.mu.Lock() defer b.mu.Unlock() - return b.mu.totalRows + return b.mu.totalBulkOpSummary } type sstSpan struct { diff --git a/pkg/storage/row_counter.go b/pkg/storage/row_counter.go index 7b6c5c2a0b40..3bd445337e19 100644 --- a/pkg/storage/row_counter.go +++ b/pkg/storage/row_counter.go @@ -19,7 +19,7 @@ import ( ) // RowCounter is a helper that counts how many distinct rows appear in the KVs -// that is is shown via `Count`. Note: the `DataSize` field of the BulkOpSummary +// that is shown via `Count`. Note: the `DataSize` field of the BulkOpSummary // is *not* populated by this and should be set separately. type RowCounter struct { kvpb.BulkOpSummary diff --git a/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/crossClusterReplication.tsx b/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/crossClusterReplication.tsx index 3c02089335a5..72d0117cfbed 100644 --- a/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/crossClusterReplication.tsx +++ b/pkg/ui/workspaces/db-console/src/views/cluster/containers/nodeGraphs/dashboards/crossClusterReplication.tsx @@ -36,9 +36,9 @@ export default function (props: GraphDashboardProps) { - + - +