Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Support excluding certain task types from executing #111036

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM
- `monitored_stats_running_average_window`- Dictates the size of the window used to calculate the running average of various "Hot" stats. Learn More: [./MONITORING](./MONITORING.MD)
- `monitored_stats_required_freshness` - Dictates the _required freshness_ of critical "Hot" stats. Learn More: [./MONITORING](./MONITORING.MD)
- `monitored_task_execution_thresholds`- Dictates the threshold of failed task executions. Learn More: [./MONITORING](./MONITORING.MD)
- `unsafe.exclude_task_types` - A list of task types to exclude from running. Supports wildcard usage, such as `namespace:*`. This configuration is experimental, unsupported, and can only be used for temporary debugging purposes because it causes Kibana to behave in unexpected ways.

## Task definitions

Expand Down
9 changes: 9 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"unsafe": Object {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
}
`);
Expand Down Expand Up @@ -93,6 +96,9 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"unsafe": Object {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
}
`);
Expand Down Expand Up @@ -141,6 +147,9 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"unsafe": Object {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
}
`);
Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ export const configSchema = schema.object(
max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY,
}),
}),
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
chrisronline marked this conversation as resolved.
Show resolved Hide resolved
}),
},
{
validate: (config) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ describe('EphemeralTaskLifecycle', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
...config,
},
elasticsearchAndSOAvailability$,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ describe('managed configuration', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
});
logger = context.logger.get('taskManager');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ describe('Configuration Statistics Aggregator', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
};

const managedConfig = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ describe('createMonitoringStatsStream', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
};

it('returns the initial config used to configure Task Manager', async () => {
Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ describe('TaskManagerPlugin', () => {
enabled: false,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
});

pluginInitializerContext.env.instanceUuid = '';
Expand Down Expand Up @@ -82,6 +85,9 @@ describe('TaskManagerPlugin', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
});

const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
Expand Down
9 changes: 8 additions & 1 deletion x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,14 @@ export class TaskManagerPlugin
usageCollection,
monitoredHealth$,
this.config.ephemeral_tasks.enabled,
this.config.ephemeral_tasks.request_capacity
this.config.ephemeral_tasks.request_capacity,
this.config.unsafe.exclude_task_types
);
}

if (this.config.unsafe.exclude_task_types.length) {
this.logger.debug(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we've labelled this as unsafe/experimental, wondering if we should elevate this message to an info, or perhaps even warning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to a warning. It would make it clear what's happening without a way to ignore it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

`Excluding task types from execution: ${this.config.unsafe.exclude_task_types.join(', ')}`
);
}

Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ describe('TaskPollingLifecycle', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export class TaskPollingLifecycle {
this.taskClaiming = new TaskClaiming({
taskStore,
maxAttempts: config.max_attempts,
excludedTaskTypes: config.unsafe.exclude_task_types,
definitions,
logger: this.logger,
getCapacity: (taskType?: string) =>
Expand Down
14 changes: 14 additions & 0 deletions x-pack/plugins/task_manager/server/queries/task_claiming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ describe('TaskClaiming', () => {
new TaskClaiming({
logger: taskManagerLogger,
definitions,
excludedTaskTypes: [],
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getCapacity: () => 10,
Expand All @@ -119,11 +120,13 @@ describe('TaskClaiming', () => {
taskClaimingOpts = {},
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
}) {
const definitions = storeOpts.definitions ?? taskDefinitions;
const store = taskStoreMock.create({ taskManagerId: storeOpts.taskManagerId });
Expand Down Expand Up @@ -151,6 +154,7 @@ describe('TaskClaiming', () => {
logger: taskManagerLogger,
definitions,
taskStore: store,
excludedTaskTypes,
maxAttempts: taskClaimingOpts.maxAttempts ?? 2,
getCapacity: taskClaimingOpts.getCapacity ?? (() => 10),
...taskClaimingOpts,
Expand All @@ -165,17 +169,20 @@ describe('TaskClaiming', () => {
claimingOpts,
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
claimingOpts: Omit<OwnershipClaimingOpts, 'size' | 'taskTypes'>;
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
}) {
const getCapacity = taskClaimingOpts.getCapacity ?? (() => 10);
const { taskClaiming, store } = initialiseTestClaiming({
storeOpts,
taskClaimingOpts,
excludedTaskTypes,
hits,
versionConflicts,
});
Expand Down Expand Up @@ -264,6 +271,11 @@ describe('TaskClaiming', () => {
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
foobar: {
title: 'foobar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
});

const [
Expand All @@ -282,6 +294,7 @@ describe('TaskClaiming', () => {
claimingOpts: {
claimOwnershipUntil: new Date(),
},
excludedTaskTypes: ['foobar'],
});
expect(query).toMatchObject({
bool: {
Expand Down Expand Up @@ -1241,6 +1254,7 @@ if (doc['task.runAt'].size()!=0) {
const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
definitions,
excludedTaskTypes: [],
taskStore,
maxAttempts: 2,
getCapacity,
Expand Down
55 changes: 37 additions & 18 deletions x-pack/plugins/task_manager/server/queries/task_claiming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* This module contains helpers for managing the task manager storage layer.
*/
import apm from 'elastic-apm-node';
import minimatch from 'minimatch';
import { Subject, Observable, from, of } from 'rxjs';
import { map, mergeScan } from 'rxjs/operators';
import { difference, partition, groupBy, mapValues, countBy, pick, isPlainObject } from 'lodash';
Expand Down Expand Up @@ -57,6 +58,7 @@ export interface TaskClaimingOpts {
definitions: TaskTypeDictionary;
taskStore: TaskStore;
maxAttempts: number;
excludedTaskTypes: string[];
getCapacity: (taskType?: string) => number;
}

Expand Down Expand Up @@ -115,6 +117,7 @@ export class TaskClaiming {
private logger: Logger;
private readonly taskClaimingBatchesByType: TaskClaimingBatches;
private readonly taskMaxAttempts: Record<string, number>;
private readonly excludedTaskTypes: string[];

/**
* Constructs a new TaskStore.
Expand All @@ -130,6 +133,7 @@ export class TaskClaiming {
this.logger = opts.logger;
this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions);
this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions));
this.excludedTaskTypes = opts.excludedTaskTypes;

this.events$ = new Subject<TaskClaim>();
}
Expand Down Expand Up @@ -354,6 +358,16 @@ export class TaskClaiming {
};
};

private isTaskTypeExcluded(taskType: string) {
for (const excludedType of this.excludedTaskTypes) {
if (minimatch(taskType, excludedType)) {
return true;
}
}

return false;
}

private async markAvailableTasksAsClaimed({
claimOwnershipUntil,
claimTasksById,
Expand All @@ -362,9 +376,11 @@ export class TaskClaiming {
}: OwnershipClaimingOpts): Promise<UpdateByQueryResult> {
const { taskTypesToSkip = [], taskTypesToClaim = [] } = groupBy(
this.definitions.getAllTypes(),
(type) => (taskTypes.has(type) ? 'taskTypesToClaim' : 'taskTypesToSkip')
(type) =>
taskTypes.has(type) && !this.isTaskTypeExcluded(type)
? 'taskTypesToClaim'
: 'taskTypesToSkip'
);

const queryForScheduledTasks = mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
Expand All @@ -382,29 +398,32 @@ export class TaskClaiming {
sort.unshift('_score');
}

const query = matchesClauses(
claimTasksById && claimTasksById.length
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
: queryForScheduledTasks,
filterDownBy(InactiveTasks)
);
const script = updateFieldsAndMarkAsFailed(
{
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById || [],
taskTypesToClaim,
taskTypesToSkip,
pick(this.taskMaxAttempts, taskTypesToClaim)
);

const apmTrans = apm.startTransaction(
'markAvailableTasksAsClaimed',
`taskManager markAvailableTasksAsClaimed`
);
try {
const result = await this.taskStore.updateByQuery(
{
query: matchesClauses(
claimTasksById && claimTasksById.length
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
: queryForScheduledTasks,
filterDownBy(InactiveTasks)
),
script: updateFieldsAndMarkAsFailed(
{
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById || [],
taskTypesToClaim,
taskTypesToSkip,
pick(this.taskMaxAttempts, taskTypesToClaim)
),
query,
script,
sort,
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { MonitoredHealth } from '../routes/health';
import { TaskPersistence } from '../task_events';
import { registerTaskManagerUsageCollector } from './task_manager_usage_collector';
import { sleep } from '../test_utils';
import { TaskManagerUsage } from './types';

describe('registerTaskManagerUsageCollector', () => {
let collector: Collector<unknown>;
Expand All @@ -31,25 +32,45 @@ describe('registerTaskManagerUsageCollector', () => {
return createUsageCollectionSetupMock().makeUsageCollector(config);
});

registerTaskManagerUsageCollector(usageCollectionMock, monitoringStats$, true, 10);
registerTaskManagerUsageCollector(usageCollectionMock, monitoringStats$, true, 10, []);

const mockHealth = getMockMonitoredHealth();
monitoringStats$.next(mockHealth);
await sleep(1001);

expect(usageCollectionMock.makeUsageCollector).toBeCalled();
const telemetry = await collector.fetch(fetchContext);
expect(telemetry).toMatchObject({
ephemeral_tasks_enabled: true,
ephemeral_request_capacity: 10,
ephemeral_stats: {
status: mockHealth.stats.ephemeral?.status,
load: mockHealth.stats.ephemeral?.value.load,
executions_per_cycle: mockHealth.stats.ephemeral?.value.executionsPerCycle,
queued_tasks: mockHealth.stats.ephemeral?.value.queuedTasks,
},
const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage;
expect(telemetry.ephemeral_tasks_enabled).toBe(true);
expect(telemetry.ephemeral_request_capacity).toBe(10);
expect(telemetry.ephemeral_stats).toMatchObject({
status: mockHealth.stats.ephemeral?.status,
load: mockHealth.stats.ephemeral?.value.load,
executions_per_cycle: mockHealth.stats.ephemeral?.value.executionsPerCycle,
queued_tasks: mockHealth.stats.ephemeral?.value.queuedTasks,
});
});

it('should report telemetry on the excluded task types', async () => {
const monitoringStats$ = new Subject<MonitoredHealth>();
const usageCollectionMock = createUsageCollectionSetupMock();
const fetchContext = createCollectorFetchContextWithKibanaMock();
usageCollectionMock.makeUsageCollector.mockImplementation((config) => {
collector = new Collector(logger, config);
return createUsageCollectionSetupMock().makeUsageCollector(config);
});

registerTaskManagerUsageCollector(usageCollectionMock, monitoringStats$, true, 10, [
'actions:*',
]);

const mockHealth = getMockMonitoredHealth();
monitoringStats$.next(mockHealth);
await sleep(1001);

expect(usageCollectionMock.makeUsageCollector).toBeCalled();
const telemetry: TaskManagerUsage = (await collector.fetch(fetchContext)) as TaskManagerUsage;
expect(telemetry.task_type_exclusion).toEqual(['actions:*']);
});
});

function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
Expand Down
Loading