Skip to content

Commit

Permalink
[Task Manager] Fixes error when we claim new tasks beyond capacity (e…
Browse files Browse the repository at this point in the history
…lastic#48384)

Fixes an issue where we would try and claim new tasks even when there are no available workers
  • Loading branch information
gmmorris authored Oct 16, 2019
1 parent 5676ac0 commit 323d71e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 17 deletions.
27 changes: 26 additions & 1 deletion x-pack/legacy/plugins/task_manager/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import _ from 'lodash';
import sinon from 'sinon';
import { TaskManager } from './task_manager';
import { TaskManager, claimAvailableTasks } from './task_manager';
import { SavedObjectsClientMock } from 'src/core/server/mocks';
import { SavedObjectsSerializer, SavedObjectsSchema } from 'src/core/server';
import { mockLogger } from './test_utils';
Expand Down Expand Up @@ -66,6 +66,7 @@ describe('TaskManager', () => {
const promise = client.schedule(task);
client.start();
await promise;

expect(savedObjectsClient.create).toHaveBeenCalled();
});

Expand Down Expand Up @@ -164,4 +165,28 @@ describe('TaskManager', () => {
/Cannot add middleware after the task manager is initialized/i
);
});

describe('claimAvailableTasks', () => {
test('should claim Available Tasks when there are available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));

const availableWorkers = 1;

claimAvailableTasks(claim, availableWorkers, logger);

expect(claim).toHaveBeenCalledTimes(1);
});

test('shouldnt claim Available Tasks when there are no available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));

const availableWorkers = 0;

claimAvailableTasks(claim, availableWorkers, logger);

expect(claim).not.toHaveBeenCalled();
});
});
});
58 changes: 42 additions & 16 deletions x-pack/legacy/plugins/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ import {
import { TaskPoller } from './task_poller';
import { TaskPool } from './task_pool';
import { TaskManagerRunner } from './task_runner';
import { FetchOpts, FetchResult, TaskStore } from './task_store';
import {
FetchOpts,
FetchResult,
TaskStore,
OwnershipClaimingOpts,
ClaimOwnershipResult,
} from './task_store';

export interface TaskManagerOpts {
logger: Logger;
Expand Down Expand Up @@ -103,7 +109,17 @@ export class TaskManager {
const poller = new TaskPoller({
logger: this.logger,
pollInterval: opts.config.get('xpack.task_manager.poll_interval'),
work: (): Promise<void> => fillPool(pool.run, () => this.claimAvailableTasks(), createRunner),
work: (): Promise<void> =>
fillPool(
pool.run,
() =>
claimAvailableTasks(
this.store.claimAvailableTasks.bind(this.store),
this.pool.availableWorkers,
this.logger
),
createRunner
),
});

this.pool = pool;
Expand Down Expand Up @@ -135,20 +151,6 @@ export class TaskManager {
startPoller();
}

private async claimAvailableTasks() {
const { docs, claimedTasks } = await this.store.claimAvailableTasks({
size: this.pool.availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
});

if (docs.length !== claimedTasks) {
this.logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
);
}
return docs;
}

private async waitUntilStarted() {
if (!this.isStarted) {
await new Promise(resolve => {
Expand Down Expand Up @@ -247,3 +249,27 @@ export class TaskManager {
}
}
}

export async function claimAvailableTasks(
claim: (opts: OwnershipClaimingOpts) => Promise<ClaimOwnershipResult>,
availableWorkers: number,
logger: Logger
) {
if (availableWorkers > 0) {
const { docs, claimedTasks } = await claim({
size: availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
});

if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
);
}
return docs;
}
logger.info(
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers. If this happens often, consider adjusting the "xpack.task_manager.max_workers" configuration.`
);
return [];
}

0 comments on commit 323d71e

Please sign in to comment.