Skip to content

Commit

Permalink
[Task Manager] Cleans up polling shift mechanism (#88210) (#88969)
Browse files Browse the repository at this point in the history
Cleanup work
1. Replaced naive initialisation of `last_polling_delay`
2. Changes values in `delayOnClaimConflicts` unit tests to make the values less confusing (it was easy to misunderstand the worker count for being the percentage of workers
3. Added comment explaining the usage of modulo
  • Loading branch information
gmmorris authored Jan 21, 2021
1 parent 7d0de97 commit 0598147
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import sinon from 'sinon';
import { take, tap, bufferCount, skip, map } from 'rxjs/operators';

import { ConcreteTaskInstance, TaskStatus } from '../task';
import { asTaskRunEvent, asTaskPollingCycleEvent, TaskTiming } from '../task_events';
import {
asTaskRunEvent,
asTaskPollingCycleEvent,
TaskTiming,
asTaskManagerStatEvent,
} from '../task_events';
import { asOk } from '../lib/result_type';
import { TaskLifecycleEvent } from '../polling_lifecycle';
import { TaskRunResult } from '../task_running';
Expand Down Expand Up @@ -530,6 +535,7 @@ describe('Task Run Statistics', () => {
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing }))
);
events$.next(asTaskManagerStatEvent('pollingDelay', asOk(0)));
events$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing }))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { combineLatest, merge, Observable, of } from 'rxjs';
import { combineLatest, Observable } from 'rxjs';
import { filter, startWith, map } from 'rxjs/operators';
import { JsonObject } from 'src/plugins/kibana_utils/common';
import { isNumber, mapValues } from 'lodash';
Expand Down Expand Up @@ -160,19 +160,12 @@ export function createTaskRunAggregator(
})
),
// get DateTime of latest polling delay refresh
merge(
/**
* as `combineLatest` hangs until it has its first value and we're not likely to reconfigure the delay in normal deployments, we needed some initial value.
I've used _now_ (`new Date().toISOString()`) as it made the most sense (it would be the time Kibana started), but it _could_ be confusing in the future.
*/
of(new Date().toISOString()),
taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskManagerStatEvent(taskEvent) && taskEvent.id === 'pollingDelay'
),
map(() => new Date().toISOString())
)
taskPollingLifecycle.events.pipe(
filter(
(taskEvent: TaskLifecycleEvent) =>
isTaskManagerStatEvent(taskEvent) && taskEvent.id === 'pollingDelay'
),
map(() => new Date().toISOString())
),
]).pipe(
map(([{ polling }, pollingDelay]) => ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ describe('delayOnClaimConflicts', () => {

test(
'initializes with a delay of 0',
fakeSchedulers(async (advance) => {
fakeSchedulers(async () => {
const pollInterval = 100;
const maxWorkers = 100;
const maxWorkers = 10;
const taskLifecycleEvents$ = new Subject<TaskLifecycleEvent>();
const delays = delayOnClaimConflicts(
of(maxWorkers),
Expand All @@ -40,9 +40,9 @@ describe('delayOnClaimConflicts', () => {

test(
'emits a random delay whenever p50 of claim clashes exceed 80% of available max_workers',
fakeSchedulers(async (advance) => {
fakeSchedulers(async () => {
const pollInterval = 100;
const maxWorkers = 100;
const maxWorkers = 10;
const taskLifecycleEvents$ = new Subject<TaskLifecycleEvent>();

const delays$ = delayOnClaimConflicts(
Expand All @@ -61,7 +61,7 @@ describe('delayOnClaimConflicts', () => {
result: FillPoolResult.PoolFilled,
stats: {
tasksUpdated: 0,
tasksConflicted: 80,
tasksConflicted: 8,
tasksClaimed: 0,
},
docs: [],
Expand All @@ -80,9 +80,9 @@ describe('delayOnClaimConflicts', () => {

test(
'doesnt emit a new delay when conflicts have reduced',
fakeSchedulers(async (advance) => {
fakeSchedulers(async () => {
const pollInterval = 100;
const maxWorkers = 100;
const maxWorkers = 10;
const taskLifecycleEvents$ = new Subject<TaskLifecycleEvent>();

const handler = jest.fn();
Expand All @@ -104,7 +104,7 @@ describe('delayOnClaimConflicts', () => {
result: FillPoolResult.PoolFilled,
stats: {
tasksUpdated: 0,
tasksConflicted: 80,
tasksConflicted: 8,
tasksClaimed: 0,
},
docs: [],
Expand All @@ -124,7 +124,7 @@ describe('delayOnClaimConflicts', () => {
result: FillPoolResult.PoolFilled,
stats: {
tasksUpdated: 0,
tasksConflicted: 70,
tasksConflicted: 7,
tasksClaimed: 0,
},
docs: [],
Expand All @@ -135,14 +135,14 @@ describe('delayOnClaimConflicts', () => {
await sleep(0);
expect(handler.mock.calls.length).toEqual(2);

// shift average back up to threshold (70 + 90) / 2 = 80
// shift average back up to threshold (7 + 9) / 2 = 80% of maxWorkers
taskLifecycleEvents$.next(
asTaskPollingCycleEvent(
asOk({
result: FillPoolResult.PoolFilled,
stats: {
tasksUpdated: 0,
tasksConflicted: 90,
tasksConflicted: 9,
tasksClaimed: 0,
},
docs: [],
Expand Down
5 changes: 3 additions & 2 deletions x-pack/plugins/task_manager/server/polling/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ export function createTaskPoller<T, H>({
})
),
]).pipe(
// pollDelay can only shift `timer` at the scale of `period`, so we round
// the delay to modulo the interval period
// We don't have control over `pollDelay` in the poller, and a change to `delayOnClaimConflicts` could accidentally cause us to pause Task Manager
// polling for a far longer duration that we intended.
// Since the goal is to shift it within the range of `period`, we use modulo as a safe guard to ensure this doesn't happen.
switchMap(([period, pollDelay]) => timer(period + (pollDelay % period), period)),
mapTo(none)
)
Expand Down
5 changes: 1 addition & 4 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ export class TaskPollingLifecycle {
this.events$,
config.version_conflict_threshold,
config.monitored_stats_running_average_window
);
pollIntervalDelay$.subscribe((delay) => {
emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay)));
});
).pipe(tap((delay) => emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay)))));

// the task poller that polls for work on fixed intervals and on demand
const poller$: Observable<
Expand Down

0 comments on commit 0598147

Please sign in to comment.