Skip to content

Commit

Permalink
streamingccl: unskip TestTenantStreaming
Browse files Browse the repository at this point in the history
This makes a couple of changes aimed at unskipping
TestTenantStreaming:

- Fix a nil pointer access in our stream status verification
  function. We changed the name of key that this function was
  accessing. This NPE was hidden by another panic along the unclean
  shutdown path in the server.

- Lower various intervals so that this test doesn't take 90 seconds.

I've run this under stress for a few hundred iterations without error.

Release note: None
  • Loading branch information
stevendanna committed Jul 7, 2022
1 parent 03d5260 commit 5ba670c
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ import (
)

func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, stats string) {
fetchValueKey := func(j json2.JSON, key string) json2.JSON {
fetchRequiredValueKey := func(j json2.JSON, key string) json2.JSON {
val, err := j.FetchValKey(key)
require.NoError(t, err)
require.NotNilf(t, val, "expected key %q to in json %q", key, j)
return val
}

Expand All @@ -60,26 +61,25 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s
statsJSON, err := json2.ParseJSON(stats)
require.NoError(t, err)

ingestionProgress := fetchValueKey(statsJSON, "ingestion_progress")
ingestionProgress := fetchRequiredValueKey(statsJSON, "ingestion_progress")
require.Equal(t, cutoverTime.UnixNano(),
parseInt64(fetchValueKey(fetchValueKey(ingestionProgress, "cutover_time"), "wall_time").String()))
parseInt64(fetchRequiredValueKey(fetchRequiredValueKey(ingestionProgress, "cutover_time"), "wall_time").String()))

partitionProgressIter, err := fetchValueKey(ingestionProgress, "partition_progress").ObjectIter()
partitionProgressIter, err := fetchRequiredValueKey(ingestionProgress, "partition_progress").ObjectIter()
require.NoError(t, err)
for partitionProgressIter.Next() {
require.Less(t, cutoverTime.UnixNano(), parseInt64(fetchValueKey(fetchValueKey(
require.Less(t, cutoverTime.UnixNano(), parseInt64(fetchRequiredValueKey(fetchRequiredValueKey(
partitionProgressIter.Value(), "ingested_timestamp"), "wall_time").String()))
}

require.Equal(t, strconv.Itoa(int(streampb.StreamReplicationStatus_STREAM_INACTIVE)),
fetchValueKey(fetchValueKey(statsJSON, "stream_replication_status"), "stream_status").String())
fetchRequiredValueKey(fetchRequiredValueKey(statsJSON, "producer_status"), "stream_status").String())
}

// TestTenantStreaming tests that tenants can stream changes end-to-end.
func TestTenantStreaming(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 83697)

skip.UnderRace(t, "slow under race")

Expand Down Expand Up @@ -120,6 +120,7 @@ SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s';
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '500ms';
`,
";")...)

Expand All @@ -129,14 +130,19 @@ SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s';
// is required. Tracked with #76378.
// TODO(ajstorm): This may be the right course of action here as the
// replication is now being run inside a tenant.
base.TestServerArgs{DisableDefaultTestTenant: true},
base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
},
roachpb.MakeTenantID(20))
defer cleanupDest()
// destSQL refers to the system tenant as that's the one that's running the
// job.
destSQL := hDest.SysDB
destSQL.ExecMultiple(t, strings.Split(`
SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '2s';
SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '100ms';
SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval = '500ms';
SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval = '100ms';
SET CLUSTER SETTING streaming.partition_progress_frequency = '100ms';
Expand Down

0 comments on commit 5ba670c

Please sign in to comment.