Skip to content

Commit

Permalink
Merge pull request #101218 from msbutler/backport23.1-100813
Browse files Browse the repository at this point in the history
release-23.1: streamingccl: deflake TestRandomClientGeneration
  • Loading branch information
msbutler authored Apr 11, 2023
2 parents 115b02c + b8788ca commit bff8f7e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 33 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ const (
// TODO(dt): just make interceptors a singleton, not the whole client.
var randomStreamClientSingleton = func() *RandomStreamClient {
c := RandomStreamClient{}
c.mu.tableID = 52
// Make the base tableID really large to prevent colliding with system table IDs.
c.mu.tableID = 5000
return &c
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -334,48 +336,56 @@ func TestStreamIngestionProcessor(t *testing.T) {
})
}

// getPartitionSpanToTableID maps a partiton's span to the tableID it covers in
// the source keyspace. It assumes the source used a random_stream_client, which generates keys for
// a single table span per partition.
func getPartitionSpanToTableID(
t *testing.T, partitions []streamclient.PartitionInfo,
) map[string]int {
pSpanToTableID := make(map[string]int)

// Aggregate the table IDs which should have been ingested.
for _, pa := range partitions {
pKey := roachpb.Key(pa.ID)
pSpan := roachpb.Span{Key: pKey, EndKey: pKey.Next()}
require.Equal(t, 1, len(pa.Spans), "unexpected number of spans in the partition")
pSpan := pa.Spans[0]
paURL, err := url.Parse(string(pa.SubscriptionToken))
require.NoError(t, err)
id, err := strconv.Atoi(paURL.Host)
require.NoError(t, err)
pSpanToTableID[pSpan.String()] = id
t.Logf("Partition Span %s; Partition ID %d", pSpan.String(), id)
}
return pSpanToTableID
}

// assertEqualKVs iterates over the store in `tc` and compares the MVCC KVs
// against the in-memory copy of events stored in the `streamValidator`. This
// ensures that the stream ingestion processor ingested at least as much data as
// was streamed up until partitionTimestamp.
// was streamed up until partitionTimestamp. The function returns true if it
// validated at least one streamed kv.
func assertEqualKVs(
t *testing.T,
tc *testcluster.TestCluster,
srv serverutils.TestServerInterface,
streamValidator *streamClientValidator,
tableID int,
targetSpan roachpb.Span,
partitionTimestamp hlc.Timestamp,
) {
key := keys.TODOSQLCodec.TablePrefix(uint32(tableID))

) (foundKVs bool) {
t.Logf("target span %s; partition ts %s", targetSpan, partitionTimestamp)
if partitionTimestamp.WallTime == 0 {
// Implies this span never got a checkpoint
return foundKVs
}
// Iterate over the store.
store := tc.GetFirstStoreFromServer(t, 0)
store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)
it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
LowerBound: key,
UpperBound: key.PrefixEnd(),
LowerBound: targetSpan.Key,
UpperBound: targetSpan.EndKey,
})
defer it.Close()
var prevKey roachpb.Key
var valueTimestampTuples []roachpb.KeyValue
var err error
for it.SeekGE(storage.MVCCKey{Key: key}); ; it.Next() {
for it.SeekGE(storage.MVCCKey{Key: targetSpan.Key}); ; it.Next() {
if ok, err := it.Valid(); !ok {
if err != nil {
t.Fatal(err)
Expand All @@ -388,20 +398,20 @@ func assertEqualKVs(
if partitionTimestamp.Less(it.UnsafeKey().Timestamp) {
continue
}

foundKVs = true
newKey := (prevKey != nil && !it.UnsafeKey().Key.Equal(prevKey)) || prevKey == nil
prevKey = it.UnsafeKey().Clone().Key

descriptiveErrorMsg := fmt.Sprintf("Key %s; Ts %s: Is new Key %t; Partition Ts %s", it.UnsafeKey().Key, it.UnsafeKey().Timestamp, newKey, partitionTimestamp)
if newKey {
// All value ts should have been drained at this point, otherwise there is
// a mismatch between the streamed and ingested data.
require.Equal(t, 0, len(valueTimestampTuples))
valueTimestampTuples, err = streamValidator.getValuesForKeyBelowTimestamp(
string(it.UnsafeKey().Key), partitionTimestamp)
require.NoError(t, err)
require.NoError(t, err, descriptiveErrorMsg)
}

require.Greater(t, len(valueTimestampTuples), 0)
// Implies there exists a key in the store that was not logged by the stream validator.
require.Greater(t, len(valueTimestampTuples), 0, descriptiveErrorMsg)
// Since the iterator goes from latest to older versions, we compare
// starting from the end of the slice that is sorted by timestamp.
latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1]
Expand All @@ -418,6 +428,7 @@ func assertEqualKVs(
// for the next iteration.
valueTimestampTuples = valueTimestampTuples[0 : len(valueTimestampTuples)-1]
}
return foundKVs
}

func makeTestStreamURI(
Expand Down Expand Up @@ -447,11 +458,9 @@ func TestRandomClientGeneration(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
registry := tc.Server(0).JobRegistry().(*jobs.Registry)
db := tc.Server(0).InternalDB().(descs.DB)
srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer srv.Stopper().Stop(ctx)
registry := srv.JobRegistry().(*jobs.Registry)

// TODO: Consider testing variations on these parameters.
tenantID := roachpb.MustMakeTenantID(20)
Expand All @@ -469,7 +478,6 @@ func TestRandomClientGeneration(t *testing.T) {

topo, err := randomStreamClient.Plan(ctx, rps.StreamID)
require.NoError(t, err)
// One system and two table data partitions.
require.Equal(t, 2 /* numPartitions */, len(topo.Partitions))

initialScanTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
Expand All @@ -478,6 +486,7 @@ func TestRandomClientGeneration(t *testing.T) {
// Cancel the flow after emitting 1000 checkpoint events from the client.
mu := syncutil.Mutex{}
cancelAfterCheckpoints := makeCheckpointEventCounter(&mu, 1000, cancel)

tenantRekey := execinfrapb.TenantRekey{
OldID: tenantID,
NewID: roachpb.MustMakeTenantID(tenantID.ToUint64() + 10),
Expand All @@ -494,12 +503,11 @@ func TestRandomClientGeneration(t *testing.T) {
randomStreamClient.RegisterInterception(cancelAfterCheckpoints)
randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator))

out, err := runStreamIngestionProcessor(ctx, t, registry, db,
out, err := runStreamIngestionProcessor(ctx, t, registry, srv.InternalDB().(descs.DB),
topo, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey,
randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/)
require.NoError(t, err)

partitionSpanToTableID := getPartitionSpanToTableID(t, topo.Partitions)
numResolvedEvents := 0
maxResolvedTimestampPerPartition := make(map[string]hlc.Timestamp)
for {
Expand Down Expand Up @@ -528,9 +536,9 @@ func TestRandomClientGeneration(t *testing.T) {
latestResolvedTimestamp = resolvedSpan.Timestamp
}

// Track the max resolved timestamp per partition.
if ts, ok := maxResolvedTimestampPerPartition[resolvedSpan.Span.String()]; !ok ||
ts.Less(resolvedSpan.Timestamp) {
// Track the max resolved timestamp per partition. Note that resolved
// spans are mapped to the source tenant keyspace.
if maxResolvedTimestampPerPartition[resolvedSpan.Span.String()].Less(resolvedSpan.Timestamp) {
maxResolvedTimestampPerPartition[resolvedSpan.Span.String()] = resolvedSpan.Timestamp
}
numResolvedEvents++
Expand All @@ -544,13 +552,28 @@ func TestRandomClientGeneration(t *testing.T) {
for _, failure := range streamValidator.failures() {
t.Error(failure)
}

for pSpan, id := range partitionSpanToTableID {
foundKVs := false
ingestionCodec := keys.MakeSQLCodec(tenantRekey.NewID)
for pSpan, id := range getPartitionSpanToTableID(t, topo.Partitions) {
// Scan the store for KVs ingested by this partition, and compare the MVCC
// KVs against the KVEvents streamed up to the max ingested timestamp for
// the partition.
assertEqualKVs(t, tc, streamValidator, id, maxResolvedTimestampPerPartition[pSpan])
//
// Note that target span must be rekeyed to the destination
// tenant key space.
startKey := ingestionCodec.TablePrefix(uint32(id))
targetSpan := roachpb.Span{Key: startKey, EndKey: startKey.PrefixEnd()}
if assertEqualKVs(t, srv, streamValidator, targetSpan,
maxResolvedTimestampPerPartition[pSpan]) {
foundKVs = true
}
}
// Note: we only assert that KVs were found over all partitions instead of in
// each partition because it is possible for a partition to not send any
// checkpoint events. This stream ingestion processor only terminates once a
// total number of checkpoints have been reached and makes no guarantees that
// each partition gets a checkpoint.
require.True(t, foundKVs, "expected to find and assert equal kvs")
require.Greater(t, numResolvedEvents, 0, "at least 1 resolved event expected")
}

Expand Down

0 comments on commit bff8f7e

Please sign in to comment.