Skip to content

Commit

Permalink
[Task Manager] Add partitions to tasks and assign those task partitio…
Browse files Browse the repository at this point in the history
…ns to Kibana nodes (elastic#188758)

Resolves elastic#187700
Resolves elastic#187698

## Summary

This is a feature branch PR to main. Merging the following PRs that have
already been approved, elastic#188001 and
elastic#188368

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
doakalexi and kibanamachine authored Jul 19, 2024
1 parent 2c8b2ff commit 5adf5be
Show file tree
Hide file tree
Showing 31 changed files with 959 additions and 57 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,7 @@
"moment-timezone": "^0.5.43",
"monaco-editor": "^0.44.0",
"monaco-yaml": "^5.1.0",
"murmurhash": "^2.0.1",
"mustache": "^2.3.2",
"node-diff3": "^3.1.2",
"node-fetch": "^2.6.7",
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-check-mappings-update-cli/current_fields.json
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,7 @@
"attempts",
"enabled",
"ownerId",
"partition",
"retryAt",
"runAt",
"schedule",
Expand Down
3 changes: 3 additions & 0 deletions packages/kbn-check-mappings-update-cli/current_mappings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3327,6 +3327,9 @@
"ownerId": {
"type": "keyword"
},
"partition": {
"type": "integer"
},
"retryAt": {
"type": "date"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ describe('checking migration metadata changes on all registered SO types', () =>
"synthetics-param": "3ebb744e5571de678b1312d5c418c8188002cf5e",
"synthetics-privates-locations": "f53d799d5c9bc8454aaa32c6abc99a899b025d5c",
"tag": "e2544392fe6563e215bb677abc8b01c2601ef2dc",
"task": "d17f2fc0bf6759a070c2221ec2787ad785c680fe",
"task": "3c89a7c918d5b896a5f8800f06e9114ad7e7aea3",
"telemetry": "7b00bcf1c7b4f6db1192bb7405a6a63e78b699fd",
"threshold-explorer-view": "175306806f9fc8e13fcc1c8953ec4ba89bda1b70",
"ui-metric": "d227284528fd19904e9d972aea0a13716fc5fe24",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,13 @@ import {
ACTIVE_NODES_LOOK_BACK,
} from './kibana_discovery_service';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import {
SavedObjectsBulkDeleteResponse,
SavedObjectsFindResponse,
SavedObjectsFindResult,
SavedObjectsUpdateResponse,
} from '@kbn/core/server';
import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server';

import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { createFindResponse, createFindSO } from './mock_kibana_discovery_service';

const currentNode = 'current-node-id';
const now = '2024-08-10T10:00:00.000Z';

const createNodeRecord = (id: string = '1', lastSeen: string = now): BackgroundTaskNode => ({
id,
last_seen: lastSeen,
});

const createFindSO = (
id: string = currentNode,
lastSeen: string = now
): SavedObjectsFindResult<BackgroundTaskNode> => ({
attributes: createNodeRecord(id, lastSeen),
id: `${BACKGROUND_TASK_NODE_SO_NAME}:${id}`,
namespaces: ['default'],
references: [],
score: 1,
type: BACKGROUND_TASK_NODE_SO_NAME,
updated_at: new Date().toDateString(),
version: '1',
});

const createFindResponse = (
soList: Array<SavedObjectsFindResult<BackgroundTaskNode>>
): SavedObjectsFindResponse<BackgroundTaskNode, unknown> => ({
total: 1,
per_page: 10000,
page: 1,
saved_objects: soList,
});

describe('KibanaDiscoveryService', () => {
const savedObjectsRepository = savedObjectsRepositoryMock.create();
const logger = loggingSystemMock.createLogger();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { savedObjectsRepositoryMock, loggingSystemMock } from '@kbn/core/server/mocks';
import { SavedObjectsFindResponse, SavedObjectsFindResult } from '@kbn/core/server';
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { KibanaDiscoveryService } from './kibana_discovery_service';

export const createDiscoveryServiceMock = (currentNode: string) => {
const savedObjectsRepository = savedObjectsRepositoryMock.create();
const logger = loggingSystemMock.createLogger();
const discoveryService = new KibanaDiscoveryService({
savedObjectsRepository,
logger,
currentNode,
});

for (const method of ['getActiveKibanaNodes'] as Array<keyof KibanaDiscoveryService>) {
jest.spyOn(discoveryService, method);
}

return discoveryService as jest.Mocked<KibanaDiscoveryService>;
};

export const createNodeRecord = (id: string, lastSeen: string): BackgroundTaskNode => ({
id,
last_seen: lastSeen,
});

export const createFindSO = (
id: string,
lastSeen: string
): SavedObjectsFindResult<BackgroundTaskNode> => ({
attributes: createNodeRecord(id, lastSeen),
id: `${BACKGROUND_TASK_NODE_SO_NAME}:${id}`,
namespaces: ['default'],
references: [],
score: 1,
type: BACKGROUND_TASK_NODE_SO_NAME,
updated_at: new Date().toDateString(),
version: '1',
});

export const createFindResponse = (
soList: Array<SavedObjectsFindResult<BackgroundTaskNode>>
): SavedObjectsFindResponse<BackgroundTaskNode, unknown> => ({
total: 1,
per_page: 10000,
page: 1,
saved_objects: soList,
});
144 changes: 144 additions & 0 deletions x-pack/plugins/task_manager/server/lib/assign_pod_partitions.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { assignPodPartitions, getParitionMap } from './assign_pod_partitions';
describe('assignPodPartitions', () => {
test('two pods', () => {
const allPods = ['foo', 'bar'];
const allPartitions = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const map = getParitionMap(allPods, allPartitions);
expect(map).toMatchInlineSnapshot(`
Object {
"1": Array [
"bar",
"foo",
],
"10": Array [
"bar",
"foo",
],
"2": Array [
"bar",
"foo",
],
"3": Array [
"bar",
"foo",
],
"4": Array [
"bar",
"foo",
],
"5": Array [
"bar",
"foo",
],
"6": Array [
"bar",
"foo",
],
"7": Array [
"bar",
"foo",
],
"8": Array [
"bar",
"foo",
],
"9": Array [
"bar",
"foo",
],
}
`);
});

test('three pods', () => {
const allPods = ['foo', 'bar', 'quz'];
const allPartitions = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const map = getParitionMap(allPods, allPartitions);
expect(map).toMatchInlineSnapshot(`
Object {
"1": Array [
"bar",
"foo",
],
"10": Array [
"bar",
"foo",
],
"2": Array [
"quz",
"bar",
],
"3": Array [
"foo",
"quz",
],
"4": Array [
"bar",
"foo",
],
"5": Array [
"quz",
"bar",
],
"6": Array [
"foo",
"quz",
],
"7": Array [
"bar",
"foo",
],
"8": Array [
"quz",
"bar",
],
"9": Array [
"foo",
"quz",
],
}
`);
const fooPartitions = assignPodPartitions('foo', allPods, allPartitions);
expect(fooPartitions).toMatchInlineSnapshot(`
Array [
1,
3,
4,
6,
7,
9,
10,
]
`);
const barPartitions = assignPodPartitions('bar', allPods, allPartitions);
expect(barPartitions).toMatchInlineSnapshot(`
Array [
1,
2,
4,
5,
7,
8,
10,
]
`);
const quzPartitions = assignPodPartitions('quz', allPods, allPartitions);
expect(quzPartitions).toMatchInlineSnapshot(`
Array [
2,
3,
5,
6,
8,
9,
]
`);
});
});
35 changes: 35 additions & 0 deletions x-pack/plugins/task_manager/server/lib/assign_pod_partitions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

const KIBANAS_PER_PARTITION = 2;

export function getParitionMap(podNames: string[], partitions: number[]): Record<number, string[]> {
const map: Record<number, string[]> = {};
let counter = 0;
for (const parition of partitions) {
map[parition] = [];
for (let i = 0; i < KIBANAS_PER_PARTITION; i++) {
map[parition].push(podNames.sort()[counter++ % podNames.length]);
}
}
return map;
}

export function assignPodPartitions(
podName: string,
podNames: string[],
partitions: number[]
): number[] {
const map = getParitionMap(podNames, partitions);
const podPartitions: number[] = [];
for (const partition of Object.keys(map)) {
if (map[Number(partition)].indexOf(podName) !== -1) {
podPartitions.push(Number(partition));
}
}
return podPartitions;
}
70 changes: 70 additions & 0 deletions x-pack/plugins/task_manager/server/lib/task_partitioner.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import {
createDiscoveryServiceMock,
createFindSO,
} from '../kibana_discovery_service/mock_kibana_discovery_service';
import { TaskPartitioner } from './task_partitioner';

const POD_NAME = 'test-pod';

describe('getAllPartitions()', () => {
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);
test('correctly sets allPartitions in constructor', () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
expect(taskPartitioner.getAllPartitions()).toEqual([
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25,
26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48,
49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,
72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94,
95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114,
115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133,
134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152,
153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171,
172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190,
191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209,
210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228,
229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247,
248, 249, 250, 251, 252, 253, 254, 255,
]);
});
});

describe('getPodName()', () => {
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);

test('correctly sets podName in constructor', () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
expect(taskPartitioner.getPodName()).toEqual('test-pod');
});
});

describe('getPartitions()', () => {
const lastSeen = '2024-08-10T10:00:00.000Z';
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
createFindSO(POD_NAME, lastSeen),
createFindSO('test-pod-2', lastSeen),
createFindSO('test-pod-3', lastSeen),
]);

test('correctly gets the partitons for this pod', async () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
expect(await taskPartitioner.getPartitions()).toEqual([
0, 1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 21, 22, 24, 25, 27, 28, 30, 31, 33, 34, 36,
37, 39, 40, 42, 43, 45, 46, 48, 49, 51, 52, 54, 55, 57, 58, 60, 61, 63, 64, 66, 67, 69, 70,
72, 73, 75, 76, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 96, 97, 99, 100, 102, 103,
105, 106, 108, 109, 111, 112, 114, 115, 117, 118, 120, 121, 123, 124, 126, 127, 129, 130, 132,
133, 135, 136, 138, 139, 141, 142, 144, 145, 147, 148, 150, 151, 153, 154, 156, 157, 159, 160,
162, 163, 165, 166, 168, 169, 171, 172, 174, 175, 177, 178, 180, 181, 183, 184, 186, 187, 189,
190, 192, 193, 195, 196, 198, 199, 201, 202, 204, 205, 207, 208, 210, 211, 213, 214, 216, 217,
219, 220, 222, 223, 225, 226, 228, 229, 231, 232, 234, 235, 237, 238, 240, 241, 243, 244, 246,
247, 249, 250, 252, 253, 255,
]);
});
});
Loading

0 comments on commit 5adf5be

Please sign in to comment.