diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md index 6cd42cda9af6a..9be3be14ea3fc 100644 --- a/x-pack/plugins/task_manager/README.md +++ b/x-pack/plugins/task_manager/README.md @@ -45,6 +45,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM - `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state. - `index` - **deprecated** The name of the index that the task_manager will use. This is deprecated, and will be removed starting in 8.0 - `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10) +- `version_conflict_threshold` - The threshold percentage for workers experiencing version conflicts for shifting the polling interval - `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security. - `override_num_workers`: An object of `taskType: number` that overrides the `num_workers` for tasks - For example: `task_manager.override_num_workers.reporting: 2` would override the number of workers occupied by tasks of type `reporting` @@ -521,4 +522,4 @@ The task manager's public API is create / delete / list. Updates aren't directly Task Manager exposes runtime statistics which enable basic observability into its inner workings and makes it possible to monitor the system from external services. -Learn More: [./MONITORING](./MONITORING.MD) \ No newline at end of file +Learn More: [./MONITORING](./MONITORING.MD) diff --git a/x-pack/plugins/task_manager/server/MONITORING.md b/x-pack/plugins/task_manager/server/MONITORING.md index 64481e81c60a4..771005f28b3f4 100644 --- a/x-pack/plugins/task_manager/server/MONITORING.md +++ b/x-pack/plugins/task_manager/server/MONITORING.md @@ -177,6 +177,8 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g "polling": { /* When was the last polling cycle? */ "last_successful_poll": "2020-10-05T17:57:55.411Z", + /* When was the last time Task Manager adjusted it's polling delay? */ + "last_polling_delay": "2020-10-05T17:57:55.411Z", /* Running average of polling duration measuring the time from the scheduled polling cycle start until all claimed tasks are marked as running */ "duration": { diff --git a/x-pack/plugins/task_manager/server/config.test.ts b/x-pack/plugins/task_manager/server/config.test.ts index d2d5ac8f22a1f..d2527d066c7b6 100644 --- a/x-pack/plugins/task_manager/server/config.test.ts +++ b/x-pack/plugins/task_manager/server/config.test.ts @@ -27,6 +27,7 @@ describe('config validation', () => { }, "poll_interval": 3000, "request_capacity": 1000, + "version_conflict_threshold": 80, } `); }); @@ -74,6 +75,7 @@ describe('config validation', () => { }, "poll_interval": 3000, "request_capacity": 1000, + "version_conflict_threshold": 80, } `); }); @@ -113,6 +115,7 @@ describe('config validation', () => { }, "poll_interval": 3000, "request_capacity": 1000, + "version_conflict_threshold": 80, } `); }); diff --git a/x-pack/plugins/task_manager/server/config.ts b/x-pack/plugins/task_manager/server/config.ts index a22c4484389ae..d5c388b08b761 100644 --- a/x-pack/plugins/task_manager/server/config.ts +++ b/x-pack/plugins/task_manager/server/config.ts @@ -10,6 +10,7 @@ export const MAX_WORKERS_LIMIT = 100; export const DEFAULT_MAX_WORKERS = 10; export const DEFAULT_POLL_INTERVAL = 3000; export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10; +export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80; // Monitoring Constants // =================== @@ -76,6 +77,12 @@ export const configSchema = schema.object( // disable the task manager rather than trying to specify it with 0 workers min: 1, }), + /* The threshold percenatge for workers experiencing version conflicts for shifting the polling interval. */ + version_conflict_threshold: schema.number({ + defaultValue: DEFAULT_VERSION_CONFLICT_THRESHOLD, + min: 50, + max: 100, + }), /* The rate at which we emit fresh monitored stats. By default we'll use the poll_interval (+ a slight buffer) */ monitored_stats_required_freshness: schema.number({ defaultValue: (config?: unknown) => diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts index 01326c73bd680..fd2b8857693ae 100644 --- a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -29,6 +29,7 @@ describe('managed configuration', () => { index: 'foo', max_attempts: 9, poll_interval: 3000, + version_conflict_threshold: 80, max_poll_inactivity_cycles: 10, monitored_aggregated_stats_refresh_rate: 60000, monitored_stats_required_freshness: 4000, diff --git a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts index 6f3dcb33d5bf5..37f97422dbe15 100644 --- a/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/configuration_statistics.test.ts @@ -17,6 +17,7 @@ describe('Configuration Statistics Aggregator', () => { index: 'foo', max_attempts: 9, poll_interval: 6000000, + version_conflict_threshold: 80, monitored_stats_required_freshness: 6000000, max_poll_inactivity_cycles: 10, request_capacity: 1000, diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts index b8502dee9a8ef..8acd32c30d65d 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.test.ts @@ -21,6 +21,7 @@ describe('createMonitoringStatsStream', () => { index: 'foo', max_attempts: 9, poll_interval: 6000000, + version_conflict_threshold: 80, monitored_stats_required_freshness: 6000000, max_poll_inactivity_cycles: 10, request_capacity: 1000, diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index 3933443296c4a..b881759d9103e 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { combineLatest, Observable } from 'rxjs'; +import { combineLatest, merge, Observable, of } from 'rxjs'; import { filter, startWith, map } from 'rxjs/operators'; import { JsonObject } from 'src/plugins/kibana_utils/common'; import { isNumber, mapValues } from 'lodash'; @@ -36,6 +36,7 @@ import { TaskExecutionFailureThreshold, TaskManagerConfig } from '../config'; interface FillPoolStat extends JsonObject { last_successful_poll: string; + last_polling_delay: string; duration: number[]; claim_conflicts: number[]; claim_mismatches: number[]; @@ -51,11 +52,13 @@ export interface TaskRunStat extends JsonObject { drift: number[]; load: number[]; execution: ExecutionStat; - polling: FillPoolStat | Omit; + polling: Omit & + Pick, 'last_successful_poll' | 'last_polling_delay'>; } interface FillPoolRawStat extends JsonObject { last_successful_poll: string; + last_polling_delay: string; result_frequency_percent_as_number: { [FillPoolResult.Failed]: number; [FillPoolResult.NoAvailableWorkers]: number; @@ -123,37 +126,61 @@ export function createTaskRunAggregator( const pollingDurationQueue = createRunningAveragedStat(runningAverageWindowSize); const claimConflictsQueue = createRunningAveragedStat(runningAverageWindowSize); const claimMismatchesQueue = createRunningAveragedStat(runningAverageWindowSize); - const taskPollingEvents$: Observable< - Pick - > = taskPollingLifecycle.events.pipe( - filter( - (taskEvent: TaskLifecycleEvent) => - isTaskPollingCycleEvent(taskEvent) && isOk(taskEvent.event) + const taskPollingEvents$: Observable> = combineLatest([ + // get latest polling stats + taskPollingLifecycle.events.pipe( + filter( + (taskEvent: TaskLifecycleEvent) => + isTaskPollingCycleEvent(taskEvent) && + isOk(taskEvent.event) + ), + map((taskEvent: TaskLifecycleEvent) => { + const { + result, + stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {}, + } = ((taskEvent.event as unknown) as Ok).value; + const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0); + return { + polling: { + last_successful_poll: new Date().toISOString(), + // Track how long the polling cycle took from begining until all claimed tasks were marked as running + duration: duration ? pollingDurationQueue(duration) : pollingDurationQueue(), + // Track how many version conflicts occured during polling + claim_conflicts: isNumber(tasksConflicted) + ? claimConflictsQueue(tasksConflicted) + : claimConflictsQueue(), + // Track how much of a mismatch there is between claimed and updated + claim_mismatches: + isNumber(tasksClaimed) && isNumber(tasksUpdated) + ? claimMismatchesQueue(tasksUpdated - tasksClaimed) + : claimMismatchesQueue(), + result_frequency_percent_as_number: resultFrequencyQueue(result), + }, + }; + }) ), - map((taskEvent: TaskLifecycleEvent) => { - const { - result, - stats: { tasksClaimed, tasksUpdated, tasksConflicted } = {}, - } = ((taskEvent.event as unknown) as Ok).value; - const duration = (taskEvent?.timing?.stop ?? 0) - (taskEvent?.timing?.start ?? 0); - return { - polling: { - last_successful_poll: new Date().toISOString(), - // Track how long the polling cycle took from begining until all claimed tasks were marked as running - duration: duration ? pollingDurationQueue(duration) : pollingDurationQueue(), - // Track how many version conflicts occured during polling - claim_conflicts: isNumber(tasksConflicted) - ? claimConflictsQueue(tasksConflicted) - : claimConflictsQueue(), - // Track how much of a mismatch there is between claimed and updated - claim_mismatches: - isNumber(tasksClaimed) && isNumber(tasksUpdated) - ? claimMismatchesQueue(tasksUpdated - tasksClaimed) - : claimMismatchesQueue(), - result_frequency_percent_as_number: resultFrequencyQueue(result), - }, - }; - }) + // get DateTime of latest polling delay refresh + merge( + /** + * as `combineLatest` hangs until it has its first value and we're not likely to reconfigure the delay in normal deployments, we needed some initial value. + I've used _now_ (`new Date().toISOString()`) as it made the most sense (it would be the time Kibana started), but it _could_ be confusing in the future. + */ + of(new Date().toISOString()), + taskPollingLifecycle.events.pipe( + filter( + (taskEvent: TaskLifecycleEvent) => + isTaskManagerStatEvent(taskEvent) && taskEvent.id === 'pollingDelay' + ), + map(() => new Date().toISOString()) + ) + ), + ]).pipe( + map(([{ polling }, pollingDelay]) => ({ + polling: { + last_polling_delay: pollingDelay, + ...polling, + }, + })) ); return combineLatest([ @@ -234,6 +261,8 @@ export function summarizeTaskRunStat( polling: { // eslint-disable-next-line @typescript-eslint/naming-convention last_successful_poll, + // eslint-disable-next-line @typescript-eslint/naming-convention + last_polling_delay, duration: pollingDuration, result_frequency_percent_as_number: pollingResultFrequency, claim_conflicts: claimConflicts, @@ -249,6 +278,7 @@ export function summarizeTaskRunStat( value: { polling: { ...(last_successful_poll ? { last_successful_poll } : {}), + ...(last_polling_delay ? { last_polling_delay } : {}), duration: calculateRunningAverage(pollingDuration as number[]), claim_conflicts: calculateRunningAverage(claimConflicts as number[]), claim_mismatches: calculateRunningAverage(claimMismatches as number[]), diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 9a1d83f6195ab..a73ba2d2958f4 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -20,6 +20,7 @@ describe('TaskManagerPlugin', () => { index: 'foo', max_attempts: 9, poll_interval: 3000, + version_conflict_threshold: 80, max_poll_inactivity_cycles: 10, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, @@ -49,6 +50,7 @@ describe('TaskManagerPlugin', () => { index: 'foo', max_attempts: 9, poll_interval: 3000, + version_conflict_threshold: 80, max_poll_inactivity_cycles: 10, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, diff --git a/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.test.ts b/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.test.ts new file mode 100644 index 0000000000000..9f0eeedf05884 --- /dev/null +++ b/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.test.ts @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import { Subject, of } from 'rxjs'; +import { fakeSchedulers } from 'rxjs-marbles/jest'; +import { sleep } from '../test_utils'; +import { asOk } from '../lib/result_type'; +import { delayOnClaimConflicts } from './delay_on_claim_conflicts'; +import { asTaskPollingCycleEvent } from '../task_events'; +import { bufferCount, take } from 'rxjs/operators'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { FillPoolResult } from '../lib/fill_pool'; + +describe('delayOnClaimConflicts', () => { + beforeEach(() => jest.useFakeTimers()); + + test( + 'initializes with a delay of 0', + fakeSchedulers(async (advance) => { + const pollInterval = 100; + const maxWorkers = 100; + const taskLifecycleEvents$ = new Subject(); + const delays = delayOnClaimConflicts( + of(maxWorkers), + of(pollInterval), + taskLifecycleEvents$, + 80, + 2 + ) + .pipe(take(1), bufferCount(1)) + .toPromise(); + + expect(await delays).toEqual([0]); + }) + ); + + test( + 'emits a random delay whenever p50 of claim clashes exceed 80% of available max_workers', + fakeSchedulers(async (advance) => { + const pollInterval = 100; + const maxWorkers = 100; + const taskLifecycleEvents$ = new Subject(); + + const delays$ = delayOnClaimConflicts( + of(maxWorkers), + of(pollInterval), + taskLifecycleEvents$, + 80, + 2 + ) + .pipe(take(2), bufferCount(2)) + .toPromise(); + + taskLifecycleEvents$.next( + asTaskPollingCycleEvent( + asOk({ + result: FillPoolResult.PoolFilled, + stats: { + tasksUpdated: 0, + tasksConflicted: 80, + tasksClaimed: 0, + }, + docs: [], + }) + ) + ); + + const [initialDelay, delayAfterClash] = await delays$; + + expect(initialDelay).toEqual(0); + // randomly delay by 25% - 75% + expect(delayAfterClash).toBeGreaterThanOrEqual(pollInterval * 0.25); + expect(delayAfterClash).toBeLessThanOrEqual(pollInterval * 0.75); + }) + ); + + test( + 'doesnt emit a new delay when conflicts have reduced', + fakeSchedulers(async (advance) => { + const pollInterval = 100; + const maxWorkers = 100; + const taskLifecycleEvents$ = new Subject(); + + const handler = jest.fn(); + + delayOnClaimConflicts( + of(maxWorkers), + of(pollInterval), + taskLifecycleEvents$, + 80, + 2 + ).subscribe(handler); + + await sleep(0); + expect(handler).toHaveBeenCalledWith(0); + + taskLifecycleEvents$.next( + asTaskPollingCycleEvent( + asOk({ + result: FillPoolResult.PoolFilled, + stats: { + tasksUpdated: 0, + tasksConflicted: 80, + tasksClaimed: 0, + }, + docs: [], + }) + ) + ); + + await sleep(0); + expect(handler.mock.calls.length).toEqual(2); + expect(handler.mock.calls[1][0]).toBeGreaterThanOrEqual(pollInterval * 0.25); + expect(handler.mock.calls[1][0]).toBeLessThanOrEqual(pollInterval * 0.75); + + // shift average below threshold + taskLifecycleEvents$.next( + asTaskPollingCycleEvent( + asOk({ + result: FillPoolResult.PoolFilled, + stats: { + tasksUpdated: 0, + tasksConflicted: 70, + tasksClaimed: 0, + }, + docs: [], + }) + ) + ); + + await sleep(0); + expect(handler.mock.calls.length).toEqual(2); + + // shift average back up to threshold (70 + 90) / 2 = 80 + taskLifecycleEvents$.next( + asTaskPollingCycleEvent( + asOk({ + result: FillPoolResult.PoolFilled, + stats: { + tasksUpdated: 0, + tasksConflicted: 90, + tasksClaimed: 0, + }, + docs: [], + }) + ) + ); + + await sleep(0); + expect(handler.mock.calls.length).toEqual(3); + expect(handler.mock.calls[2][0]).toBeGreaterThanOrEqual(pollInterval * 0.25); + expect(handler.mock.calls[2][0]).toBeLessThanOrEqual(pollInterval * 0.75); + }) + ); +}); diff --git a/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.ts b/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.ts new file mode 100644 index 0000000000000..46a4c0e6b87d6 --- /dev/null +++ b/x-pack/plugins/task_manager/server/polling/delay_on_claim_conflicts.ts @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * This module contains the logic for polling the task manager index for new work. + */ + +import stats from 'stats-lite'; +import { isNumber, random } from 'lodash'; +import { merge, of, Observable, combineLatest } from 'rxjs'; +import { filter, map } from 'rxjs/operators'; +import { Option, none, some, isSome, Some } from 'fp-ts/lib/Option'; +import { isOk } from '../lib/result_type'; +import { ManagedConfiguration } from '../lib/create_managed_configuration'; +import { TaskLifecycleEvent } from '../polling_lifecycle'; +import { isTaskPollingCycleEvent } from '../task_events'; +import { ClaimAndFillPoolResult } from '../lib/fill_pool'; +import { createRunningAveragedStat } from '../monitoring/task_run_calcultors'; + +/** + * Emits a delay amount in ms to apply to polling whenever the task store exceeds a threshold of claim claimClashes + */ +export function delayOnClaimConflicts( + maxWorkersConfiguration$: ManagedConfiguration['maxWorkersConfiguration$'], + pollIntervalConfiguration$: ManagedConfiguration['pollIntervalConfiguration$'], + taskLifecycleEvents$: Observable, + claimClashesPercentageThreshold: number, + runningAverageWindowSize: number +): Observable { + const claimConflictQueue = createRunningAveragedStat(runningAverageWindowSize); + return merge( + of(0), + combineLatest([ + maxWorkersConfiguration$, + pollIntervalConfiguration$, + taskLifecycleEvents$.pipe( + map>((taskEvent: TaskLifecycleEvent) => + isTaskPollingCycleEvent(taskEvent) && + isOk(taskEvent.event) && + isNumber(taskEvent.event.value.stats?.tasksConflicted) + ? some(taskEvent.event.value.stats!.tasksConflicted) + : none + ), + filter>((claimClashes) => isSome(claimClashes)), + map((claimClashes: Option) => (claimClashes as Some).value) + ), + ]).pipe( + map(([maxWorkers, pollInterval, latestClaimConflicts]) => { + // add latest claimConflict count to queue + claimConflictQueue(latestClaimConflicts); + + const emitWhenExceeds = (claimClashesPercentageThreshold * maxWorkers) / 100; + if ( + // avoid calculating average if the new value isn't above the Threshold + latestClaimConflicts >= emitWhenExceeds && + // only calculate average and emit value if above or equal to Threshold + stats.percentile(claimConflictQueue(), 0.5) >= emitWhenExceeds + ) { + return some(pollInterval); + } + return none; + }), + filter>((pollInterval) => isSome(pollInterval)), + map, number>((maybePollInterval) => { + const pollInterval = (maybePollInterval as Some).value; + return random(pollInterval * 0.25, pollInterval * 0.75, false); + }) + ) + ); +} diff --git a/x-pack/plugins/task_manager/server/polling/index.ts b/x-pack/plugins/task_manager/server/polling/index.ts index 5c1f06eaeb256..dcd3f24291518 100644 --- a/x-pack/plugins/task_manager/server/polling/index.ts +++ b/x-pack/plugins/task_manager/server/polling/index.ts @@ -7,3 +7,4 @@ export { createObservableMonitor } from './observable_monitor'; export { createTaskPoller, PollingError, PollingErrorType } from './task_poller'; export { timeoutPromiseAfter } from './timeout_promise_after'; +export { delayOnClaimConflicts } from './delay_on_claim_conflicts'; diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.test.ts b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts index f5f1667312d79..3cd0ea34bf94a 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.test.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts @@ -27,6 +27,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, getCapacity: () => 1, work, @@ -62,6 +63,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$, + pollIntervalDelay$: of(0), bufferCapacity, getCapacity: () => 1, work, @@ -104,6 +106,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, work, workTimeout: pollInterval * 5, @@ -163,6 +166,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, work, workTimeout: pollInterval * 5, @@ -209,6 +213,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, work, workTimeout: pollInterval * 5, @@ -254,6 +259,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, work, workTimeout: pollInterval * 5, @@ -291,6 +297,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, work: async (...args) => { await worker; @@ -342,6 +349,7 @@ describe('TaskPoller', () => { createTaskPoller<[string, Resolvable], string[]>({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, work: async (...resolvables) => { await Promise.all(resolvables.map(([, future]) => future)); @@ -402,6 +410,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, work: async (...args) => { throw new Error('failed to work'); @@ -443,6 +452,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, work, workTimeout: pollInterval * 5, @@ -486,6 +496,7 @@ describe('TaskPoller', () => { createTaskPoller({ logger: loggingSystemMock.create().get(), pollInterval$: of(pollInterval), + pollIntervalDelay$: of(0), bufferCapacity, work, workTimeout: pollInterval * 5, diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.ts b/x-pack/plugins/task_manager/server/polling/task_poller.ts index 3d48453aa5a9a..fac0137f38ba5 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.ts @@ -10,7 +10,7 @@ import { performance } from 'perf_hooks'; import { after } from 'lodash'; -import { Subject, merge, interval, of, Observable } from 'rxjs'; +import { Subject, merge, of, Observable, combineLatest, timer } from 'rxjs'; import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs/operators'; import { pipe } from 'fp-ts/lib/pipeable'; @@ -33,6 +33,7 @@ type WorkFn = (...params: T[]) => Promise; interface Opts { logger: Logger; pollInterval$: Observable; + pollIntervalDelay$: Observable; bufferCapacity: number; getCapacity: () => number; pollRequests$: Observable>; @@ -56,6 +57,7 @@ interface Opts { export function createTaskPoller({ logger, pollInterval$, + pollIntervalDelay$, getCapacity, pollRequests$, bufferCapacity, @@ -70,11 +72,21 @@ export function createTaskPoller({ // emit a polling event on demand pollRequests$, // emit a polling event on a fixed interval - pollInterval$.pipe( - switchMap((period) => { - logger.debug(`Task poller now using interval of ${period}ms`); - return interval(period); - }), + combineLatest([ + pollInterval$.pipe( + tap((period) => { + logger.debug(`Task poller now using interval of ${period}ms`); + }) + ), + pollIntervalDelay$.pipe( + tap((pollDelay) => { + logger.debug(`Task poller now delaying emission by ${pollDelay}ms`); + }) + ), + ]).pipe( + // pollDelay can only shift `timer` at the scale of `period`, so we round + // the delay to modulo the interval period + switchMap(([period, pollDelay]) => timer(period + (pollDelay % period), period)), mapTo(none) ) ).pipe( diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index bf3ff6da9fbdc..4c54033c3cb93 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -26,6 +26,7 @@ describe('TaskPollingLifecycle', () => { index: 'foo', max_attempts: 9, poll_interval: 6000000, + version_conflict_threshold: 80, max_poll_inactivity_cycles: 10, request_capacity: 1000, monitored_aggregated_stats_refresh_rate: 5000, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index a4522f350f745..1133d1c269ca1 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -25,6 +25,7 @@ import { TaskPollingCycle, asTaskPollingCycleEvent, TaskManagerStat, + asTaskManagerStatEvent, } from './task_events'; import { fillPool, FillPoolResult, TimedFillPoolResult } from './lib/fill_pool'; import { Middleware } from './lib/middleware'; @@ -42,6 +43,7 @@ import { TaskStore, OwnershipClaimingOpts, ClaimOwnershipResult } from './task_s import { identifyEsError } from './lib/identify_es_error'; import { BufferedTaskStore } from './buffered_task_store'; import { TaskTypeDictionary } from './task_type_dictionary'; +import { delayOnClaimConflicts } from './polling'; export type TaskPollingLifecycleOpts = { logger: Logger; @@ -121,6 +123,17 @@ export class TaskPollingLifecycle { poll_interval: pollInterval, } = config; + const pollIntervalDelay$ = delayOnClaimConflicts( + maxWorkersConfiguration$, + pollIntervalConfiguration$, + this.events$, + config.version_conflict_threshold, + config.monitored_stats_running_average_window + ); + pollIntervalDelay$.subscribe((delay) => { + emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay))); + }); + // the task poller that polls for work on fixed intervals and on demand const poller$: Observable< Result> @@ -129,6 +142,7 @@ export class TaskPollingLifecycle { createTaskPoller({ logger, pollInterval$: pollIntervalConfiguration$, + pollIntervalDelay$, bufferCapacity: config.request_capacity, getCapacity: () => this.pool.availableWorkers, pollRequests$: this.claimRequests$, diff --git a/x-pack/plugins/task_manager/server/task_events.ts b/x-pack/plugins/task_manager/server/task_events.ts index fc09738a149a2..bd8eb56587906 100644 --- a/x-pack/plugins/task_manager/server/task_events.ts +++ b/x-pack/plugins/task_manager/server/task_events.ts @@ -53,7 +53,7 @@ export type TaskClaim = TaskEvent; export type TaskPollingCycle = TaskEvent>; -export type TaskManagerStats = 'load'; +export type TaskManagerStats = 'load' | 'pollingDelay'; export type TaskManagerStat = TaskEvent; export type OkResultOf = EventType extends TaskEvent diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts index 4c84ca1298e10..26ceeef59069f 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts @@ -40,6 +40,7 @@ interface MonitoringStats { }; polling: { last_successful_poll: string; + last_polling_delay: string; duration: Record; result_frequency_percent_as_number: Record; }; @@ -177,6 +178,7 @@ export default function ({ getService }: FtrProviderContext) { } = (await getHealth()).stats; expect(isNaN(Date.parse(polling.last_successful_poll as string))).to.eql(false); + expect(isNaN(Date.parse(polling.last_polling_delay as string))).to.eql(false); expect(typeof polling.result_frequency_percent_as_number.NoTasksClaimed).to.eql('number'); expect(typeof polling.result_frequency_percent_as_number.RanOutOfCapacity).to.eql('number'); expect(typeof polling.result_frequency_percent_as_number.PoolFilled).to.eql('number');