diff --git a/internal/workerutil/dbworker/store/helpers_test.go b/internal/workerutil/dbworker/store/helpers_test.go index 7778b9f018a1..76358cae2176 100644 --- a/internal/workerutil/dbworker/store/helpers_test.go +++ b/internal/workerutil/dbworker/store/helpers_test.go @@ -79,6 +79,34 @@ func testScanFirstRecordView(rows *sql.Rows, queryErr error) (v workerutil.Recor return nil, false, nil } +type TestRecordRetry struct { + ID int + State string + NumResets int +} + +func (v TestRecordRetry) RecordID() int { + return v.ID +} + +func testScanFirstRecordRetry(rows *sql.Rows, queryErr error) (v workerutil.Record, exists bool, err error) { + if queryErr != nil { + return nil, false, queryErr + } + defer func() { err = basestore.CloseRows(rows, err) }() + + if rows.Next() { + var record TestRecordRetry + if err := rows.Scan(&record.ID, &record.State, &record.NumResets); err != nil { + return nil, false, err + } + + return record, true, nil + } + + return nil, false, nil +} + func setupStoreTest(t *testing.T) { if testing.Short() { t.Skip() @@ -158,6 +186,26 @@ func assertDequeueRecordViewResult(t *testing.T, expectedID, expectedNewField in } } +func assertDequeueRecordRetryResult(t *testing.T, expectedID, expectedNumResets int, record interface{}, tx Store, ok bool, err error) { + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if !ok { + t.Fatalf("expected a dequeueable record") + } + defer func() { _ = tx.Done(nil) }() + + if val := record.(TestRecordRetry).ID; val != expectedID { + t.Errorf("unexpected id. want=%d have=%d", expectedID, val) + } + if val := record.(TestRecordRetry).State; val != "processing" { + t.Errorf("unexpected state. want=%s have=%s", "processing", val) + } + if val := record.(TestRecordRetry).NumResets; val != expectedNumResets { + t.Errorf("unexpected num resets. want=%d have=%d", expectedNumResets, val) + } +} + func testNow() time.Time { return time.Now().UTC().Truncate(time.Second) } diff --git a/internal/workerutil/dbworker/store/store.go b/internal/workerutil/dbworker/store/store.go index 4102b0fd3b4c..6a2f08193873 100644 --- a/internal/workerutil/dbworker/store/store.go +++ b/internal/workerutil/dbworker/store/store.go @@ -131,6 +131,14 @@ type StoreOptions struct { // be moved into the errored state rather than queued on its next reset to prevent an infinite retry // cycle of the same input. MaxNumResets int + + // RetryAfter determines whether the store dequeues jobs that have errored + // more than RetryAfter ago. + // If RetryAfter is a non-zero duration, the store dequeues records where + // - the state is 'errored' + // - the failed attempts counter hasn't reached MaxNumResets + // - the finished_at timestamp was more than RetryAfter ago + RetryAfter time.Duration } // RecordScanFn is a function that interprets row values as a particular record. This function should @@ -229,13 +237,26 @@ func (s *store) dequeue(ctx context.Context, conditions []*sqlf.Query, independe txCtx = context.Background() } - query := s.formatQuery( - selectCandidateQuery, - quote(s.options.ViewName), - makeConditionSuffix(conditions), - s.options.OrderByExpression, - quote(s.options.TableName), - ) + var query *sqlf.Query + if s.options.RetryAfter != 0 { + query = s.formatQuery( + selectCandidateRetryQuery, + quote(s.options.ViewName), + int(s.options.RetryAfter/time.Second), + s.options.MaxNumResets, + makeConditionSuffix(conditions), + s.options.OrderByExpression, + quote(s.options.TableName), + ) + } else { + query = s.formatQuery( + selectCandidateQuery, + quote(s.options.ViewName), + makeConditionSuffix(conditions), + s.options.OrderByExpression, + quote(s.options.TableName), + ) + } for { // First, we try to select an eligible record outside of a transaction. This will skip @@ -324,6 +345,34 @@ WHERE {id} IN (SELECT {id} FROM candidate) RETURNING {id} ` +const selectCandidateRetryQuery = ` +-- source: internal/workerutil/store.go:Dequeue +WITH candidate AS ( + SELECT {id} FROM %s + WHERE + ( + ({state} = 'queued' AND + ({process_after} IS NULL OR {process_after} <= NOW())) + OR + ({state} = 'errored' AND + NOW() - {finished_at} > (%s * '1 second'::interval) AND + {num_resets} < %s) + ) + %s + ORDER BY %s + FOR UPDATE SKIP LOCKED + LIMIT 1 +) +UPDATE %s +SET + {state} = 'processing', + {started_at} = NOW(), + {finished_at} = NULL, + {num_resets} = (CASE WHEN ({state} = 'errored') THEN ({num_resets} + 1) ELSE {num_resets} END) +WHERE {id} IN (SELECT {id} FROM candidate) +RETURNING {id} +` + const lockQuery = ` -- source: internal/workerutil/store.go:Dequeue SELECT 1 FROM %s diff --git a/internal/workerutil/dbworker/store/store_test.go b/internal/workerutil/dbworker/store/store_test.go index 568c88b6e846..df9eceabf9df 100644 --- a/internal/workerutil/dbworker/store/store_test.go +++ b/internal/workerutil/dbworker/store/store_test.go @@ -175,6 +175,46 @@ func TestStoreDequeueConcurrent(t *testing.T) { } } +func TestStoreDequeueRetryAfter(t *testing.T) { + setupStoreTest(t) + + if _, err := dbconn.Global.Exec(` + INSERT INTO workerutil_test (id, state, finished_at, failure_message, num_resets, uploaded_at) + VALUES + (1, 'errored', NOW() - '6 minute'::interval, 'error', 3, NOW() - '2 minutes'::interval), + (2, 'errored', NOW() - '4 minute'::interval, 'error', 0, NOW() - '3 minutes'::interval), + (3, 'errored', NOW() - '6 minute'::interval, 'error', 5, NOW() - '4 minutes'::interval), + (4, 'queued', NULL, NULL, 0, NOW() - '1 minutes'::interval) + `); err != nil { + t.Fatalf("unexpected error inserting records: %s", err) + } + + options := StoreOptions{ + TableName: defaultTestStoreOptions.TableName, + StalledMaxAge: defaultTestStoreOptions.StalledMaxAge, + + Scan: testScanFirstRecordRetry, + ColumnExpressions: []*sqlf.Query{ + sqlf.Sprintf("w.id"), + sqlf.Sprintf("w.state"), + sqlf.Sprintf("w.num_resets"), + }, + OrderByExpression: sqlf.Sprintf("w.uploaded_at"), + MaxNumResets: 5, + RetryAfter: 5 * time.Minute, + } + + store := testStore(options) + + // Dequeue errored record + record1, tx, ok, err := store.Dequeue(context.Background(), nil) + assertDequeueRecordRetryResult(t, 1, 4, record1, tx, ok, err) + + // Dequeue non-errored record + record2, tx, ok, err := store.Dequeue(context.Background(), nil) + assertDequeueRecordRetryResult(t, 4, 0, record2, tx, ok, err) +} + func TestStoreRequeue(t *testing.T) { setupStoreTest(t)