Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
106438: changefeedccl: surface errors when job fails in alter changefeed tests r=[miretskiy] a=HonoreDB

Alter changefeed tests use SucceedsSoon around checking various things to do with a job's progress, which leads to an unhelpful hardcoded error message if the job has actually failed entirely. This commit extracts the logic into a test helper and makes it surface job errors.

Fixes: cockroachdb#102760

Co-authored-by: Aaron Zinger <[email protected]>
  • Loading branch information
craig[bot] and HonoreDB committed Jul 10, 2023
2 parents 3a52371 + 70c51c0 commit 71ff6f9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 28 deletions.
39 changes: 11 additions & 28 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,17 +1156,9 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
_ = g.Wait()
}()

// Helper to read job progress
loadProgress := func() jobspb.Progress {
jobID := jobFeed.JobID()
job, err := jobRegistry.LoadJob(context.Background(), jobID)
require.NoError(t, err)
return job.Progress()
}

// Ensure initial backfill completes
testutils.SucceedsSoon(t, func() error {
prog := loadProgress()
prog := loadProgress(t, jobFeed, jobRegistry)
if p := prog.GetHighWater(); p != nil && !p.IsEmpty() {
return nil
}
Expand Down Expand Up @@ -1210,7 +1202,7 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
}

// Check if we've set a checkpoint yet
progress := loadProgress()
progress := loadProgress(t, jobFeed, jobRegistry)
if p := progress.GetChangefeed(); p != nil && p.Checkpoint != nil && len(p.Checkpoint.Spans) > 0 {
initialCheckpoint.Add(p.Checkpoint.Spans...)
atomic.StoreInt32(&foundCheckpoint, 1)
Expand Down Expand Up @@ -1238,7 +1230,10 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
if atomic.LoadInt32(&foundCheckpoint) != 0 {
return nil
}
return errors.New("waiting for checkpoint")
if err := jobFeed.FetchTerminalJobErr(); err != nil {
return err
}
return errors.Newf("waiting for checkpoint")
})

require.NoError(t, jobFeed.Pause())
Expand Down Expand Up @@ -1339,16 +1334,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
}()

jobFeed := testFeed.(cdctest.EnterpriseTestFeed)
loadProgress := func() jobspb.Progress {
jobID := jobFeed.JobID()
job, err := registry.LoadJob(context.Background(), jobID)
require.NoError(t, err)
return job.Progress()
}

// Wait for non-nil checkpoint.
testutils.SucceedsSoon(t, func() error {
progress := loadProgress()
progress := loadProgress(t, jobFeed, registry)
if p := progress.GetChangefeed(); p != nil && p.Checkpoint != nil && len(p.Checkpoint.Spans) > 0 {
return nil
}
Expand All @@ -1357,7 +1346,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {

// Pause the job and read and verify the latest checkpoint information.
require.NoError(t, jobFeed.Pause())
progress := loadProgress()
progress := loadProgress(t, jobFeed, registry)
require.NotNil(t, progress.GetChangefeed())
h := progress.GetHighWater()
noHighWater := h == nil || h.IsEmpty()
Expand Down Expand Up @@ -1385,15 +1374,15 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {

// Wait for the high water mark to be non-zero.
testutils.SucceedsSoon(t, func() error {
prog := loadProgress()
prog := loadProgress(t, jobFeed, registry)
if p := prog.GetHighWater(); p != nil && !p.IsEmpty() {
return nil
}
return errors.New("waiting for highwater")
})

// At this point, highwater mark should be set, and previous checkpoint should be gone.
progress = loadProgress()
progress = loadProgress(t, jobFeed, registry)
require.NotNil(t, progress.GetChangefeed())
require.Equal(t, 0, len(progress.GetChangefeed().Checkpoint.Spans))

Expand Down Expand Up @@ -1569,14 +1558,8 @@ func TestAlterChangefeedWithOldCursorFromCreateChangefeed(t *testing.T) {
castedFeed, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

loadProgress := func() jobspb.Progress {
job, err := registry.LoadJob(context.Background(), castedFeed.JobID())
require.NoError(t, err)
return job.Progress()
}

testutils.SucceedsSoon(t, func() error {
progress := loadProgress()
progress := loadProgress(t, castedFeed, registry)
if hw := progress.GetHighWater(); hw != nil && cursor.LessEq(*hw) {
return nil
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,19 @@ func expectNotice(t *testing.T, s serverutils.TestTenantInterface, sql string, e
require.Equal(t, expected, actual)
}

func loadProgress(
t *testing.T, jobFeed cdctest.EnterpriseTestFeed, jobRegistry *jobs.Registry,
) jobspb.Progress {
t.Helper()
jobID := jobFeed.JobID()
job, err := jobRegistry.LoadJob(context.Background(), jobID)
require.NoError(t, err)
if job.Status().Terminal() {
t.Errorf("tried to load progress for job %v but it has reached terminal status %s with error %s", job, job.Status(), jobFeed.FetchTerminalJobErr())
}
return job.Progress()
}

func feed(
t testing.TB, f cdctest.TestFeedFactory, create string, args ...interface{},
) cdctest.TestFeed {
Expand Down

0 comments on commit 71ff6f9

Please sign in to comment.