Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Aug 26, 2024
1 parent f987bda commit 730d933
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,10 @@ kibana_vars=(
xpack.securitySolution.packagerTaskInterval
xpack.securitySolution.prebuiltRulesPackageVersion
xpack.spaces.maxSpaces
xpack.task_manager.active_nodes_lookback
xpack.task_manager.capacity
xpack.task_manager.claim_strategy
xpack.task_manager.discovery_interval
xpack.task_manager.discovery.active_nodes_lookback
xpack.task_manager.discovery.interval
xpack.task_manager.kibanas_per_partition
xpack.task_manager.max_attempts
xpack.task_manager.max_workers
Expand Down
44 changes: 38 additions & 6 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"active_nodes_lookback": 30,
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery_interval": 10000,
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand Down Expand Up @@ -72,10 +74,12 @@ describe('config validation', () => {
const config: Record<string, unknown> = {};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"active_nodes_lookback": 30,
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery_interval": 10000,
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand Down Expand Up @@ -130,10 +134,12 @@ describe('config validation', () => {
};
expect(configSchema.validate(config)).toMatchInlineSnapshot(`
Object {
"active_nodes_lookback": 30,
"allow_reading_invalid_state": true,
"claim_strategy": "update_by_query",
"discovery_interval": 10000,
"discovery": Object {
"active_nodes_lookback": "30s",
"interval": 10000,
},
"ephemeral_tasks": Object {
"enabled": false,
"request_capacity": 10,
Expand Down Expand Up @@ -261,4 +267,30 @@ describe('config validation', () => {
const result = configSchema.validate({ claim_strategy: CLAIM_STRATEGY_MGET });
expect(result.poll_interval).toEqual(500);
});

test('discovery active_nodes_lookback must be a valid duration', () => {
const config: Record<string, unknown> = {
discovery: {
active_nodes_lookback: 'foo',
},
};
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"[discovery.active_nodes_lookback]: active node lookback duration must be a valid duration string"`
);
});

test('discovery active_nodes_lookback must be less than 5m', () => {
const config: Record<string, unknown> = {
discovery: {
active_nodes_lookback: '301s',
},
};
expect(() => {
configSchema.validate(config);
}).toThrowErrorMatchingInlineSnapshot(
`"[discovery.active_nodes_lookback]: active node lookback duration cannot exceed five minutes"`
);
});
});
34 changes: 25 additions & 9 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { schema, TypeOf } from '@kbn/config-schema';
import { parseIntervalAsMillisecond } from './lib/intervals';

export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_CAPACITY = 10;
Expand All @@ -32,9 +33,12 @@ export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;
export const CLAIM_STRATEGY_UPDATE_BY_QUERY = 'update_by_query';
export const CLAIM_STRATEGY_MGET = 'mget';

export const DEFAULT_DISCOVERY_INTERVAL_MS = 1000 * 10;
export const DEFAULT_DISCOVERY_INTERVAL_MS = 1000 * 10; // 10 seconds
const MIN_DISCOVERY_INTERVAL_MS = 1000; // 1 second
const MAX_DISCOVERY_INTERVAL_MS = 1000 * 60 * 5; // 5 minutes

export const DEFAULT_ACTIVE_NODES_LOOK_BACK_S = 30;
export const DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION = '30s';
const FIVE_MIN_IN_MS = 5 * 60 * 1000;

export const DEFAULT_KIBANAS_PER_PARTITION = 2;

Expand Down Expand Up @@ -73,16 +77,28 @@ const requestTimeoutsConfig = schema.object({

export const configSchema = schema.object(
{
active_nodes_lookback: schema.number({
defaultValue: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
min: 10,
}),
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
/* The number of normal cost tasks that this Kibana instance will run simultaneously */
capacity: schema.maybe(schema.number({ min: MIN_CAPACITY, max: MAX_CAPACITY })),
discovery_interval: schema.number({
defaultValue: DEFAULT_DISCOVERY_INTERVAL_MS,
min: DEFAULT_DISCOVERY_INTERVAL_MS,
discovery: schema.object({
active_nodes_lookback: schema.string({
defaultValue: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
validate: (duration) => {
try {
const parsedDurationMs = parseIntervalAsMillisecond(duration);
if (parsedDurationMs > FIVE_MIN_IN_MS) {
return 'active node lookback duration cannot exceed five minutes';
}
} catch (err) {
return 'active node lookback duration must be a valid duration string';
}
},
}),
interval: schema.number({
defaultValue: DEFAULT_DISCOVERY_INTERVAL_MS,
min: MIN_DISCOVERY_INTERVAL_MS,
max: MAX_DISCOVERY_INTERVAL_MS,
}),
}),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ describe('EphemeralTaskLifecycle', () => {
definitions: new TaskTypeDictionary(taskManagerLogger),
executionContext,
config: {
active_nodes_lookback: 30,
discovery_interval: 10000,
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
max_attempts: 9,
poll_interval: 6000000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
active_nodes_lookback: 30,
discovery_interval: 10000,
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
Expand Down Expand Up @@ -163,8 +165,10 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
active_nodes_lookback: 30,
discovery_interval: 10000,
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
Expand Down Expand Up @@ -286,8 +290,10 @@ describe('managed configuration', () => {
clock = sinon.useFakeTimers();

const context = coreMock.createPluginInitializerContext<TaskManagerConfig>({
active_nodes_lookback: 30,
discovery_interval: 10000,
discovery: {
active_nodes_lookback: '30s',
interval: 10000,
},
kibanas_per_partition: 2,
capacity: 10,
max_attempts: 9,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { SavedObjectsBulkDeleteResponse, SavedObjectsUpdateResponse } from '@kbn/core/server';

import { createFindResponse, createFindSO } from './mock_kibana_discovery_service';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_S, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';

const currentNode = 'current-node-id';
const now = '2024-08-10T10:00:00.000Z';
Expand Down Expand Up @@ -40,8 +40,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand All @@ -67,8 +69,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();
await kibanaDiscoveryService.start();
Expand All @@ -85,8 +89,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand All @@ -111,8 +117,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand All @@ -138,8 +146,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});
await kibanaDiscoveryService.start();

Expand Down Expand Up @@ -181,14 +191,16 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});

const activeNodes = await kibanaDiscoveryService.getActiveKibanaNodes();

expect(savedObjectsRepository.find).toHaveBeenCalledWith({
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${DEFAULT_ACTIVE_NODES_LOOK_BACK_S}s`,
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION}`,
page: 1,
perPage: 10000,
type: BACKGROUND_TASK_NODE_SO_NAME,
Expand All @@ -205,8 +217,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});

await kibanaDiscoveryService.deleteCurrentNode();
Expand All @@ -228,8 +242,10 @@ describe('KibanaDiscoveryService', () => {
savedObjectsRepository,
logger,
currentNode,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});

await kibanaDiscoveryService.deleteCurrentNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import type { ISavedObjectsRepository } from '@kbn/core/server';
import { Logger } from '@kbn/core/server';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { TaskManagerConfig } from '../config';

interface DiscoveryServiceParams {
activeNodesLookBack: number;
config: TaskManagerConfig['discovery'];
currentNode: string;
discoveryInterval: number;
savedObjectsRepository: ISavedObjectsRepository;
logger: Logger;
}
Expand All @@ -24,22 +24,16 @@ interface DiscoveryServiceUpsertParams {
}

export class KibanaDiscoveryService {
private activeNodesLookBack: number;
private readonly activeNodesLookBack: string;
private readonly discoveryInterval: number;
private currentNode: string;
private discoveryInterval: number;
private started = false;
private savedObjectsRepository: ISavedObjectsRepository;
private logger: Logger;

constructor({
activeNodesLookBack,
currentNode,
discoveryInterval,
savedObjectsRepository,
logger,
}: DiscoveryServiceParams) {
this.activeNodesLookBack = activeNodesLookBack;
this.discoveryInterval = discoveryInterval;
constructor({ config, currentNode, savedObjectsRepository, logger }: DiscoveryServiceParams) {
this.activeNodesLookBack = config.active_nodes_lookback;
this.discoveryInterval = config.interval;
this.savedObjectsRepository = savedObjectsRepository;
this.logger = logger;
this.currentNode = currentNode;
Expand Down Expand Up @@ -102,7 +96,7 @@ export class KibanaDiscoveryService {
type: BACKGROUND_TASK_NODE_SO_NAME,
perPage: 10000,
page: 1,
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${this.activeNodesLookBack}s`,
filter: `${BACKGROUND_TASK_NODE_SO_NAME}.attributes.last_seen > now-${this.activeNodesLookBack}`,
});

return activeNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { SavedObjectsFindResponse, SavedObjectsFindResult } from '@kbn/core/serv
import { BackgroundTaskNode } from '../saved_objects/schemas/background_task_node';
import { BACKGROUND_TASK_NODE_SO_NAME } from '../saved_objects';
import { KibanaDiscoveryService } from './kibana_discovery_service';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_S, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';
import { DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION, DEFAULT_DISCOVERY_INTERVAL_MS } from '../config';

export const createDiscoveryServiceMock = (currentNode: string) => {
const savedObjectsRepository = savedObjectsRepositoryMock.create();
Expand All @@ -19,8 +19,10 @@ export const createDiscoveryServiceMock = (currentNode: string) => {
savedObjectsRepository,
logger,
currentNode,
activeNodesLookBack: DEFAULT_ACTIVE_NODES_LOOK_BACK_S,
discoveryInterval: DEFAULT_DISCOVERY_INTERVAL_MS,
config: {
active_nodes_lookback: DEFAULT_ACTIVE_NODES_LOOK_BACK_DURATION,
interval: DEFAULT_DISCOVERY_INTERVAL_MS,
},
});

for (const method of ['getActiveKibanaNodes'] as Array<keyof KibanaDiscoveryService>) {
Expand Down
Loading

0 comments on commit 730d933

Please sign in to comment.