Skip to content

Commit

Permalink
streamingccl: re-enabled TestRandomClientGeneration
Browse files Browse the repository at this point in the history
TestRandomClientGeneration was skipped in cockroachdb#61292 as a flake. However,
in the time since then, other changes in this code broke this test
more completely. Re-enabling the test requirea few unrelated
changes:

- The stream ingestion processor required a fully formed job to be
  able to poll the cutover time. Now, test code can set a
  cutoverProvider that doesn't depend on a full job record.

- The request intercepting depended on an explicit client being
  set. This test was rather passing the processor a randgen URI. Now we
  pass the client explicitly and also update the test code to make it
  clear that the stream URI isn't actually used for anything.

- The code was attempting to validate the number of rows using SQL. I
  haven't dug into how this was working in the past. But as we are
  connecting to the host tenant and the keys are being ingested to a
  guest tenant, we would need a connection to the guest tenant to
  validate the table data. I've simply removed this assertion since I
  don't think it was testing very much compared to the KV level
  assertions also used in the test.

- The test code assumed that the partitions were keyed based on the
  subscription token rather than the subscription ID.

It isn't clear what the original source of the flakiness was.
However, the test has run a few hundred times under stress without
issue.

Alternatively, we could just delete this test.

Fixes cockroachdb#61287

Release note: None
  • Loading branch information
stevendanna committed Jun 29, 2022
1 parent 808d375 commit 71d26e7
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID(

// Generate namespace entry.
codec := keys.MakeSQLCodec(config.tenantID)
key := catalogkeys.MakePublicObjectNameKey(codec, 50, testTable.Name)
key := catalogkeys.MakePublicObjectNameKey(codec, IngestionDatabaseID, testTable.Name)
k := rekey(config.tenantID, key)
var value roachpb.Value
value.SetInt(int64(testTable.GetID()))
Expand Down
65 changes: 48 additions & 17 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -97,6 +98,9 @@ type streamIngestionProcessor struct {
// consuming multiple partitions from a stream.
streamPartitionClients []streamclient.Client

// cutoverProvider indicates when the cutover time has been reached.
cutoverProvider cutoverProvider

// Checkpoint events may need to be buffered if they arrive within the same
// minimumFlushInterval.
bufferedCheckpoints map[string]hlc.Timestamp
Expand Down Expand Up @@ -181,10 +185,14 @@ func newStreamIngestionDataProcessor(
curBatch: make([]storage.MVCCKeyValue, 0),
bufferedCheckpoints: make(map[string]hlc.Timestamp),
maxFlushRateTimer: timeutil.NewTimer(),
cutoverCh: make(chan struct{}),
closePoller: make(chan struct{}),
rekeyer: rekeyer,
rewriteToDiffKey: spec.TenantRekey.NewID != spec.TenantRekey.OldID,
cutoverProvider: &cutoverFromJobProgress{
jobID: jobspb.JobID(spec.JobID),
registry: flowCtx.Cfg.JobRegistry,
},
cutoverCh: make(chan struct{}),
closePoller: make(chan struct{}),
rekeyer: rekeyer,
rewriteToDiffKey: spec.TenantRekey.NewID != spec.TenantRekey.OldID,
}
if err := sip.Init(sip, post, streamIngestionResultTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down Expand Up @@ -352,9 +360,7 @@ func (sip *streamIngestionProcessor) checkForCutoverSignal(
ctx context.Context, stopPoller chan struct{},
) error {
sv := &sip.flowCtx.Cfg.Settings.SV
registry := sip.flowCtx.Cfg.JobRegistry
tick := time.NewTicker(cutoverSignalPollInterval.Get(sv))
jobID := sip.spec.JobID
defer tick.Stop()
for {
select {
Expand All @@ -363,20 +369,11 @@ func (sip *streamIngestionProcessor) checkForCutoverSignal(
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
j, err := registry.LoadJob(ctx, jobspb.JobID(jobID))
cutoverReached, err := sip.cutoverProvider.cutoverReached(ctx)
if err != nil {
return err
}
progress := j.Progress()
var sp *jobspb.Progress_StreamIngest
var ok bool
if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok {
return errors.Newf("unknown progress type %T in stream ingestion job %d",
j.Progress().Progress, jobID)
}
// Job has been signaled to complete.
if resolvedTimestamp := progress.GetHighWater(); !sp.StreamIngest.CutoverTime.IsEmpty() &&
resolvedTimestamp != nil && sp.StreamIngest.CutoverTime.Less(*resolvedTimestamp) {
if cutoverReached {
sip.cutoverCh <- struct{}{}
return nil
}
Expand Down Expand Up @@ -664,6 +661,40 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) {
return &flushedCheckpoints, sip.batcher.Reset(ctx)
}

// cutoverProvider allows us to override how we decide when the job has reached
// the cutover places in tests.
type cutoverProvider interface {
cutoverReached(context.Context) (bool, error)
}

// custoverFromJobProgress is a cutoverProvider that decides whether the cutover
// time has been reached based on the progress stored on the job record.
type cutoverFromJobProgress struct {
registry *jobs.Registry
jobID jobspb.JobID
}

func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) {
j, err := c.registry.LoadJob(ctx, c.jobID)
if err != nil {
return false, err
}
progress := j.Progress()
var sp *jobspb.Progress_StreamIngest
var ok bool
if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok {
return false, errors.Newf("unknown progress type %T in stream ingestion job %d",
j.Progress().Progress, c.jobID)
}
// Job has been signaled to complete.
if resolvedTimestamp := progress.GetHighWater(); !sp.StreamIngest.CutoverTime.IsEmpty() &&
resolvedTimestamp != nil && sp.StreamIngest.CutoverTime.Less(*resolvedTimestamp) {
return true, nil
}

return false, nil
}

func init() {
rowexec.NewStreamIngestionDataProcessor = newStreamIngestionDataProcessor
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -202,8 +200,8 @@ func TestStreamIngestionProcessor(t *testing.T) {
{ID: "1", SubscriptionToken: p1},
{ID: "2", SubscriptionToken: p2},
}
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test/",
partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient)
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB,
partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */)
require.NoError(t, err)

actualRows := make(map[string]struct{})
Expand Down Expand Up @@ -238,8 +236,8 @@ func TestStreamIngestionProcessor(t *testing.T) {
{SubscriptionToken: streamclient.SubscriptionToken("1")},
{SubscriptionToken: streamclient.SubscriptionToken("2")},
}
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test",
partitions, startTime, nil /* interceptEvents */, tenantRekey, &errorStreamClient{})
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB,
partitions, startTime, nil /* interceptEvents */, tenantRekey, &errorStreamClient{}, nil /* cutoverProvider */)
require.NoError(t, err)

// Expect no rows, and just the error.
Expand All @@ -262,8 +260,8 @@ func TestStreamIngestionProcessor(t *testing.T) {
streamingTestingKnob := &sql.StreamingTestingKnobs{RunAfterReceivingEvent: func(ctx context.Context) {
processEventCh <- struct{}{}
}}
sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, "randomgen://test/",
partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, streamingTestingKnob)
sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB,
partitions, startTime, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */, streamingTestingKnob)
defer func() {
require.NoError(t, sip.forceClientForTests.Close())
}()
Expand Down Expand Up @@ -310,7 +308,7 @@ func getPartitionSpanToTableID(

// Aggregate the table IDs which should have been ingested.
for _, pa := range partitions {
pKey := roachpb.Key(pa.SubscriptionToken)
pKey := roachpb.Key(pa.ID)
pSpan := roachpb.Span{Key: pKey, EndKey: pKey.Next()}
paURL, err := url.Parse(string(pa.SubscriptionToken))
require.NoError(t, err)
Expand Down Expand Up @@ -401,11 +399,14 @@ func makeTestStreamURI(
"&TENANT_ID=" + strconv.Itoa(tenantID)
}

type noCutover struct{}

func (n noCutover) cutoverReached(context.Context) (bool, error) { return false, nil }

// TestRandomClientGeneration tests the ingestion processor against a random
// stream workload.
func TestRandomClientGeneration(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 61287, "flaky test")
defer log.Scope(t).Close(t)

ctx := context.Background()
Expand All @@ -414,8 +415,6 @@ func TestRandomClientGeneration(t *testing.T) {
defer tc.Stopper().Stop(ctx)
registry := tc.Server(0).JobRegistry().(*jobs.Registry)
kvDB := tc.Server(0).DB()
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

// TODO: Consider testing variations on these parameters.
const tenantID = 20
Expand Down Expand Up @@ -447,8 +446,10 @@ func TestRandomClientGeneration(t *testing.T) {
require.NoError(t, err)
streamValidator := newStreamClientValidator(rekeyer)
validator := registerValidatorWithClient(streamValidator)
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, streamAddr, topo,
startTime, []streamclient.InterceptFn{cancelAfterCheckpoints, validator}, tenantRekey, nil /* mockClient */)

out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB,
topo, startTime, []streamclient.InterceptFn{cancelAfterCheckpoints, validator}, tenantRekey,
streamClient, noCutover{})
require.NoError(t, err)

partitionSpanToTableID := getPartitionSpanToTableID(t, topo)
Expand All @@ -466,6 +467,7 @@ func TestRandomClientGeneration(t *testing.T) {
if row == nil {
break
}

datum := row[0].Datum
protoBytes, ok := datum.(*tree.DBytes)
require.True(t, ok)
Expand All @@ -475,8 +477,8 @@ func TestRandomClientGeneration(t *testing.T) {

for _, resolvedSpan := range resolvedSpans.ResolvedSpans {
if _, ok := partitionSpanToTableID[resolvedSpan.Span.String()]; !ok {
t.Fatalf("expected resolved span %v to be either in one of the supplied partition"+
" addresses %v", resolvedSpan.Span, topo)
t.Fatalf("expected resolved span %v to be in one of the supplied partition"+
" addresses %v", resolvedSpan.Span, partitionSpanToTableID)
}

// All resolved timestamp events should be greater than the start time.
Expand All @@ -497,11 +499,6 @@ func TestRandomClientGeneration(t *testing.T) {
}

for pSpan, id := range partitionSpanToTableID {
numRows, err := strconv.Atoi(sqlDB.QueryStr(t, fmt.Sprintf(
`SELECT count(*) FROM defaultdb.%s%d`, streamclient.IngestionTablePrefix, id))[0][0])
require.NoError(t, err)
require.Greater(t, numRows, 0, "at least 1 row ingested expected")

// Scan the store for KVs ingested by this partition, and compare the MVCC
// KVs against the KVEvents streamed up to the max ingested timestamp for
// the partition.
Expand All @@ -515,15 +512,15 @@ func runStreamIngestionProcessor(
t *testing.T,
registry *jobs.Registry,
kvDB *kv.DB,
streamAddr string,
partitions streamclient.Topology,
startTime hlc.Timestamp,
interceptEvents []streamclient.InterceptFn,
tenantRekey execinfrapb.TenantRekey,
mockClient streamclient.Client,
cutoverProvider cutoverProvider,
) (*distsqlutils.RowBuffer, error) {
sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, streamAddr,
partitions, startTime, interceptEvents, tenantRekey, mockClient, nil /* streamingTestingKnobs */)
sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB,
partitions, startTime, interceptEvents, tenantRekey, mockClient, cutoverProvider, nil /* streamingTestingKnobs */)
require.NoError(t, err)

sip.Run(ctx)
Expand All @@ -543,16 +540,19 @@ func getStreamIngestionProcessor(
t *testing.T,
registry *jobs.Registry,
kvDB *kv.DB,
streamAddr string,
partitions streamclient.Topology,
startTime hlc.Timestamp,
interceptEvents []streamclient.InterceptFn,
tenantRekey execinfrapb.TenantRekey,
mockClient streamclient.Client,
cutoverProvider cutoverProvider,
streamingTestingKnobs *sql.StreamingTestingKnobs,
) (*streamIngestionProcessor, *distsqlutils.RowBuffer, error) {
st := cluster.MakeTestingClusterSettings()
evalCtx := eval.MakeTestingEvalContext(st)
if mockClient == nil {
return nil, nil, errors.AssertionFailedf("non-nil streamclient required")
}

testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st)
defer testDiskMonitor.Stop(ctx)
Expand All @@ -573,7 +573,7 @@ func getStreamIngestionProcessor(
post := execinfrapb.PostProcessSpec{}

var spec execinfrapb.StreamIngestionDataSpec
spec.StreamAddress = streamAddr
spec.StreamAddress = "http://unused"
spec.TenantRekey = tenantRekey
spec.PartitionIds = make([]string, len(partitions))
spec.PartitionAddresses = make([]string, len(partitions))
Expand All @@ -592,8 +592,9 @@ func getStreamIngestionProcessor(
t.Fatal("expected the processor that's created to be a split and scatter processor")
}

if mockClient != nil {
sip.forceClientForTests = mockClient
sip.forceClientForTests = mockClient
if cutoverProvider != nil {
sip.cutoverProvider = cutoverProvider
}

if interceptable, ok := sip.forceClientForTests.(streamclient.InterceptableStreamClient); ok {
Expand Down

0 comments on commit 71d26e7

Please sign in to comment.