Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup unused code for claiming tasks by id #144408

Merged
merged 3 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ export class TaskManagerPlugin
taskStore,
middleware: this.middleware,
ephemeralTaskLifecycle: this.ephemeralTaskLifecycle,
definitions: this.definitions,
taskManagerId: taskStore.taskManagerId,
});

Expand Down
6 changes: 2 additions & 4 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ describe('TaskPollingLifecycle', () => {
)
);

expect(
isOk(await getFirstAsPromise(claimAvailableTasks([], taskClaiming, logger)))
).toBeTruthy();
expect(isOk(await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger)))).toBeTruthy();

expect(taskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1);
});
Expand Down Expand Up @@ -266,7 +264,7 @@ describe('TaskPollingLifecycle', () => {
})
);

const err = await getFirstAsPromise(claimAvailableTasks([], taskClaiming, logger));
const err = await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger));

expect(isErr(err)).toBeTruthy();
expect((err as Err<FillPoolResult>).error).toEqual(FillPoolResult.Failed);
Expand Down
8 changes: 1 addition & 7 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,7 @@ export class TaskPollingLifecycle {
return fillPool(
// claim available tasks
() => {
return claimAvailableTasks(
tasksToClaim.splice(0, this.pool.availableWorkers),
this.taskClaiming,
this.logger
).pipe(
return claimAvailableTasks(this.taskClaiming, this.logger).pipe(
tap(
mapOk(({ timing }: ClaimOwnershipResult) => {
if (timing) {
Expand Down Expand Up @@ -313,15 +309,13 @@ export class TaskPollingLifecycle {
}

export function claimAvailableTasks(
claimTasksById: string[],
taskClaiming: TaskClaiming,
logger: Logger
): Observable<Result<ClaimOwnershipResult, FillPoolResult>> {
return new Observable((observer) => {
taskClaiming
.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: intervalFromNow('30s')!,
claimTasksById,
})
.subscribe(
(claimResult) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ describe('mark_available_tasks_as_claimed', () => {
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
});
const claimTasksById = undefined;
const defaultMaxAttempts = 1;
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
Expand All @@ -62,7 +61,6 @@ describe('mark_available_tasks_as_claimed', () => {
),
script: updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById: claimTasksById || [],
claimableTaskTypes: definitions.getAllTypes(),
skippedTaskTypes: [],
unusedTaskTypes: [],
Expand Down Expand Up @@ -140,7 +138,7 @@ if (doc['task.runAt'].size()!=0) {
script: {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
Expand All @@ -152,15 +150,6 @@ if (doc['task.runAt'].size()!=0) {
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
Expand All @@ -173,7 +162,6 @@ if (doc['task.runAt'].size()!=0) {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById: [],
claimableTaskTypes: ['sampleTask', 'otherTask'],
skippedTaskTypes: [],
unusedTaskTypes: [],
Expand All @@ -187,79 +175,6 @@ if (doc['task.runAt'].size()!=0) {
});

describe(`script`, () => {
test('it supports claiming specific tasks by id', async () => {
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};

const claimTasksById = [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
];

expect(
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById,
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
})
).toMatchObject({
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
}
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
ctx.op = "noop";
}`,
lang: 'painless',
params: {
now: 0,
fieldUpdates,
claimTasksById: [
'33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
taskMaxAttempts: {
foo: 5,
bar: 2,
},
},
});
});

test('it marks the update as a noop if the type is skipped', async () => {
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
Expand All @@ -271,7 +186,6 @@ if (doc['task.runAt'].size()!=0) {
expect(
updateFieldsAndMarkAsFailed({
fieldUpdates,
claimTasksById: [],
claimableTaskTypes: ['foo', 'bar'],
skippedTaskTypes: [],
unusedTaskTypes: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts {
fieldUpdates: {
[field: string]: string | number | Date;
};
claimTasksById: string[];
claimableTaskTypes: string[];
skippedTaskTypes: string[];
unusedTaskTypes: string[];
Expand All @@ -129,7 +128,6 @@ export interface UpdateFieldsAndMarkAsFailedOpts {

export const updateFieldsAndMarkAsFailed = ({
fieldUpdates,
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
Expand All @@ -148,13 +146,11 @@ export const updateFieldsAndMarkAsFailed = ({
return {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
${setScheduledAtAndMarkAsClaimed}
} else {
ctx._source.task.status = "failed";
}
} else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) {
${setScheduledAtAndMarkAsClaimed}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {
Expand All @@ -164,7 +160,6 @@ export const updateFieldsAndMarkAsFailed = ({
params: {
now: new Date().getTime(),
fieldUpdates,
claimTasksById,
claimableTaskTypes,
skippedTaskTypes,
unusedTaskTypes,
Expand Down
12 changes: 0 additions & 12 deletions x-pack/plugins/task_manager/server/queries/query_clauses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,3 @@ export function filterDownBy(...filter: estypes.QueryDslQueryContainer[]) {
},
};
}

export function asPinnedQuery(
ids: estypes.QueryDslPinnedQuery['ids'],
organic: estypes.QueryDslPinnedQuery['organic']
): Pick<estypes.QueryDslQueryContainer, 'pinned'> {
return {
pinned: {
ids,
organic,
},
};
}
Loading