diff --git a/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts b/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts index d3533ac058314..e1591feee9271 100644 --- a/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts +++ b/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts @@ -12,68 +12,58 @@ import { TaskPoolRunResult } from '../task_pool'; import { asOk, Result } from './result_type'; import { ConcreteTaskInstance, TaskStatus } from '../task'; import { TaskManagerRunner } from '../task_running/task_runner'; -import { from, Observable } from 'rxjs'; import { ClaimOwnershipResult } from '../queries/task_claiming'; jest.mock('../task_running/task_runner'); describe('fillPool', () => { function mockFetchAvailableTasks( - tasksToMock: number[][] - ): () => Observable> { - const claimCycles: ConcreteTaskInstance[][] = tasksToMock.map((ids) => mockTaskInstances(ids)); + tasksToMock: number[] + ): () => Promise> { + const tasks: ConcreteTaskInstance[] = tasksToMock.map((id) => mockTaskInstance(id)); return () => - from( - claimCycles.map((tasks) => - asOk({ - stats: { - tasksUpdated: tasks?.length ?? 0, - tasksConflicted: 0, - tasksClaimed: 0, - }, - docs: tasks, - }) - ) + Promise.resolve( + asOk({ + stats: { + tasksUpdated: tasks?.length ?? 0, + tasksConflicted: 0, + tasksClaimed: 0, + }, + docs: tasks, + }) ); } - const mockTaskInstances = (ids: number[]): ConcreteTaskInstance[] => - ids.map((id) => ({ - id: `${id}`, - attempts: 0, - status: TaskStatus.Running, - version: '123', - runAt: new Date(0), - scheduledAt: new Date(0), + const mockTaskInstance = (id: number): ConcreteTaskInstance => ({ + id: `${id}`, + attempts: 0, + status: TaskStatus.Running, + version: '123', + runAt: new Date(0), + scheduledAt: new Date(0), + startedAt: new Date(0), + retryAt: new Date(0), + state: { startedAt: new Date(0), - retryAt: new Date(0), - state: { - startedAt: new Date(0), - }, - taskType: '', - params: {}, - ownerId: null, - })); - - test('fills task pool with all claimed tasks until fetchAvailableTasks stream closes', async () => { - const tasks = [ - [1, 2, 3], - [4, 5], - ]; + }, + taskType: '', + params: {}, + ownerId: null, + }); + + test('fills task pool with all claimed tasks', async () => { + const tasks = [1, 2, 3, 4, 5]; const fetchAvailableTasks = mockFetchAvailableTasks(tasks); const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks); const converter = _.identity; await fillPool(fetchAvailableTasks, converter, run); - expect(_.flattenDeep(run.args)).toEqual(mockTaskInstances([1, 2, 3, 4, 5])); + expect(_.flattenDeep(run.args)).toEqual(tasks.map((id) => mockTaskInstance(id))); }); test('calls the converter on the records prior to running', async () => { - const tasks = [ - [1, 2, 3], - [4, 5], - ]; + const tasks = [1, 2, 3, 4, 5]; const fetchAvailableTasks = mockFetchAvailableTasks(tasks); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = (instance: ConcreteTaskInstance) => @@ -91,14 +81,13 @@ describe('fillPool', () => { instance.id as unknown as TaskManagerRunner; try { - const fetchAvailableTasks = () => - new Observable>((obs) => - obs.error('fetch is not working') - ); + const fetchAvailableTasks = () => { + throw new Error('fetch is not working'); + }; await fillPool(fetchAvailableTasks, converter, run); } catch (err) { - expect(err.toString()).toBe('fetch is not working'); + expect(err.toString()).toBe('Error: fetch is not working'); expect(run.called).toBe(false); } }); @@ -109,10 +98,7 @@ describe('fillPool', () => { instance.id as unknown as TaskManagerRunner; try { - const tasks = [ - [1, 2, 3], - [4, 5], - ]; + const tasks = [1, 2, 3, 4, 5]; const fetchAvailableTasks = mockFetchAvailableTasks(tasks); await fillPool(fetchAvailableTasks, converter, run); @@ -123,10 +109,7 @@ describe('fillPool', () => { test('throws exception from converter', async () => { try { - const tasks = [ - [1, 2, 3], - [4, 5], - ]; + const tasks = [1, 2, 3, 4, 5]; const fetchAvailableTasks = mockFetchAvailableTasks(tasks); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = (instance: ConcreteTaskInstance) => { diff --git a/x-pack/plugins/task_manager/server/lib/fill_pool.ts b/x-pack/plugins/task_manager/server/lib/fill_pool.ts index bf1ed95da363b..42c8320e81db7 100644 --- a/x-pack/plugins/task_manager/server/lib/fill_pool.ts +++ b/x-pack/plugins/task_manager/server/lib/fill_pool.ts @@ -5,14 +5,12 @@ * 2.0. */ -import { Observable } from 'rxjs'; -import { concatMap, last } from 'rxjs'; import { ClaimOwnershipResult } from '../queries/task_claiming'; import { ConcreteTaskInstance } from '../task'; import { WithTaskTiming, startTaskTimer } from '../task_events'; import { TaskPoolRunResult } from '../task_pool'; import { TaskManagerRunner } from '../task_running'; -import { Result, map as mapResult, asErr, asOk } from './result_type'; +import { Result, isOk } from './result_type'; export enum FillPoolResult { Failed = 'Failed', @@ -23,17 +21,6 @@ export enum FillPoolResult { PoolFilled = 'PoolFilled', } -type FillPoolAndRunResult = Result< - { - result: TaskPoolRunResult; - stats?: ClaimOwnershipResult['stats']; - }, - { - result: FillPoolResult; - stats?: ClaimOwnershipResult['stats']; - } ->; - export type ClaimAndFillPoolResult = Partial> & { result: FillPoolResult; }; @@ -52,66 +39,44 @@ export type TimedFillPoolResult = WithTaskTiming; * @param converter - a function that converts task records to the appropriate task runner */ export async function fillPool( - fetchAvailableTasks: () => Observable>, + fetchAvailableTasks: () => Promise>, converter: (taskInstance: ConcreteTaskInstance) => TaskManagerRunner, run: (tasks: TaskManagerRunner[]) => Promise ): Promise { - return new Promise((resolve, reject) => { - const stopTaskTimer = startTaskTimer(); - const augmentTimingTo = ( - result: FillPoolResult, - stats?: ClaimOwnershipResult['stats'] - ): TimedFillPoolResult => ({ - result, - stats, - timing: stopTaskTimer(), - }); - fetchAvailableTasks() - .pipe( - // each ClaimOwnershipResult will be sequencially consumed an ran using the `run` handler - concatMap(async (res) => - mapResult>( - res, - async ({ docs, stats }) => { - if (!docs.length) { - return asOk({ result: TaskPoolRunResult.NoTaskWereRan, stats }); - } - return asOk( - await run(docs.map(converter)).then((runResult) => ({ - result: runResult, - stats, - })) - ); - }, - async (fillPoolResult) => asErr({ result: fillPoolResult }) - ) - ), - // when the final call to `run` completes, we'll complete the stream and emit the - // final accumulated result - last() - ) - .subscribe( - (claimResults) => { - resolve( - mapResult( - claimResults, - ({ result, stats }) => { - switch (result) { - case TaskPoolRunResult.RanOutOfCapacity: - return augmentTimingTo(FillPoolResult.RanOutOfCapacity, stats); - case TaskPoolRunResult.RunningAtCapacity: - return augmentTimingTo(FillPoolResult.RunningAtCapacity, stats); - case TaskPoolRunResult.NoTaskWereRan: - return augmentTimingTo(FillPoolResult.NoTasksClaimed, stats); - default: - return augmentTimingTo(FillPoolResult.PoolFilled, stats); - } - }, - ({ result, stats }) => augmentTimingTo(result, stats) - ) - ); - }, - (err) => reject(err) - ); + const stopTaskTimer = startTaskTimer(); + const augmentTimingTo = ( + result: FillPoolResult, + stats?: ClaimOwnershipResult['stats'] + ): TimedFillPoolResult => ({ + result, + stats, + timing: stopTaskTimer(), }); + + const claimResults = await fetchAvailableTasks(); + if (isOk(claimResults)) { + if (!claimResults.value.docs.length) { + return augmentTimingTo(FillPoolResult.NoTasksClaimed, claimResults.value.stats); + } + + const taskPoolRunResult = await run(claimResults.value.docs.map(converter)).then( + (runResult) => ({ + result: runResult, + stats: claimResults.value.stats, + }) + ); + + switch (taskPoolRunResult.result) { + case TaskPoolRunResult.RanOutOfCapacity: + return augmentTimingTo(FillPoolResult.RanOutOfCapacity, taskPoolRunResult.stats); + case TaskPoolRunResult.RunningAtCapacity: + return augmentTimingTo(FillPoolResult.RunningAtCapacity, taskPoolRunResult.stats); + case TaskPoolRunResult.NoTaskWereRan: + return augmentTimingTo(FillPoolResult.NoTasksClaimed, taskPoolRunResult.stats); + default: + return augmentTimingTo(FillPoolResult.PoolFilled, taskPoolRunResult.stats); + } + } + + return augmentTimingTo(claimResults.error); } diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 6e3b7416ad787..1f244f7f4c8a5 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -6,7 +6,7 @@ */ import sinon from 'sinon'; -import { Observable, of, Subject } from 'rxjs'; +import { of, Subject } from 'rxjs'; import { TaskPollingLifecycle, claimAvailableTasks, TaskLifecycleEvent } from './polling_lifecycle'; import { createInitialMiddleware } from './lib/middleware'; @@ -16,9 +16,8 @@ import { mockLogger } from './test_utils'; import { taskClaimingMock } from './queries/task_claiming.mock'; import { TaskClaiming, ClaimOwnershipResult } from './queries/task_claiming'; import type { TaskClaiming as TaskClaimingClass } from './queries/task_claiming'; -import { asOk, Err, isErr, isOk, Result } from './lib/result_type'; +import { asOk, Err, isErr, isOk, Ok } from './lib/result_type'; import { FillPoolResult } from './lib/fill_pool'; -import { ElasticsearchResponseError } from './lib/identify_es_error'; import { executionContextServiceMock } from '@kbn/core/server/mocks'; import { TaskCost } from './task'; import { CLAIM_STRATEGY_MGET, DEFAULT_KIBANAS_PER_PARTITION } from './config'; @@ -40,6 +39,18 @@ jest.mock('./constants', () => ({ CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: ['report', 'quickReport'], })); +interface EsError extends Error { + name: string; + statusCode: number; + meta: { + body: { + error: { + type: string; + }; + }; + }; +} + describe('TaskPollingLifecycle', () => { let clock: sinon.SinonFakeTimers; const taskManagerLogger = mockLogger(); @@ -273,18 +284,19 @@ describe('TaskPollingLifecycle', () => { describe('claimAvailableTasks', () => { test('should claim Available Tasks when there are available workers', async () => { + const claimResult = { + docs: [], + stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 }, + }; const logger = mockLogger(); const taskClaiming = taskClaimingMock.create({}); taskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => - of( - asOk({ - docs: [], - stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 }, - }) - ) + Promise.resolve(asOk(claimResult)) ); - expect(isOk(await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger)))).toBeTruthy(); + const result = await claimAvailableTasks(taskClaiming, logger); + expect(isOk(result)).toBeTruthy(); + expect((result as Ok).value).toEqual(claimResult); expect(taskClaiming.claimAvailableTasksIfCapacityIsAvailable).toHaveBeenCalledTimes(1); }); @@ -296,56 +308,54 @@ describe('TaskPollingLifecycle', () => { test('handles failure due to inline scripts being disabled', async () => { const logger = mockLogger(); const taskClaiming = taskClaimingMock.create({}); - taskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation( - () => - new Observable>((observer) => { - observer.error({ - name: 'ResponseError', - meta: { - body: { - error: { - root_cause: [ - { - type: 'illegal_argument_exception', - reason: 'cannot execute [inline] scripts', - }, - ], - type: 'search_phase_execution_exception', - reason: 'all shards failed', - phase: 'query', - grouped: true, - failed_shards: [ - { - shard: 0, - index: '.kibana_task_manager_1', - node: '24A4QbjHSK6prvtopAKLKw', - reason: { - type: 'illegal_argument_exception', - reason: 'cannot execute [inline] scripts', - }, - }, - ], - caused_by: { - type: 'illegal_argument_exception', - reason: 'cannot execute [inline] scripts', - caused_by: { - type: 'illegal_argument_exception', - reason: 'cannot execute [inline] scripts', - }, - }, + taskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => { + const error = new Error(`fail`) as EsError; + error.name = 'ResponseError'; + error.meta = { + body: { + error: { + // @ts-ignore + root_cause: [ + { + type: 'illegal_argument_exception', + reason: 'cannot execute [inline] scripts', + }, + ], + type: 'search_phase_execution_exception', + reason: 'all shards failed', + phase: 'query', + grouped: true, + failed_shards: [ + { + shard: 0, + index: '.kibana_task_manager_1', + node: '24A4QbjHSK6prvtopAKLKw', + reason: { + type: 'illegal_argument_exception', + reason: 'cannot execute [inline] scripts', }, - status: 400, + }, + ], + caused_by: { + type: 'illegal_argument_exception', + reason: 'cannot execute [inline] scripts', + caused_by: { + type: 'illegal_argument_exception', + reason: 'cannot execute [inline] scripts', }, }, - statusCode: 400, - } as ElasticsearchResponseError); - }) - ); + }, + status: 400, + }, + }; + error.statusCode = 400; + throw error; + }); - const err = await getFirstAsPromise(claimAvailableTasks(taskClaiming, logger)); + const claimErr = await claimAvailableTasks(taskClaiming, logger); - expect(isErr(err)).toBeTruthy(); - expect((err as Err).error).toEqual(FillPoolResult.Failed); + expect(isErr(claimErr)).toBeTruthy(); + expect((claimErr as Err).error).toEqual(FillPoolResult.Failed); expect(logger.warn).toHaveBeenCalledTimes(1); expect(logger.warn).toHaveBeenCalledWith( @@ -358,7 +368,7 @@ describe('TaskPollingLifecycle', () => { test('should emit event when polling is successful', async () => { clock.restore(); mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => - of( + Promise.resolve( asOk({ docs: [], stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 }, @@ -398,7 +408,7 @@ describe('TaskPollingLifecycle', () => { test('should set utilization to max when capacity is not fully reached but there are tasks left unclaimed', async () => { clock.restore(); mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => - of( + Promise.resolve( asOk({ docs: [], stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0, tasksLeftUnclaimed: 2 }, @@ -466,7 +476,7 @@ describe('TaskPollingLifecycle', () => { test('should emit success event when polling is successful', async () => { clock.restore(); mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => - of( + Promise.resolve( asOk({ docs: [], stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0 }, @@ -549,7 +559,7 @@ describe('TaskPollingLifecycle', () => { test('should emit failure event when polling is successful but individual task errors reported', async () => { clock.restore(); mockTaskClaiming.claimAvailableTasksIfCapacityIsAvailable.mockImplementation(() => - of( + Promise.resolve( asOk({ docs: [], stats: { tasksUpdated: 0, tasksConflicted: 0, tasksClaimed: 0, tasksErrors: 2 }, @@ -588,12 +598,6 @@ describe('TaskPollingLifecycle', () => { }); }); -function getFirstAsPromise(obs$: Observable): Promise { - return new Promise((resolve, reject) => { - obs$.subscribe(resolve, reject); - }); -} - type RetryableFunction = () => boolean; const RETRY_UNTIL_DEFAULT_COUNT = 20; diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 7d8be75c2330c..0b1710ae7fa2f 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -12,7 +12,7 @@ import { tap } from 'rxjs'; import { UsageCounter } from '@kbn/usage-collection-plugin/server'; import type { Logger, ExecutionContextStart } from '@kbn/core/server'; -import { Result, asErr, mapErr, asOk, map, mapOk } from './lib/result_type'; +import { Result, asErr, mapErr, asOk, map, mapOk, isOk } from './lib/result_type'; import { ManagedConfiguration } from './lib/create_managed_configuration'; import { TaskManagerConfig, CLAIM_STRATEGY_UPDATE_BY_QUERY } from './config'; @@ -246,18 +246,19 @@ export class TaskPollingLifecycle implements ITaskEventEmitter => { return fillPool( // claim available tasks - () => { - return claimAvailableTasks(this.taskClaiming, this.logger).pipe( - tap( - mapOk(({ timing }: ClaimOwnershipResult) => { - if (timing) { - this.emitEvent( - asTaskManagerStatEvent('claimDuration', asOk(timing.stop - timing.start)) - ); - } - }) - ) - ); + async () => { + const result = await claimAvailableTasks(this.taskClaiming, this.logger); + + if (isOk(result) && result.value.timing) { + this.emitEvent( + asTaskManagerStatEvent( + 'claimDuration', + asOk(result.value.timing.stop - result.value.timing.start) + ) + ); + } + + return result; }, // wrap each task in a Task Runner this.createTaskRunnerForTask, @@ -352,39 +353,23 @@ export class TaskPollingLifecycle implements ITaskEventEmitter> { - return new Observable((observer) => { - taskClaiming - .claimAvailableTasksIfCapacityIsAvailable({ - claimOwnershipUntil: intervalFromNow('30s')!, - }) - .subscribe( - (claimResult) => { - observer.next(claimResult); - }, - (ex) => { - // if the `taskClaiming` stream errors out we want to catch it and see if - // we can identify the reason - // if we can - we emit an FillPoolResult error rather than erroring out the wrapping Observable - // returned by `claimAvailableTasks` - if (isEsCannotExecuteScriptError(ex)) { - logger.warn( - `Task Manager cannot operate when inline scripts are disabled in Elasticsearch` - ); - observer.next(asErr(FillPoolResult.Failed)); - observer.complete(); - } else { - const esError = identifyEsError(ex); - // as we could't identify the reason - we'll error out the wrapping Observable too - observer.error(esError.length > 0 ? esError : ex); - } - }, - () => { - observer.complete(); - } - ); - }); +): Promise> { + try { + return taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: intervalFromNow('30s')!, + }); + } catch (err) { + // if we can identify the reason for the error, emit a FillPoolResult error + if (isEsCannotExecuteScriptError(err)) { + logger.warn(`Task Manager cannot operate when inline scripts are disabled in Elasticsearch`); + return asErr(FillPoolResult.Failed); + } else { + const esError = identifyEsError(err); + // as we could't identify the reason - propagate the error + throw esError.length > 0 ? esError : err; + } + } } diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts index 5116c25c38f3f..c9bca31755408 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts @@ -8,8 +8,7 @@ /* * This module contains helpers for managing the task manager storage layer. */ -import { Subject, Observable, of } from 'rxjs'; -import { map } from 'rxjs'; +import { Subject, Observable } from 'rxjs'; import { groupBy, isPlainObject } from 'lodash'; import { Logger } from '@kbn/core/server'; @@ -168,29 +167,34 @@ export class TaskClaiming { return this.events$; } - public claimAvailableTasksIfCapacityIsAvailable( + public async claimAvailableTasksIfCapacityIsAvailable( claimingOptions: Omit - ): Observable> { + ): Promise> { if (this.getAvailableCapacity()) { - const opts: TaskClaimerOpts = { - batches: this.getClaimingBatches(), - claimOwnershipUntil: claimingOptions.claimOwnershipUntil, - taskStore: this.taskStore, - events$: this.events$, - getCapacity: this.getAvailableCapacity, - unusedTypes: this.unusedTypes, - definitions: this.definitions, - taskMaxAttempts: this.taskMaxAttempts, - excludedTaskTypes: this.excludedTaskTypes, - logger: this.logger, - taskPartitioner: this.taskPartitioner, - }; - return this.taskClaimer(opts).pipe(map((claimResult) => asOk(claimResult))); + try { + const opts: TaskClaimerOpts = { + batches: this.getClaimingBatches(), + claimOwnershipUntil: claimingOptions.claimOwnershipUntil, + taskStore: this.taskStore, + events$: this.events$, + getCapacity: this.getAvailableCapacity, + unusedTypes: this.unusedTypes, + definitions: this.definitions, + taskMaxAttempts: this.taskMaxAttempts, + excludedTaskTypes: this.excludedTaskTypes, + logger: this.logger, + taskPartitioner: this.taskPartitioner, + }; + const result = await this.taskClaimer(opts); + return asOk(result); + } catch (err) { + throw err; + } } this.logger.debug( `[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers.` ); - return of(asErr(FillPoolResult.NoAvailableWorkers)); + return asErr(FillPoolResult.NoAvailableWorkers); } } diff --git a/x-pack/plugins/task_manager/server/task_claimers/index.ts b/x-pack/plugins/task_manager/server/task_claimers/index.ts index a6e013cd8b338..4b6c8b96d6ca4 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/index.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/index.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { Subject, Observable } from 'rxjs'; +import { Subject } from 'rxjs'; import { Logger } from '@kbn/core/server'; import minimatch from 'minimatch'; @@ -45,7 +45,7 @@ export interface ClaimOwnershipResult { timing?: TaskTiming; } -export type TaskClaimerFn = (opts: TaskClaimerOpts) => Observable; +export type TaskClaimerFn = (opts: TaskClaimerOpts) => Promise; let WarnedOnInvalidClaimer = false; diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index 593be2d5497ec..0d3560c3bec6e 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -8,7 +8,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import { v4 as uuidv4 } from 'uuid'; -import { filter, take, toArray } from 'rxjs'; +import { filter, take } from 'rxjs'; import { CLAIM_STRATEGY_MGET, DEFAULT_KIBANAS_PER_PARTITION } from '../config'; @@ -31,7 +31,6 @@ import { TaskClaimingOpts, TASK_MANAGER_MARK_AS_CLAIMED, } from '../queries/task_claiming'; -import { Observable } from 'rxjs'; import { taskStoreMock } from '../task_store.mock'; import apm from 'elastic-apm-node'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; @@ -223,21 +222,15 @@ describe('TaskClaiming', () => { hits, }); - const resultsOrErr = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable(claimingOpts) - ); - for (const resultOrErr of resultsOrErr) { - if (!isOk(resultOrErr)) { - expect(resultOrErr).toBe(undefined); - } + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable(claimingOpts); + if (!isOk(resultOrErr)) { + expect(resultOrErr).toBe(undefined); } - const results = resultsOrErr.map((resultOrErr) => { - if (!isOk(resultOrErr)) { - expect(resultOrErr).toBe(undefined); - } - return unwrap(resultOrErr) as ClaimOwnershipResult; - }); + if (!isOk(resultOrErr)) { + expect(resultOrErr).toBe(undefined); + } + const result = unwrap(resultOrErr) as ClaimOwnershipResult; expect(apm.startTransaction).toHaveBeenCalledWith( TASK_MANAGER_MARK_AS_CLAIMED, @@ -247,14 +240,14 @@ describe('TaskClaiming', () => { expect(store.msearch.mock.calls).toMatchObject({}); expect(store.getDocVersions.mock.calls).toMatchObject({}); - return results.map((result, index) => ({ + return { result, args: { - search: store.msearch.mock.calls[index][0] as SearchOpts[] & { + search: store.msearch.mock.calls[0][0] as SearchOpts[] & { query: MustNotCondition; }, }, - })); + }; } test('makes calls to APM as expected when markAvailableTasksAsClaimed throws error', async () => { @@ -287,11 +280,9 @@ describe('TaskClaiming', () => { store.msearch.mockRejectedValue(new Error('Oh no')); await expect( - getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ - claimOwnershipUntil: new Date(), - }) - ) + taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }) ).rejects.toMatchInlineSnapshot(`[Error: Oh no]`); expect(apm.startTransaction).toHaveBeenCalledWith( @@ -369,9 +360,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -479,9 +470,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -586,9 +577,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -683,9 +674,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -771,9 +762,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -837,9 +828,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -930,9 +921,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -1023,9 +1014,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -1122,9 +1113,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -1253,9 +1244,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -1381,9 +1372,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -1508,9 +1499,7 @@ describe('TaskClaiming', () => { }); await expect(() => - getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ) + taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) ).rejects.toThrowErrorMatchingInlineSnapshot(`"oh no"`); expect(apm.startTransaction).toHaveBeenCalledWith( @@ -1624,9 +1613,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -1757,9 +1746,9 @@ describe('TaskClaiming', () => { taskPartitioner, }); - const [resultOrErr] = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ); + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); if (!isOk(resultOrErr)) { expect(resultOrErr).toBe(undefined); @@ -1874,9 +1863,7 @@ describe('TaskClaiming', () => { }); await expect(() => - getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) - ) + taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ claimOwnershipUntil: new Date() }) ).rejects.toThrowErrorMatchingInlineSnapshot(`"oh no"`); expect(apm.startTransaction).toHaveBeenCalledWith( @@ -1964,13 +1951,11 @@ describe('TaskClaiming', () => { claimOwnershipUntil: new Date(), }, }); - const [ - { - args: { - search: [{ query }], - }, + const { + args: { + search: [{ query }], }, - ] = claimedResults; + } = claimedResults; expect(query).toMatchInlineSnapshot(` Object { @@ -2122,13 +2107,11 @@ describe('TaskClaiming', () => { claimOwnershipUntil: new Date(), }, }); - const [ - { - args: { - search: [{ query }], - }, + const { + args: { + search: [{ query }], }, - ] = claimedResults; + } = claimedResults; expect(taskManagerLogger.warn).toHaveBeenCalledWith( 'Background task node "test" has no assigned partitions, claiming against all partitions', @@ -2404,11 +2387,9 @@ describe('TaskClaiming', () => { ) .toPromise(); - await getFirstAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ - claimOwnershipUntil: new Date(), - }) - ); + await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); const event = await promise; expect(event).toMatchObject( @@ -2476,14 +2457,3 @@ function mockInstance(instance: Partial = {}) { instance ); } - -function getFirstAsPromise(obs$: Observable): Promise { - return new Promise((resolve, reject) => { - obs$.subscribe(resolve, reject); - }); -} -function getAllAsPromise(obs$: Observable): Promise { - return new Promise((resolve, reject) => { - obs$.pipe(toArray()).subscribe(resolve, reject); - }); -} diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 407cf6b90dd6c..4b7e5ec6b3691 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -14,7 +14,7 @@ // capacity and the cost of each task type to run import apm, { Logger } from 'elastic-apm-node'; -import { Subject, Observable } from 'rxjs'; +import { Subject } from 'rxjs'; import { createWrappedLogger } from '../lib/wrapped_logger'; import { TaskTypeDictionary } from '../task_type_dictionary'; @@ -70,24 +70,9 @@ interface OwnershipClaimingOpts { const SIZE_MULTIPLIER_FOR_TASK_FETCH = 4; -export function claimAvailableTasksMget(opts: TaskClaimerOpts): Observable { - const taskClaimOwnership$ = new Subject(); - - claimAvailableTasksApm(opts) - .then((result) => { - taskClaimOwnership$.next(result); - }) - .catch((err) => { - taskClaimOwnership$.error(err); - }) - .finally(() => { - taskClaimOwnership$.complete(); - }); - - return taskClaimOwnership$; -} - -async function claimAvailableTasksApm(opts: TaskClaimerOpts): Promise { +export async function claimAvailableTasksMget( + opts: TaskClaimerOpts +): Promise { const apmTrans = apm.startTransaction( TASK_MANAGER_MARK_AS_CLAIMED, TASK_MANAGER_TRANSACTION_TYPE diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.test.ts index 9453bced9f7ba..13e6faf2de0fd 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.test.ts @@ -7,7 +7,7 @@ import _ from 'lodash'; import { v1 as uuidv1, v4 as uuidv4 } from 'uuid'; -import { filter, take, toArray } from 'rxjs'; +import { filter, take } from 'rxjs'; import { TaskStatus, ConcreteTaskInstance, TaskPriority } from '../task'; import { SearchOpts, StoreOpts, UpdateByQueryOpts, UpdateByQuerySearchOpts } from '../task_store'; @@ -22,7 +22,6 @@ import { TaskClaimingOpts, TASK_MANAGER_MARK_AS_CLAIMED, } from '../queries/task_claiming'; -import { Observable } from 'rxjs'; import { taskStoreMock } from '../task_store.mock'; import apm from 'elastic-apm-node'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; @@ -174,21 +173,12 @@ describe('TaskClaiming', () => { versionConflicts, }); - const resultsOrErr = await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable(claimingOpts) - ); - for (const resultOrErr of resultsOrErr) { - if (!isOk(resultOrErr)) { - expect(resultOrErr).toBe(undefined); - } + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable(claimingOpts); + if (!isOk(resultOrErr)) { + expect(resultOrErr).toBe(undefined); } - const results = resultsOrErr.map((resultOrErr) => { - if (!isOk(resultOrErr)) { - expect(resultOrErr).toBe(undefined); - } - return unwrap(resultOrErr) as ClaimOwnershipResult; - }); + const result = unwrap(resultOrErr) as ClaimOwnershipResult; expect(apm.startTransaction).toHaveBeenCalledWith( TASK_MANAGER_MARK_AS_CLAIMED, @@ -200,18 +190,19 @@ describe('TaskClaiming', () => { max_docs: getCapacity(), }); expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: getCapacity() }); - return results.map((result, index) => ({ + return { result, + store, args: { - search: store.fetch.mock.calls[index][0] as SearchOpts & { + search: store.fetch.mock.calls[0][0] as SearchOpts & { query: MustNotCondition; }, - updateByQuery: store.updateByQuery.mock.calls[index] as [ + updateByQuery: store.updateByQuery.mock.calls[0] as [ UpdateByQuerySearchOpts, UpdateByQueryOpts ], }, - })); + }; } test('makes calls to APM as expected when markAvailableTasksAsClaimed throws error', async () => { @@ -243,11 +234,9 @@ describe('TaskClaiming', () => { store.updateByQuery.mockRejectedValue(new Error('Oh no')); await expect( - getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ - claimOwnershipUntil: new Date(), - }) - ) + taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }) ).rejects.toMatchInlineSnapshot(`[Error: Oh no]`); expect(apm.startTransaction).toHaveBeenCalledWith( @@ -280,13 +269,11 @@ describe('TaskClaiming', () => { }, }); - const [ - { - args: { - updateByQuery: [{ query, sort }], - }, + const { + args: { + updateByQuery: [{ query, sort }], }, - ] = await testClaimAvailableTasks({ + } = await testClaimAvailableTasks({ storeOpts: { definitions, }, @@ -446,7 +433,7 @@ if (doc['task.runAt'].size()!=0) { createTaskRunner: jest.fn(), }, }); - const results = await testClaimAvailableTasks({ + const { store } = await testClaimAvailableTasks({ storeOpts: { taskManagerId, definitions, @@ -470,10 +457,9 @@ if (doc['task.runAt'].size()!=0) { }, }); - expect(results.length).toEqual(4); - - expect(results[0].args.updateByQuery[1].max_docs).toEqual(10); - expect(results[0].args.updateByQuery[0].script).toMatchObject({ + expect(store.updateByQuery).toHaveBeenCalledTimes(4); + expect(store.updateByQuery.mock.calls[0][1]?.max_docs).toEqual(10); + expect(store.updateByQuery.mock.calls[0][0]?.script).toMatchObject({ source: expect.any(String), lang: 'painless', params: { @@ -492,8 +478,8 @@ if (doc['task.runAt'].size()!=0) { }, }); - expect(results[1].args.updateByQuery[1].max_docs).toEqual(1); - expect(results[1].args.updateByQuery[0].script).toMatchObject({ + expect(store.updateByQuery.mock.calls[1][1]?.max_docs).toEqual(1); + expect(store.updateByQuery.mock.calls[1][0]?.script).toMatchObject({ source: expect.any(String), lang: 'painless', params: { @@ -507,14 +493,15 @@ if (doc['task.runAt'].size()!=0) { 'anotherLimitedToOne', 'limitedToTwo', ], + unusedTaskTypes: [], taskMaxAttempts: { limitedToOne: maxAttempts, }, }, }); - expect(results[2].args.updateByQuery[1].max_docs).toEqual(1); - expect(results[2].args.updateByQuery[0].script).toMatchObject({ + expect(store.updateByQuery.mock.calls[2][1]?.max_docs).toEqual(1); + expect(store.updateByQuery.mock.calls[2][0]?.script).toMatchObject({ source: expect.any(String), lang: 'painless', params: { @@ -534,8 +521,8 @@ if (doc['task.runAt'].size()!=0) { }, }); - expect(results[3].args.updateByQuery[1].max_docs).toEqual(2); - expect(results[3].args.updateByQuery[0].script).toMatchObject({ + expect(store.updateByQuery.mock.calls[3][1]?.max_docs).toEqual(2); + expect(store.updateByQuery.mock.calls[3][0]?.script).toMatchObject({ source: expect.any(String), lang: 'painless', params: { @@ -556,6 +543,130 @@ if (doc['task.runAt'].size()!=0) { }); }); + test('it should return tasks from all batches', async () => { + const maxAttempts = _.random(2, 43); + const definitions = new TaskTypeDictionary(mockLogger()); + const taskManagerId = uuidv1(); + definitions.registerTaskDefinitions({ + unlimited: { + title: 'unlimited', + createTaskRunner: jest.fn(), + }, + limitedToZero: { + title: 'limitedToZero', + maxConcurrency: 0, + createTaskRunner: jest.fn(), + }, + anotherUnlimited: { + title: 'anotherUnlimited', + createTaskRunner: jest.fn(), + }, + finalUnlimited: { + title: 'finalUnlimited', + createTaskRunner: jest.fn(), + }, + limitedToOne: { + title: 'limitedToOne', + maxConcurrency: 1, + createTaskRunner: jest.fn(), + }, + anotherLimitedToOne: { + title: 'anotherLimitedToOne', + maxConcurrency: 1, + createTaskRunner: jest.fn(), + }, + limitedToTwo: { + title: 'limitedToTwo', + maxConcurrency: 2, + createTaskRunner: jest.fn(), + }, + }); + const store = taskStoreMock.create({ taskManagerId }); + store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); + + // mock the return values for 4 batches + const batch1Docs = [mockInstance({ id: `task:id-1` })]; + store.fetch.mockResolvedValueOnce({ docs: batch1Docs, versionMap: new Map() }); + store.updateByQuery.mockResolvedValueOnce({ + updated: batch1Docs.length, + version_conflicts: 0, + total: batch1Docs.length, + }); + + const batch2Docs = [mockInstance({ id: `task:id-2` })]; + store.fetch.mockResolvedValueOnce({ docs: batch2Docs, versionMap: new Map() }); + store.updateByQuery.mockResolvedValueOnce({ + updated: batch2Docs.length, + version_conflicts: 1, + total: batch2Docs.length, + }); + + const batch3Docs = [mockInstance({ id: `task:id-3` }), mockInstance({ id: `task:id-4` })]; + store.fetch.mockResolvedValueOnce({ docs: batch3Docs, versionMap: new Map() }); + store.updateByQuery.mockResolvedValueOnce({ + updated: batch3Docs.length, + version_conflicts: 0, + total: batch3Docs.length, + }); + + const batch4Docs = [ + mockInstance({ id: `task:id-5` }), + mockInstance({ id: `task:id-6` }), + mockInstance({ id: `task:id-7` }), + ]; + store.fetch.mockResolvedValueOnce({ docs: batch4Docs, versionMap: new Map() }); + store.updateByQuery.mockResolvedValueOnce({ + updated: batch4Docs.length, + version_conflicts: 2, + total: batch4Docs.length, + }); + + const taskClaiming = new TaskClaiming({ + logger: taskManagerLogger, + strategy: 'default', + definitions, + taskStore: store, + maxAttempts, + getAvailableCapacity: (type) => { + switch (type) { + case 'limitedToOne': + case 'anotherLimitedToOne': + return 1; + case 'limitedToTwo': + return 2; + default: + return 10; + } + }, + taskPartitioner, + excludedTaskTypes: [], + unusedTypes: [], + }); + + const resultOrErr = await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); + + if (!isOk(resultOrErr)) { + expect(resultOrErr).toBe(undefined); + } + + const result = unwrap(resultOrErr) as ClaimOwnershipResult; + + expect(store.updateByQuery).toHaveBeenCalledTimes(4); + + // result should be an accumulation of all returned updateByQueryResults + expect(result).toEqual({ + stats: { + tasksClaimed: 7, + tasksConflicted: 3, + tasksUpdated: 7, + }, + timing: expect.any(Object), + docs: [...batch1Docs, ...batch2Docs, ...batch3Docs, ...batch4Docs], + }); + }); + test('it should reduce the available capacity from batch to batch', async () => { const maxAttempts = _.random(2, 43); const definitions = new TaskTypeDictionary(mockLogger()); @@ -576,7 +687,7 @@ if (doc['task.runAt'].size()!=0) { createTaskRunner: jest.fn(), }, }); - const results = await testClaimAvailableTasks({ + const { store } = await testClaimAvailableTasks({ storeOpts: { taskManagerId, definitions, @@ -640,15 +751,15 @@ if (doc['task.runAt'].size()!=0) { }, }); - expect(results.length).toEqual(3); + expect(store.updateByQuery).toHaveBeenCalledTimes(3); - expect(results[0].args.updateByQuery[1].max_docs).toEqual(10); + expect(store.updateByQuery.mock.calls[0][1]?.max_docs).toEqual(10); // only capacity for 3, even though 5 are allowed - expect(results[1].args.updateByQuery[1].max_docs).toEqual(3); + expect(store.updateByQuery.mock.calls[1][1]?.max_docs).toEqual(3); // only capacity for 1, even though 2 are allowed - expect(results[2].args.updateByQuery[1].max_docs).toEqual(1); + expect(store.updateByQuery.mock.calls[2][1]?.max_docs).toEqual(1); }); test('it shuffles the types claimed in batches to ensure no type starves another', async () => { @@ -706,36 +817,34 @@ if (doc['task.runAt'].size()!=0) { }, }); - async function getUpdateByQueryScriptParams() { - return ( - await getAllAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ - claimOwnershipUntil: new Date(), - }) - ) - ).map( - (result, index) => - ( - store.updateByQuery.mock.calls[index][0] as { - query: MustNotCondition; - size: number; - sort: string | string[]; - script: { - params: { - [claimableTaskTypes: string]: string[]; - }; - }; - } - ).script.params.claimableTaskTypes - ); + interface UBQParams { + script: { + params: { + [claimableTaskTypes: string]: string[]; + }; + }; } - const firstCycle = await getUpdateByQueryScriptParams(); + // first cycle + await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); + expect(store.updateByQuery).toHaveBeenCalledTimes(4); + const firstCycle = store.updateByQuery.mock.calls.map( + (call) => (call[0] as UBQParams).script.params.claimableTaskTypes + ); + store.updateByQuery.mockClear(); - const secondCycle = await getUpdateByQueryScriptParams(); - expect(firstCycle.length).toEqual(4); - expect(secondCycle.length).toEqual(4); + // second cycle + await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); + expect(store.updateByQuery).toHaveBeenCalledTimes(4); + const secondCycle = store.updateByQuery.mock.calls.map( + (call) => (call[0] as UBQParams).script.params.claimableTaskTypes + ); + expect(firstCycle).not.toMatchObject(secondCycle); }); @@ -765,13 +874,11 @@ if (doc['task.runAt'].size()!=0) { }, }); - const [ - { - args: { - updateByQuery: [{ query, script }], - }, + const { + args: { + updateByQuery: [{ query, script }], }, - ] = await testClaimAvailableTasks({ + } = await testClaimAvailableTasks({ storeOpts: { definitions, taskManagerId, @@ -871,13 +978,11 @@ if (doc['task.runAt'].size()!=0) { ownerId: taskManagerId, retryAt: claimOwnershipUntil, }; - const [ - { - args: { - updateByQuery: [{ script }], - }, + const { + args: { + updateByQuery: [{ script }], }, - ] = await testClaimAvailableTasks({ + } = await testClaimAvailableTasks({ storeOpts: { taskManagerId, }, @@ -921,14 +1026,12 @@ if (doc['task.runAt'].size()!=0) { ownerId: taskManagerId, }), ]; - const [ - { - result: { docs }, - args: { - search: { query }, - }, + const { + result: { docs }, + args: { + search: { query }, }, - ] = await testClaimAvailableTasks({ + } = await testClaimAvailableTasks({ storeOpts: { taskManagerId, }, @@ -1022,14 +1125,12 @@ if (doc['task.runAt'].size()!=0) { ownerId: taskManagerId, }), ]; - const [ - { - result: { docs }, - args: { - search: { query }, - }, + const { + result: { docs }, + args: { + search: { query }, }, - ] = await testClaimAvailableTasks({ + } = await testClaimAvailableTasks({ storeOpts: { taskManagerId, }, @@ -1135,13 +1236,11 @@ if (doc['task.runAt'].size()!=0) { }), ]; const maxDocs = 10; - const [ - { - result: { - stats: { tasksUpdated, tasksConflicted, tasksClaimed }, - }, + const { + result: { + stats: { tasksUpdated, tasksConflicted, tasksClaimed }, }, - ] = await testClaimAvailableTasks({ + } = await testClaimAvailableTasks({ storeOpts: { taskManagerId, }, @@ -1279,11 +1378,9 @@ if (doc['task.runAt'].size()!=0) { ) .toPromise(); - await getFirstAsPromise( - taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ - claimOwnershipUntil: new Date(), - }) - ); + await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); const event = await promise; expect(event).toMatchObject( @@ -1339,14 +1436,3 @@ function mockInstance(instance: Partial = {}) { instance ); } - -function getFirstAsPromise(obs$: Observable): Promise { - return new Promise((resolve, reject) => { - obs$.subscribe(resolve, reject); - }); -} -function getAllAsPromise(obs$: Observable): Promise { - return new Promise((resolve, reject) => { - obs$.pipe(toArray()).subscribe(resolve, reject); - }); -} diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts index 807ee8ca4397f..5a4bccb43b984 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_update_by_query.ts @@ -9,8 +9,7 @@ * This module contains helpers for managing the task manager storage layer. */ import apm from 'elastic-apm-node'; -import { Subject, Observable, from, of } from 'rxjs'; -import { mergeScan } from 'rxjs'; +import { Subject } from 'rxjs'; import { groupBy, pick } from 'lodash'; import { asOk } from '../lib/result_type'; @@ -57,52 +56,47 @@ interface OwnershipClaimingOpts { taskMaxAttempts: Record; } -export function claimAvailableTasksUpdateByQuery( +export async function claimAvailableTasksUpdateByQuery( opts: TaskClaimerOpts -): Observable { +): Promise { const { getCapacity, claimOwnershipUntil, batches, events$, taskStore } = opts; const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts; const initialCapacity = getCapacity(); - return from(batches).pipe( - mergeScan( - (accumulatedResult, batch) => { - const stopTaskTimer = startTaskTimer(); - const capacity = Math.min( - initialCapacity - accumulatedResult.stats.tasksClaimed, - isLimited(batch) ? getCapacity(batch.tasksTypes) : getCapacity() - ); - // if we have no more capacity, short circuit here - if (capacity <= 0) { - return of(accumulatedResult); - } - return from( - executeClaimAvailableTasks({ - claimOwnershipUntil, - size: capacity, - events$, - taskTypes: isLimited(batch) ? new Set([batch.tasksTypes]) : batch.tasksTypes, - taskStore, - definitions, - unusedTypes, - excludedTaskTypes, - taskMaxAttempts, - }).then((result) => { - const { stats, docs } = accumulateClaimOwnershipResults(accumulatedResult, result); - stats.tasksConflicted = correctVersionConflictsForContinuation( - stats.tasksClaimed, - stats.tasksConflicted, - initialCapacity - ); - return { stats, docs, timing: stopTaskTimer() }; - }) - ); - }, - // initialise the accumulation with no results - accumulateClaimOwnershipResults(), - // only run one batch at a time - 1 - ) - ); + + let accumulatedResult = getEmptyClaimOwnershipResult(); + const stopTaskTimer = startTaskTimer(); + for (const batch of batches) { + const capacity = Math.min( + initialCapacity - accumulatedResult.stats.tasksClaimed, + isLimited(batch) ? getCapacity(batch.tasksTypes) : getCapacity() + ); + + // if we have no more capacity, short circuit here + if (capacity <= 0) { + return accumulatedResult; + } + + const result = await executeClaimAvailableTasks({ + claimOwnershipUntil, + size: capacity, + events$, + taskTypes: isLimited(batch) ? new Set([batch.tasksTypes]) : batch.tasksTypes, + taskStore, + definitions, + unusedTypes, + excludedTaskTypes, + taskMaxAttempts, + }); + + accumulatedResult = accumulateClaimOwnershipResults(accumulatedResult, result); + accumulatedResult.stats.tasksConflicted = correctVersionConflictsForContinuation( + accumulatedResult.stats.tasksClaimed, + accumulatedResult.stats.tasksConflicted, + initialCapacity + ); + } + + return { ...accumulatedResult, timing: stopTaskTimer() }; } async function executeClaimAvailableTasks( @@ -230,7 +224,7 @@ function accumulateClaimOwnershipResults( tasksConflicted: stats.tasksConflicted + prev.stats.tasksConflicted, tasksClaimed: stats.tasksClaimed + prev.stats.tasksClaimed, }, - docs, + docs: [...prev.docs, ...docs], timing, }; return res;