Skip to content

Commit

Permalink
[Response Ops][Task Manager] Ensure mget claim errors are correctly…
Browse files Browse the repository at this point in the history
… reflected in task claim metrics (#191309)

Resolves #190082

## Summary

This PR ensures that any errors during the `mget` task claim process are
accurately reflected in the task manager metrics.
* Removed try/catch statements within the `mget` claim function so any
errors updating/getting the task docs get bubbled up to the polling
lifecycle code. This ensures that these errors get properly reporting
using existing mechanisms
* Reporting any errors inside the `mget` task claim process where
individual documents may fail to update even if other bulk operations
succeed.

## Verify

1. Verify that errors thrown within the `mget` claim process are
reflected in the metrics. To do this, you can throw an error in each of
the following functions used during the claim cycle:
  * `taskStore.msearch`
  * `taskStore.getDocVersions`
  * `taskStore.bulkUpdate`
  * `taskStore.bulkGet`

2. Verify that if `taskStore.bulkUpdate` or `taskStore.bulkGet` return
successfully but contain errors within the response, they are reflected
as task claim failures in the metrics.

---------

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
ymao1 and elasticmachine authored Sep 3, 2024
1 parent 4bf2f97 commit ad816b0
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 118 deletions.
125 changes: 125 additions & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { TaskCost } from './task';
import { CLAIM_STRATEGY_MGET, DEFAULT_KIBANAS_PER_PARTITION } from './config';
import { TaskPartitioner } from './lib/task_partitioner';
import { KibanaDiscoveryService } from './kibana_discovery_service';
import { TaskEventType } from './task_events';

const executionContext = executionContextServiceMock.createSetupContract();
let mockTaskClaiming = taskClaimingMock.create({});
Expand Down Expand Up @@ -435,6 +436,130 @@ describe('TaskPollingLifecycle', () => {
});
});
});

describe('pollingLifecycleEvents events', () => {
test('should emit success event when polling is successful', async () => {
clock.restore();
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() =>
of(
asOk({
docs: [],
stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 },
})
)
);
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const taskPollingLifecycle = new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
});

const emittedEvents: TaskLifecycleEvent[] = [];

taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
emittedEvents.push(event)
);

elasticsearchAndSOAvailability$.next(true);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
await retryUntil('pollingCycleEvent emitted', () => {
return !!emittedEvents.find(
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
);
});

const pollingCycleEvent = emittedEvents.find(
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
);

expect(pollingCycleEvent!.event).toEqual({
tag: 'ok',
value: {
result: 'NoTasksClaimed',
stats: {
tasksUpdated: 0,
tasksConflicted: 0,
tasksClaimed: 0,
},
},
});
});

test('should emit failure event when polling error occurs', async () => {
clock.restore();
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => {
throw new Error('booo');
});
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const taskPollingLifecycle = new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
});

const emittedEvents: TaskLifecycleEvent[] = [];

taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
emittedEvents.push(event)
);

elasticsearchAndSOAvailability$.next(true);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
await retryUntil('pollingCycleEvent emitted', () => {
return !!emittedEvents.find(
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
);
});

const pollingCycleEvent = emittedEvents.find(
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
);

expect(pollingCycleEvent!.event).toEqual({
tag: 'err',
error: new Error(`Failed to poll for work: Error: booo`),
});
});

test('should emit failure event when polling is successful but individual task errors reported', async () => {
clock.restore();
mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() =>
of(
asOk({
docs: [],
stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0, tasksErrors: 2 },
})
)
);
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
const taskPollingLifecycle = new TaskPollingLifecycle({
...taskManagerOpts,
elasticsearchAndSOAvailability$,
});

const emittedEvents: TaskLifecycleEvent[] = [];

taskPollingLifecycle.events.subscribe((event: TaskLifecycleEvent) =>
emittedEvents.push(event)
);

elasticsearchAndSOAvailability$.next(true);
expect(mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalled();
await retryUntil('pollingCycleEvent emitted', () => {
return !!emittedEvents.find(
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
);
});

const pollingCycleEvent = emittedEvents.find(
(event: TaskLifecycleEvent) => event.type === TaskEventType.TASK_POLLING_CYCLE
);

expect(pollingCycleEvent!.event).toEqual({
tag: 'err',
error: new Error(`Partially failed to poll for work: some tasks could not be claimed.`),
});
});
});
});

function getFirstAsPromise<T>(obs$: Observable<T>): Promise<T> {
Expand Down
18 changes: 16 additions & 2 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import { Subject, Observable } from 'rxjs';
import { pipe } from 'fp-ts/lib/pipeable';
import { map as mapOptional } from 'fp-ts/lib/Option';
import { map as mapOptional, none } from 'fp-ts/lib/Option';
import { tap } from 'rxjs';
import { UsageCounter } from '@kbn/usage-collection-plugin/server';
import type { Logger, ExecutionContextStart } from '@kbn/core/server';
Expand Down Expand Up @@ -312,7 +312,21 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
this.emitEvent(
map(
result,
({ timing, ...event }) => asTaskPollingCycleEvent<string>(asOk(event), timing),
({ timing, ...event }) => {
const anyTaskErrors = event.stats?.tasksErrors ?? 0;
if (anyTaskErrors > 0) {
return asTaskPollingCycleEvent<string>(
asErr(
new PollingError<string>(
'Partially failed to poll for work: some tasks could not be claimed.',
PollingErrorType.WorkError,
none
)
)
);
}
return asTaskPollingCycleEvent<string>(asOk(event), timing);
},
(event) => asTaskPollingCycleEvent<string>(asErr(event))
)
);
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/task_claimers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export interface ClaimOwnershipResult {
tasksConflicted: number;
tasksClaimed: number;
tasksLeftUnclaimed?: number;
tasksErrors?: number;
};
docs: ConcreteTaskInstance[];
timing?: TaskTiming;
Expand Down
Loading

0 comments on commit ad816b0

Please sign in to comment.