Skip to content

Commit

Permalink
Adding version to task manager update
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Sep 8, 2021
1 parent 721cf10 commit 18c8b9e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 31 deletions.
20 changes: 16 additions & 4 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,20 @@ export class TaskManagerRunner implements TaskRunner {
}`
);
}
if (!this.instance.task.taskType.startsWith(`alerting:`)) {
return asOk(EMPTY_RUN_RESULT);
}

this.logger.debug(`Running task ${this}`);

const apmTrans = apm.startTransaction(this.taskType, 'taskManager run', {
childOf: this.instance.task.traceparent,
});

if (this.instance.task.taskType.startsWith(`alerting:`)) {
this.logger.info(`alerting task instance ${JSON.stringify(this.instance.task)}`);
}
const taskVersion = this.instance.task.version!;
const modifiedContext = await this.beforeRun({
taskInstance: this.instance.task,
});
Expand All @@ -280,7 +288,7 @@ export class TaskManagerRunner implements TaskRunner {
);
const validatedResult = this.validateResult(result);
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
this.processResult(validatedResult, stopTaskTimer())
this.processResult(validatedResult, taskVersion, stopTaskTimer())
);
if (apmTrans) apmTrans.end('success');
return processedResult;
Expand All @@ -291,6 +299,7 @@ export class TaskManagerRunner implements TaskRunner {
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
this.processResult(
asErr({ error: err, state: modifiedContext.taskInstance.state }),
taskVersion,
stopTaskTimer()
)
);
Expand Down Expand Up @@ -476,7 +485,8 @@ export class TaskManagerRunner implements TaskRunner {
};

private async processResultForRecurringTask(
result: Result<SuccessfulRunResult, FailedRunResult>
result: Result<SuccessfulRunResult, FailedRunResult>,
taskVersion: string
): Promise<TaskRunResult> {
const hasTaskRunFailed = isOk(result);
const fieldUpdates: Partial<ConcreteTaskInstance> & Pick<ConcreteTaskInstance, 'status'> = flow(
Expand All @@ -493,6 +503,7 @@ export class TaskManagerRunner implements TaskRunner {
schedule: reschedule ?? schedule,
attempts,
status: TaskStatus.Idle,
version: 'WzI1NTAsMV0=', // taskVersion,
});
}
),
Expand Down Expand Up @@ -538,6 +549,7 @@ export class TaskManagerRunner implements TaskRunner {

private async processResult(
result: Result<SuccessfulRunResult, FailedRunResult>,
taskVersion: string,
taskTiming: TaskTiming
): Promise<Result<SuccessfulRunResult, FailedRunResult>> {
const { task } = this.instance;
Expand All @@ -554,7 +566,7 @@ export class TaskManagerRunner implements TaskRunner {
? TaskPersistence.Recurring
: TaskPersistence.NonRecurring,
result: await (runAt || schedule || task.schedule
? this.processResultForRecurringTask(result)
? this.processResultForRecurringTask(result, taskVersion)
: this.processResultWhenDone()),
}),
taskTiming
Expand All @@ -568,7 +580,7 @@ export class TaskManagerRunner implements TaskRunner {
asErr({
task,
persistence: task.schedule ? TaskPersistence.Recurring : TaskPersistence.NonRecurring,
result: await this.processResultForRecurringTask(result),
result: await this.processResultForRecurringTask(result, taskVersion),
error,
}),
taskTiming
Expand Down
98 changes: 71 additions & 27 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,45 +209,79 @@ export class TaskStore {
return attrsById;
}, new Map());

const doLog = docs.some((doc) => doc.taskType.startsWith(`alerting:`));
let updatedSavedObjects: Array<SavedObjectsUpdateResponse | Error>;
try {
({
saved_objects: updatedSavedObjects,
} = await this.savedObjectsRepository.bulkUpdate<SerializedConcreteTaskInstance>(
docs.map((doc) => ({
type: 'task',
id: doc.id,
options: { version: doc.version },
attributes: attributesByDocId.get(doc.id)!,
})),
{
refresh: false,
}
docs.map(
(doc) => ({
type: 'task',
id: doc.id,
version: doc.version,
attributes: attributesByDocId.get(doc.id)!,
}),
{
refresh: false,
}
)
));
} catch (e) {
this.errors$.next(e);
throw e;
}

return updatedSavedObjects.map<BulkUpdateResult>((updatedSavedObject, index) =>
isSavedObjectsUpdateResponse(updatedSavedObject)
? asOk(
savedObjectToConcreteTaskInstance({
...updatedSavedObject,
attributes: defaults(
updatedSavedObject.attributes,
attributesByDocId.get(updatedSavedObject.id)!
),
})
)
: asErr({
// The SavedObjectsRepository maintains the order of the docs
// so we can rely on the index in the `docs` to match an error
// on the same index in the `bulkUpdate` result
entity: docs[index],
error: updatedSavedObject,
return updatedSavedObjects.map<BulkUpdateResult>((updatedSavedObject, index) => {
if (doLog) {
console.log(`updatedSavedObject ${JSON.stringify(updatedSavedObject)}`);
}

// Conflict errors are not thrown as errors in saved object bulk update.
// Instead they are returned as successes and the error object is inside the
// response. So we need special handling to check for that.
if (isSavedObjectsConflictError(updatedSavedObject)) {
if (doLog) {
console.log(`update failed due to conflict!`);
}
// If we return this asError, it will fail the whole task and log an error
// I don't think that's what we want. I think we want to log the conflict but
// still return the task as successful.
return asOk(
savedObjectToConcreteTaskInstance({
...updatedSavedObject,
attributes: defaults(
updatedSavedObject.attributes,
attributesByDocId.get(updatedSavedObject.id)!
),
})
);
);
} else if (isSavedObjectsUpdateResponse(updatedSavedObject)) {
if (doLog) {
console.log(`update succeeded!`);
}
return asOk(
savedObjectToConcreteTaskInstance({
...updatedSavedObject,
attributes: defaults(
updatedSavedObject.attributes,
attributesByDocId.get(updatedSavedObject.id)!
),
})
);
} else {
if (doLog) {
console.log(`update failed with error ${(updatedSavedObject as Error).message}`);
}
return asErr({
// The SavedObjectsRepository maintains the order of the docs
// so we can rely on the index in the `docs` to match an error
// on the same index in the `bulkUpdate` result
entity: docs[index],
error: updatedSavedObject,
});
}
});
}

/**
Expand Down Expand Up @@ -480,3 +514,13 @@ function isSavedObjectsUpdateResponse(
): result is SavedObjectsUpdateResponse {
return result && typeof (result as SavedObjectsUpdateResponse).id === 'string';
}

function isSavedObjectsConflictError(
result: SavedObjectsUpdateResponse | Error
): result is SavedObjectsUpdateResponse {
return (
result &&
typeof (result as SavedObjectsUpdateResponse).error === 'object' &&
(result as SavedObjectsUpdateResponse).error?.statusCode === 409
);
}

0 comments on commit 18c8b9e

Please sign in to comment.