Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
RFC: Add RetryAfter to dbworker.StoreOptions (#13457)
Browse files Browse the repository at this point in the history
* Add RetryAfter to dbworker.StoreOptions

In campaigns we need the ability to retry jobs multiple times. (See
https://github.com/sourcegraph/sourcegraph/issues/12700#issuecomment-671798531
for additional context.)

This is what I think is the easiest-to-understand and simplest solution.

I did have another solution that involved a PreDequeue hook (that
returned the custom conditions you see here now) and boolean in the
StoreOptions to switch between AND'ing or OR'ing the custom conditions
to the selectCandidateQuery.

This felt a bit hacky. It was less code, but also easier to miss and
misudnerstand.

What do you think of this?

* Add tests for RetryAfter in dbworker.Store
  • Loading branch information
mrnugget authored Sep 1, 2020
1 parent 1e23b23 commit bdfd096
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 7 deletions.
48 changes: 48 additions & 0 deletions internal/workerutil/dbworker/store/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,34 @@ func testScanFirstRecordView(rows *sql.Rows, queryErr error) (v workerutil.Recor
return nil, false, nil
}

type TestRecordRetry struct {
ID int
State string
NumResets int
}

func (v TestRecordRetry) RecordID() int {
return v.ID
}

func testScanFirstRecordRetry(rows *sql.Rows, queryErr error) (v workerutil.Record, exists bool, err error) {
if queryErr != nil {
return nil, false, queryErr
}
defer func() { err = basestore.CloseRows(rows, err) }()

if rows.Next() {
var record TestRecordRetry
if err := rows.Scan(&record.ID, &record.State, &record.NumResets); err != nil {
return nil, false, err
}

return record, true, nil
}

return nil, false, nil
}

func setupStoreTest(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down Expand Up @@ -158,6 +186,26 @@ func assertDequeueRecordViewResult(t *testing.T, expectedID, expectedNewField in
}
}

func assertDequeueRecordRetryResult(t *testing.T, expectedID, expectedNumResets int, record interface{}, tx Store, ok bool, err error) {
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !ok {
t.Fatalf("expected a dequeueable record")
}
defer func() { _ = tx.Done(nil) }()

if val := record.(TestRecordRetry).ID; val != expectedID {
t.Errorf("unexpected id. want=%d have=%d", expectedID, val)
}
if val := record.(TestRecordRetry).State; val != "processing" {
t.Errorf("unexpected state. want=%s have=%s", "processing", val)
}
if val := record.(TestRecordRetry).NumResets; val != expectedNumResets {
t.Errorf("unexpected num resets. want=%d have=%d", expectedNumResets, val)
}
}

func testNow() time.Time {
return time.Now().UTC().Truncate(time.Second)
}
63 changes: 56 additions & 7 deletions internal/workerutil/dbworker/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ type StoreOptions struct {
// be moved into the errored state rather than queued on its next reset to prevent an infinite retry
// cycle of the same input.
MaxNumResets int

// RetryAfter determines whether the store dequeues jobs that have errored
// more than RetryAfter ago.
// If RetryAfter is a non-zero duration, the store dequeues records where
// - the state is 'errored'
// - the failed attempts counter hasn't reached MaxNumResets
// - the finished_at timestamp was more than RetryAfter ago
RetryAfter time.Duration
}

// RecordScanFn is a function that interprets row values as a particular record. This function should
Expand Down Expand Up @@ -229,13 +237,26 @@ func (s *store) dequeue(ctx context.Context, conditions []*sqlf.Query, independe
txCtx = context.Background()
}

query := s.formatQuery(
selectCandidateQuery,
quote(s.options.ViewName),
makeConditionSuffix(conditions),
s.options.OrderByExpression,
quote(s.options.TableName),
)
var query *sqlf.Query
if s.options.RetryAfter != 0 {
query = s.formatQuery(
selectCandidateRetryQuery,
quote(s.options.ViewName),
int(s.options.RetryAfter/time.Second),
s.options.MaxNumResets,
makeConditionSuffix(conditions),
s.options.OrderByExpression,
quote(s.options.TableName),
)
} else {
query = s.formatQuery(
selectCandidateQuery,
quote(s.options.ViewName),
makeConditionSuffix(conditions),
s.options.OrderByExpression,
quote(s.options.TableName),
)
}

for {
// First, we try to select an eligible record outside of a transaction. This will skip
Expand Down Expand Up @@ -324,6 +345,34 @@ WHERE {id} IN (SELECT {id} FROM candidate)
RETURNING {id}
`

const selectCandidateRetryQuery = `
-- source: internal/workerutil/store.go:Dequeue
WITH candidate AS (
SELECT {id} FROM %s
WHERE
(
({state} = 'queued' AND
({process_after} IS NULL OR {process_after} <= NOW()))
OR
({state} = 'errored' AND
NOW() - {finished_at} > (%s * '1 second'::interval) AND
{num_resets} < %s)
)
%s
ORDER BY %s
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE %s
SET
{state} = 'processing',
{started_at} = NOW(),
{finished_at} = NULL,
{num_resets} = (CASE WHEN ({state} = 'errored') THEN ({num_resets} + 1) ELSE {num_resets} END)
WHERE {id} IN (SELECT {id} FROM candidate)
RETURNING {id}
`

const lockQuery = `
-- source: internal/workerutil/store.go:Dequeue
SELECT 1 FROM %s
Expand Down
40 changes: 40 additions & 0 deletions internal/workerutil/dbworker/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,46 @@ func TestStoreDequeueConcurrent(t *testing.T) {
}
}

func TestStoreDequeueRetryAfter(t *testing.T) {
setupStoreTest(t)

if _, err := dbconn.Global.Exec(`
INSERT INTO workerutil_test (id, state, finished_at, failure_message, num_resets, uploaded_at)
VALUES
(1, 'errored', NOW() - '6 minute'::interval, 'error', 3, NOW() - '2 minutes'::interval),
(2, 'errored', NOW() - '4 minute'::interval, 'error', 0, NOW() - '3 minutes'::interval),
(3, 'errored', NOW() - '6 minute'::interval, 'error', 5, NOW() - '4 minutes'::interval),
(4, 'queued', NULL, NULL, 0, NOW() - '1 minutes'::interval)
`); err != nil {
t.Fatalf("unexpected error inserting records: %s", err)
}

options := StoreOptions{
TableName: defaultTestStoreOptions.TableName,
StalledMaxAge: defaultTestStoreOptions.StalledMaxAge,

Scan: testScanFirstRecordRetry,
ColumnExpressions: []*sqlf.Query{
sqlf.Sprintf("w.id"),
sqlf.Sprintf("w.state"),
sqlf.Sprintf("w.num_resets"),
},
OrderByExpression: sqlf.Sprintf("w.uploaded_at"),
MaxNumResets: 5,
RetryAfter: 5 * time.Minute,
}

store := testStore(options)

// Dequeue errored record
record1, tx, ok, err := store.Dequeue(context.Background(), nil)
assertDequeueRecordRetryResult(t, 1, 4, record1, tx, ok, err)

// Dequeue non-errored record
record2, tx, ok, err := store.Dequeue(context.Background(), nil)
assertDequeueRecordRetryResult(t, 4, 0, record2, tx, ok, err)
}

func TestStoreRequeue(t *testing.T) {
setupStoreTest(t)

Expand Down

0 comments on commit bdfd096

Please sign in to comment.