Skip to content

Commit

Permalink
Merge #96904 #97248
Browse files Browse the repository at this point in the history
96904: workload: support import parallelism for multi-tenant cluster r=adityamaru a=stevendanna

All nodes now produce entries in the sql_instances table, so we are able to use that table to get a node count even when running against a secondary tenant.

I've kept the original query in place to support older clusters.

Fixes #78968

Release note: None

97248: bulk, dashboards: fix c2c metrics and dashboards r=msbutler a=adityamaru

This change fixes a few bugs around c2c metric collection
and display:

1) This change fixes the unit on the c2c replication dashboard
for the logical and sst byte metrics.

2) Previously, the IngestedEvents was the number of point keys
in the current batch and the size of rangekeys in the batch. This has
been corrected to be the sum of the number of point keys and rangekeys
in the batch.

3) Previously, we were adding the running sum of the SSTDataSize to
the metric tracking SST bytes, instead of only bytes ingested as a part of
the current batch. This is incorrect and has been changed to only capture
the number of bytes that have been ingested as part of the latest flush.

4) Callers of the SSTBatcher were making an incorrect assumption about
the per batch BulkOpSummary that was exposed via the GetBatchSummary method.
This per batch summary is reset whenever the SSTBatcher is reset. This reset
is usually performed by the caller after a manual call to Flush. However, the
batcher itself may decide to flush and reset while adding keys to the current
batch thereby resetting the BulkOpSummary. So when the caller fetches the summary
after the manual flush it would only contain a partial set of results that had
been aggregated since the last internal flush. This change warns users against
directly accessing the per batch BulkOpSummary and instead registering an OnFlush
callback that is called on every successful flush of the batcher.

The stream ingestion processor now updates is logical and sst bytes onFlush.

Informs: #97224

Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
3 people committed Feb 21, 2023
3 parents fb740d4 + 5f66471 + 9f979ea commit dd2749a
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 99 deletions.
16 changes: 8 additions & 8 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ var (
// Metrics are for production monitoring of stream ingestion jobs.
type Metrics struct {
IngestedEvents *metric.Counter
IngestedBytes *metric.Counter
SSTBytes *metric.Counter
IngestedLogicalBytes *metric.Counter
IngestedSSTBytes *metric.Counter
Flushes *metric.Counter
JobProgressUpdates *metric.Counter
ResolvedEvents *metric.Counter
Expand All @@ -156,12 +156,12 @@ func (*Metrics) MetricStruct() {}
// MakeMetrics makes the metrics for stream ingestion job monitoring.
func MakeMetrics(histogramWindow time.Duration) metric.Struct {
m := &Metrics{
IngestedEvents: metric.NewCounter(metaReplicationEventsIngested),
IngestedBytes: metric.NewCounter(metaReplicationIngestedBytes),
SSTBytes: metric.NewCounter(metaReplicationSSTBytes),
Flushes: metric.NewCounter(metaReplicationFlushes),
ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested),
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
IngestedEvents: metric.NewCounter(metaReplicationEventsIngested),
IngestedLogicalBytes: metric.NewCounter(metaReplicationIngestedBytes),
IngestedSSTBytes: metric.NewCounter(metaReplicationSSTBytes),
Flushes: metric.NewCounter(metaReplicationFlushes),
ResolvedEvents: metric.NewCounter(metaReplicationResolvedEventsIngested),
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaReplicationFlushHistNanos,
Duration: histogramWindow,
Expand Down
95 changes: 56 additions & 39 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,24 @@ type rangeKeyBatcher struct {

// Minimum timestamp in the current batch. Used for metrics purpose.
minTimestamp hlc.Timestamp
// Data size of the current batch.
dataSize int

// batchSummary is the BulkOpSummary for the current batch of rangekeys.
batchSummary kvpb.BulkOpSummary

// onFlush is the callback called after the current batch has been
// successfully ingested.
onFlush func(kvpb.BulkOpSummary)
}

func newRangeKeyBatcher(ctx context.Context, cs *cluster.Settings, db *kv.DB) *rangeKeyBatcher {
func newRangeKeyBatcher(
ctx context.Context, cs *cluster.Settings, db *kv.DB, onFlush func(summary kvpb.BulkOpSummary),
) *rangeKeyBatcher {
batcher := &rangeKeyBatcher{
db: db,
minTimestamp: hlc.MaxTimestamp,
dataSize: 0,
batchSummary: kvpb.BulkOpSummary{},
rangeKeySSTFile: &storage.MemFile{},
onFlush: onFlush,
}
batcher.rangeKeySSTWriterMaker = func() *storage.SSTWriter {
w := storage.MakeIngestionSSTWriter(ctx, cs, batcher.rangeKeySSTFile)
Expand Down Expand Up @@ -282,14 +290,23 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
evalCtx := sip.FlowCtx.EvalCtx
db := sip.FlowCtx.Cfg.DB
var err error
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db.KV(), evalCtx.Settings,
sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), sip.flowCtx.Cfg.BulkSenderLimiter)
sip.batcher, err = bulk.MakeStreamSSTBatcher(
ctx, db.KV(), evalCtx.Settings, sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
sip.flowCtx.Cfg.BulkSenderLimiter, func(batchSummary kvpb.BulkOpSummary) {
// OnFlush update the ingested logical and SST byte metrics.
sip.metrics.IngestedLogicalBytes.Inc(batchSummary.DataSize)
sip.metrics.IngestedSSTBytes.Inc(batchSummary.SSTDataSize)
})
if err != nil {
sip.MoveToDraining(errors.Wrap(err, "creating stream sst batcher"))
return
}

sip.rangeBatcher = newRangeKeyBatcher(ctx, evalCtx.Settings, db.KV())
sip.rangeBatcher = newRangeKeyBatcher(ctx, evalCtx.Settings, db.KV(), func(batchSummary kvpb.BulkOpSummary) {
// OnFlush update the ingested logical and SST byte metrics.
sip.metrics.IngestedLogicalBytes.Inc(batchSummary.DataSize)
sip.metrics.IngestedSSTBytes.Inc(batchSummary.SSTDataSize)
})

// Start a poller that checks if the stream ingestion job has been signaled to
// cutover.
Expand Down Expand Up @@ -732,11 +749,6 @@ func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) erro
// the current size of all buffered range keys.
func (r *rangeKeyBatcher) buffer(rangeKV storage.MVCCRangeKeyValue) {
r.curRangeKVBatch = append(r.curRangeKVBatch, rangeKV)
r.dataSize += rangeKV.RangeKey.EncodedSize() + len(rangeKV.Value)
}

func (r *rangeKeyBatcher) size() int {
return r.dataSize
}

// Flush all the range keys buffered so far into storage as an SST.
Expand Down Expand Up @@ -767,6 +779,7 @@ func (r *rangeKeyBatcher) flush(ctx context.Context) error {
if rangeKeyVal.RangeKey.Timestamp.Less(r.minTimestamp) {
r.minTimestamp = rangeKeyVal.RangeKey.Timestamp
}
r.batchSummary.DataSize += int64(rangeKeyVal.RangeKey.EncodedSize() + len(rangeKeyVal.Value))
}

// Finish the current batch.
Expand All @@ -778,7 +791,16 @@ func (r *rangeKeyBatcher) flush(ctx context.Context) error {
false /* disallowConflicts */, false, /* disallowShadowing */
hlc.Timestamp{}, nil /* stats */, false, /* ingestAsWrites */
r.db.Clock().Now())
return err
if err != nil {
return err
}
r.batchSummary.SSTDataSize += int64(len(r.rangeKeySSTFile.Data()))

if r.onFlush != nil {
r.onFlush(r.batchSummary)
}

return nil
}

// Reset all the states inside the batcher and needs to called after flush
Expand All @@ -789,7 +811,7 @@ func (r *rangeKeyBatcher) reset() {
}
r.rangeKeySSTFile.Reset()
r.minTimestamp = hlc.MaxTimestamp
r.dataSize = 0
r.batchSummary.Reset()
r.curRangeKVBatch = r.curRangeKVBatch[:0]
}

Expand All @@ -798,9 +820,11 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
defer sp.Finish()

flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)}

// First process the point KVs.
//
// Ensure that the current batch is sorted.
sort.Sort(sip.curKVBatch)
totalSize := 0
minBatchMVCCTimestamp := hlc.MaxTimestamp
for _, keyVal := range sip.curKVBatch {
if err := sip.batcher.AddMVCCKey(ctx, keyVal.Key, keyVal.Value); err != nil {
Expand All @@ -809,40 +833,33 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
if keyVal.Key.Timestamp.Less(minBatchMVCCTimestamp) {
minBatchMVCCTimestamp = keyVal.Key.Timestamp
}
totalSize += len(keyVal.Key.Key) + len(keyVal.Value)
}

if sip.rangeBatcher.size() > 0 {
totalSize += sip.rangeBatcher.size()
if sip.rangeBatcher.minTimestamp.Less(minBatchMVCCTimestamp) {
minBatchMVCCTimestamp = sip.rangeBatcher.minTimestamp
preFlushTime := timeutil.Now()
if len(sip.curKVBatch) > 0 {
if err := sip.batcher.Flush(ctx); err != nil {
return nil, errors.Wrap(err, "flushing sst batcher")
}
}

if len(sip.curKVBatch) > 0 || sip.rangeBatcher.size() > 0 {
preFlushTime := timeutil.Now()
defer func() {
sip.metrics.FlushHistNanos.RecordValue(timeutil.Since(preFlushTime).Nanoseconds())
sip.metrics.CommitLatency.RecordValue(timeutil.Since(minBatchMVCCTimestamp.GoTime()).Nanoseconds())
sip.metrics.Flushes.Inc(1)
sip.metrics.IngestedBytes.Inc(int64(totalSize))
sip.metrics.SSTBytes.Inc(sip.batcher.GetSummary().SSTDataSize)
sip.metrics.IngestedEvents.Inc(int64(len(sip.curKVBatch)))
sip.metrics.IngestedEvents.Inc(int64(sip.rangeBatcher.size()))
}()
if len(sip.curKVBatch) > 0 {
if err := sip.batcher.Flush(ctx); err != nil {
return nil, errors.Wrap(err, "flushing sst batcher")
}
// Now process the range KVs.
if len(sip.rangeBatcher.curRangeKVBatch) > 0 {
if sip.rangeBatcher.minTimestamp.Less(minBatchMVCCTimestamp) {
minBatchMVCCTimestamp = sip.rangeBatcher.minTimestamp
}

if sip.rangeBatcher.size() > 0 {
if err := sip.rangeBatcher.flush(ctx); err != nil {
return nil, errors.Wrap(err, "flushing range key sst")
}
if err := sip.rangeBatcher.flush(ctx); err != nil {
return nil, errors.Wrap(err, "flushing range key sst")
}
}

// Update the flush metrics.
sip.metrics.FlushHistNanos.RecordValue(timeutil.Since(preFlushTime).Nanoseconds())
sip.metrics.CommitLatency.RecordValue(timeutil.Since(minBatchMVCCTimestamp.GoTime()).Nanoseconds())
sip.metrics.Flushes.Inc(1)
sip.metrics.IngestedEvents.Inc(int64(len(sip.curKVBatch)))
sip.metrics.IngestedEvents.Inc(int64(len(sip.rangeBatcher.curRangeKVBatch)))

// Go through buffered checkpoint events, and put them on the channel to be
// emitted to the downstream frontier processor.
sip.frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) span.OpResult {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/workloadccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/envutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"//pkg/workload",
Expand Down
42 changes: 26 additions & 16 deletions pkg/ccl/workloadccl/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ func csvServerPaths(
return paths
}

// Specify an explicit empty prefix for crdb_internal to avoid an error if
// the database we're connected to does not exist.
const numNodesQuery = `SELECT count(node_id) FROM "".crdb_internal.gossip_liveness`

// MakeFixture regenerates a fixture, storing it to GCS. It is expected that the
// generator will have had Configure called on it.
//
Expand Down Expand Up @@ -330,6 +326,29 @@ func (l ImportDataLoader) InitialDataLoad(
return bytes, nil
}

// Specify an explicit empty prefix for crdb_internal to avoid an error if
// the database we're connected to does not exist.
const numNodesQuery = `SELECT count(node_id) FROM "".crdb_internal.gossip_liveness`
const numNodesQuerySQLInstances = `SELECT count(1) FROM system.sql_instances WHERE addr IS NOT NULL`

func getNodeCount(ctx context.Context, sqlDB *gosql.DB) (int, error) {
var numNodes int
if err := sqlDB.QueryRow(numNodesQuery).Scan(&numNodes); err != nil {
// If the query is unsupported because we're in
// multi-tenant mode, use the sql_instances table.
if !strings.Contains(err.Error(), errorutil.UnsupportedWithMultiTenancyMessage) {
return 0, err

}
} else {
return numNodes, nil
}
if err := sqlDB.QueryRow(numNodesQuerySQLInstances).Scan(&numNodes); err != nil {
return 0, err
}
return numNodes, nil
}

// ImportFixture works like MakeFixture, but instead of stopping halfway or
// writing a backup to cloud storage, it finishes ingesting the data.
// It also includes the option to inject pre-calculated table statistics if
Expand All @@ -349,18 +368,9 @@ func ImportFixture(
)
}

var numNodes int
if err := sqlDB.QueryRow(numNodesQuery).Scan(&numNodes); err != nil {
if strings.Contains(err.Error(), errorutil.UnsupportedWithMultiTenancyMessage) {
// If the query is unsupported because we're in multi-tenant mode. Assume
// that the cluster has 1 node for the purposes of running CSV servers.
// Tenants won't use DistSQL to parallelize IMPORT across SQL pods. Doing
// something better here is tracked in:
// https://github.com/cockroachdb/cockroach/issues/78968
numNodes = 1
} else {
return 0, err
}
numNodes, err := getNodeCount(ctx, sqlDB)
if err != nil {
return 0, err
}

var bytesAtomic int64
Expand Down
54 changes: 46 additions & 8 deletions pkg/ccl/workloadccl/fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ package workloadccl_test
import (
"context"
"fmt"
"math"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -24,8 +26,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -83,6 +87,8 @@ func (g fixtureTestGen) Tables() []workload.Table {

func TestFixture(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

// This test is brittle and requires manual intervention to run.
Expand Down Expand Up @@ -165,6 +171,7 @@ func TestFixture(t *testing.T) {

func TestImportFixture(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

Expand All @@ -175,12 +182,7 @@ func TestImportFixture(t *testing.T) {
stats.DefaultRefreshInterval = time.Millisecond
stats.DefaultAsOfTime = 10 * time.Millisecond

s, db, _ := serverutils.StartServer(t,
// Need to disable the test tenant until we have a fix for #75449.
base.TestServerArgs{
DisableDefaultTestTenant: true,
},
)
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)

Expand Down Expand Up @@ -220,15 +222,15 @@ func TestImportFixture(t *testing.T) {

func TestImportFixtureCSVServer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
ts := httptest.NewServer(workload.CSVMux(workload.Registered()))
defer ts.Close()

s, db, _ := serverutils.StartServer(t,
base.TestServerArgs{
UseDatabase: `d`,
// Test fails within a test tenant due to #75449.
DisableDefaultTestTenant: true,
},
)
defer s.Stopper().Stop(ctx)
Expand All @@ -250,3 +252,39 @@ func TestImportFixtureCSVServer(t *testing.T) {
sqlDB.CheckQueryResults(t,
`SELECT count(*) FROM d.fx`, [][]string{{strconv.Itoa(fixtureTestGenRows)}})
}

func TestImportFixtureNodeCount(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

const (
nodes = 3
filesPerNode = 1
)

tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(db)

gen := makeTestWorkload()
flag := fmt.Sprintf("--val=%d", timeutil.Now().UnixNano())
require.NoError(t, gen.Flags().Parse([]string{flag}))

sqlDB.Exec(t, "CREATE DATABASE ingest")
_, err := workloadccl.ImportFixture(
ctx, db, gen, "ingest", filesPerNode, false, /* injectStats */
``, /* csvServer */
)
require.NoError(t, err)

var desc string
sqlDB.QueryRow(t, "SELECT description FROM crdb_internal.jobs WHERE job_type = 'IMPORT'").Scan(&desc)

expectedFiles := math.Ceil(float64(fixtureTestGenRows) / float64(nodes))
actualFiles := strings.Count(desc, "workload://")
require.Equal(t, int(expectedFiles), actualFiles)
}
Loading

0 comments on commit dd2749a

Please sign in to comment.