Skip to content

Commit

Permalink
Merge #92005
Browse files Browse the repository at this point in the history
92005: jobs: deflake TestRegistryLifecycle r=ajwerner a=stevendanna

The explicit transactions in this test can hit transaction retry errors despite the test conditions all passing.

Here, we wrap the transactions we intend to commit in a retry loop using client-side retries.

It seems likely that #91563 made transaction retries more likely.

Fixes #92001

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Nov 16, 2022
2 parents 7c7622f + 5067df4 commit 522980a
Showing 1 changed file with 66 additions and 20 deletions.
86 changes: 66 additions & 20 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"
"path/filepath"
"reflect"
"regexp"
"runtime/pprof"
"sort"
"strings"
Expand Down Expand Up @@ -624,6 +625,53 @@ func TestRegistryLifecycle(t *testing.T) {
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

r, err := regexp.Compile("retry txn")
require.NoError(t, err)

executeWithRetriableTxn := func(db *gosql.DB, fn func(txn *gosql.Tx) error) error {
txn, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
_ = txn.Rollback()
}

}()

_, err = txn.Exec("SAVEPOINT cockroach_restart")
if err != nil {
return err
}

maxRetries := 10
retryCount := 0
for {
err = fn(txn)
if err == nil {
_, err = txn.Exec("RELEASE SAVEPOINT cockroach_restart")
if err == nil {
return txn.Commit()
}
}

if !r.MatchString(err.Error()) {
return err
}

_, rollbackErr := txn.Exec("ROLLBACK TO SAVEPOINT cockroach_restart")
if rollbackErr != nil {
return errors.CombineErrors(rollbackErr, err)
}

retryCount++
if retryCount > maxRetries {
return errors.Wrap(err, "retries exhausted")
}
}
}

// Rollback a CANCEL.
{
txn, err := rts.outerDB.Begin()
Expand Down Expand Up @@ -661,19 +709,18 @@ func TestRegistryLifecycle(t *testing.T) {
}
// Now pause it for reals.
{
txn, err := rts.outerDB.Begin()
err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error {
if _, err := txn.Exec("PAUSE JOB $1", job.ID()); err != nil {
return err
}
// Not committed yet, so state shouldn't have changed.
// Don't check status in txn.
rts.check(t, "")
return nil
})
if err != nil {
t.Fatal(err)
}
if _, err := txn.Exec("PAUSE JOB $1", job.ID()); err != nil {
t.Fatal(err)
}
// Not committed yet, so state shouldn't have changed.
// Don't check status in txn.
rts.check(t, "")
if err := txn.Commit(); err != nil {
t.Fatal(err)
}
rts.check(t, jobs.StatusPaused)
}
// Rollback a RESUME.
Expand All @@ -692,19 +739,18 @@ func TestRegistryLifecycle(t *testing.T) {
}
// Commit a RESUME.
{
txn, err := rts.outerDB.Begin()
err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error {
if _, err := txn.Exec("RESUME JOB $1", job.ID()); err != nil {
return err
}
// Not committed yet, so state shouldn't have changed.
// Don't check status in txn.
rts.check(t, "")
return nil
})
if err != nil {
t.Fatal(err)
}
if _, err := txn.Exec("RESUME JOB $1", job.ID()); err != nil {
t.Fatal(err)
}
// Not committed yet, so state shouldn't have changed.
// Don't check status in txn.
rts.check(t, "")
if err := txn.Commit(); err != nil {
t.Fatal(err)
}
}
rts.mu.e.ResumeStart = true
rts.check(t, jobs.StatusRunning)
Expand Down

0 comments on commit 522980a

Please sign in to comment.