Skip to content

Commit

Permalink
[ResponseOps] implement task claiming strategy mget (#180485)
Browse files Browse the repository at this point in the history
resolves: #181325

## Summary

Adds a new task claiming strategy `unsafe_mget`, which can be used instead of
the default one `default`. Add the following to your `kibana.yml` to
enable it:

    xpack.task_manager.claim_strategy: 'unsafe_mget'
  • Loading branch information
pmuellr authored Jun 13, 2024
1 parent 2bb0c75 commit f016398
Show file tree
Hide file tree
Showing 45 changed files with 5,106 additions and 116 deletions.
1 change: 1 addition & 0 deletions .buildkite/ftr_configs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ enabled:
- x-pack/test/spaces_api_integration/security_and_spaces/config_trial.ts
- x-pack/test/spaces_api_integration/security_and_spaces/copy_to_space_config_trial.ts
- x-pack/test/spaces_api_integration/spaces_only/config.ts
- x-pack/test/task_manager_claimer_mget/config.ts
- x-pack/test/ui_capabilities/security_and_spaces/config.ts
- x-pack/test/ui_capabilities/spaces_only/config.ts
- x-pack/test/upgrade_assistant_integration/config.js
Expand Down
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ x-pack/plugins/runtime_fields @elastic/kibana-management
packages/kbn-safer-lodash-set @elastic/kibana-security
x-pack/test/security_api_integration/plugins/saml_provider @elastic/kibana-security
x-pack/test/plugin_api_integration/plugins/sample_task_plugin @elastic/response-ops
x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget @elastic/response-ops
test/plugin_functional/plugins/saved_object_export_transforms @elastic/kibana-core
test/plugin_functional/plugins/saved_object_import_warnings @elastic/kibana-core
x-pack/test/saved_object_api_integration/common/plugins/saved_object_test_plugin @elastic/kibana-security
Expand Down Expand Up @@ -1326,6 +1327,7 @@ x-pack/plugins/cloud_integrations/cloud_full_story/server/config.ts @elastic/kib
/x-pack/test/alerting_api_integration/observability @elastic/obs-ux-management-team
/x-pack/test/plugin_api_integration/test_suites/task_manager/ @elastic/response-ops
/x-pack/test/functional_with_es_ssl/apps/triggers_actions_ui/ @elastic/response-ops
/x-pack/test/task_manager_claimer_mget/ @elastic/response-ops
/docs/user/alerting/ @elastic/response-ops
/docs/management/connectors/ @elastic/response-ops
/x-pack/test/cases_api_integration/ @elastic/response-ops
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@
"@kbn/safer-lodash-set": "link:packages/kbn-safer-lodash-set",
"@kbn/saml-provider-plugin": "link:x-pack/test/security_api_integration/plugins/saml_provider",
"@kbn/sample-task-plugin": "link:x-pack/test/plugin_api_integration/plugins/sample_task_plugin",
"@kbn/sample-task-plugin-mget": "link:x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget",
"@kbn/saved-object-export-transforms-plugin": "link:test/plugin_functional/plugins/saved_object_export_transforms",
"@kbn/saved-object-import-warnings-plugin": "link:test/plugin_functional/plugins/saved_object_import_warnings",
"@kbn/saved-object-test-plugin": "link:x-pack/test/saved_object_api_integration/common/plugins/saved_object_test_plugin",
Expand Down
2 changes: 2 additions & 0 deletions tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,8 @@
"@kbn/saml-provider-plugin/*": ["x-pack/test/security_api_integration/plugins/saml_provider/*"],
"@kbn/sample-task-plugin": ["x-pack/test/plugin_api_integration/plugins/sample_task_plugin"],
"@kbn/sample-task-plugin/*": ["x-pack/test/plugin_api_integration/plugins/sample_task_plugin/*"],
"@kbn/sample-task-plugin-mget": ["x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget"],
"@kbn/sample-task-plugin-mget/*": ["x-pack/test/task_manager_claimer_mget/plugins/sample_task_plugin_mget/*"],
"@kbn/saved-object-export-transforms-plugin": ["test/plugin_functional/plugins/saved_object_export_transforms"],
"@kbn/saved-object-export-transforms-plugin/*": ["test/plugin_functional/plugins/saved_object_export_transforms/*"],
"@kbn/saved-object-import-warnings-plugin": ["test/plugin_functional/plugins/saved_object_import_warnings"],
Expand Down
9 changes: 2 additions & 7 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,7 @@ describe('config validation', () => {
}).not.toThrowError();
});

test('the claim strategy is validated', () => {
const config = { claim_strategy: 'invalid-strategy' };
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"The claim strategy is invalid: Unknown task claiming strategy (invalid-strategy)"`
);
test('any claim strategy is valid', () => {
configSchema.validate({ claim_strategy: 'anything!' });
});
});
7 changes: 1 addition & 6 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

import { schema, TypeOf } from '@kbn/config-schema';
import { getTaskClaimer } from './task_claimers';

export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_MAX_WORKERS = 10;
Expand All @@ -27,6 +26,7 @@ export const DEFAULT_METRICS_RESET_INTERVAL = 30 * 1000; // 30 seconds
export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;

export const CLAIM_STRATEGY_DEFAULT = 'default';
export const CLAIM_STRATEGY_MGET = 'unsafe_mget';

export const taskExecutionFailureThresholdSchema = schema.object(
{
Expand Down Expand Up @@ -165,11 +165,6 @@ export const configSchema = schema.object(
) {
return `The specified monitored_stats_required_freshness (${config.monitored_stats_required_freshness}) is invalid, as it is below the poll_interval (${config.poll_interval})`;
}
try {
getTaskClaimer(config.claim_strategy);
} catch (err) {
return `The claim strategy is invalid: ${err.message}`;
}
},
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

const DOCS_TO_FETCH = 10000;

// Get the event logs from multiple clusters, focusing on rule runs
// as they test recurring activity easily, and augmenting with other
// bits, producing a single .ndjson file for all clusters.
main();

async function main() {
// get urls and their host names
const urls = process.argv.slice(2);
const urlNoCreds = urls.map((url) => new URL(url)).map((url) => url?.origin || 'unknown');
const urlHosts = urls
.map((url) => new URL(url))
.map((url) => url?.host || 'unknown')
.map((url) => url.split('.')[0]);

if (urls.length === 0) return help();

// get the event logs
const docPromises = urls.map(getRuleRunEventDocs);
const docResults = await Promise.allSettled(docPromises);

/** @type { any[][] } */
const serverDocs = [];

// log errors, and add urls to event logs
for (let i = 0; i < urls.length; i++) {
const url = urls[i];
const docResult = docResults[i];
if (docResult.status === 'rejected') {
console.error(`Failed to get docs from ${url}: ${docResult.reason}`);
} else {
for (const doc of docResult.value) {
if (!doc.kibana) doc.kibana = {};
// add/remove some bits - remove to save space
doc.kibana.url = urlNoCreds[i];
doc.kibana.host = urlHosts[i];
delete doc.kibana.saved_objects;
delete doc.kibana.space_ids;

if (!doc.event) doc.event = {};
if (doc.event.start) doc.event.startMs = new Date(doc.event.start).valueOf();
if (doc.event.end) doc.event.endMs = new Date(doc.event.end).valueOf();
if (doc.event.endMs && doc.event.startMs)
doc.event.durationMs = doc.event.endMs - doc.event.startMs;
}
serverDocs.push(docResult.value);
}
}

// for each server's docs, apply a worker id
for (const docs of serverDocs) {
// sort ascending by timestamp
docs.sort((a, b) => a.event.startMs - b.event.startMs);

assignWorkerIds(docs);

for (const doc of docs) {
console.log(JSON.stringify(doc));
}
}
}

class Worker {
/** @param { string } id */
constructor(id) {
this.id = id;
/** @type { number | undefined } */
this.nextEnd = undefined;
/** @type { number | undefined } */
this.lastEnd = undefined;
}

/** @type { (currentDate: number) => void } */
update(currentDate) {
if (currentDate >= this.nextEnd) {
this.lastEnd = this.nextEnd;
this.nextEnd = undefined;
}
}

/** @type { () => boolean } */
isAvailable() {
return this.nextEnd === undefined;
}

/** @type { (end: number) => void } */
claimTill(end) {
this.nextEnd = end;
}
}

class Workers {
constructor() {
/** @type { Map<string, Worker[]> } */
this.workersByServer = new Map();

/** @type { Map<string, string> } */
this.serverMap = new Map();
}

/** @type { (doc: any) => string } */
getServerId(doc) {
const { server_uuid: serverUuid } = doc?.kibana || {};
return this.serverMap.get(serverUuid) || 'unknown';
}

/** @type { (doc: any) => Worker } */
getAvailableWorker(doc) {
const { startMs, endMs } = doc?.event || {};
const { server_uuid: serverUuid } = doc?.kibana || {};
if (!this.serverMap.has(serverUuid)) {
this.serverMap.set(serverUuid, `${this.serverMap.size + 1}`);
}

const workers = this.getWorkersForServer(serverUuid);

for (const worker of workers) {
worker.update(startMs);
if (worker.isAvailable()) {
worker.claimTill(endMs);
return worker;
}
}
const worker = new Worker(workers.length + 1);
worker.claimTill(endMs);
workers.push(worker);

return worker;
}

/** @type { (serverUuid) => Worker[] } */
getWorkersForServer(serverUuid) {
let workers = this.workersByServer.get(serverUuid);
if (workers !== undefined) return workers;

workers = [];
this.workersByServer.set(serverUuid, workers);
return workers;
}
}

/** @type { (docs: any[]) => void } */
function assignWorkerIds(docs) {
const workers = new Workers();
for (const doc of docs) {
const worker = workers.getAvailableWorker(doc);
const serverId = workers.getServerId(doc).padStart(3, '0');
const workerId = `${worker.id}`.padStart(3, '0');
doc.kibana.worker = `${serverId}-${workerId}`;
doc.event.preIdleMs = worker.lastEnd ? doc.event.startMs - worker.lastEnd : 0;
}
}

/** @type { (url: string) => Promise<any[]>} */
async function getRuleRunEventDocs(url) {
const parsedUrl = new URL(url);
const indices = `.kibana-event-log,.kibana-event-log-ds`;
const options = `expand_wildcards=all&ignore_unavailable=true`;
const searchUrl = `${parsedUrl.origin}/${indices}/_search?${options}`;
const query = getQuery();
const authHeader = getAuthHeader(parsedUrl.username, parsedUrl.password);
const headers = {
'Content-Type': 'application/json',
...(authHeader ? { Authorization: authHeader } : {}),
};
const fetchResult = await fetch(searchUrl, {
method: 'POST',
headers,
body: JSON.stringify(query),
});

if (!fetchResult.ok) {
const text = await fetchResult.text();
throw new Error(`Failed to fetch from ${searchUrl}: ${fetchResult.statusText}\n${text}`);
}

const result = await fetchResult.json();
const sources = result.hits.hits.map((hit) => hit._source);

return sources;
}

/** @type { (username: string, password: string) => string | undefined } */
function getAuthHeader(username, password) {
if (!username || !password) return undefined;
if (username.toUpperCase() === 'APIKEY') return `ApiKey ${password}`;
const encoded = Buffer.from(`${username}:${password}`).toString('base64');
return `Basic ${encoded}`;
}

/** @type { (size: number) => any} */
function getQuery() {
return {
size: DOCS_TO_FETCH,
query: {
bool: {
filter: [
{ term: { 'event.provider': 'alerting' } },
{ term: { 'event.action': 'execute' } },
],
},
},
sort: [{ '@timestamp': { order: 'desc' } }],
};
}

function help() {
console.error(`
usage: [this-command] <es-url1> <es-url2> ... <es-urlN>
Will fetch rule execution event logs from each url, and augment them:
- adds event.startMs - event.start as an epoch number
- adds event.endMs - event.end as an epoch number
- adds event.durationMs - event.end as an epoch number
- adds event.preIdleMs - time worker was idle before this
- adds kibana.url - the URL passed in (which is actually ES)
- adds kibana.host - just the host name from that URL
- adds kibana.worker - worker in form of nodeId-workerId (unique only by url)
- deletes kibana.saved_objects - not needed and confusing
- deletes kibana.space_ids - not needed
The output is a single .ndjson file with all the docs.
`);
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ export function createTaskRunAggregator(
(taskEvent: TaskLifecycleEvent) =>
isTaskManagerStatEvent(taskEvent) && taskEvent.id === 'pollingDelay'
),
map(() => new Date().toISOString())
map(() => new Date().toISOString()),
startWith(new Date().toISOString())
),
// get the average ratio of polled tasks by their persistency
taskPollingLifecycle.events.pipe(
Expand Down
12 changes: 7 additions & 5 deletions x-pack/plugins/task_manager/server/polling/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ interface Opts<H> {
logger: Logger;
initialPollInterval: number;
pollInterval$: Observable<number>;
pollIntervalDelay$: Observable<number>;
pollIntervalDelay$?: Observable<number>;
getCapacity: () => number;
work: WorkFn<H>;
}
Expand Down Expand Up @@ -99,10 +99,12 @@ export function createTaskPoller<T, H>({
pollInterval = interval;
logger.debug(`Task poller now using interval of ${interval}ms`);
});
pollIntervalDelay$.subscribe((delay) => {
pollIntervalDelay = delay;
logger.debug(`Task poller now delaying emission by ${delay}ms`);
});
if (pollIntervalDelay$) {
pollIntervalDelay$.subscribe((delay) => {
pollIntervalDelay = delay;
logger.debug(`Task poller now delaying emission by ${delay}ms`);
});
}
hasSubscribed = true;
}

Expand Down
21 changes: 12 additions & 9 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import type { Logger, ExecutionContextStart } from '@kbn/core/server';

import { Result, asErr, mapErr, asOk, map, mapOk } from './lib/result_type';
import { ManagedConfiguration } from './lib/create_managed_configuration';
import { TaskManagerConfig } from './config';
import { TaskManagerConfig, CLAIM_STRATEGY_DEFAULT } from './config';

import {
TaskMarkRunning,
Expand Down Expand Up @@ -154,15 +154,18 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
// pipe taskClaiming events into the lifecycle event stream
this.taskClaiming.events.subscribe(emitEvent);

const { poll_interval: pollInterval } = config;
const { poll_interval: pollInterval, claim_strategy: claimStrategy } = config;

const pollIntervalDelay$ = delayOnClaimConflicts(
maxWorkersConfiguration$,
pollIntervalConfiguration$,
this.events$,
config.version_conflict_threshold,
config.monitored_stats_running_average_window
).pipe(tap((delay) => emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay)))));
let pollIntervalDelay$: Observable<number> | undefined;
if (claimStrategy === CLAIM_STRATEGY_DEFAULT) {
pollIntervalDelay$ = delayOnClaimConflicts(
maxWorkersConfiguration$,
pollIntervalConfiguration$,
this.events$,
config.version_conflict_threshold,
config.monitored_stats_running_average_window
).pipe(tap((delay) => emitEvent(asTaskManagerStatEvent('pollingDelay', asOk(delay)))));
}

const poller = createTaskPoller<string, TimedFillPoolResult>({
logger,
Expand Down
Loading

0 comments on commit f016398

Please sign in to comment.