Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mikecote committed Nov 2, 2022
1 parent 3f62042 commit 96d8748
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 755 deletions.
1 change: 0 additions & 1 deletion x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ export class TaskManagerPlugin
taskStore,
middleware: this.middleware,
ephemeralTaskLifecycle: this.ephemeralTaskLifecycle,
definitions: this.definitions,
taskManagerId: taskStore.taskManagerId,
});

Expand Down
6 changes: 2 additions & 4 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ describe('TaskPollingLifecycle', () => {
)
);

expect(
isOk(await getFirstAsPromise(claimAvailableTasks([], taskClaiming, logger)))
).toBeTruthy();
expect(isOk(await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger)))).toBeTruthy();

expect(taskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1);
});
Expand Down Expand Up @@ -266,7 +264,7 @@ describe('TaskPollingLifecycle', () => {
})
);

const err = await getFirstAsPromise(claimAvailableTasks([], taskClaiming, logger));
const err = await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger));

expect(isErr(err)).toBeTruthy();
expect((err as Err<FillPoolResult>).error).toEqual(FillPoolResult.Failed);
Expand Down
8 changes: 1 addition & 7 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,7 @@ export class TaskPollingLifecycle {
return fillPool(
// claim available tasks
() => {
return claimAvailableTasks(
tasksToClaim.splice(0, this.pool.availableWorkers),
this.taskClaiming,
this.logger
).pipe(
return claimAvailableTasks(this.taskClaiming, this.logger).pipe(
tap(
mapOk(({ timing }: ClaimOwnershipResult) => {
if (timing) {
Expand Down Expand Up @@ -313,15 +309,13 @@ export class TaskPollingLifecycle {
}

export function claimAvailableTasks(
claimTasksById: string[],
taskClaiming: TaskClaiming,
logger: Logger
): Observable<Result<ClaimOwnershipResult, FillPoolResult>> {
return new Observable((observer) => {
taskClaiming
.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: intervalFromNow('30s')!,
claimTasksById,
})
.subscribe(
(claimResult) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ describe('mark_available_tasks_as_claimed', () => {
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
});
const claimTasksById = undefined;
const defaultMaxAttempts = 1;
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
Expand All @@ -62,7 +61,6 @@ describe('mark_available_tasks_as_claimed', () => {
),
script: updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById: claimTasksById || [],
claimableTaskTypes: definitions.getAllTypes(),
skippedTaskTypes: [],
unusedTaskTypes: [],
Expand Down Expand Up @@ -140,7 +138,7 @@ if (doc['task.runAt'].size()!=0) {
script: {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
Expand All @@ -152,15 +150,6 @@ if (doc['task.runAt'].size()!=0) {
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
Expand All @@ -173,7 +162,6 @@ if (doc['task.runAt'].size()!=0) {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById: [],
claimableTaskTypes: ['sampleTask', 'otherTask'],
skippedTaskTypes: [],
unusedTaskTypes: [],
Expand All @@ -187,79 +175,6 @@ if (doc['task.runAt'].size()!=0) {
});

describe(`script`, () => {
test('it supports claiming specific tasks by id', async () => {
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};

const claimTasksById = [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
];

expect(
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById,
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
})
).toMatchObject({
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
}`,
lang: 'painless',
params: {
now: 0,
fieldUpdates,
claimTasksById: [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
},
});
});

test('it marks the update as a noop if the type is skipped', async () => {
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
Expand All @@ -271,7 +186,6 @@ if (doc['task.runAt'].size()!=0) {
expect(
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts {
fieldUpdates: {
[field: string]: string | number | Date;
};
claimTasksById: string[];
claimableTaskTypes: string[];
skippedTaskTypes: string[];
unusedTaskTypes: string[];
Expand All @@ -129,7 +128,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts {

export const updateFieldsAndMarkAsFailed = ({
fieldUpdates,
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
Expand All @@ -148,13 +146,11 @@ export const updateFieldsAndMarkAsFailed = ({
return {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
${setScheduledAtAndMarkAsClaimed}
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
${setScheduledAtAndMarkAsClaimed}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
Expand All @@ -164,7 +160,6 @@ export const updateFieldsAndMarkAsFailed = ({
params: {
now: new Date().getTime(),
fieldUpdates,
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
Expand Down
12 changes: 0 additions & 12 deletions x-pack/plugins/task_manager/server/queries/query_clauses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,3 @@ export function filterDownBy(...filter: estypes.QueryDslQueryContainer[]) {
},
};
}

export function asPinnedQuery(
ids: estypes.QueryDslPinnedQuery['ids'],
organic: estypes.QueryDslPinnedQuery['organic']
): Pick<estypes.QueryDslQueryContainer, 'pinned'> {
return {
pinned: {
ids,
organic,
},
};
}
Loading

0 comments on commit 96d8748

Please sign in to comment.