Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Response Ops][Task Manager] Add new configurable values #190934

Merged
merged 6 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,11 @@ kibana_vars=(
xpack.securitySolution.packagerTaskInterval
xpack.securitySolution.prebuiltRulesPackageVersion
xpack.spaces.maxSpaces
xpack.task_manager.active_nodes_lookback
xpack.task_manager.capacity
xpack.task_manager.claim_strategy
xpack.task_manager.discovery_interval
xpack.task_manager.kibanas_per_partition
xpack.task_manager.max_attempts
xpack.task_manager.max_workers
xpack.task_manager.monitored_aggregated_stats_refresh_rate
Expand Down
9 changes: 9 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"active_nodes_lookback": 30,
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery_interval": 10000,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand All @@ -22,6 +24,7 @@ describe('config validation', () => {
"monitor": true,
"warn_threshold": 5000,
},
"kibanas_per_partition": 2,
"max_attempts": 3,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
Expand Down Expand Up @@ -69,8 +72,10 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"active_nodes_lookback": 30,
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery_interval": 10000,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand All @@ -79,6 +84,7 @@ describe('config validation', () => {
"monitor": true,
"warn_threshold": 5000,
},
"kibanas_per_partition": 2,
"max_attempts": 3,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
Expand Down Expand Up @@ -124,8 +130,10 @@ describe('config validation', () => {
};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"active_nodes_lookback": 30,
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery_interval": 10000,
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand All @@ -134,6 +142,7 @@ describe('config validation', () => {
"monitor": true,
"warn_threshold": 5000,
},
"kibanas_per_partition": 2,
"max_attempts": 3,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
Expand Down
18 changes: 18 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would putting these new discovery configs under a name space (like discovery.) be better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 730d933

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;
export const CLAIM_STRATEGY_UPDATE_BY_QUERY = 'update_by_query';
export const CLAIM_STRATEGY_MGET = 'mget';

export const DEFAULT_DISCOVERY_INTERVAL_MS = 1000 * 10;

export const DEFAULT_ACTIVE_NODES_LOOK_BACK_S = 30;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe duration format like 1s 1h. we already have validateDurationSchema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 730d933


export const DEFAULT_KIBANAS_PER_PARTITION = 2;

export const taskExecutionFailureThresholdSchema = schema.object(
{
error_threshold: schema.number({
Expand Down Expand Up @@ -67,9 +73,17 @@ const requestTimeoutsConfig = schema.object({

export const configSchema = schema.object(
{
active_nodes_lookback: schema.number({
defaultValue: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
min: 10,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we should set a max as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would 5 minutes make sense? If a user sets 5m, it would take 5m for a node that crashed to disappear from the list of active nodes. Perhaps its a sufficient high bound that we'd want to support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 730d933

}),
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
/* The number of normal cost tasks that this Kibana instance will run simultaneously */
capacity: schema.maybe(schema.number({ min: MIN_CAPACITY, max: MAX_CAPACITY })),
discovery_interval: schema.number({
defaultValue: DEFAULT_DISCOVERY_INTERVAL_MS,
min: DEFAULT_DISCOVERY_INTERVAL_MS,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we should set a max as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see 1 or 5 minutes as the max, we could allow a smaller min, maybe 1s or so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 730d933 to set max of 5 minutes

}),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
Expand All @@ -81,6 +95,10 @@ export const configSchema = schema.object(
}),
}),
event_loop_delay: eventLoopDelaySchema,
kibanas_per_partition: schema.number({
defaultValue: DEFAULT_KIBANAS_PER_PARTITION,
min: 1,
}),
/* The maximum number of times a task will be attempted before being abandoned as failed */
max_attempts: schema.number({
defaultValue: 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ describe('EphemeralTaskLifecycle', () => {
definitions: new TaskTypeDictionary(taskManagerLogger),
executionContext,
config: {
active_nodes_lookback: 30,
discovery_interval: 10000,
kibanas_per_partition: 2,
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
active_nodes_lookback: 30,
discovery_interval: 10000,
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
Expand Down Expand Up @@ -160,6 +163,9 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
active_nodes_lookback: 30,
discovery_interval: 10000,
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
Expand Down Expand Up @@ -280,6 +286,9 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
active_nodes_lookback: 30,
discovery_interval: 10000,
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
poll_interval: 3000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
* 2.0.
*/
import { savedObjectsRepositoryMock, loggingSystemMock } from '@kbn/core/server/mocks';
import {
KibanaDiscoveryService,
DISCOVERY_INTERVAL,
ACTIVE_NODES_LOOK_BACK,
} from './kibana_discovery_service';
import { KibanaDiscoveryService } from './kibana_discovery_service';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server';

import { createFindResponse, createFindSO } from './mock_kibana_discovery_service';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_S, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';

const currentNode = 'current-node-id';
const now = '2024-08-10T10:00:00.000Z';
Expand Down Expand Up @@ -43,6 +40,8 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
});
await kibanaDiscoveryService.start();

Expand All @@ -68,6 +67,8 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
});
await kibanaDiscoveryService.start();
await kibanaDiscoveryService.start();
Expand All @@ -84,13 +85,19 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
});
await kibanaDiscoveryService.start();

expect(savedObjectsRepository.update).toHaveBeenCalledTimes(1);

expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);

jest.runOnlyPendingTimers();

Expand All @@ -104,6 +111,8 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
});
await kibanaDiscoveryService.start();

Expand All @@ -113,7 +122,11 @@ describe('KibanaDiscoveryService', () => {
);
expect(logger.info).not.toHaveBeenCalled();
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);
});

it('reschedules when upsert fails after start', async () => {
Expand All @@ -125,6 +138,8 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
});
await kibanaDiscoveryService.start();

Expand All @@ -133,15 +148,23 @@ describe('KibanaDiscoveryService', () => {
expect(logger.info).toHaveBeenCalledWith('Kibana Discovery Service has been started');
expect(kibanaDiscoveryService.isStarted()).toBe(true);
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
1,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);

savedObjectsRepository.update.mockRejectedValueOnce(new Error('foo'));

await jest.advanceTimersByTimeAsync(15000);

expect(savedObjectsRepository.update).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenCalledTimes(2);
expect(setTimeout).toHaveBeenNthCalledWith(2, expect.any(Function), DISCOVERY_INTERVAL);
expect(setTimeout).toHaveBeenNthCalledWith(
2,
expect.any(Function),
DEFAULT_DISCOVERY_INTERVAL_MS
);
expect(logger.error).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
"Kibana Discovery Service couldn't update this node's last_seen timestamp. id: current-node-id, last_seen: 2024-08-10T10:00:10.000Z, error:foo"
Expand All @@ -158,12 +181,14 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
});

const activeNodes = await kibanaDiscoveryService.getActiveKibanaNodes();

expect(savedObjectsRepository.find).toHaveBeenCalledWith({
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${ACTIVE_NODES_LOOK_BACK}`,
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${DEFAULT_ACTIVE_NODES_LOOK_BACK_S}s`,
page: 1,
perPage: 10000,
type: BACKGROUND_TASK_NODE_SO_NAME,
Expand All @@ -180,6 +205,8 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
});

await kibanaDiscoveryService.deleteCurrentNode();
Expand All @@ -201,6 +228,8 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
});

await kibanaDiscoveryService.deleteCurrentNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';

interface DiscoveryServiceParams {
activeNodesLookBack: number;
currentNode: string;
discoveryInterval: number;
savedObjectsRepository: ISavedObjectsRepository;
logger: Logger;
}
Expand All @@ -21,16 +23,23 @@ interface DiscoveryServiceUpsertParams {
lastSeen: string;
}

export const DISCOVERY_INTERVAL = 1000 * 10;
export const ACTIVE_NODES_LOOK_BACK = '30s';

export class KibanaDiscoveryService {
private activeNodesLookBack: number;
private currentNode: string;
private discoveryInterval: number;
private started = false;
private savedObjectsRepository: ISavedObjectsRepository;
private logger: Logger;

constructor({ currentNode, savedObjectsRepository, logger }: DiscoveryServiceParams) {
constructor({
activeNodesLookBack,
currentNode,
discoveryInterval,
savedObjectsRepository,
logger,
}: DiscoveryServiceParams) {
this.activeNodesLookBack = activeNodesLookBack;
this.discoveryInterval = discoveryInterval;
this.savedObjectsRepository = savedObjectsRepository;
this.logger = logger;
this.currentNode = currentNode;
Expand Down Expand Up @@ -60,7 +69,7 @@ export class KibanaDiscoveryService {
} catch (e) {
if (!this.started) {
this.logger.error(
`Kibana Discovery Service couldn't be started and will be retried in ${DISCOVERY_INTERVAL}ms, error:${e.message}`
`Kibana Discovery Service couldn't be started and will be retried in ${this.discoveryInterval}ms, error:${e.message}`
);
} else {
this.logger.error(
Expand All @@ -70,7 +79,7 @@ export class KibanaDiscoveryService {
} finally {
setTimeout(
async () => await this.scheduleUpsertCurrentNode(),
DISCOVERY_INTERVAL - (Date.now() - lastSeenDate.getTime())
this.discoveryInterval - (Date.now() - lastSeenDate.getTime())
);
}
}
Expand All @@ -93,7 +102,7 @@ export class KibanaDiscoveryService {
type: BACKGROUND_TASK_NODE_SO_NAME,
perPage: 10000,
page: 1,
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${ACTIVE_NODES_LOOK_BACK}`,
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${this.activeNodesLookBack}s`,
});

return activeNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { SavedObjectsFindResponse, SavedObjectsFindResult } from '@kbn/core/serv
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { KibanaDiscoveryService } from './kibana_discovery_service';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_S, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';

export const createDiscoveryServiceMock = (currentNode: string) => {
const savedObjectsRepository = savedObjectsRepositoryMock.create();
Expand All @@ -18,6 +19,8 @@ export const createDiscoveryServiceMock = (currentNode: string) => {
savedObjectsRepository,
logger,
currentNode,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
});

for (const method of ['getActiveKibanaNodes'] as Array<keyof KibanaDiscoveryService>) {
Expand Down
Loading