Skip to content

Commit

Permalink
[Alerting] Shift polling interval by random amount when Task Manager …
Browse files Browse the repository at this point in the history
…experiences consistent claim version conflicts (#88020)

This PR Introduces a `pollingDelay` which is applied to the polling interval whenever the average percentage of tasks experiencing a version conflict is higher than a preconfigured threshold (default to 80%).
  • Loading branch information
gmmorris authored Jan 12, 2021
1 parent 14d96da commit 5e4402c
Show file tree
Hide file tree
Showing 18 changed files with 361 additions and 40 deletions.
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM
- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state.
- `index` - **deprecated** The name of the index that the task_manager will use. This is deprecated, and will be removed starting in 8.0
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
- `version_conflict_threshold` - The threshold percentage for workers experiencing version conflicts for shifting the polling interval
- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security.
- `override_num_workers`: An object of `taskType: number` that overrides the `num_workers` for tasks
- For example: `task_manager.override_num_workers.reporting: 2` would override the number of workers occupied by tasks of type `reporting`
Expand Down Expand Up @@ -521,4 +522,4 @@ The task manager's public API is create / delete / list. Updates aren't directly

Task Manager exposes runtime statistics which enable basic observability into its inner workings and makes it possible to monitor the system from external services.

Learn More: [./MONITORING](./MONITORING.MD)
Learn More: [./MONITORING](./MONITORING.MD)
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/MONITORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g
"polling": {
/* When was the last polling cycle? */
"last_successful_poll": "2020-10-05T17:57:55.411Z",
/* When was the last time Task Manager adjusted it's polling delay? */
"last_polling_delay": "2020-10-05T17:57:55.411Z",
/* Running average of polling duration measuring the time from the scheduled polling cycle
start until all claimed tasks are marked as running */
"duration": {
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"version_conflict_threshold": 80,
}
`);
});
Expand Down Expand Up @@ -74,6 +75,7 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"version_conflict_threshold": 80,
}
`);
});
Expand Down Expand Up @@ -113,6 +115,7 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"version_conflict_threshold": 80,
}
`);
});
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;

// Monitoring Constants
// ===================
Expand Down Expand Up @@ -76,6 +77,12 @@ export const configSchema = schema.object(
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
}),
/* The threshold percenatge for workers experiencing version conflicts for shifting the polling interval. */
version_conflict_threshold: schema.number({
defaultValue: DEFAULT_VERSION_CONFLICT_THRESHOLD,
min: 50,
max: 100,
}),
/* The rate at which we emit fresh monitored stats. By default we'll use the poll_interval (+ a slight buffer) */
monitored_stats_required_freshness: schema.number({
defaultValue: (config?: unknown) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ describe('managed configuration', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_required_freshness: 4000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ describe('Configuration Statistics Aggregator', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ describe('createMonitoringStatsStream', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { combineLatest, Observable } from 'rxjs';
import { combineLatest, merge, Observable, of } from 'rxjs';
import { filter, startWith, map } from 'rxjs/operators';
import { JsonObject } from 'src/plugins/kibana_utils/common';
import { isNumber, mapValues } from 'lodash';
Expand Down Expand Up @@ -36,6 +36,7 @@ import { TaskExecutionFailureThreshold, TaskManagerConfig } from '../config';

interface FillPoolStat extends JsonObject {
last_successful_poll: string;
last_polling_delay: string;
duration: number[];
claim_conflicts: number[];
claim_mismatches: number[];
Expand All @@ -51,11 +52,13 @@ export interface TaskRunStat extends JsonObject {
drift: number[];
load: number[];
execution: ExecutionStat;
polling: FillPoolStat | Omit<FillPoolStat, 'last_successful_poll'>;
polling: Omit<FillPoolStat, 'last_successful_poll' | 'last_polling_delay'> &
Pick<Partial<FillPoolStat>, 'last_successful_poll' | 'last_polling_delay'>;
}

interface FillPoolRawStat extends JsonObject {
last_successful_poll: string;
last_polling_delay: string;
result_frequency_percent_as_number: {
[FillPoolResult.Failed]: number;
[FillPoolResult.NoAvailableWorkers]: number;
Expand Down Expand Up @@ -123,37 +126,61 @@ export function createTaskRunAggregator(
const pollingDurationQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimConflictsQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const claimMismatchesQueue = createRunningAveragedStat<number>(runningAverageWindowSize);
const taskPollingEvents$: Observable<
Pick<TaskRunStat, 'polling'>
> = taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskPollingCycleEvent(taskEvent) && isOk<ClaimAndFillPoolResult, unknown>(taskEvent.event)
const taskPollingEvents$: Observable<Pick<TaskRunStat, 'polling'>> = combineLatest([
// get latest polling stats
taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskPollingCycleEvent(taskEvent) &&
isOk<ClaimAndFillPoolResult, unknown>(taskEvent.event)
),
map((taskEvent: TaskLifecycleEvent) => {
const {
result,
stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {},
} = ((taskEvent.event as unknown) as Ok<ClaimAndFillPoolResult>).value;
const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0);
return {
polling: {
last_successful_poll: new Date().toISOString(),
// Track how long the polling cycle took from begining until all claimed tasks were marked as running
duration: duration ? pollingDurationQueue(duration) : pollingDurationQueue(),
// Track how many version conflicts occured during polling
claim_conflicts: isNumber(tasksConflicted)
? claimConflictsQueue(tasksConflicted)
: claimConflictsQueue(),
// Track how much of a mismatch there is between claimed and updated
claim_mismatches:
isNumber(tasksClaimed) && isNumber(tasksUpdated)
? claimMismatchesQueue(tasksUpdated - tasksClaimed)
: claimMismatchesQueue(),
result_frequency_percent_as_number: resultFrequencyQueue(result),
},
};
})
),
map((taskEvent: TaskLifecycleEvent) => {
const {
result,
stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {},
} = ((taskEvent.event as unknown) as Ok<ClaimAndFillPoolResult>).value;
const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0);
return {
polling: {
last_successful_poll: new Date().toISOString(),
// Track how long the polling cycle took from begining until all claimed tasks were marked as running
duration: duration ? pollingDurationQueue(duration) : pollingDurationQueue(),
// Track how many version conflicts occured during polling
claim_conflicts: isNumber(tasksConflicted)
? claimConflictsQueue(tasksConflicted)
: claimConflictsQueue(),
// Track how much of a mismatch there is between claimed and updated
claim_mismatches:
isNumber(tasksClaimed) && isNumber(tasksUpdated)
? claimMismatchesQueue(tasksUpdated - tasksClaimed)
: claimMismatchesQueue(),
result_frequency_percent_as_number: resultFrequencyQueue(result),
},
};
})
// get DateTime of latest polling delay refresh
merge(
/**
* as `combineLatest` hangs until it has its first value and we're not likely to reconfigure the delay in normal deployments, we needed some initial value.
I've used _now_ (`new Date().toISOString()`) as it made the most sense (it would be the time Kibana started), but it _could_ be confusing in the future.
*/
of(new Date().toISOString()),
taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskManagerStatEvent(taskEvent) && taskEvent.id === 'pollingDelay'
),
map(() => new Date().toISOString())
)
),
]).pipe(
map(([{ polling }, pollingDelay]) => ({
polling: {
last_polling_delay: pollingDelay,
...polling,
},
}))
);

return combineLatest([
Expand Down Expand Up @@ -234,6 +261,8 @@ export function summarizeTaskRunStat(
polling: {
// eslint-disable-next-line @typescript-eslint/naming-convention
last_successful_poll,
// eslint-disable-next-line @typescript-eslint/naming-convention
last_polling_delay,
duration: pollingDuration,
result_frequency_percent_as_number: pollingResultFrequency,
claim_conflicts: claimConflicts,
Expand All @@ -249,6 +278,7 @@ export function summarizeTaskRunStat(
value: {
polling: {
...(last_successful_poll ? { last_successful_poll } : {}),
...(last_polling_delay ? { last_polling_delay } : {}),
duration: calculateRunningAverage(pollingDuration as number[]),
claim_conflicts: calculateRunningAverage(claimConflicts as number[]),
claim_mismatches: calculateRunningAverage(claimMismatches as number[]),
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ describe('TaskManagerPlugin', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down Expand Up @@ -49,6 +50,7 @@ describe('TaskManagerPlugin', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Loading

0 comments on commit 5e4402c

Please sign in to comment.