From 55e1a174e2065e43255abfa59bdd73093e645e64 Mon Sep 17 00:00:00 2001 From: Ying Date: Fri, 6 Sep 2024 15:15:33 -0400 Subject: [PATCH 1/3] Reapply "[Response Ops][Task Manager] Setting task status directly to `running` in `mget` claim strategy (#191669)" This reverts commit 689f227606dd6e96d9e468aa0b1721a43f364719. --- .../server/lib/get_retry_at.test.ts | 89 ++++++++ .../task_manager/server/lib/get_retry_at.ts | 79 +++++++ .../task_manager/server/polling_lifecycle.ts | 1 + .../task_claimers/strategy_mget.test.ts | 200 ++++++++++++------ .../server/task_claimers/strategy_mget.ts | 13 +- .../server/task_running/task_runner.test.ts | 54 +++-- .../server/task_running/task_runner.ts | 82 ++----- .../task_manager/task_management.ts | 15 -- 8 files changed, 367 insertions(+), 166 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/lib/get_retry_at.test.ts create mode 100644 x-pack/plugins/task_manager/server/lib/get_retry_at.ts diff --git a/x-pack/plugins/task_manager/server/lib/get_retry_at.test.ts b/x-pack/plugins/task_manager/server/lib/get_retry_at.test.ts new file mode 100644 index 0000000000000..b777a7d7c81ff --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/get_retry_at.test.ts @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import sinon from 'sinon'; +import { calculateDelayBasedOnAttempts, getRetryDate } from './get_retry_at'; +import { createRetryableError } from '../task_running'; + +let fakeTimer: sinon.SinonFakeTimers; + +describe('calculateDelayBasedOnAttempts', () => { + it('returns 30s on the first attempt', () => { + expect(calculateDelayBasedOnAttempts(1)).toBe(30000); + }); + + it('returns delay with jitter', () => { + const delay = calculateDelayBasedOnAttempts(5); + // with jitter should be random between 0 and 40 min (inclusive) + expect(delay).toBeGreaterThanOrEqual(0); + expect(delay).toBeLessThanOrEqual(2400000); + }); + + it('returns delay capped at 1 hour', () => { + const delay = calculateDelayBasedOnAttempts(10); + // with jitter should be random between 0 and 1 hr (inclusive) + expect(delay).toBeGreaterThanOrEqual(0); + expect(delay).toBeLessThanOrEqual(60 * 60 * 1000); + }); +}); + +describe('getRetryDate', () => { + beforeAll(() => { + fakeTimer = sinon.useFakeTimers(new Date('2021-01-01T12:00:00.000Z')); + }); + + afterAll(() => fakeTimer.restore()); + + it('returns retry date based on number of attempts if error is not retryable', () => { + expect(getRetryDate({ error: new Error('foo'), attempts: 1 })).toEqual( + new Date('2021-01-01T12:00:30.000Z') + ); + }); + + it('returns retry date based on number of attempts and add duration if error is not retryable', () => { + expect(getRetryDate({ error: new Error('foo'), attempts: 1, addDuration: '5m' })).toEqual( + new Date('2021-01-01T12:05:30.000Z') + ); + }); + + it('returns retry date for retryable error with retry date', () => { + expect( + getRetryDate({ + error: createRetryableError(new Error('foo'), new Date('2021-02-01T12:00:00.000Z')), + attempts: 1, + }) + ).toEqual(new Date('2021-02-01T12:00:00.000Z')); + }); + + it('returns retry date based on number of attempts for retryable error with retry=true', () => { + expect( + getRetryDate({ + error: createRetryableError(new Error('foo'), true), + attempts: 1, + }) + ).toEqual(new Date('2021-01-01T12:00:30.000Z')); + }); + + it('returns retry date based on number of attempts and add duration for retryable error with retry=true', () => { + expect( + getRetryDate({ + error: createRetryableError(new Error('foo'), true), + attempts: 1, + addDuration: '5m', + }) + ).toEqual(new Date('2021-01-01T12:05:30.000Z')); + }); + + it('returns undefined for retryable error with retry=false', () => { + expect( + getRetryDate({ + error: createRetryableError(new Error('foo'), false), + attempts: 1, + }) + ).toBeUndefined(); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/get_retry_at.ts b/x-pack/plugins/task_manager/server/lib/get_retry_at.ts new file mode 100644 index 0000000000000..278ba18642d06 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/get_retry_at.ts @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { random } from 'lodash'; +import { ConcreteTaskInstance, DEFAULT_TIMEOUT, TaskDefinition } from '../task'; +import { isRetryableError } from '../task_running'; +import { intervalFromDate, maxIntervalFromDate } from './intervals'; + +export function getRetryAt( + task: ConcreteTaskInstance, + taskDefinition: TaskDefinition | undefined +): Date | undefined { + const taskTimeout = getTimeout(task, taskDefinition); + if (task.schedule) { + return maxIntervalFromDate(new Date(), task.schedule.interval, taskTimeout); + } + + return getRetryDate({ + attempts: task.attempts + 1, + // Fake an error. This allows retry logic when tasks keep timing out + // and lets us set a proper "retryAt" value each time. + error: new Error('Task timeout'), + addDuration: taskTimeout, + }); +} + +export function getRetryDate({ + error, + attempts, + addDuration, +}: { + error: Error; + attempts: number; + addDuration?: string; +}): Date | undefined { + const retry: boolean | Date = isRetryableError(error) ?? true; + + let result; + if (retry instanceof Date) { + result = retry; + } else if (retry === true) { + result = new Date(Date.now() + calculateDelayBasedOnAttempts(attempts)); + } + + // Add a duration to the result + if (addDuration && result) { + result = intervalFromDate(result, addDuration)!; + } + return result; +} + +export function calculateDelayBasedOnAttempts(attempts: number) { + // Return 30s for the first retry attempt + if (attempts === 1) { + return 30 * 1000; + } else { + const defaultBackoffPerFailure = 5 * 60 * 1000; + const maxDelay = 60 * 60 * 1000; + // For each remaining attempt return an exponential delay with jitter that is capped at 1 hour. + // We adjust the attempts by 2 to ensure that delay starts at 5m for the second retry attempt + // and increases exponentially from there. + return random(Math.min(maxDelay, defaultBackoffPerFailure * Math.pow(2, attempts - 2))); + } +} + +export function getTimeout( + task: ConcreteTaskInstance, + taskDefinition: TaskDefinition | undefined +): string { + if (task.schedule) { + return taskDefinition?.timeout ?? DEFAULT_TIMEOUT; + } + + return task.timeoutOverride ? task.timeoutOverride : taskDefinition?.timeout ?? DEFAULT_TIMEOUT; +} diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index a6cc78bee0785..b8d41391f1411 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -222,6 +222,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter ({ ], })); +let fakeTimer: sinon.SinonFakeTimers; const taskManagerLogger = mockLogger(); beforeEach(() => jest.clearAllMocks()); @@ -110,6 +112,12 @@ const taskPartitioner = new TaskPartitioner({ // needs more tests in the similar to the `strategy_default.test.ts` test suite describe('TaskClaiming', () => { + beforeAll(() => { + fakeTimer = sinon.useFakeTimers(); + }); + + afterAll(() => fakeTimer.restore()); + beforeEach(() => { jest.clearAllMocks(); jest @@ -399,21 +407,27 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[0], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[0].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[1], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[1].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -492,9 +506,11 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -599,9 +615,11 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -699,9 +717,11 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -847,15 +867,19 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[1], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[1].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -933,15 +957,19 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[1], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[1].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -1019,15 +1047,19 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[1], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[1].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -1118,27 +1150,35 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[0], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[1].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[1], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[1].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[4], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[1].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -1236,27 +1276,35 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[0], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[0].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[1], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[3], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[3].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -1331,27 +1379,35 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[0], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[0].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[1], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[3], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[3].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -1442,27 +1498,35 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[0], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[0].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[1], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[1].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[3], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[3].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } @@ -1535,27 +1599,35 @@ describe('TaskClaiming', () => { [ { ...fetchedTasks[0], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[0].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[1], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[1].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[2], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[2].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, { ...fetchedTasks[3], + attempts: 1, ownerId: 'test-test', - retryAt: fetchedTasks[3].runAt, - status: 'claiming', + retryAt: new Date('1970-01-01T00:05:30.000Z'), + status: 'running', + startedAt: new Date('1970-01-01T00:00:00.000Z'), }, ], { validate: false, excludeLargeFields: true } diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index a1595643f1743..c0193917f0889 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -18,6 +18,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import apm, { Logger } from 'elastic-apm-node'; import { Subject, Observable } from 'rxjs'; +import { omit } from 'lodash'; import { TaskTypeDictionary } from '../task_type_dictionary'; import { TaskClaimerOpts, @@ -46,6 +47,7 @@ import { TaskStore, SearchOpts } from '../task_store'; import { isOk, asOk } from '../lib/result_type'; import { selectTasksByCapacity } from './lib/task_selector_by_capacity'; import { TaskPartitioner } from '../lib/task_partitioner'; +import { getRetryAt } from '../lib/get_retry_at'; interface OwnershipClaimingOpts { claimOwnershipUntil: Date; @@ -187,16 +189,21 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise { expect(instance.enabled).not.toBeDefined(); }); + test('skips marking task as running for mget claim strategy', async () => { + const { runner, store } = await pendingStageSetup({ + instance: { + schedule: { + interval: '10m', + }, + }, + definitions: { + bar: { + title: 'Bar!', + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + strategy: CLAIM_STRATEGY_MGET, + }); + const result = await runner.markTaskAsRunning(); + + expect(result).toBe(true); + expect(apm.startTransaction).not.toHaveBeenCalled(); + expect(mockApmTrans.end).not.toHaveBeenCalled(); + + expect(runner.id).toEqual('foo'); + expect(runner.taskType).toEqual('bar'); + expect(runner.toString()).toEqual('bar "foo"'); + + expect(store.update).not.toHaveBeenCalled(); + }); + describe('TaskEvents', () => { test('emits TaskEvent when a task is marked as running', async () => { const id = _.random(1, 20).toString(); @@ -2344,26 +2374,6 @@ describe('TaskManagerRunner', () => { `Error encountered when running onTaskRemoved() hook for testbar2 "foo": Fail` ); }); - - describe('calculateDelay', () => { - it('returns 30s on the first attempt', () => { - expect(calculateDelay(1)).toBe(30000); - }); - - it('returns delay with jitter', () => { - const delay = calculateDelay(5); - // with jitter should be random between 0 and 40 min (inclusive) - expect(delay).toBeGreaterThanOrEqual(0); - expect(delay).toBeLessThanOrEqual(2400000); - }); - - it('returns delay capped at 1 hour', () => { - const delay = calculateDelay(10); - // with jitter should be random between 0 and 1 hr (inclusive) - expect(delay).toBeGreaterThanOrEqual(0); - expect(delay).toBeLessThanOrEqual(60 * 60 * 1000); - }); - }); }); interface TestOpts { @@ -2371,6 +2381,7 @@ describe('TaskManagerRunner', () => { definitions?: TaskDefinitionRegistry; onTaskEvent?: jest.Mock<(event: TaskEvent) => void>; allowReadingInvalidState?: boolean; + strategy?: string; } function withAnyTiming(taskRun: TaskRun) { @@ -2447,6 +2458,7 @@ describe('TaskManagerRunner', () => { warn_threshold: 5000, }, allowReadingInvalidState: opts.allowReadingInvalidState || false, + strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY, }); if (stage === TaskRunningStage.READY_TO_RUN) { diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index bfcabed9f6e45..24bad009c825a 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -14,7 +14,7 @@ import apm from 'elastic-apm-node'; import { v4 as uuidv4 } from 'uuid'; import { withSpan } from '@kbn/apm-utils'; -import { defaults, flow, identity, omit, random } from 'lodash'; +import { defaults, flow, identity, omit } from 'lodash'; import { ExecutionContextStart, Logger, SavedObjectsErrorHelpers } from '@kbn/core/server'; import { UsageCounter } from '@kbn/usage-collection-plugin/server'; import { Middleware } from '../lib/middleware'; @@ -40,7 +40,7 @@ import { TaskTiming, TaskManagerStat, } from '../task_events'; -import { intervalFromDate, maxIntervalFromDate } from '../lib/intervals'; +import { intervalFromDate } from '../lib/intervals'; import { CancelFunction, CancellableTask, @@ -51,12 +51,12 @@ import { SuccessfulRunResult, TaskDefinition, TaskStatus, - DEFAULT_TIMEOUT, } from '../task'; import { TaskTypeDictionary } from '../task_type_dictionary'; -import { isRetryableError, isUnrecoverableError } from './errors'; -import type { EventLoopDelayConfig } from '../config'; +import { isUnrecoverableError } from './errors'; +import { CLAIM_STRATEGY_MGET, type EventLoopDelayConfig } from '../config'; import { TaskValidator } from '../task_validator'; +import { getRetryAt, getRetryDate, getTimeout } from '../lib/get_retry_at'; export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} }; @@ -109,6 +109,7 @@ type Opts = { usageCounter?: UsageCounter; eventLoopDelayConfig: EventLoopDelayConfig; allowReadingInvalidState: boolean; + strategy: string; } & Pick; export enum TaskRunResult { @@ -160,6 +161,7 @@ export class TaskManagerRunner implements TaskRunner { private usageCounter?: UsageCounter; private eventLoopDelayConfig: EventLoopDelayConfig; private readonly taskValidator: TaskValidator; + private readonly claimStrategy: string; /** * Creates an instance of TaskManagerRunner. @@ -184,6 +186,7 @@ export class TaskManagerRunner implements TaskRunner { usageCounter, eventLoopDelayConfig, allowReadingInvalidState, + strategy, }: Opts) { this.instance = asPending(sanitizeInstance(instance)); this.definitions = definitions; @@ -202,6 +205,7 @@ export class TaskManagerRunner implements TaskRunner { definitions: this.definitions, allowReadingInvalidState, }); + this.claimStrategy = strategy; } /** @@ -266,14 +270,7 @@ export class TaskManagerRunner implements TaskRunner { * defined by the task type unless this is an ad-hoc task that specifies an override */ public get timeout() { - if (this.instance.task.schedule) { - // recurring tasks should use timeout in task type - return this.definition?.timeout ?? DEFAULT_TIMEOUT; - } - - return this.instance.task.timeoutOverride - ? this.instance.task.timeoutOverride - : this.definition?.timeout ?? DEFAULT_TIMEOUT; + return getTimeout(this.instance.task, this.definition); } /** @@ -442,6 +439,13 @@ export class TaskManagerRunner implements TaskRunner { ); } + // mget claim strategy sets the task to `running` during the claim cycle + // so this update to mark the task as running is unnecessary + if (this.claimStrategy === CLAIM_STRATEGY_MGET) { + this.instance = asReadyToRun(this.instance.task as ConcreteTaskInstanceWithStartedAt); + return true; + } + const apmTrans = apm.startTransaction( TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING, TASK_MANAGER_TRANSACTION_TYPE @@ -475,16 +479,7 @@ export class TaskManagerRunner implements TaskRunner { status: TaskStatus.Running, startedAt: now, attempts, - retryAt: - (this.instance.task.schedule - ? maxIntervalFromDate(now, this.instance.task.schedule.interval, this.timeout) - : this.getRetryDelay({ - attempts, - // Fake an error. This allows retry logic when tasks keep timing out - // and lets us set a proper "retryAt" value each time. - error: new Error('Task timeout'), - addDuration: this.timeout, - })) ?? null, + retryAt: getRetryAt(taskInstance, this.definition) ?? null, // This is a safe conversion as we're setting the startAt above }, { validate: false } @@ -595,7 +590,7 @@ export class TaskManagerRunner implements TaskRunner { ? { schedule } : // when result.error is truthy, then we're retrying because it failed { - runAt: this.getRetryDelay({ + runAt: getRetryDate({ attempts, error, }), @@ -800,31 +795,6 @@ export class TaskManagerRunner implements TaskRunner { return result; } - private getRetryDelay({ - error, - attempts, - addDuration, - }: { - error: Error; - attempts: number; - addDuration?: string; - }): Date | undefined { - const retry: boolean | Date = isRetryableError(error) ?? true; - - let result; - if (retry instanceof Date) { - result = retry; - } else if (retry === true) { - result = new Date(Date.now() + calculateDelay(attempts)); - } - - // Add a duration to the result - if (addDuration && result) { - result = intervalFromDate(result, addDuration)!; - } - return result; - } - private getMaxAttempts() { return this.definition?.maxAttempts ?? this.defaultMaxAttempts; } @@ -883,20 +853,6 @@ export function asRan(task: InstanceOf): RanTask }; } -export function calculateDelay(attempts: number) { - // Return 30s for the first retry attempt - if (attempts === 1) { - return 30 * 1000; - } else { - const defaultBackoffPerFailure = 5 * 60 * 1000; - const maxDelay = 60 * 60 * 1000; - // For each remaining attempt return an exponential delay with jitter that is capped at 1 hour. - // We adjust the attempts by 2 to ensure that delay starts at 5m for the second retry attempt - // and increases exponentially from there. - return random(Math.min(maxDelay, defaultBackoffPerFailure * Math.pow(2, attempts - 2))); - } -} - export function getTaskDelayInSeconds(scheduledAt: Date) { const now = new Date(); return (now.valueOf() - scheduledAt.valueOf()) / 1000; diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management.ts index c2d5e0edccf64..6323cef329ed6 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_management.ts @@ -553,21 +553,6 @@ export default function ({ getService }: FtrProviderContext) { await releaseTasksWaitingForEventToComplete('releaseSecondWaveOfTasks'); }); - it('should increment attempts when task fails on markAsRunning', async () => { - const originalTask = await scheduleTask({ - taskType: 'sampleTask', - params: { throwOnMarkAsRunning: true }, - }); - - expect(originalTask.attempts).to.eql(0); - - // Wait for task manager to attempt running the task a second time - await retry.try(async () => { - const task = await currentTask(originalTask.id); - expect(task.attempts).to.eql(2); - }); - }); - it('should return a task run error result when trying to run a non-existent task', async () => { // runSoon should fail const failedRunSoonResult = await runTaskSoon({ From 2115bc2deacbb95d3d530642752453bcbb42a920 Mon Sep 17 00:00:00 2001 From: Ying Date: Fri, 6 Sep 2024 15:17:27 -0400 Subject: [PATCH 2/3] Re-ordering tests --- .../task_manager_claimer_mget/test_suites/task_manager/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts index 83005f2d55342..d94ceaee7da42 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts @@ -9,6 +9,7 @@ import { FtrProviderContext } from '../../ftr_provider_context'; export default function ({ loadTestFile }: FtrProviderContext) { describe('task_manager with mget task claimer', function taskManagerSuite() { + loadTestFile(require.resolve('./task_partitions')); loadTestFile(require.resolve('./task_priority')); loadTestFile(require.resolve('./background_task_utilization_route')); loadTestFile(require.resolve('./metrics_route')); @@ -16,7 +17,6 @@ export default function ({ loadTestFile }: FtrProviderContext) { loadTestFile(require.resolve('./task_management')); loadTestFile(require.resolve('./task_management_scheduled_at')); loadTestFile(require.resolve('./task_management_removed_types')); - loadTestFile(require.resolve('./task_partitions')); loadTestFile(require.resolve('./migrations')); }); From 1646ae91707470c71070d87a675b67f79b7c4aa6 Mon Sep 17 00:00:00 2001 From: Ying Date: Fri, 6 Sep 2024 17:05:45 -0400 Subject: [PATCH 3/3] Changing execution id comparison --- .../plugins/task_manager/server/task_running/task_runner.ts | 4 +++- .../test_suites/task_manager/index.ts | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 24bad009c825a..002fcfec1a41e 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -227,7 +227,9 @@ export class TaskManagerRunner implements TaskRunner { * @param id */ public isSameTask(executionId: string) { - return executionId.startsWith(this.id); + const executionIdParts = executionId.split('::'); + const executionIdCompare = executionIdParts.length > 0 ? executionIdParts[0] : executionId; + return executionIdCompare === this.id; } /** diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts index d94ceaee7da42..83005f2d55342 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts @@ -9,7 +9,6 @@ import { FtrProviderContext } from '../../ftr_provider_context'; export default function ({ loadTestFile }: FtrProviderContext) { describe('task_manager with mget task claimer', function taskManagerSuite() { - loadTestFile(require.resolve('./task_partitions')); loadTestFile(require.resolve('./task_priority')); loadTestFile(require.resolve('./background_task_utilization_route')); loadTestFile(require.resolve('./metrics_route')); @@ -17,6 +16,7 @@ export default function ({ loadTestFile }: FtrProviderContext) { loadTestFile(require.resolve('./task_management')); loadTestFile(require.resolve('./task_management_scheduled_at')); loadTestFile(require.resolve('./task_management_removed_types')); + loadTestFile(require.resolve('./task_partitions')); loadTestFile(require.resolve('./migrations')); });