Skip to content

Commit

Permalink
[Task Manager] Task Partitioning (#188001)
Browse files Browse the repository at this point in the history
Resolves #187698

## Summary


This PR does the following:
- Adds a new `partition` field to the task manager index
- Assigns a partition to a task if there is not one when creating or
updating

### Checklist

- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios


### To verify

New tasks
- Create a rule and verify that the newly created tasks have the
`partition` field

Old tasks
- Checkout main and create a new rule, let it run
- Stop kibana
- Checkout this branch and restart kibana
- Verify that the old tasks get updated with the `partition` field

ex. the query I use to look at the ES query rule task
```
POST .kibana_task_manager*/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "task.taskType": {
              "value": "alerting:.es-query"
            }
          }
        }
      ]
    }
  }
}
```

---------

Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
doakalexi and kibanamachine authored Jul 15, 2024
1 parent d5345e4 commit 70ff4cc
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 12 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,7 @@
"moment-timezone": "^0.5.43",
"monaco-editor": "^0.44.0",
"monaco-yaml": "^5.1.0",
"murmurhash": "^2.0.1",
"mustache": "^2.3.2",
"node-fetch": "^2.6.7",
"node-forge": "^1.3.1",
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-check-mappings-update-cli/current_fields.json
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@
"attempts",
"enabled",
"ownerId",
"partition",
"retryAt",
"runAt",
"schedule",
Expand Down
3 changes: 3 additions & 0 deletions packages/kbn-check-mappings-update-cli/current_mappings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3312,6 +3312,9 @@
"ownerId": {
"type": "keyword"
},
"partition": {
"type": "integer"
},
"retryAt": {
"type": "date"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ describe('checking migration metadata changes on all registered SO types', () =>
"synthetics-param": "3ebb744e5571de678b1312d5c418c8188002cf5e",
"synthetics-privates-locations": "f53d799d5c9bc8454aaa32c6abc99a899b025d5c",
"tag": "e2544392fe6563e215bb677abc8b01c2601ef2dc",
"task": "d17f2fc0bf6759a070c2221ec2787ad785c680fe",
"task": "3c89a7c918d5b896a5f8800f06e9114ad7e7aea3",
"telemetry": "7b00bcf1c7b4f6db1192bb7405a6a63e78b699fd",
"threshold-explorer-view": "175306806f9fc8e13fcc1c8953ec4ba89bda1b70",
"ui-metric": "d227284528fd19904e9d972aea0a13716fc5fe24",
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/saved_objects/mappings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,8 @@ export const taskMappings: SavedObjectsTypeMappingDefinition = {
ownerId: {
type: 'keyword',
},
partition: {
type: 'integer',
},
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { SavedObjectsModelVersionMap } from '@kbn/core-saved-objects-server';
import { taskSchemaV1 } from '../schemas/task';
import { taskSchemaV1, taskSchemaV2 } from '../schemas/task';

export const taskModelVersions: SavedObjectsModelVersionMap = {
'1': {
Expand All @@ -17,7 +17,22 @@ export const taskModelVersions: SavedObjectsModelVersionMap = {
},
],
schemas: {
forwardCompatibility: taskSchemaV1.extends({}, { unknowns: 'ignore' }),
create: taskSchemaV1,
},
},
'2': {
changes: [
{
type: 'mappings_addition',
addedMappings: {
partition: { type: 'integer' },
},
},
],
schemas: {
forwardCompatibility: taskSchemaV2.extends({}, { unknowns: 'ignore' }),
create: taskSchemaV2,
},
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ export const taskSchemaV1 = schema.object({
]),
version: schema.maybe(schema.string()),
});

export const taskSchemaV2 = taskSchemaV1.extends({
partition: schema.maybe(schema.number()),
});
11 changes: 11 additions & 0 deletions x-pack/plugins/task_manager/server/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ export interface TaskInstance {
* Optionally override the timeout defined in the task type for this specific task instance
*/
timeoutOverride?: string;

/*
* Used to break up tasks so each Kibana node can claim tasks on a subset of the partitions
*/
partition?: number;
}

/**
Expand Down Expand Up @@ -426,6 +431,11 @@ export interface ConcreteTaskInstance extends TaskInstance {
* The random uuid of the Kibana instance which claimed ownership of the task last
*/
ownerId: string | null;

/*
* Used to break up tasks so each Kibana node can claim tasks on a subset of the partitions
*/
partition?: number;
}

export interface ConcreteTaskInstanceVersion {
Expand Down Expand Up @@ -460,4 +470,5 @@ export type SerializedConcreteTaskInstance = Omit<
startedAt: string | null;
retryAt: string | null;
runAt: string;
partition?: number;
};
7 changes: 7 additions & 0 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ describe('TaskStore', () => {
taskType: 'report',
user: undefined,
traceparent: 'apmTraceparent',
partition: 225,
},
{
id: 'id',
Expand All @@ -183,6 +184,7 @@ describe('TaskStore', () => {
user: undefined,
version: '123',
traceparent: 'apmTraceparent',
partition: 225,
});
});

Expand Down Expand Up @@ -490,6 +492,7 @@ describe('TaskStore', () => {
version: '123',
ownerId: null,
traceparent: 'myTraceparent',
partition: 99,
};

savedObjectsClient.update.mockImplementation(
Expand Down Expand Up @@ -532,6 +535,7 @@ describe('TaskStore', () => {
user: undefined,
ownerId: null,
traceparent: 'myTraceparent',
partition: 99,
},
{ version: '123', refresh: false }
);
Expand Down Expand Up @@ -1050,6 +1054,7 @@ describe('TaskStore', () => {
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
},
references: [],
version: '123',
Expand Down Expand Up @@ -1089,6 +1094,7 @@ describe('TaskStore', () => {
status: 'idle',
taskType: 'report',
traceparent: 'apmTraceparent',
partition: 225,
},
},
],
Expand All @@ -1113,6 +1119,7 @@ describe('TaskStore', () => {
user: undefined,
version: '123',
traceparent: 'apmTraceparent',
partition: 225,
},
]);
});
Expand Down
21 changes: 14 additions & 7 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
/*
* This module contains helpers for managing the task manager storage layer.
*/
import murmurhash from 'murmurhash';
import { v4 } from 'uuid';
import { Subject } from 'rxjs';
import { omit, defaults, get } from 'lodash';
import { SavedObjectError } from '@kbn/core-saved-objects-common';
Expand Down Expand Up @@ -40,6 +42,8 @@ import { TaskTypeDictionary } from './task_type_dictionary';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { TaskValidator } from './task_validator';

const MAX_PARTITIONS = 256;

export interface StoreOpts {
esClient: ElasticsearchClient;
index: string;
Expand Down Expand Up @@ -165,12 +169,13 @@ export class TaskStore {

let savedObject;
try {
const id = taskInstance.id || v4();
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
savedObject = await this.savedObjectsRepository.create<SerializedConcreteTaskInstance>(
'task',
taskInstanceToAttributes(validatedTaskInstance),
{ id: taskInstance.id, refresh: false }
taskInstanceToAttributes(validatedTaskInstance, id),
{ id, refresh: false }
);
if (get(taskInstance, 'schedule.interval', null) == null) {
this.adHocTaskCounter.increment();
Expand All @@ -191,13 +196,14 @@ export class TaskStore {
*/
public async bulkSchedule(taskInstances: TaskInstance[]): Promise<ConcreteTaskInstance[]> {
const objects = taskInstances.map((taskInstance) => {
const id = taskInstance.id || v4();
this.definitions.ensureHas(taskInstance.taskType);
const validatedTaskInstance =
this.taskValidator.getValidatedTaskInstanceForUpdating(taskInstance);
return {
type: 'task',
attributes: taskInstanceToAttributes(validatedTaskInstance),
id: taskInstance.id,
attributes: taskInstanceToAttributes(validatedTaskInstance, id),
id,
};
});

Expand Down Expand Up @@ -252,7 +258,7 @@ export class TaskStore {
const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
const attributes = taskInstanceToAttributes(taskInstance);
const attributes = taskInstanceToAttributes(taskInstance, doc.id);

let updatedSavedObject;
try {
Expand Down Expand Up @@ -297,7 +303,7 @@ export class TaskStore {
const taskInstance = this.taskValidator.getValidatedTaskInstanceForUpdating(doc, {
validate: options.validate,
});
attrsById.set(doc.id, taskInstanceToAttributes(taskInstance));
attrsById.set(doc.id, taskInstanceToAttributes(taskInstance, doc.id));
return attrsById;
}, new Map());

Expand Down Expand Up @@ -622,7 +628,7 @@ export function correctVersionConflictsForContinuation(
return maxDocs && versionConflicts + updated > maxDocs ? maxDocs - updated : versionConflicts;
}

function taskInstanceToAttributes(doc: TaskInstance): SerializedConcreteTaskInstance {
function taskInstanceToAttributes(doc: TaskInstance, id: string): SerializedConcreteTaskInstance {
return {
...omit(doc, 'id', 'version'),
params: JSON.stringify(doc.params || {}),
Expand All @@ -633,6 +639,7 @@ function taskInstanceToAttributes(doc: TaskInstance): SerializedConcreteTaskInst
retryAt: (doc.retryAt && doc.retryAt.toISOString()) || null,
runAt: (doc.runAt || new Date()).toISOString(),
status: (doc as ConcreteTaskInstance).status || 'idle',
partition: doc.partition || murmurhash.v3(id) % MAX_PARTITIONS,
} as SerializedConcreteTaskInstance;
}

Expand Down
36 changes: 33 additions & 3 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -23632,6 +23632,11 @@ murmurhash-js@^1.0.0:
resolved "https://registry.yarnpkg.com/murmurhash-js/-/murmurhash-js-1.0.0.tgz#b06278e21fc6c37fa5313732b0412bcb6ae15f51"
integrity sha1-sGJ44h/Gw3+lMTcysEEry2rhX1E=

murmurhash@^2.0.1:
version "2.0.1"
resolved "https://registry.yarnpkg.com/murmurhash/-/murmurhash-2.0.1.tgz#4097720e08cf978872194ad84ea5be2dec9b610f"
integrity sha512-5vQEh3y+DG/lMPM0mCGPDnyV8chYg/g7rl6v3Gd8WMF9S429ox3Xk8qrk174kWhG767KQMqqxLD1WnGd77hiew==

mustache@^2.3.2:
version "2.3.2"
resolved "https://registry.yarnpkg.com/mustache/-/mustache-2.3.2.tgz#a6d4d9c3f91d13359ab889a812954f9230a3d0c5"
Expand Down Expand Up @@ -29353,7 +29358,7 @@ string-replace-loader@^2.2.0:
loader-utils "^1.2.3"
schema-utils "^1.0.0"

"string-width-cjs@npm:string-width@^4.2.0", "string-width@^1.0.2 || 2 || 3 || 4", string-width@^4.0.0, string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.2, string-width@^4.2.3:
"string-width-cjs@npm:string-width@^4.2.0":
version "4.2.3"
resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010"
integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==
Expand All @@ -29371,6 +29376,15 @@ string-width@^1.0.1:
is-fullwidth-code-point "^1.0.0"
strip-ansi "^3.0.0"

"string-width@^1.0.2 || 2 || 3 || 4", string-width@^4.0.0, string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.2, string-width@^4.2.3:
version "4.2.3"
resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010"
integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==
dependencies:
emoji-regex "^8.0.0"
is-fullwidth-code-point "^3.0.0"
strip-ansi "^6.0.1"

string-width@^5.0.1, string-width@^5.1.2:
version "5.1.2"
resolved "https://registry.yarnpkg.com/string-width/-/string-width-5.1.2.tgz#14f8daec6d81e7221d2a357e668cab73bdbca794"
Expand Down Expand Up @@ -29481,7 +29495,7 @@ stringify-object@^3.2.1:
is-obj "^1.0.1"
is-regexp "^1.0.0"

"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.0, strip-ansi@^6.0.1:
"strip-ansi-cjs@npm:strip-ansi@^6.0.1":
version "6.0.1"
resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9"
integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==
Expand All @@ -29495,6 +29509,13 @@ strip-ansi@^3.0.0, strip-ansi@^3.0.1:
dependencies:
ansi-regex "^2.0.0"

strip-ansi@^6.0.0, strip-ansi@^6.0.1:
version "6.0.1"
resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9"
integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==
dependencies:
ansi-regex "^5.0.1"

strip-ansi@^7.0.1, strip-ansi@^7.1.0:
version "7.1.0"
resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-7.1.0.tgz#d5b6568ca689d8561370b0707685d22434faff45"
Expand Down Expand Up @@ -32382,7 +32403,7 @@ [email protected]:
resolved "https://registry.yarnpkg.com/workerpool/-/workerpool-6.2.1.tgz#46fc150c17d826b86a008e5a4508656777e9c343"
integrity sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw==

"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0", wrap-ansi@^7.0.0:
"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0":
version "7.0.0"
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43"
integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==
Expand All @@ -32408,6 +32429,15 @@ wrap-ansi@^6.2.0:
string-width "^4.1.0"
strip-ansi "^6.0.0"

wrap-ansi@^7.0.0:
version "7.0.0"
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43"
integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==
dependencies:
ansi-styles "^4.0.0"
string-width "^4.1.0"
strip-ansi "^6.0.0"

wrap-ansi@^8.1.0:
version "8.1.0"
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-8.1.0.tgz#56dc22368ee570face1b49819975d9b9a5ead214"
Expand Down

0 comments on commit 70ff4cc

Please sign in to comment.