Skip to content

Commit

Permalink
[Response Ops][Task Manager] Add new configurable values (#190934)
Browse files Browse the repository at this point in the history
Resolves #190734

## Summary

Making the following configurable:
- discovery interval for new discovery service
- lookback time when querying for active nodes in discovery service
- kibanas per partition for new partitioning functionality
  • Loading branch information
ymao1 authored Aug 26, 2024
1 parent 252c728 commit 1ee45da
Show file tree
Hide file tree
Showing 21 changed files with 321 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,11 @@ kibana_vars=(
xpack.securitySolution.packagerTaskInterval
xpack.securitySolution.prebuiltRulesPackageVersion
xpack.spaces.maxSpaces
xpack.task_manager.capacity
xpack.task_manager.claim_strategy
xpack.task_manager.discovery.active_nodes_lookback
xpack.task_manager.discovery.interval
xpack.task_manager.kibanas_per_partition
xpack.task_manager.max_attempts
xpack.task_manager.max_workers
xpack.task_manager.monitored_aggregated_stats_refresh_rate
Expand Down
41 changes: 41 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand All @@ -22,6 +26,7 @@ describe('config validation', () => {
"monitor": true,
"warn_threshold": 5000,
},
"kibanas_per_partition": 2,
"max_attempts": 3,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
Expand Down Expand Up @@ -71,6 +76,10 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand All @@ -79,6 +88,7 @@ describe('config validation', () => {
"monitor": true,
"warn_threshold": 5000,
},
"kibanas_per_partition": 2,
"max_attempts": 3,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
Expand Down Expand Up @@ -126,6 +136,10 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand All @@ -134,6 +148,7 @@ describe('config validation', () => {
"monitor": true,
"warn_threshold": 5000,
},
"kibanas_per_partition": 2,
"max_attempts": 3,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
Expand Down Expand Up @@ -252,4 +267,30 @@ describe('config validation', () => {
const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_MGET });
expect(result.poll_interval).toEqual(500);
});

test('discovery active_nodes_lookback must be a valid duration', () => {
const config: Record<string, unknown> = {
discovery: {
active_nodes_lookback: 'foo',
},
};
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"[discovery.active_nodes_lookback]: active node lookback duration must be a valid duration string"`
);
});

test('discovery active_nodes_lookback must be less than 5m', () => {
const config: Record<string, unknown> = {
discovery: {
active_nodes_lookback: '301s',
},
};
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"[discovery.active_nodes_lookback]: active node lookback duration cannot exceed five minutes"`
);
});
});
34 changes: 34 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { schema, TypeOf } from '@kbn/config-schema';
import { parseIntervalAsMillisecond } from './lib/intervals';

export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_CAPACITY = 10;
Expand All @@ -32,6 +33,15 @@ export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;
export const CLAIM_STRATEGY_UPDATE_BY_QUERY = 'update_by_query';
export const CLAIM_STRATEGY_MGET = 'mget';

export const DEFAULT_DISCOVERY_INTERVAL_MS = 1000 * 10; // 10 seconds
const MIN_DISCOVERY_INTERVAL_MS = 1000; // 1 second
const MAX_DISCOVERY_INTERVAL_MS = 1000 * 60 * 5; // 5 minutes

export const DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION = '30s';
const FIVE_MIN_IN_MS = 5 * 60 * 1000;

export const DEFAULT_KIBANAS_PER_PARTITION = 2;

export const taskExecutionFailureThresholdSchema = schema.object(
{
error_threshold: schema.number({
Expand Down Expand Up @@ -70,6 +80,26 @@ export const configSchema = schema.object(
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
/* The number of normal cost tasks that this Kibana instance will run simultaneously */
capacity: schema.maybe(schema.number({ min: MIN_CAPACITY, max: MAX_CAPACITY })),
discovery: schema.object({
active_nodes_lookback: schema.string({
defaultValue: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
validate: (duration) => {
try {
const parsedDurationMs = parseIntervalAsMillisecond(duration);
if (parsedDurationMs > FIVE_MIN_IN_MS) {
return 'active node lookback duration cannot exceed five minutes';
}
} catch (err) {
return 'active node lookback duration must be a valid duration string';
}
},
}),
interval: schema.number({
defaultValue: DEFAULT_DISCOVERY_INTERVAL_MS,
min: MIN_DISCOVERY_INTERVAL_MS,
max: MAX_DISCOVERY_INTERVAL_MS,
}),
}),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
Expand All @@ -81,6 +111,10 @@ export const configSchema = schema.object(
}),
}),
event_loop_delay: eventLoopDelaySchema,
kibanas_per_partition: schema.number({
defaultValue: DEFAULT_KIBANAS_PER_PARTITION,
min: 1,
}),
/* The maximum number of times a task will be attempted before being abandoned as failed */
max_attempts: schema.number({
defaultValue: 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ describe('EphemeralTaskLifecycle', () => {
definitions: new TaskTypeDictionary(taskManagerLogger),
executionContext,
config: {
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
Expand Down Expand Up @@ -160,6 +165,11 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
Expand Down Expand Up @@ -280,6 +290,11 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
* 2.0.
*/
import { savedObjectsRepositoryMock, loggingSystemMock } from '@kbn/core/server/mocks';
import {
KibanaDiscoveryService,
DISCOVERY_INTERVAL,
ACTIVE_NODES_LOOK_BACK,
} from './kibana_discovery_service';
import { KibanaDiscoveryService } from './kibana_discovery_service';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server';

import { createFindResponse, createFindSO } from './mock_kibana_discovery_service';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';

const currentNode = 'current-node-id';
const now = '2024-08-10T10:00:00.000Z';
Expand Down Expand Up @@ -43,6 +40,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand All @@ -68,6 +69,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();
await kibanaDiscoveryService.start();
Expand All @@ -84,13 +89,21 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

expect(savedObjectsRepository.update).toHaveBeenCalledTimes(1);

expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);

jest.runOnlyPendingTimers();

Expand All @@ -104,6 +117,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand All @@ -113,7 +130,11 @@ describe('KibanaDiscoveryService', () => {
);
expect(logger.info).not.toHaveBeenCalled();
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);
});

it('reschedules when upsert fails after start', async () => {
Expand All @@ -125,6 +146,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand All @@ -133,15 +158,23 @@ describe('KibanaDiscoveryService', () => {
expect(logger.info).toHaveBeenCalledWith('Kibana Discovery Service has been started');
expect(kibanaDiscoveryService.isStarted()).toBe(true);
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);

savedObjectsRepository.update.mockRejectedValueOnce(new Error('foo'));

await jest.advanceTimersByTimeAsync(15000);

expect(savedObjectsRepository.update).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenNthCalledWith(2, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
2,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
"Kibana Discovery Service couldn't update this node's last_seen timestamp. id: current-node-id, last_seen: 2024-08-10T10:00:10.000Z, error:foo"
Expand All @@ -158,12 +191,16 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});

const activeNodes = await kibanaDiscoveryService.getActiveKibanaNodes();

expect(savedObjectsRepository.find).toHaveBeenCalledWith({
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${ACTIVE_NODES_LOOK_BACK}`,
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION}`,
page: 1,
perPage: 10000,
type: BACKGROUND_TASK_NODE_SO_NAME,
Expand All @@ -180,6 +217,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});

await kibanaDiscoveryService.deleteCurrentNode();
Expand Down
Loading

0 comments on commit 1ee45da

Please sign in to comment.