diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 1c5452119e05..388c234e0e90 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -78,7 +78,8 @@ const ( // TODO(dt): just make interceptors a singleton, not the whole client. var randomStreamClientSingleton = func() *RandomStreamClient { c := RandomStreamClient{} - c.mu.tableID = 52 + // Make the base tableID really large to prevent colliding with system table IDs. + c.mu.tableID = 5000 return &c }() diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 5233730e2c44..d52c56639f61 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -37,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -334,6 +336,9 @@ func TestStreamIngestionProcessor(t *testing.T) { }) } +// getPartitionSpanToTableID maps a partiton's span to the tableID it covers in +// the source keyspace. It assumes the source used a random_stream_client, which generates keys for +// a single table span per partition. func getPartitionSpanToTableID( t *testing.T, partitions []streamclient.PartitionInfo, ) map[string]int { @@ -341,13 +346,14 @@ func getPartitionSpanToTableID( // Aggregate the table IDs which should have been ingested. for _, pa := range partitions { - pKey := roachpb.Key(pa.ID) - pSpan := roachpb.Span{Key: pKey, EndKey: pKey.Next()} + require.Equal(t, 1, len(pa.Spans), "unexpected number of spans in the partition") + pSpan := pa.Spans[0] paURL, err := url.Parse(string(pa.SubscriptionToken)) require.NoError(t, err) id, err := strconv.Atoi(paURL.Host) require.NoError(t, err) pSpanToTableID[pSpan.String()] = id + t.Logf("Partition Span %s; Partition ID %d", pSpan.String(), id) } return pSpanToTableID } @@ -355,27 +361,31 @@ func getPartitionSpanToTableID( // assertEqualKVs iterates over the store in `tc` and compares the MVCC KVs // against the in-memory copy of events stored in the `streamValidator`. This // ensures that the stream ingestion processor ingested at least as much data as -// was streamed up until partitionTimestamp. +// was streamed up until partitionTimestamp. The function returns true if it +// validated at least one streamed kv. func assertEqualKVs( t *testing.T, - tc *testcluster.TestCluster, + srv serverutils.TestServerInterface, streamValidator *streamClientValidator, - tableID int, + targetSpan roachpb.Span, partitionTimestamp hlc.Timestamp, -) { - key := keys.TODOSQLCodec.TablePrefix(uint32(tableID)) - +) (foundKVs bool) { + t.Logf("target span %s; partition ts %s", targetSpan, partitionTimestamp) + if partitionTimestamp.WallTime == 0 { + // Implies this span never got a checkpoint + return foundKVs + } // Iterate over the store. - store := tc.GetFirstStoreFromServer(t, 0) + store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID()) + require.NoError(t, err) it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ - LowerBound: key, - UpperBound: key.PrefixEnd(), + LowerBound: targetSpan.Key, + UpperBound: targetSpan.EndKey, }) defer it.Close() var prevKey roachpb.Key var valueTimestampTuples []roachpb.KeyValue - var err error - for it.SeekGE(storage.MVCCKey{Key: key}); ; it.Next() { + for it.SeekGE(storage.MVCCKey{Key: targetSpan.Key}); ; it.Next() { if ok, err := it.Valid(); !ok { if err != nil { t.Fatal(err) @@ -388,20 +398,20 @@ func assertEqualKVs( if partitionTimestamp.Less(it.UnsafeKey().Timestamp) { continue } - + foundKVs = true newKey := (prevKey != nil && !it.UnsafeKey().Key.Equal(prevKey)) || prevKey == nil prevKey = it.UnsafeKey().Clone().Key - + descriptiveErrorMsg := fmt.Sprintf("Key %s; Ts %s: Is new Key %t; Partition Ts %s", it.UnsafeKey().Key, it.UnsafeKey().Timestamp, newKey, partitionTimestamp) if newKey { // All value ts should have been drained at this point, otherwise there is // a mismatch between the streamed and ingested data. require.Equal(t, 0, len(valueTimestampTuples)) valueTimestampTuples, err = streamValidator.getValuesForKeyBelowTimestamp( string(it.UnsafeKey().Key), partitionTimestamp) - require.NoError(t, err) + require.NoError(t, err, descriptiveErrorMsg) } - - require.Greater(t, len(valueTimestampTuples), 0) + // Implies there exists a key in the store that was not logged by the stream validator. + require.Greater(t, len(valueTimestampTuples), 0, descriptiveErrorMsg) // Since the iterator goes from latest to older versions, we compare // starting from the end of the slice that is sorted by timestamp. latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1] @@ -418,6 +428,7 @@ func assertEqualKVs( // for the next iteration. valueTimestampTuples = valueTimestampTuples[0 : len(valueTimestampTuples)-1] } + return foundKVs } func makeTestStreamURI( @@ -447,11 +458,9 @@ func TestRandomClientGeneration(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - - tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) - defer tc.Stopper().Stop(ctx) - registry := tc.Server(0).JobRegistry().(*jobs.Registry) - db := tc.Server(0).InternalDB().(descs.DB) + srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + registry := srv.JobRegistry().(*jobs.Registry) // TODO: Consider testing variations on these parameters. tenantID := roachpb.MustMakeTenantID(20) @@ -469,7 +478,6 @@ func TestRandomClientGeneration(t *testing.T) { topo, err := randomStreamClient.Plan(ctx, rps.StreamID) require.NoError(t, err) - // One system and two table data partitions. require.Equal(t, 2 /* numPartitions */, len(topo.Partitions)) initialScanTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} @@ -478,6 +486,7 @@ func TestRandomClientGeneration(t *testing.T) { // Cancel the flow after emitting 1000 checkpoint events from the client. mu := syncutil.Mutex{} cancelAfterCheckpoints := makeCheckpointEventCounter(&mu, 1000, cancel) + tenantRekey := execinfrapb.TenantRekey{ OldID: tenantID, NewID: roachpb.MustMakeTenantID(tenantID.ToUint64() + 10), @@ -494,12 +503,11 @@ func TestRandomClientGeneration(t *testing.T) { randomStreamClient.RegisterInterception(cancelAfterCheckpoints) randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator)) - out, err := runStreamIngestionProcessor(ctx, t, registry, db, + out, err := runStreamIngestionProcessor(ctx, t, registry, srv.InternalDB().(descs.DB), topo, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/) require.NoError(t, err) - partitionSpanToTableID := getPartitionSpanToTableID(t, topo.Partitions) numResolvedEvents := 0 maxResolvedTimestampPerPartition := make(map[string]hlc.Timestamp) for { @@ -528,9 +536,9 @@ func TestRandomClientGeneration(t *testing.T) { latestResolvedTimestamp = resolvedSpan.Timestamp } - // Track the max resolved timestamp per partition. - if ts, ok := maxResolvedTimestampPerPartition[resolvedSpan.Span.String()]; !ok || - ts.Less(resolvedSpan.Timestamp) { + // Track the max resolved timestamp per partition. Note that resolved + // spans are mapped to the source tenant keyspace. + if maxResolvedTimestampPerPartition[resolvedSpan.Span.String()].Less(resolvedSpan.Timestamp) { maxResolvedTimestampPerPartition[resolvedSpan.Span.String()] = resolvedSpan.Timestamp } numResolvedEvents++ @@ -544,13 +552,28 @@ func TestRandomClientGeneration(t *testing.T) { for _, failure := range streamValidator.failures() { t.Error(failure) } - - for pSpan, id := range partitionSpanToTableID { + foundKVs := false + ingestionCodec := keys.MakeSQLCodec(tenantRekey.NewID) + for pSpan, id := range getPartitionSpanToTableID(t, topo.Partitions) { // Scan the store for KVs ingested by this partition, and compare the MVCC // KVs against the KVEvents streamed up to the max ingested timestamp for // the partition. - assertEqualKVs(t, tc, streamValidator, id, maxResolvedTimestampPerPartition[pSpan]) + // + // Note that target span must be rekeyed to the destination + // tenant key space. + startKey := ingestionCodec.TablePrefix(uint32(id)) + targetSpan := roachpb.Span{Key: startKey, EndKey: startKey.PrefixEnd()} + if assertEqualKVs(t, srv, streamValidator, targetSpan, + maxResolvedTimestampPerPartition[pSpan]) { + foundKVs = true + } } + // Note: we only assert that KVs were found over all partitions instead of in + // each partition because it is possible for a partition to not send any + // checkpoint events. This stream ingestion processor only terminates once a + // total number of checkpoints have been reached and makes no guarantees that + // each partition gets a checkpoint. + require.True(t, foundKVs, "expected to find and assert equal kvs") require.Greater(t, numResolvedEvents, 0, "at least 1 resolved event expected") }