diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts index 21ea72cbbb00d..625776db3250d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts @@ -11,7 +11,12 @@ import sinon from 'sinon'; import { take, tap, bufferCount, skip, map } from 'rxjs/operators'; import { ConcreteTaskInstance, TaskStatus } from '../task'; -import { asTaskRunEvent, asTaskPollingCycleEvent, TaskTiming } from '../task_events'; +import { + asTaskRunEvent, + asTaskPollingCycleEvent, + TaskTiming, + asTaskManagerStatEvent, +} from '../task_events'; import { asOk } from '../lib/result_type'; import { TaskLifecycleEvent } from '../polling_lifecycle'; import { TaskRunResult } from '../task_running'; @@ -530,6 +535,7 @@ describe('Task Run Statistics', () => { events$.next( asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing })) ); + events$.next(asTaskManagerStatEvent('pollingDelay', asOk(0))); events$.next( asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed, timing })) ); diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index b881759d9103e..82fe0ec813435 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { combineLatest, merge, Observable, of } from 'rxjs'; +import { combineLatest, Observable } from 'rxjs'; import { filter, startWith, map } from 'rxjs/operators'; import { JsonObject } from 'src/plugins/kibana_utils/common'; import { isNumber, mapValues } from 'lodash'; @@ -160,19 +160,12 @@ export function createTaskRunAggregator( }) ), // get DateTime of latest polling delay refresh - merge( - /** - * as `combineLatest` hangs until it has its first value and we're not likely to reconfigure the delay in normal deployments, we needed some initial value. - I've used _now_ (`new Date().toISOString()`) as it made the most sense (it would be the time Kibana started), but it _could_ be confusing in the future. - */ - of(new Date().toISOString()), - taskPollingLifecycle.events.pipe( - filter( - (taskEvent: TaskLifecycleEvent) => - isTaskManagerStatEvent(taskEvent) && taskEvent.id === 'pollingDelay' - ), - map(() => new Date().toISOString()) - ) + taskPollingLifecycle.events.pipe( + filter( + (taskEvent: TaskLifecycleEvent) => + isTaskManagerStatEvent(taskEvent) && taskEvent.id === 'pollingDelay' + ), + map(() => new Date().toISOString()) ), ]).pipe( map(([{ polling }, pollingDelay]) => ({ diff --git a/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.test.ts b/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.test.ts index 9f0eeedf05884..6b57f3470aecc 100644 --- a/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.test.ts +++ b/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.test.ts @@ -20,9 +20,9 @@ describe('delayOnClaimConflicts', () => { test( 'initializes with a delay of 0', - fakeSchedulers(async (advance) => { + fakeSchedulers(async () => { const pollInterval = 100; - const maxWorkers = 100; + const maxWorkers = 10; const taskLifecycleEvents$ = new Subject(); const delays = delayOnClaimConflicts( of(maxWorkers), @@ -40,9 +40,9 @@ describe('delayOnClaimConflicts', () => { test( 'emits a random delay whenever p50 of claim clashes exceed 80% of available max_workers', - fakeSchedulers(async (advance) => { + fakeSchedulers(async () => { const pollInterval = 100; - const maxWorkers = 100; + const maxWorkers = 10; const taskLifecycleEvents$ = new Subject(); const delays$ = delayOnClaimConflicts( @@ -61,7 +61,7 @@ describe('delayOnClaimConflicts', () => { result: FillPoolResult.PoolFilled, stats: { tasksUpdated: 0, - tasksConflicted: 80, + tasksConflicted: 8, tasksClaimed: 0, }, docs: [], @@ -80,9 +80,9 @@ describe('delayOnClaimConflicts', () => { test( 'doesnt emit a new delay when conflicts have reduced', - fakeSchedulers(async (advance) => { + fakeSchedulers(async () => { const pollInterval = 100; - const maxWorkers = 100; + const maxWorkers = 10; const taskLifecycleEvents$ = new Subject(); const handler = jest.fn(); @@ -104,7 +104,7 @@ describe('delayOnClaimConflicts', () => { result: FillPoolResult.PoolFilled, stats: { tasksUpdated: 0, - tasksConflicted: 80, + tasksConflicted: 8, tasksClaimed: 0, }, docs: [], @@ -124,7 +124,7 @@ describe('delayOnClaimConflicts', () => { result: FillPoolResult.PoolFilled, stats: { tasksUpdated: 0, - tasksConflicted: 70, + tasksConflicted: 7, tasksClaimed: 0, }, docs: [], @@ -135,14 +135,14 @@ describe('delayOnClaimConflicts', () => { await sleep(0); expect(handler.mock.calls.length).toEqual(2); - // shift average back up to threshold (70 + 90) / 2 = 80 + // shift average back up to threshold (7 + 9) / 2 = 80% of maxWorkers taskLifecycleEvents$.next( asTaskPollingCycleEvent( asOk({ result: FillPoolResult.PoolFilled, stats: { tasksUpdated: 0, - tasksConflicted: 90, + tasksConflicted: 9, tasksClaimed: 0, }, docs: [], diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.ts b/x-pack/plugins/task_manager/server/polling/task_poller.ts index fac0137f38ba5..076dc8306cd91 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.ts @@ -84,8 +84,9 @@ export function createTaskPoller({ }) ), ]).pipe( - // pollDelay can only shift `timer` at the scale of `period`, so we round - // the delay to modulo the interval period + // We don't have control over `pollDelay` in the poller, and a change to `delayOnClaimConflicts` could accidentally cause us to pause Task Manager + // polling for a far longer duration that we intended. + // Since the goal is to shift it within the range of `period`, we use modulo as a safe guard to ensure this doesn't happen. switchMap(([period, pollDelay]) => timer(period + (pollDelay % period), period)), mapTo(none) ) diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 1133d1c269ca1..d698686a21664 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -129,10 +129,7 @@ export class TaskPollingLifecycle { this.events$, config.version_conflict_threshold, config.monitored_stats_running_average_window - ); - pollIntervalDelay$.subscribe((delay) => { - emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay))); - }); + ).pipe(tap((delay) => emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay))))); // the task poller that polls for work on fixed intervals and on demand const poller$: Observable<