diff --git a/package.json b/package.json index 0b99d219a968e..004f97b766c01 100644 --- a/package.json +++ b/package.json @@ -1100,6 +1100,7 @@ "moment-timezone": "^0.5.43", "monaco-editor": "^0.44.0", "monaco-yaml": "^5.1.0", + "murmurhash": "^2.0.1", "mustache": "^2.3.2", "node-diff3": "^3.1.2", "node-fetch": "^2.6.7", diff --git a/packages/kbn-check-mappings-update-cli/current_fields.json b/packages/kbn-check-mappings-update-cli/current_fields.json index 1d62104fac177..4cec00e68dc42 100644 --- a/packages/kbn-check-mappings-update-cli/current_fields.json +++ b/packages/kbn-check-mappings-update-cli/current_fields.json @@ -1005,6 +1005,7 @@ "attempts", "enabled", "ownerId", + "partition", "retryAt", "runAt", "schedule", diff --git a/packages/kbn-check-mappings-update-cli/current_mappings.json b/packages/kbn-check-mappings-update-cli/current_mappings.json index 93633a1e05824..9242f775f7094 100644 --- a/packages/kbn-check-mappings-update-cli/current_mappings.json +++ b/packages/kbn-check-mappings-update-cli/current_mappings.json @@ -3327,6 +3327,9 @@ "ownerId": { "type": "keyword" }, + "partition": { + "type": "integer" + }, "retryAt": { "type": "date" }, diff --git a/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts b/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts index 856a4d6b2f834..02ad163c43931 100644 --- a/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts +++ b/src/core/server/integration_tests/ci_checks/saved_objects/check_registered_types.test.ts @@ -161,7 +161,7 @@ describe('checking migration metadata changes on all registered SO types', () => "synthetics-param": "3ebb744e5571de678b1312d5c418c8188002cf5e", "synthetics-privates-locations": "f53d799d5c9bc8454aaa32c6abc99a899b025d5c", "tag": "e2544392fe6563e215bb677abc8b01c2601ef2dc", - "task": "d17f2fc0bf6759a070c2221ec2787ad785c680fe", + "task": "3c89a7c918d5b896a5f8800f06e9114ad7e7aea3", "telemetry": "7b00bcf1c7b4f6db1192bb7405a6a63e78b699fd", "threshold-explorer-view": "175306806f9fc8e13fcc1c8953ec4ba89bda1b70", "ui-metric": "d227284528fd19904e9d972aea0a13716fc5fe24", diff --git a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts index c82618d34bf8a..921402d94e9b0 100644 --- a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts +++ b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.test.ts @@ -11,46 +11,13 @@ import { ACTIVE_NODES_LOOK_BACK, } from './kibana_discovery_service'; import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects'; -import { - SavedObjectsBulkDeleteResponse, - SavedObjectsFindResponse, - SavedObjectsFindResult, - SavedObjectsUpdateResponse, -} from '@kbn/core/server'; +import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server'; -import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node'; +import { createFindResponse, createFindSO } from './mock_kibana_discovery_service'; const currentNode = 'current-node-id'; const now = '2024-08-10T10:00:00.000Z'; -const createNodeRecord = (id: string = '1', lastSeen: string = now): BackgroundTaskNode => ({ - id, - last_seen: lastSeen, -}); - -const createFindSO = ( - id: string = currentNode, - lastSeen: string = now -): SavedObjectsFindResult => ({ - attributes: createNodeRecord(id, lastSeen), - id: `${BACKGROUND_TASK_NODE_SO_NAME}:${id}`, - namespaces: ['default'], - references: [], - score: 1, - type: BACKGROUND_TASK_NODE_SO_NAME, - updated_at: new Date().toDateString(), - version: '1', -}); - -const createFindResponse = ( - soList: Array> -): SavedObjectsFindResponse => ({ - total: 1, - per_page: 10000, - page: 1, - saved_objects: soList, -}); - describe('KibanaDiscoveryService', () => { const savedObjectsRepository = savedObjectsRepositoryMock.create(); const logger = loggingSystemMock.createLogger(); diff --git a/x-pack/plugins/task_manager/server/kibana_discovery_service/mock_kibana_discovery_service.ts b/x-pack/plugins/task_manager/server/kibana_discovery_service/mock_kibana_discovery_service.ts new file mode 100644 index 0000000000000..39b98d29a149e --- /dev/null +++ b/x-pack/plugins/task_manager/server/kibana_discovery_service/mock_kibana_discovery_service.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { savedObjectsRepositoryMock, loggingSystemMock } from '@kbn/core/server/mocks'; +import { SavedObjectsFindResponse, SavedObjectsFindResult } from '@kbn/core/server'; +import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node'; +import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects'; +import { KibanaDiscoveryService } from './kibana_discovery_service'; + +export const createDiscoveryServiceMock = (currentNode: string) => { + const savedObjectsRepository = savedObjectsRepositoryMock.create(); + const logger = loggingSystemMock.createLogger(); + const discoveryService = new KibanaDiscoveryService({ + savedObjectsRepository, + logger, + currentNode, + }); + + for (const method of ['getActiveKibanaNodes'] as Array) { + jest.spyOn(discoveryService, method); + } + + return discoveryService as jest.Mocked; +}; + +export const createNodeRecord = (id: string, lastSeen: string): BackgroundTaskNode => ({ + id, + last_seen: lastSeen, +}); + +export const createFindSO = ( + id: string, + lastSeen: string +): SavedObjectsFindResult => ({ + attributes: createNodeRecord(id, lastSeen), + id: `${BACKGROUND_TASK_NODE_SO_NAME}:${id}`, + namespaces: ['default'], + references: [], + score: 1, + type: BACKGROUND_TASK_NODE_SO_NAME, + updated_at: new Date().toDateString(), + version: '1', +}); + +export const createFindResponse = ( + soList: Array> +): SavedObjectsFindResponse => ({ + total: 1, + per_page: 10000, + page: 1, + saved_objects: soList, +}); diff --git a/x-pack/plugins/task_manager/server/lib/assign_pod_partitions.test.ts b/x-pack/plugins/task_manager/server/lib/assign_pod_partitions.test.ts new file mode 100644 index 0000000000000..f7bb5413fc0cd --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/assign_pod_partitions.test.ts @@ -0,0 +1,144 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { assignPodPartitions, getParitionMap } from './assign_pod_partitions'; +describe('assignPodPartitions', () => { + test('two pods', () => { + const allPods = ['foo', 'bar']; + const allPartitions = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + const map = getParitionMap(allPods, allPartitions); + expect(map).toMatchInlineSnapshot(` + Object { + "1": Array [ + "bar", + "foo", + ], + "10": Array [ + "bar", + "foo", + ], + "2": Array [ + "bar", + "foo", + ], + "3": Array [ + "bar", + "foo", + ], + "4": Array [ + "bar", + "foo", + ], + "5": Array [ + "bar", + "foo", + ], + "6": Array [ + "bar", + "foo", + ], + "7": Array [ + "bar", + "foo", + ], + "8": Array [ + "bar", + "foo", + ], + "9": Array [ + "bar", + "foo", + ], + } + `); + }); + + test('three pods', () => { + const allPods = ['foo', 'bar', 'quz']; + const allPartitions = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + const map = getParitionMap(allPods, allPartitions); + expect(map).toMatchInlineSnapshot(` + Object { + "1": Array [ + "bar", + "foo", + ], + "10": Array [ + "bar", + "foo", + ], + "2": Array [ + "quz", + "bar", + ], + "3": Array [ + "foo", + "quz", + ], + "4": Array [ + "bar", + "foo", + ], + "5": Array [ + "quz", + "bar", + ], + "6": Array [ + "foo", + "quz", + ], + "7": Array [ + "bar", + "foo", + ], + "8": Array [ + "quz", + "bar", + ], + "9": Array [ + "foo", + "quz", + ], + } + `); + const fooPartitions = assignPodPartitions('foo', allPods, allPartitions); + expect(fooPartitions).toMatchInlineSnapshot(` + Array [ + 1, + 3, + 4, + 6, + 7, + 9, + 10, + ] + `); + const barPartitions = assignPodPartitions('bar', allPods, allPartitions); + expect(barPartitions).toMatchInlineSnapshot(` + Array [ + 1, + 2, + 4, + 5, + 7, + 8, + 10, + ] + `); + const quzPartitions = assignPodPartitions('quz', allPods, allPartitions); + expect(quzPartitions).toMatchInlineSnapshot(` + Array [ + 2, + 3, + 5, + 6, + 8, + 9, + ] + `); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/assign_pod_partitions.ts b/x-pack/plugins/task_manager/server/lib/assign_pod_partitions.ts new file mode 100644 index 0000000000000..c2d5554b01f1b --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/assign_pod_partitions.ts @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +const KIBANAS_PER_PARTITION = 2; + +export function getParitionMap(podNames: string[], partitions: number[]): Record { + const map: Record = {}; + let counter = 0; + for (const parition of partitions) { + map[parition] = []; + for (let i = 0; i < KIBANAS_PER_PARTITION; i++) { + map[parition].push(podNames.sort()[counter++ % podNames.length]); + } + } + return map; +} + +export function assignPodPartitions( + podName: string, + podNames: string[], + partitions: number[] +): number[] { + const map = getParitionMap(podNames, partitions); + const podPartitions: number[] = []; + for (const partition of Object.keys(map)) { + if (map[Number(partition)].indexOf(podName) !== -1) { + podPartitions.push(Number(partition)); + } + } + return podPartitions; +} diff --git a/x-pack/plugins/task_manager/server/lib/task_partitioner.test.ts b/x-pack/plugins/task_manager/server/lib/task_partitioner.test.ts new file mode 100644 index 0000000000000..4f1ebc60aa249 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/task_partitioner.test.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + createDiscoveryServiceMock, + createFindSO, +} from '../kibana_discovery_service/mock_kibana_discovery_service'; +import { TaskPartitioner } from './task_partitioner'; + +const POD_NAME = 'test-pod'; + +describe('getAllPartitions()', () => { + const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME); + test('correctly sets allPartitions in constructor', () => { + const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock); + expect(taskPartitioner.getAllPartitions()).toEqual([ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, + 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, + 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, + 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, + 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, + 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, + 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, + 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, + 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, + 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, + 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, + 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, + 248, 249, 250, 251, 252, 253, 254, 255, + ]); + }); +}); + +describe('getPodName()', () => { + const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME); + + test('correctly sets podName in constructor', () => { + const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock); + expect(taskPartitioner.getPodName()).toEqual('test-pod'); + }); +}); + +describe('getPartitions()', () => { + const lastSeen = '2024-08-10T10:00:00.000Z'; + const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME); + discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([ + createFindSO(POD_NAME, lastSeen), + createFindSO('test-pod-2', lastSeen), + createFindSO('test-pod-3', lastSeen), + ]); + + test('correctly gets the partitons for this pod', async () => { + const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock); + expect(await taskPartitioner.getPartitions()).toEqual([ + 0, 1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 21, 22, 24, 25, 27, 28, 30, 31, 33, 34, 36, + 37, 39, 40, 42, 43, 45, 46, 48, 49, 51, 52, 54, 55, 57, 58, 60, 61, 63, 64, 66, 67, 69, 70, + 72, 73, 75, 76, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 96, 97, 99, 100, 102, 103, + 105, 106, 108, 109, 111, 112, 114, 115, 117, 118, 120, 121, 123, 124, 126, 127, 129, 130, 132, + 133, 135, 136, 138, 139, 141, 142, 144, 145, 147, 148, 150, 151, 153, 154, 156, 157, 159, 160, + 162, 163, 165, 166, 168, 169, 171, 172, 174, 175, 177, 178, 180, 181, 183, 184, 186, 187, 189, + 190, 192, 193, 195, 196, 198, 199, 201, 202, 204, 205, 207, 208, 210, 211, 213, 214, 216, 217, + 219, 220, 222, 223, 225, 226, 228, 229, 231, 232, 234, 235, 237, 238, 240, 241, 243, 244, 246, + 247, 249, 250, 252, 253, 255, + ]); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/task_partitioner.ts b/x-pack/plugins/task_manager/server/lib/task_partitioner.ts new file mode 100644 index 0000000000000..8ff696391a826 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/task_partitioner.ts @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { KibanaDiscoveryService } from '../kibana_discovery_service'; +import { assignPodPartitions } from './assign_pod_partitions'; + +function range(start: number, end: number) { + const nums: number[] = []; + for (let i = start; i < end; ++i) { + nums.push(i); + } + return nums; +} + +export const MAX_PARTITIONS = 256; + +export class TaskPartitioner { + private readonly allPartitions: number[]; + private readonly podName: string; + private kibanaDiscoveryService: KibanaDiscoveryService; + + constructor(podName: string, kibanaDiscoveryService: KibanaDiscoveryService) { + this.allPartitions = range(0, MAX_PARTITIONS); + this.podName = podName; + this.kibanaDiscoveryService = kibanaDiscoveryService; + } + + getAllPartitions(): number[] { + return this.allPartitions; + } + + getPodName(): string { + return this.podName; + } + + async getPartitions(): Promise { + const allPodNames = await this.getAllPodNames(); + const podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions); + return podPartitions; + } + + private async getAllPodNames(): Promise { + const nodes = await this.kibanaDiscoveryService.getActiveKibanaNodes(); + return nodes.map((node) => node.attributes.id); + } +} diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index e931c58b579ac..1926b48b31ea6 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -42,6 +42,7 @@ import { AdHocTaskCounter } from './lib/adhoc_task_counter'; import { setupIntervalLogging } from './lib/log_health_metrics'; import { metricsStream, Metrics } from './metrics'; import { TaskManagerMetricsCollector } from './metrics/task_metrics_collector'; +import { TaskPartitioner } from './lib/task_partitioner'; export interface TaskManagerSetupContract { /** @@ -281,6 +282,8 @@ export class TaskManagerPlugin taskTypes: new Set(this.definitions.getAllTypes()), excludedTypes: new Set(this.config.unsafe.exclude_task_types), }); + + const taskPartitioner = new TaskPartitioner(this.taskManagerId!, this.kibanaDiscoveryService); this.taskPollingLifecycle = new TaskPollingLifecycle({ config: this.config!, definitions: this.definitions, @@ -292,6 +295,7 @@ export class TaskManagerPlugin middleware: this.middleware, elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!, ...managedConfiguration, + taskPartitioner, }); this.ephemeralTaskLifecycle = new EphemeralTaskLifecycle({ diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 942922b84f407..baf45cb65ea1e 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -20,6 +20,8 @@ import { asOk, Err, isErr, isOk, Result } from './lib/result_type'; import { FillPoolResult } from './lib/fill_pool'; import { ElasticsearchResponseError } from './lib/identify_es_error'; import { executionContextServiceMock } from '@kbn/core/server/mocks'; +import { TaskPartitioner } from './lib/task_partitioner'; +import { KibanaDiscoveryService } from './kibana_discovery_service'; const executionContext = executionContextServiceMock.createSetupContract(); let mockTaskClaiming = taskClaimingMock.create({}); @@ -91,6 +93,7 @@ describe('TaskPollingLifecycle', () => { maxWorkersConfiguration$: of(100), pollIntervalConfiguration$: of(100), executionContext, + taskPartitioner: new TaskPartitioner('test', {} as KibanaDiscoveryService), }; beforeEach(() => { diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 35fc48423f710..3b9c5621da0b9 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -43,6 +43,7 @@ import { TaskTypeDictionary } from './task_type_dictionary'; import { delayOnClaimConflicts } from './polling'; import { TaskClaiming } from './queries/task_claiming'; import { ClaimOwnershipResult } from './task_claimers'; +import { TaskPartitioner } from './lib/task_partitioner'; export interface ITaskEventEmitter { get events(): Observable; @@ -58,6 +59,7 @@ export type TaskPollingLifecycleOpts = { elasticsearchAndSOAvailability$: Observable; executionContext: ExecutionContextStart; usageCounter?: UsageCounter; + taskPartitioner: TaskPartitioner; } & ManagedConfiguration; export type TaskLifecycleEvent = @@ -109,6 +111,7 @@ export class TaskPollingLifecycle implements ITaskEventEmitter { + expect(tasksWithPartitions([1, 2, 3])).toMatchInlineSnapshot(` + Object { + "bool": Object { + "filter": Array [ + Object { + "bool": Object { + "should": Array [ + Object { + "terms": Object { + "task.partition": Array [ + 1, + 2, + 3, + ], + }, + }, + Object { + "bool": Object { + "must_not": Array [ + Object { + "exists": Object { + "field": "task.partition", + }, + }, + ], + }, + }, + ], + }, + }, + ], + }, + } + `); + }); }); diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts index 0c241aeef14b8..107a3f4466637 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts @@ -233,3 +233,34 @@ export const OneOfTaskTypes = (field: string, types: string[]): MustCondition => }, }; }; + +export function tasksWithPartitions(partitions: number[]): estypes.QueryDslQueryContainer { + return { + bool: { + filter: [ + { + bool: { + should: [ + { + terms: { + 'task.partition': partitions, + }, + }, + { + bool: { + must_not: [ + { + exists: { + field: 'task.partition', + }, + }, + ], + }, + }, + ], + }, + }, + ], + }, + }; +} diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts index 33e5a0074319d..bc4adb71dd4a1 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts @@ -10,6 +10,8 @@ import { mockLogger } from '../test_utils'; import { TaskClaiming } from './task_claiming'; import { taskStoreMock } from '../task_store.mock'; import apm from 'elastic-apm-node'; +import { TaskPartitioner } from '../lib/task_partitioner'; +import { KibanaDiscoveryService } from '../kibana_discovery_service'; jest.mock('../constants', () => ({ CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [ @@ -23,6 +25,7 @@ jest.mock('../constants', () => ({ })); const taskManagerLogger = mockLogger(); +const taskPartitioner = new TaskPartitioner('test', {} as KibanaDiscoveryService); beforeEach(() => jest.clearAllMocks()); @@ -78,6 +81,7 @@ describe('TaskClaiming', () => { taskStore: taskStoreMock.create({ taskManagerId: '' }), maxAttempts: 2, getCapacity: () => 10, + taskPartitioner, }); expect(taskManagerLogger.warn).toHaveBeenCalledWith( @@ -127,6 +131,7 @@ describe('TaskClaiming', () => { taskStore: taskStoreMock.create({ taskManagerId: '' }), maxAttempts: 2, getCapacity: () => 10, + taskPartitioner, }); expect(taskManagerLogger.info).toHaveBeenCalledTimes(2); diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts index ffd053656d72d..188f47b0d2d2f 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts @@ -27,6 +27,7 @@ import { ClaimOwnershipResult, getTaskClaimer, } from '../task_claimers'; +import { TaskPartitioner } from '../lib/task_partitioner'; export type { ClaimOwnershipResult } from '../task_claimers'; export interface TaskClaimingOpts { @@ -38,6 +39,7 @@ export interface TaskClaimingOpts { maxAttempts: number; excludedTaskTypes: string[]; getCapacity: (taskType?: string) => number; + taskPartitioner: TaskPartitioner; } export interface OwnershipClaimingOpts { @@ -92,6 +94,7 @@ export class TaskClaiming { private readonly excludedTaskTypes: string[]; private readonly unusedTypes: string[]; private readonly taskClaimer: TaskClaimerFn; + private readonly taskPartitioner: TaskPartitioner; /** * Constructs a new TaskStore. @@ -111,6 +114,7 @@ export class TaskClaiming { this.unusedTypes = opts.unusedTypes; this.taskClaimer = getTaskClaimer(this.logger, opts.strategy); this.events$ = new Subject(); + this.taskPartitioner = opts.taskPartitioner; this.logger.info(`using task claiming strategy: ${opts.strategy}`); } @@ -178,6 +182,7 @@ export class TaskClaiming { taskMaxAttempts: this.taskMaxAttempts, excludedTaskTypes: this.excludedTaskTypes, logger: this.logger, + taskPartitioner: this.taskPartitioner, }; return this.taskClaimer(opts).pipe(map((claimResult) => asOk(claimResult))); } diff --git a/x-pack/plugins/task_manager/server/saved_objects/mappings.ts b/x-pack/plugins/task_manager/server/saved_objects/mappings.ts index c0dc563a85157..8ad641b56a58f 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/mappings.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/mappings.ts @@ -62,6 +62,9 @@ export const taskMappings: SavedObjectsTypeMappingDefinition = { ownerId: { type: 'keyword', }, + partition: { + type: 'integer', + }, }, }; diff --git a/x-pack/plugins/task_manager/server/saved_objects/model_versions/task_model_versions.ts b/x-pack/plugins/task_manager/server/saved_objects/model_versions/task_model_versions.ts index d76d26e56c23e..775b3ea2f8cad 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/model_versions/task_model_versions.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/model_versions/task_model_versions.ts @@ -6,7 +6,7 @@ */ import { SavedObjectsModelVersionMap } from '@kbn/core-saved-objects-server'; -import { taskSchemaV1 } from '../schemas/task'; +import { taskSchemaV1, taskSchemaV2 } from '../schemas/task'; export const taskModelVersions: SavedObjectsModelVersionMap = { '1': { @@ -17,7 +17,22 @@ export const taskModelVersions: SavedObjectsModelVersionMap = { }, ], schemas: { + forwardCompatibility: taskSchemaV1.extends({}, { unknowns: 'ignore' }), create: taskSchemaV1, }, }, + '2': { + changes: [ + { + type: 'mappings_addition', + addedMappings: { + partition: { type: 'integer' }, + }, + }, + ], + schemas: { + forwardCompatibility: taskSchemaV2.extends({}, { unknowns: 'ignore' }), + create: taskSchemaV2, + }, + }, }; diff --git a/x-pack/plugins/task_manager/server/saved_objects/schemas/task.ts b/x-pack/plugins/task_manager/server/saved_objects/schemas/task.ts index ae2e28d1d9beb..2a6ee5c92198c 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/schemas/task.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/schemas/task.ts @@ -38,3 +38,7 @@ export const taskSchemaV1 = schema.object({ ]), version: schema.maybe(schema.string()), }); + +export const taskSchemaV2 = taskSchemaV1.extends({ + partition: schema.maybe(schema.number()), +}); diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index 054b8f4686388..fae99bb8f1f5b 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -328,6 +328,11 @@ export interface TaskInstance { * Optionally override the timeout defined in the task type for this specific task instance */ timeoutOverride?: string; + + /* + * Used to break up tasks so each Kibana node can claim tasks on a subset of the partitions + */ + partition?: number; } /** @@ -426,6 +431,11 @@ export interface ConcreteTaskInstance extends TaskInstance { * The random uuid of the Kibana instance which claimed ownership of the task last */ ownerId: string | null; + + /* + * Used to break up tasks so each Kibana node can claim tasks on a subset of the partitions + */ + partition?: number; } export interface ConcreteTaskInstanceVersion { @@ -460,4 +470,5 @@ export type SerializedConcreteTaskInstance = Omit< startedAt: string | null; retryAt: string | null; runAt: string; + partition?: number; }; diff --git a/x-pack/plugins/task_manager/server/task_claimers/index.ts b/x-pack/plugins/task_manager/server/task_claimers/index.ts index 927d4c762f625..1caa6e2addb0f 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/index.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/index.ts @@ -16,6 +16,7 @@ import { ConcreteTaskInstance } from '../task'; import { claimAvailableTasksDefault } from './strategy_default'; import { claimAvailableTasksMget } from './strategy_mget'; import { CLAIM_STRATEGY_DEFAULT, CLAIM_STRATEGY_MGET } from '../config'; +import { TaskPartitioner } from '../lib/task_partitioner'; export interface TaskClaimerOpts { getCapacity: (taskType?: string | undefined) => number; @@ -28,6 +29,7 @@ export interface TaskClaimerOpts { excludedTaskTypes: string[]; taskMaxAttempts: Record; logger: Logger; + taskPartitioner: TaskPartitioner; } export interface ClaimOwnershipResult { diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_default.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_default.test.ts index e07038960d371..8aa206bbe1872 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_default.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_default.test.ts @@ -28,6 +28,8 @@ import apm from 'elastic-apm-node'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; import { ClaimOwnershipResult } from '.'; import { FillPoolResult } from '../lib/fill_pool'; +import { TaskPartitioner } from '../lib/task_partitioner'; +import { KibanaDiscoveryService } from '../kibana_discovery_service'; jest.mock('../constants', () => ({ CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [ @@ -41,6 +43,7 @@ jest.mock('../constants', () => ({ })); const taskManagerLogger = mockLogger(); +const taskPartitioner = new TaskPartitioner('test', {} as KibanaDiscoveryService); beforeEach(() => jest.clearAllMocks()); @@ -131,6 +134,7 @@ describe('TaskClaiming', () => { unusedTypes: unusedTaskTypes, maxAttempts: taskClaimingOpts.maxAttempts ?? 2, getCapacity: taskClaimingOpts.getCapacity ?? (() => 10), + taskPartitioner, ...taskClaimingOpts, }); @@ -1251,6 +1255,7 @@ if (doc['task.runAt'].size()!=0) { taskStore, maxAttempts: 2, getCapacity, + taskPartitioner, }); return { taskManagerId, runAt, taskClaiming }; diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index 0306f9dda3da8..b58ea02893c10 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -16,7 +16,7 @@ import { ConcreteTaskInstanceVersion, TaskPriority, } from '../task'; -import { StoreOpts } from '../task_store'; +import { SearchOpts, StoreOpts } from '../task_store'; import { asTaskClaimEvent, TaskEvent } from '../task_events'; import { asOk, isOk, unwrap } from '../lib/result_type'; import { TaskTypeDictionary } from '../task_type_dictionary'; @@ -33,6 +33,16 @@ import apm from 'elastic-apm-node'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; import { ClaimOwnershipResult } from '.'; import { FillPoolResult } from '../lib/fill_pool'; +import { TaskPartitioner } from '../lib/task_partitioner'; +import type { MustNotCondition } from '../queries/query_clauses'; +import { + createDiscoveryServiceMock, + createFindSO, +} from '../kibana_discovery_service/mock_kibana_discovery_service'; + +jest.mock('../lib/assign_pod_partitions', () => ({ + assignPodPartitions: jest.fn().mockReturnValue([1, 3]), +})); jest.mock('../constants', () => ({ CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [ @@ -80,6 +90,15 @@ const mockApmTrans = { end: jest.fn(), }; +const discoveryServiceMock = createDiscoveryServiceMock('test'); +const lastSeen = '2024-08-10T10:00:00.000Z'; +discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([ + createFindSO('test', lastSeen), + createFindSO('test-pod-2', lastSeen), + createFindSO('test-pod-3', lastSeen), +]); +const taskPartitioner = new TaskPartitioner('test', discoveryServiceMock); + // needs more tests in the similar to the `strategy_default.test.ts` test suite describe('TaskClaiming', () => { beforeEach(() => { @@ -138,6 +157,7 @@ describe('TaskClaiming', () => { unusedTypes: unusedTaskTypes, maxAttempts: taskClaimingOpts.maxAttempts ?? 2, getCapacity: taskClaimingOpts.getCapacity ?? (() => 10), + taskPartitioner, ...taskClaimingOpts, }); @@ -183,17 +203,13 @@ describe('TaskClaiming', () => { return unwrap(resultOrErr) as ClaimOwnershipResult; }); - expect(apm.startTransaction).toHaveBeenCalledWith( - TASK_MANAGER_MARK_AS_CLAIMED, - TASK_MANAGER_TRANSACTION_TYPE - ); - expect(mockApmTrans.end).toHaveBeenCalledWith('success'); - - expect(store.fetch.mock.calls).toMatchObject({}); - expect(store.getDocVersions.mock.calls).toMatchObject({}); return results.map((result, index) => ({ result, - args: {}, + args: { + search: store.fetch.mock.calls[index][0] as SearchOpts & { + query: MustNotCondition; + }, + }, })); } @@ -272,6 +288,151 @@ describe('TaskClaiming', () => { }); expect(result).toMatchObject({}); }); + + test('it should filter for specific partitions and tasks without partitions', async () => { + const taskManagerId = uuidv4(); + const [ + { + args: { + search: { query }, + }, + }, + ] = await testClaimAvailableTasks({ + storeOpts: { + taskManagerId, + }, + taskClaimingOpts: {}, + claimingOpts: { + claimOwnershipUntil: new Date(), + }, + }); + + expect(query).toMatchInlineSnapshot(` + Object { + "bool": Object { + "filter": Array [ + Object { + "bool": Object { + "should": Array [ + Object { + "terms": Object { + "task.partition": Array [ + 1, + 3, + ], + }, + }, + Object { + "bool": Object { + "must_not": Array [ + Object { + "exists": Object { + "field": "task.partition", + }, + }, + ], + }, + }, + ], + }, + }, + ], + "must": Array [ + Object { + "bool": Object { + "must": Array [ + Object { + "term": Object { + "task.enabled": true, + }, + }, + ], + }, + }, + Object { + "bool": Object { + "must": Array [ + Object { + "terms": Object { + "task.taskType": Array [ + "report", + "dernstraight", + "yawn", + ], + }, + }, + ], + }, + }, + Object { + "bool": Object { + "should": Array [ + Object { + "bool": Object { + "must": Array [ + Object { + "term": Object { + "task.status": "idle", + }, + }, + Object { + "range": Object { + "task.runAt": Object { + "lte": "now", + }, + }, + }, + ], + }, + }, + Object { + "bool": Object { + "must": Array [ + Object { + "bool": Object { + "should": Array [ + Object { + "term": Object { + "task.status": "running", + }, + }, + Object { + "term": Object { + "task.status": "claiming", + }, + }, + ], + }, + }, + Object { + "range": Object { + "task.retryAt": Object { + "lte": "now", + }, + }, + }, + ], + }, + }, + ], + }, + }, + Object { + "bool": Object { + "must_not": Array [ + Object { + "term": Object { + "task.status": "unrecognized", + }, + }, + ], + }, + }, + ], + }, + } + `); + }); }); describe('task events', () => { @@ -373,6 +534,7 @@ describe('TaskClaiming', () => { taskStore, maxAttempts: 2, getCapacity, + taskPartitioner, }); return { taskManagerId, runAt, taskClaiming }; diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index 07d18a39a1dbc..362c38166339f 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -36,10 +36,12 @@ import { EnabledTask, OneOfTaskTypes, RecognizedTask, + tasksWithPartitions, } from '../queries/mark_available_tasks_as_claimed'; import { TaskStore, SearchOpts } from '../task_store'; import { isOk, asOk } from '../lib/result_type'; +import { TaskPartitioner } from '../lib/task_partitioner'; interface OwnershipClaimingOpts { claimOwnershipUntil: Date; @@ -51,6 +53,7 @@ interface OwnershipClaimingOpts { events$: Subject; definitions: TaskTypeDictionary; taskMaxAttempts: Record; + taskPartitioner: TaskPartitioner; } const SIZE_MULTIPLIER_FOR_TASK_FETCH = 4; @@ -89,7 +92,7 @@ async function claimAvailableTasksApm(opts: TaskClaimerOpts): Promise { - const { getCapacity, claimOwnershipUntil, batches, events$, taskStore } = opts; + const { getCapacity, claimOwnershipUntil, batches, events$, taskStore, taskPartitioner } = opts; const { definitions, unusedTypes, excludedTaskTypes, taskMaxAttempts } = opts; const { logger } = opts; const loggerTag = claimAvailableTasksMget.name; @@ -111,6 +114,7 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise { const searchedTypes = Array.from(taskTypes) .concat(Array.from(removedTypes)) @@ -283,9 +287,14 @@ async function searchAvailableTasks({ // must have a status that isn't 'unrecognized' RecognizedTask ); + const partitions = await taskPartitioner.getPartitions(); const sort: NonNullable = getClaimSort(definitions); - const query = matchesClauses(queryForScheduledTasks, filterDownBy(InactiveTasks)); + const query = matchesClauses( + queryForScheduledTasks, + filterDownBy(InactiveTasks), + tasksWithPartitions(partitions) + ); return await taskStore.fetch({ query, diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 925dc4d1a4c69..afde0ae91ea55 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -160,6 +160,7 @@ describe('TaskStore', () => { taskType: 'report', user: undefined, traceparent: 'apmTraceparent', + partition: 225, }, { id: 'id', @@ -183,6 +184,7 @@ describe('TaskStore', () => { user: undefined, version: '123', traceparent: 'apmTraceparent', + partition: 225, }); }); @@ -490,6 +492,7 @@ describe('TaskStore', () => { version: '123', ownerId: null, traceparent: 'myTraceparent', + partition: 99, }; savedObjectsClient.update.mockImplementation( @@ -532,6 +535,7 @@ describe('TaskStore', () => { user: undefined, ownerId: null, traceparent: 'myTraceparent', + partition: 99, }, { version: '123', refresh: false } ); @@ -1050,6 +1054,7 @@ describe('TaskStore', () => { status: 'idle', taskType: 'report', traceparent: 'apmTraceparent', + partition: 225, }, references: [], version: '123', @@ -1089,6 +1094,7 @@ describe('TaskStore', () => { status: 'idle', taskType: 'report', traceparent: 'apmTraceparent', + partition: 225, }, }, ], @@ -1113,6 +1119,7 @@ describe('TaskStore', () => { user: undefined, version: '123', traceparent: 'apmTraceparent', + partition: 225, }, ]); }); diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index b922d10ee5cf1..e0ad3dfae149a 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -8,6 +8,8 @@ /* * This module contains helpers for managing the task manager storage layer. */ +import murmurhash from 'murmurhash'; +import { v4 } from 'uuid'; import { Subject } from 'rxjs'; import { omit, defaults, get } from 'lodash'; import { SavedObjectError } from '@kbn/core-saved-objects-common'; @@ -39,6 +41,7 @@ import { import { TaskTypeDictionary } from './task_type_dictionary'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; import { TaskValidator } from './task_validator'; +import { MAX_PARTITIONS } from './lib/task_partitioner'; export interface StoreOpts { esClient: ElasticsearchClient; @@ -165,12 +168,13 @@ export class TaskStore { let savedObject; try { + const id = taskInstance.id || v4(); const validatedTaskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance); savedObject = await this.savedObjectsRepository.create( 'task', - taskInstanceToAttributes(validatedTaskInstance), - { id: taskInstance.id, refresh: false } + taskInstanceToAttributes(validatedTaskInstance, id), + { id, refresh: false } ); if (get(taskInstance, 'schedule.interval', null) == null) { this.adHocTaskCounter.increment(); @@ -191,13 +195,14 @@ export class TaskStore { */ public async bulkSchedule(taskInstances: TaskInstance[]): Promise { const objects = taskInstances.map((taskInstance) => { + const id = taskInstance.id || v4(); this.definitions.ensureHas(taskInstance.taskType); const validatedTaskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance); return { type: 'task', - attributes: taskInstanceToAttributes(validatedTaskInstance), - id: taskInstance.id, + attributes: taskInstanceToAttributes(validatedTaskInstance, id), + id, }; }); @@ -252,7 +257,7 @@ export class TaskStore { const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, { validate: options.validate, }); - const attributes = taskInstanceToAttributes(taskInstance); + const attributes = taskInstanceToAttributes(taskInstance, doc.id); let updatedSavedObject; try { @@ -297,7 +302,7 @@ export class TaskStore { const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, { validate: options.validate, }); - attrsById.set(doc.id, taskInstanceToAttributes(taskInstance)); + attrsById.set(doc.id, taskInstanceToAttributes(taskInstance, doc.id)); return attrsById; }, new Map()); @@ -622,7 +627,7 @@ export function correctVersionConflictsForContinuation( return maxDocs && versionConflicts + updated > maxDocs ? maxDocs - updated : versionConflicts; } -function taskInstanceToAttributes(doc: TaskInstance): SerializedConcreteTaskInstance { +function taskInstanceToAttributes(doc: TaskInstance, id: string): SerializedConcreteTaskInstance { return { ...omit(doc, 'id', 'version'), params: JSON.stringify(doc.params || {}), @@ -633,6 +638,7 @@ function taskInstanceToAttributes(doc: TaskInstance): SerializedConcreteTaskInst retryAt: (doc.retryAt && doc.retryAt.toISOString()) || null, runAt: (doc.runAt || new Date()).toISOString(), status: (doc as ConcreteTaskInstance).status || 'idle', + partition: doc.partition || murmurhash.v3(id) % MAX_PARTITIONS, } as SerializedConcreteTaskInstance; } diff --git a/x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/server/init_routes.ts b/x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/server/init_routes.ts index 3273fe855ad31..1c346584beaf2 100644 --- a/x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/server/init_routes.ts +++ b/x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/server/init_routes.ts @@ -17,6 +17,7 @@ import { } from '@kbn/core/server'; import { EventEmitter } from 'events'; import { TaskManagerStartContract } from '@kbn/task-manager-plugin/server'; +import { BACKGROUND_TASK_NODE_SO_NAME } from '@kbn/task-manager-plugin/server/saved_objects'; const scope = 'testing'; const taskManagerQuery = { @@ -401,4 +402,40 @@ export function initRoutes( } } ); + + router.post( + { + path: `/api/update_kibana_node`, + validate: { + body: schema.object({ + id: schema.string(), + lastSeen: schema.string(), + }), + }, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise> { + const { id, lastSeen } = req.body; + + const client = (await context.core).savedObjects.getClient({ + includedHiddenTypes: [BACKGROUND_TASK_NODE_SO_NAME], + }); + const node = await client.update( + BACKGROUND_TASK_NODE_SO_NAME, + id, + { + id, + last_seen: lastSeen, + }, + { upsert: { id, last_seen: lastSeen }, refresh: false, retryOnConflict: 3 } + ); + + return res.ok({ + body: node, + }); + } + ); } diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts index 66ba9a0108afc..83005f2d55342 100644 --- a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/index.ts @@ -16,6 +16,7 @@ export default function ({ loadTestFile }: FtrProviderContext) { loadTestFile(require.resolve('./task_management')); loadTestFile(require.resolve('./task_management_scheduled_at')); loadTestFile(require.resolve('./task_management_removed_types')); + loadTestFile(require.resolve('./task_partitions')); loadTestFile(require.resolve('./migrations')); }); diff --git a/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_partitions.ts b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_partitions.ts new file mode 100644 index 0000000000000..bede1c52625b3 --- /dev/null +++ b/x-pack/test/task_manager_claimer_mget/test_suites/task_manager/task_partitions.ts @@ -0,0 +1,218 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server'; +import { taskMappings as TaskManagerMapping } from '@kbn/task-manager-plugin/server/saved_objects/mappings'; +import { asyncForEach } from '@kbn/std'; +import { FtrProviderContext } from '../../ftr_provider_context'; + +const { properties: taskManagerIndexMapping } = TaskManagerMapping; + +export interface RawDoc { + _id: string; + _source: any; + _type?: string; +} +export interface SearchResults { + hits: { + hits: RawDoc[]; + }; +} + +type DeprecatedConcreteTaskInstance = Omit & { + interval: string; +}; + +type SerializedConcreteTaskInstance = Omit< + ConcreteTaskInstance, + 'state' | 'params' | 'scheduledAt' | 'startedAt' | 'retryAt' | 'runAt' +> & { + state: State; + params: Params; + scheduledAt: string; + startedAt: string | null; + retryAt: string | null; + runAt: string; +}; + +export default function ({ getService }: FtrProviderContext) { + const es = getService('es'); + const retry = getService('retry'); + const supertest = getService('supertest'); + + const testHistoryIndex = '.kibana_task_manager_test_result'; + const testNode1 = 'y-test-node'; + const testNode2 = 'z-test-node'; + + function scheduleTask( + task: Partial + ): Promise { + return supertest + .post('/api/sample_tasks/schedule') + .set('kbn-xsrf', 'xxx') + .send({ task }) + .expect(200) + .then((response: { body: SerializedConcreteTaskInstance }) => response.body); + } + + function currentTasks(): Promise<{ + docs: Array>; + }> { + return supertest + .get('/api/sample_tasks') + .expect(200) + .then((response) => response.body); + } + + function updateKibanaNodes() { + const lastSeen = new Date().toISOString(); + return Promise.all([ + supertest + .post('/api/update_kibana_node') + .set('kbn-xsrf', 'xxx') + .send({ id: testNode1, lastSeen }) + .expect(200), + supertest + .post('/api/update_kibana_node') + .set('kbn-xsrf', 'xxx') + .send({ id: testNode2, lastSeen }) + .expect(200), + ]); + } + + async function historyDocs({ + taskId, + taskType, + }: { + taskId?: string; + taskType?: string; + }): Promise { + const filter: any[] = [{ term: { type: 'task' } }]; + if (taskId) { + filter.push({ term: { taskId } }); + } + if (taskType) { + filter.push({ term: { taskType } }); + } + return es + .search({ + index: testHistoryIndex, + body: { + query: { + bool: { + filter, + }, + }, + }, + }) + .then((result) => (result as unknown as SearchResults).hits.hits); + } + + describe('task partitions', () => { + beforeEach(async () => { + const exists = await es.indices.exists({ index: testHistoryIndex }); + if (exists) { + await es.deleteByQuery({ + index: testHistoryIndex, + refresh: true, + body: { query: { term: { type: 'task' } } }, + }); + } else { + await es.indices.create({ + index: testHistoryIndex, + body: { + mappings: { + properties: { + type: { + type: 'keyword', + }, + taskId: { + type: 'keyword', + }, + params: taskManagerIndexMapping.params, + state: taskManagerIndexMapping.state, + runAt: taskManagerIndexMapping.runAt, + } as Record, + }, + }, + }); + } + }); + + afterEach(async () => { + await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200); + await es.deleteByQuery({ + index: '.kibana_task_manager', + refresh: true, + body: { query: { terms: { id: [testNode1, testNode2] } } }, + }); + }); + + it('should tasks with partitions assigned to this kibana node', async () => { + const partitions: Record = { + '0': 127, + '1': 147, + '2': 23, + }; + + const tasksToSchedule = []; + for (let i = 0; i < 3; i++) { + tasksToSchedule.push( + scheduleTask({ + id: `${i}`, + taskType: 'sampleTask', + schedule: { interval: `1d` }, + params: {}, + }) + ); + } + const scheduledTasks = await Promise.all(tasksToSchedule); + + let tasks: any[] = []; + await retry.try(async () => { + tasks = (await currentTasks()).docs; + expect(tasks.length).to.eql(3); + }); + + const taskIds = tasks.map((task) => task.id); + await asyncForEach(scheduledTasks, async (scheduledTask) => { + expect(taskIds).to.contain(scheduledTask.id); + expect(scheduledTask.partition).to.eql(partitions[scheduledTask.id]); + + let taskRanOnThisNode: boolean = false; + let counter = 0; + await retry.try(async () => { + await updateKibanaNodes(); + + const doc: RawDoc[] = await historyDocs({ taskId: scheduledTask.id }); + if (doc.length === 1) { + taskRanOnThisNode = true; + return; + } + + // we don't want the test to time out, so we check + // 20 times and then return + if (scheduledTask.id === '2' && counter > 20) { + return; + } + counter++; + + throw new Error(`The task ID: ${scheduledTask.id} has not run yet`); + }); + + // taskId 2 should not run on this kibana node + if (scheduledTask.id === '2') { + expect(taskRanOnThisNode).to.be(false); + } else { + expect(taskRanOnThisNode).to.be(true); + } + }); + }); + }); +} diff --git a/yarn.lock b/yarn.lock index 6a28104d98d00..a7c594c619838 100644 --- a/yarn.lock +++ b/yarn.lock @@ -23573,6 +23573,11 @@ murmurhash-js@^1.0.0: resolved "https://registry.yarnpkg.com/murmurhash-js/-/murmurhash-js-1.0.0.tgz#b06278e21fc6c37fa5313732b0412bcb6ae15f51" integrity sha1-sGJ44h/Gw3+lMTcysEEry2rhX1E= +murmurhash@^2.0.1: + version "2.0.1" + resolved "https://registry.yarnpkg.com/murmurhash/-/murmurhash-2.0.1.tgz#4097720e08cf978872194ad84ea5be2dec9b610f" + integrity sha512-5vQEh3y+DG/lMPM0mCGPDnyV8chYg/g7rl6v3Gd8WMF9S429ox3Xk8qrk174kWhG767KQMqqxLD1WnGd77hiew== + mustache@^2.3.2: version "2.3.2" resolved "https://registry.yarnpkg.com/mustache/-/mustache-2.3.2.tgz#a6d4d9c3f91d13359ab889a812954f9230a3d0c5"