Skip to content

Commit

Permalink
Skipping marking task as running if claim strategy is mget
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Aug 28, 2024
1 parent e9f3b6d commit 1a733dd
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
allowReadingInvalidState: this.config.allow_reading_invalid_state,
strategy: this.config.claim_strategy,
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING,
} from './task_runner';
import { schema } from '@kbn/config-schema';
import { CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY } from '../config';

const baseDelay = 5 * 60 * 1000;
const executionContext = executionContextServiceMock.createSetupContract();
Expand Down Expand Up @@ -768,6 +769,36 @@ describe('TaskManagerRunner', () => {
expect(instance.enabled).not.toBeDefined();
});

test('skips marking task as running for mget claim strategy', async () => {
const { runner, store } = await pendingStageSetup({
instance: {
schedule: {
interval: '10m',
},
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
strategy: CLAIM_STRATEGY_MGET,
});
const result = await runner.markTaskAsRunning();

expect(result).toBe(true);
expect(apm.startTransaction).not.toHaveBeenCalled();
expect(mockApmTrans.end).not.toHaveBeenCalled();

expect(runner.id).toEqual('foo');
expect(runner.taskType).toEqual('bar');
expect(runner.toString()).toEqual('bar "foo"');

expect(store.update).not.toHaveBeenCalled();
});

describe('TaskEvents', () => {
test('emits TaskEvent when a task is marked as running', async () => {
const id = _.random(1, 20).toString();
Expand Down Expand Up @@ -2259,6 +2290,7 @@ describe('TaskManagerRunner', () => {
definitions?: TaskDefinitionRegistry;
onTaskEvent?: jest.Mock<(event: TaskEvent<unknown, unknown>) => void>;
allowReadingInvalidState?: boolean;
strategy?: string;
}

function withAnyTiming(taskRun: TaskRun) {
Expand Down Expand Up @@ -2335,6 +2367,7 @@ describe('TaskManagerRunner', () => {
warn_threshold: 5000,
},
allowReadingInvalidState: opts.allowReadingInvalidState || false,
strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY,
});

if (stage === TaskRunningStage.READY_TO_RUN) {
Expand Down
13 changes: 12 additions & 1 deletion x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import {
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isUnrecoverableError } from './errors';
import type { EventLoopDelayConfig } from '../config';
import { CLAIM_STRATEGY_MGET, type EventLoopDelayConfig } from '../config';
import { TaskValidator } from '../task_validator';
import { getRetryAt, getRetryDate, getTimeout } from '../lib/get_retry_at';

Expand Down Expand Up @@ -109,6 +109,7 @@ type Opts = {
usageCounter?: UsageCounter;
eventLoopDelayConfig: EventLoopDelayConfig;
allowReadingInvalidState: boolean;
strategy: string;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;

export enum TaskRunResult {
Expand Down Expand Up @@ -160,6 +161,7 @@ export class TaskManagerRunner implements TaskRunner {
private usageCounter?: UsageCounter;
private eventLoopDelayConfig: EventLoopDelayConfig;
private readonly taskValidator: TaskValidator;
private readonly claimStrategy: string;

/**
* Creates an instance of TaskManagerRunner.
Expand All @@ -184,6 +186,7 @@ export class TaskManagerRunner implements TaskRunner {
usageCounter,
eventLoopDelayConfig,
allowReadingInvalidState,
strategy,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
this.definitions = definitions;
Expand All @@ -202,6 +205,7 @@ export class TaskManagerRunner implements TaskRunner {
definitions: this.definitions,
allowReadingInvalidState,
});
this.claimStrategy = strategy;
}

/**
Expand Down Expand Up @@ -435,6 +439,13 @@ export class TaskManagerRunner implements TaskRunner {
);
}

// mget claim strategy sets the task to `running` during the claim cycle
// so this update to mark the task as running is unnecessary
if (this.claimStrategy === CLAIM_STRATEGY_MGET) {
this.instance = asReadyToRun(this.instance.task as ConcreteTaskInstanceWithStartedAt);
return true;
}

const apmTrans = apm.startTransaction(
TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING,
TASK_MANAGER_TRANSACTION_TYPE
Expand Down

0 comments on commit 1a733dd

Please sign in to comment.