Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Response Ops][Task Manager] Add new configurable values #190934

Merged
merged 6 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,11 @@ kibana_vars=(
xpack.securitySolution.packagerTaskInterval
xpack.securitySolution.prebuiltRulesPackageVersion
xpack.spaces.maxSpaces
xpack.task_manager.capacity
xpack.task_manager.claim_strategy
xpack.task_manager.discovery.active_nodes_lookback
xpack.task_manager.discovery.interval
xpack.task_manager.kibanas_per_partition
xpack.task_manager.max_attempts
xpack.task_manager.max_workers
xpack.task_manager.monitored_aggregated_stats_refresh_rate
Expand Down
41 changes: 41 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand All @@ -22,6 +26,7 @@ describe('config validation', () => {
"monitor": true,
"warn_threshold": 5000,
},
"kibanas_per_partition": 2,
"max_attempts": 3,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
Expand Down Expand Up @@ -71,6 +76,10 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand All @@ -79,6 +88,7 @@ describe('config validation', () => {
"monitor": true,
"warn_threshold": 5000,
},
"kibanas_per_partition": 2,
"max_attempts": 3,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
Expand Down Expand Up @@ -126,6 +136,10 @@ describe('config validation', () => {
Object {
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand All @@ -134,6 +148,7 @@ describe('config validation', () => {
"monitor": true,
"warn_threshold": 5000,
},
"kibanas_per_partition": 2,
"max_attempts": 3,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
Expand Down Expand Up @@ -252,4 +267,30 @@ describe('config validation', () => {
const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_MGET });
expect(result.poll_interval).toEqual(500);
});

test('discovery active_nodes_lookback must be a valid duration', () => {
const config: Record<string, unknown> = {
discovery: {
active_nodes_lookback: 'foo',
},
};
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"[discovery.active_nodes_lookback]: active node lookback duration must be a valid duration string"`
);
});

test('discovery active_nodes_lookback must be less than 5m', () => {
const config: Record<string, unknown> = {
discovery: {
active_nodes_lookback: '301s',
},
};
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"[discovery.active_nodes_lookback]: active node lookback duration cannot exceed five minutes"`
);
});
});
34 changes: 34 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would putting these new discovery configs under a name space (like discovery.) be better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 730d933

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { schema, TypeOf } from '@kbn/config-schema';
import { parseIntervalAsMillisecond } from './lib/intervals';

export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_CAPACITY = 10;
Expand All @@ -32,6 +33,15 @@ export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;
export const CLAIM_STRATEGY_UPDATE_BY_QUERY = 'update_by_query';
export const CLAIM_STRATEGY_MGET = 'mget';

export const DEFAULT_DISCOVERY_INTERVAL_MS = 1000 * 10; // 10 seconds
const MIN_DISCOVERY_INTERVAL_MS = 1000; // 1 second
const MAX_DISCOVERY_INTERVAL_MS = 1000 * 60 * 5; // 5 minutes

export const DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION = '30s';
const FIVE_MIN_IN_MS = 5 * 60 * 1000;

export const DEFAULT_KIBANAS_PER_PARTITION = 2;

export const taskExecutionFailureThresholdSchema = schema.object(
{
error_threshold: schema.number({
Expand Down Expand Up @@ -70,6 +80,26 @@ export const configSchema = schema.object(
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
/* The number of normal cost tasks that this Kibana instance will run simultaneously */
capacity: schema.maybe(schema.number({ min: MIN_CAPACITY, max: MAX_CAPACITY })),
discovery: schema.object({
active_nodes_lookback: schema.string({
defaultValue: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
validate: (duration) => {
try {
const parsedDurationMs = parseIntervalAsMillisecond(duration);
if (parsedDurationMs > FIVE_MIN_IN_MS) {
return 'active node lookback duration cannot exceed five minutes';
}
} catch (err) {
return 'active node lookback duration must be a valid duration string';
}
},
}),
interval: schema.number({
defaultValue: DEFAULT_DISCOVERY_INTERVAL_MS,
min: MIN_DISCOVERY_INTERVAL_MS,
max: MAX_DISCOVERY_INTERVAL_MS,
}),
}),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
Expand All @@ -81,6 +111,10 @@ export const configSchema = schema.object(
}),
}),
event_loop_delay: eventLoopDelaySchema,
kibanas_per_partition: schema.number({
defaultValue: DEFAULT_KIBANAS_PER_PARTITION,
min: 1,
}),
/* The maximum number of times a task will be attempted before being abandoned as failed */
max_attempts: schema.number({
defaultValue: 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ describe('EphemeralTaskLifecycle', () => {
definitions: new TaskTypeDictionary(taskManagerLogger),
executionContext,
config: {
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
Expand Down Expand Up @@ -160,6 +165,11 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
Expand Down Expand Up @@ -280,6 +290,11 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
* 2.0.
*/
import { savedObjectsRepositoryMock, loggingSystemMock } from '@kbn/core/server/mocks';
import {
KibanaDiscoveryService,
DISCOVERY_INTERVAL,
ACTIVE_NODES_LOOK_BACK,
} from './kibana_discovery_service';
import { KibanaDiscoveryService } from './kibana_discovery_service';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server';

import { createFindResponse, createFindSO } from './mock_kibana_discovery_service';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';

const currentNode = 'current-node-id';
const now = '2024-08-10T10:00:00.000Z';
Expand Down Expand Up @@ -43,6 +40,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand All @@ -68,6 +69,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();
await kibanaDiscoveryService.start();
Expand All @@ -84,13 +89,21 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

expect(savedObjectsRepository.update).toHaveBeenCalledTimes(1);

expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);

jest.runOnlyPendingTimers();

Expand All @@ -104,6 +117,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand All @@ -113,7 +130,11 @@ describe('KibanaDiscoveryService', () => {
);
expect(logger.info).not.toHaveBeenCalled();
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);
});

it('reschedules when upsert fails after start', async () => {
Expand All @@ -125,6 +146,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand All @@ -133,15 +158,23 @@ describe('KibanaDiscoveryService', () => {
expect(logger.info).toHaveBeenCalledWith('Kibana Discovery Service has been started');
expect(kibanaDiscoveryService.isStarted()).toBe(true);
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);

savedObjectsRepository.update.mockRejectedValueOnce(new Error('foo'));

await jest.advanceTimersByTimeAsync(15000);

expect(savedObjectsRepository.update).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenNthCalledWith(2, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
2,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
"Kibana Discovery Service couldn't update this node's last_seen timestamp. id: current-node-id, last_seen: 2024-08-10T10:00:10.000Z, error:foo"
Expand All @@ -158,12 +191,16 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});

const activeNodes = await kibanaDiscoveryService.getActiveKibanaNodes();

expect(savedObjectsRepository.find).toHaveBeenCalledWith({
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${ACTIVE_NODES_LOOK_BACK}`,
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION}`,
page: 1,
perPage: 10000,
type: BACKGROUND_TASK_NODE_SO_NAME,
Expand All @@ -180,6 +217,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});

await kibanaDiscoveryService.deleteCurrentNode();
Expand Down
Loading