diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 259fab7f24e8..982b2b0efa54 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -45,9 +45,10 @@ import ( ) func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, stats string) { - fetchValueKey := func(j json2.JSON, key string) json2.JSON { + fetchRequiredValueKey := func(j json2.JSON, key string) json2.JSON { val, err := j.FetchValKey(key) require.NoError(t, err) + require.NotNilf(t, val, "expected key %q to in json %q", key, j) return val } @@ -60,26 +61,25 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s statsJSON, err := json2.ParseJSON(stats) require.NoError(t, err) - ingestionProgress := fetchValueKey(statsJSON, "ingestion_progress") + ingestionProgress := fetchRequiredValueKey(statsJSON, "ingestion_progress") require.Equal(t, cutoverTime.UnixNano(), - parseInt64(fetchValueKey(fetchValueKey(ingestionProgress, "cutover_time"), "wall_time").String())) + parseInt64(fetchRequiredValueKey(fetchRequiredValueKey(ingestionProgress, "cutover_time"), "wall_time").String())) - partitionProgressIter, err := fetchValueKey(ingestionProgress, "partition_progress").ObjectIter() + partitionProgressIter, err := fetchRequiredValueKey(ingestionProgress, "partition_progress").ObjectIter() require.NoError(t, err) for partitionProgressIter.Next() { - require.Less(t, cutoverTime.UnixNano(), parseInt64(fetchValueKey(fetchValueKey( + require.Less(t, cutoverTime.UnixNano(), parseInt64(fetchRequiredValueKey(fetchRequiredValueKey( partitionProgressIter.Value(), "ingested_timestamp"), "wall_time").String())) } require.Equal(t, strconv.Itoa(int(streampb.StreamReplicationStatus_STREAM_INACTIVE)), - fetchValueKey(fetchValueKey(statsJSON, "stream_replication_status"), "stream_status").String()) + fetchRequiredValueKey(fetchRequiredValueKey(statsJSON, "producer_status"), "stream_status").String()) } // TestTenantStreaming tests that tenants can stream changes end-to-end. func TestTenantStreaming(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 83697) skip.UnderRace(t, "slow under race") @@ -120,6 +120,7 @@ SET CLUSTER SETTING kv.rangefeed.enabled = true; SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'; SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s'; +SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '500ms'; `, ";")...) @@ -129,14 +130,19 @@ SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s'; // is required. Tracked with #76378. // TODO(ajstorm): This may be the right course of action here as the // replication is now being run inside a tenant. - base.TestServerArgs{DisableDefaultTestTenant: true}, + base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }, roachpb.MakeTenantID(20)) defer cleanupDest() // destSQL refers to the system tenant as that's the one that's running the // job. destSQL := hDest.SysDB destSQL.ExecMultiple(t, strings.Split(` -SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '2s'; +SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '100ms'; SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval = '500ms'; SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval = '100ms'; SET CLUSTER SETTING streaming.partition_progress_frequency = '100ms';