diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 8be5febb6203..f39a0aad1b71 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -1156,17 +1156,9 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { _ = g.Wait() }() - // Helper to read job progress - loadProgress := func() jobspb.Progress { - jobID := jobFeed.JobID() - job, err := jobRegistry.LoadJob(context.Background(), jobID) - require.NoError(t, err) - return job.Progress() - } - // Ensure initial backfill completes testutils.SucceedsSoon(t, func() error { - prog := loadProgress() + prog := loadProgress(t, jobFeed, jobRegistry) if p := prog.GetHighWater(); p != nil && !p.IsEmpty() { return nil } @@ -1210,7 +1202,7 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { } // Check if we've set a checkpoint yet - progress := loadProgress() + progress := loadProgress(t, jobFeed, jobRegistry) if p := progress.GetChangefeed(); p != nil && p.Checkpoint != nil && len(p.Checkpoint.Spans) > 0 { initialCheckpoint.Add(p.Checkpoint.Spans...) atomic.StoreInt32(&foundCheckpoint, 1) @@ -1238,7 +1230,10 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { if atomic.LoadInt32(&foundCheckpoint) != 0 { return nil } - return errors.New("waiting for checkpoint") + if err := jobFeed.FetchTerminalJobErr(); err != nil { + return err + } + return errors.Newf("waiting for checkpoint") }) require.NoError(t, jobFeed.Pause()) @@ -1339,16 +1334,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { }() jobFeed := testFeed.(cdctest.EnterpriseTestFeed) - loadProgress := func() jobspb.Progress { - jobID := jobFeed.JobID() - job, err := registry.LoadJob(context.Background(), jobID) - require.NoError(t, err) - return job.Progress() - } // Wait for non-nil checkpoint. testutils.SucceedsSoon(t, func() error { - progress := loadProgress() + progress := loadProgress(t, jobFeed, registry) if p := progress.GetChangefeed(); p != nil && p.Checkpoint != nil && len(p.Checkpoint.Spans) > 0 { return nil } @@ -1357,7 +1346,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { // Pause the job and read and verify the latest checkpoint information. require.NoError(t, jobFeed.Pause()) - progress := loadProgress() + progress := loadProgress(t, jobFeed, registry) require.NotNil(t, progress.GetChangefeed()) h := progress.GetHighWater() noHighWater := h == nil || h.IsEmpty() @@ -1385,7 +1374,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { // Wait for the high water mark to be non-zero. testutils.SucceedsSoon(t, func() error { - prog := loadProgress() + prog := loadProgress(t, jobFeed, registry) if p := prog.GetHighWater(); p != nil && !p.IsEmpty() { return nil } @@ -1393,7 +1382,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { }) // At this point, highwater mark should be set, and previous checkpoint should be gone. - progress = loadProgress() + progress = loadProgress(t, jobFeed, registry) require.NotNil(t, progress.GetChangefeed()) require.Equal(t, 0, len(progress.GetChangefeed().Checkpoint.Spans)) @@ -1569,14 +1558,8 @@ func TestAlterChangefeedWithOldCursorFromCreateChangefeed(t *testing.T) { castedFeed, ok := testFeed.(cdctest.EnterpriseTestFeed) require.True(t, ok) - loadProgress := func() jobspb.Progress { - job, err := registry.LoadJob(context.Background(), castedFeed.JobID()) - require.NoError(t, err) - return job.Progress() - } - testutils.SucceedsSoon(t, func() error { - progress := loadProgress() + progress := loadProgress(t, castedFeed, registry) if hw := progress.GetHighWater(); hw != nil && cursor.LessEq(*hw) { return nil } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index c500f3b36a89..ff726991bd32 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -688,6 +688,19 @@ func expectNotice(t *testing.T, s serverutils.TestTenantInterface, sql string, e require.Equal(t, expected, actual) } +func loadProgress( + t *testing.T, jobFeed cdctest.EnterpriseTestFeed, jobRegistry *jobs.Registry, +) jobspb.Progress { + t.Helper() + jobID := jobFeed.JobID() + job, err := jobRegistry.LoadJob(context.Background(), jobID) + require.NoError(t, err) + if job.Status().Terminal() { + t.Errorf("tried to load progress for job %v but it has reached terminal status %s with error %s", job, job.Status(), jobFeed.FetchTerminalJobErr()) + } + return job.Progress() +} + func feed( t testing.TB, f cdctest.TestFeedFactory, create string, args ...interface{}, ) cdctest.TestFeed {