From 4f0508a81b15245959449109bfce8239923892bb Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 6 Apr 2023 07:56:16 -0400 Subject: [PATCH] streamingccl: deflake TestRandomClientGeneration This patch fixes 4 bugs in TestRandomClientGeneration that were responsible for the persistent flakiness and lack of coverage in this test: - the randomeStreamClient no longer instantiates keys with a table prefix that collides with the job info table prefix. This collision was the original cause of the flakes reported in #99343. - getPartitionSpanToTableId() now generates a correct map from source partition key space to table Id. Previously, the key spans in the map didn't contain keys that mapped to anything logical in the cockroach key space. - assertKVs() now checks for keys in the destination tenant keyspace. - assertKVs() now actually asserts that kvs were found. Before, the assertion could pass if no keys were actually checked, which has been happening for months and allowed the bugs above to infest this test. Fixes #99343 Release note: None --- .../streamclient/random_stream_client.go | 3 +- pkg/ccl/streamingccl/streamingest/BUILD.bazel | 1 - .../stream_ingestion_processor_test.go | 80 +++++++++++-------- 3 files changed, 50 insertions(+), 34 deletions(-) diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 1c5452119e05..388c234e0e90 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -78,7 +78,8 @@ const ( // TODO(dt): just make interceptors a singleton, not the whole client. var randomStreamClientSingleton = func() *RandomStreamClient { c := RandomStreamClient{} - c.mu.tableID = 52 + // Make the base tableID really large to prevent colliding with system table IDs. + c.mu.tableID = 5000 return &c }() diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index e87a2088245d..4ef0e5b2bccc 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -137,7 +137,6 @@ go_test( "//pkg/testutils/datapathutils", "//pkg/testutils/distsqlutils", "//pkg/testutils/jobutils", - "//pkg/testutils/keysutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index ba82fcff69a2..a9a016d15a1c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -37,7 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" - "github.com/cockroachdb/cockroach/pkg/testutils/keysutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -335,6 +336,9 @@ func TestStreamIngestionProcessor(t *testing.T) { }) } +// getPartitionSpanToTableID maps a partiton's span to the tableID it covers in +// the source keyspace. It assumes the source used a random_stream_client, which generates keys for +// a single table span per partition. func getPartitionSpanToTableID( t *testing.T, partitions []streamclient.PartitionInfo, ) map[string]int { @@ -342,13 +346,14 @@ func getPartitionSpanToTableID( // Aggregate the table IDs which should have been ingested. for _, pa := range partitions { - pKey := roachpb.Key(pa.ID) - pSpan := roachpb.Span{Key: pKey, EndKey: pKey.Next()} + require.Equal(t, 1, len(pa.Spans), "unexpected number of spans in the partition") + pSpan := pa.Spans[0] paURL, err := url.Parse(string(pa.SubscriptionToken)) require.NoError(t, err) id, err := strconv.Atoi(paURL.Host) require.NoError(t, err) pSpanToTableID[pSpan.String()] = id + t.Logf("Partition Span %s; Partition ID %d", pSpan.String(), id) } return pSpanToTableID } @@ -359,24 +364,27 @@ func getPartitionSpanToTableID( // was streamed up until partitionTimestamp. func assertEqualKVs( t *testing.T, - tc *testcluster.TestCluster, + srv serverutils.TestServerInterface, streamValidator *streamClientValidator, - tableID int, + targetSpan roachpb.Span, partitionTimestamp hlc.Timestamp, -) { - key := keysutils.TestingSQLCodec.TablePrefix(uint32(tableID)) - +) (foundKVs bool) { + t.Logf("target span %s; partition ts %s", targetSpan, partitionTimestamp) + if partitionTimestamp.WallTime == 0 { + // Implies this span never got a checkpoint + return foundKVs + } // Iterate over the store. - store := tc.GetFirstStoreFromServer(t, 0) + store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID()) + require.NoError(t, err) it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ - LowerBound: key, - UpperBound: key.PrefixEnd(), + LowerBound: targetSpan.Key, + UpperBound: targetSpan.EndKey, }) defer it.Close() var prevKey roachpb.Key var valueTimestampTuples []roachpb.KeyValue - var err error - for it.SeekGE(storage.MVCCKey{Key: key}); ; it.Next() { + for it.SeekGE(storage.MVCCKey{Key: targetSpan.Key}); ; it.Next() { if ok, err := it.Valid(); !ok { if err != nil { t.Fatal(err) @@ -389,20 +397,20 @@ func assertEqualKVs( if partitionTimestamp.Less(it.UnsafeKey().Timestamp) { continue } - + foundKVs = true newKey := (prevKey != nil && !it.UnsafeKey().Key.Equal(prevKey)) || prevKey == nil prevKey = it.UnsafeKey().Clone().Key - + descriptiveErrorMsg := fmt.Sprintf("Key %s; Ts %s: Is new Key %t; Partition Ts %s", it.UnsafeKey().Key, it.UnsafeKey().Timestamp, newKey, partitionTimestamp) if newKey { // All value ts should have been drained at this point, otherwise there is // a mismatch between the streamed and ingested data. require.Equal(t, 0, len(valueTimestampTuples)) valueTimestampTuples, err = streamValidator.getValuesForKeyBelowTimestamp( string(it.UnsafeKey().Key), partitionTimestamp) - require.NoError(t, err) + require.NoError(t, err, descriptiveErrorMsg) } - - require.Greater(t, len(valueTimestampTuples), 0) + // Implies there exists a key in the store that was not logged by the stream validator. + require.Greater(t, len(valueTimestampTuples), 0, descriptiveErrorMsg) // Since the iterator goes from latest to older versions, we compare // starting from the end of the slice that is sorted by timestamp. latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1] @@ -419,6 +427,7 @@ func assertEqualKVs( // for the next iteration. valueTimestampTuples = valueTimestampTuples[0 : len(valueTimestampTuples)-1] } + return foundKVs } func makeTestStreamURI( @@ -448,11 +457,9 @@ func TestRandomClientGeneration(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - - tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) - defer tc.Stopper().Stop(ctx) - registry := tc.Server(0).JobRegistry().(*jobs.Registry) - db := tc.Server(0).InternalDB().(descs.DB) + srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + registry := srv.JobRegistry().(*jobs.Registry) // TODO: Consider testing variations on these parameters. tenantID := roachpb.MustMakeTenantID(20) @@ -470,7 +477,6 @@ func TestRandomClientGeneration(t *testing.T) { topo, err := randomStreamClient.Plan(ctx, rps.StreamID) require.NoError(t, err) - // One system and two table data partitions. require.Equal(t, 2 /* numPartitions */, len(topo.Partitions)) initialScanTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} @@ -479,6 +485,7 @@ func TestRandomClientGeneration(t *testing.T) { // Cancel the flow after emitting 1000 checkpoint events from the client. mu := syncutil.Mutex{} cancelAfterCheckpoints := makeCheckpointEventCounter(&mu, 1000, cancel) + tenantRekey := execinfrapb.TenantRekey{ OldID: tenantID, NewID: roachpb.MustMakeTenantID(tenantID.ToUint64() + 10), @@ -495,12 +502,11 @@ func TestRandomClientGeneration(t *testing.T) { randomStreamClient.RegisterInterception(cancelAfterCheckpoints) randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator)) - out, err := runStreamIngestionProcessor(ctx, t, registry, db, + out, err := runStreamIngestionProcessor(ctx, t, registry, srv.InternalDB().(descs.DB), topo, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/) require.NoError(t, err) - partitionSpanToTableID := getPartitionSpanToTableID(t, topo.Partitions) numResolvedEvents := 0 maxResolvedTimestampPerPartition := make(map[string]hlc.Timestamp) for { @@ -529,9 +535,9 @@ func TestRandomClientGeneration(t *testing.T) { latestResolvedTimestamp = resolvedSpan.Timestamp } - // Track the max resolved timestamp per partition. - if ts, ok := maxResolvedTimestampPerPartition[resolvedSpan.Span.String()]; !ok || - ts.Less(resolvedSpan.Timestamp) { + // Track the max resolved timestamp per partition. Note that resolved + // spans are mapped to the source tenant keyspace. + if maxResolvedTimestampPerPartition[resolvedSpan.Span.String()].Less(resolvedSpan.Timestamp) { maxResolvedTimestampPerPartition[resolvedSpan.Span.String()] = resolvedSpan.Timestamp } numResolvedEvents++ @@ -545,13 +551,23 @@ func TestRandomClientGeneration(t *testing.T) { for _, failure := range streamValidator.failures() { t.Error(failure) } - - for pSpan, id := range partitionSpanToTableID { + foundKVs := false + ingestionCodec := keys.MakeSQLCodec(tenantRekey.NewID) + for pSpan, id := range getPartitionSpanToTableID(t, topo.Partitions) { // Scan the store for KVs ingested by this partition, and compare the MVCC // KVs against the KVEvents streamed up to the max ingested timestamp for // the partition. - assertEqualKVs(t, tc, streamValidator, id, maxResolvedTimestampPerPartition[pSpan]) + // + // Note that target span must be rekeyed to the destination + // tenant key space. + startKey := ingestionCodec.TablePrefix(uint32(id)) + targetSpan := roachpb.Span{Key: startKey, EndKey: startKey.PrefixEnd()} + if assertEqualKVs(t, srv, streamValidator, targetSpan, + maxResolvedTimestampPerPartition[pSpan]) { + foundKVs = true + } } + require.Equal(t, true, foundKVs, "expected to find and assert equal kvs") require.Greater(t, numResolvedEvents, 0, "at least 1 resolved event expected") }