Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactively disable Task Manager lifecycle when core services become unavailable #81779

Merged
merged 10 commits into from
Oct 29, 2020
10 changes: 9 additions & 1 deletion x-pack/plugins/task_manager/server/monitoring/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,20 @@ export {
export function createMonitoringStats(
taskPollingLifecycle: TaskPollingLifecycle,
taskStore: TaskStore,
elasticsearchAndSOAvailability$: Observable<boolean>,
config: TaskManagerConfig,
managedConfig: ManagedConfiguration,
logger: Logger
): Observable<MonitoringStats> {
return createMonitoringStatsStream(
createAggregators(taskPollingLifecycle, taskStore, config, managedConfig, logger),
createAggregators(
taskPollingLifecycle,
taskStore,
elasticsearchAndSOAvailability$,
config,
managedConfig,
logger
),
config
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export interface RawMonitoringStats {
export function createAggregators(
taskPollingLifecycle: TaskPollingLifecycle,
taskStore: TaskStore,
elasticsearchAndSOAvailability$: Observable<boolean>,
config: TaskManagerConfig,
managedConfig: ManagedConfiguration,
logger: Logger
Expand All @@ -72,6 +73,7 @@ export function createAggregators(
createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window),
createWorkloadAggregator(
taskStore,
elasticsearchAndSOAvailability$,
config.monitored_aggregated_stats_refresh_rate,
config.poll_interval,
logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { ESSearchResponse } from '../../../apm/typings/elasticsearch';
import { AggregationResultOf } from '../../../apm/typings/elasticsearch/aggregations';
import { times } from 'lodash';
import { taskStoreMock } from '../task_store.mock';
import { of, Subject } from 'rxjs';
import { sleep } from '../test_utils';

type MockESResult = ESSearchResponse<
ConcreteTaskInstance,
Expand Down Expand Up @@ -75,6 +77,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
10,
3000,
loggingSystemMock.create().get()
Expand Down Expand Up @@ -231,6 +234,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
10,
3000,
loggingSystemMock.create().get()
Expand All @@ -252,12 +256,51 @@ describe('Workload Statistics Aggregator', () => {
});
});

test('skips summary of the workload when services are unavailable', async () => {
const taskStore = taskStoreMock.create({});
taskStore.aggregate.mockResolvedValue(mockAggregatedResult());

const availability$ = new Subject<boolean>();

const workloadAggregator = createWorkloadAggregator(
taskStore,
availability$,
10,
3000,
loggingSystemMock.create().get()
);

return new Promise(async (resolve) => {
workloadAggregator.pipe(first()).subscribe((result) => {
expect(result.key).toEqual('workload');
expect(result.value).toMatchObject({
count: 4,
task_types: {
actions_telemetry: { count: 2, status: { idle: 2 } },
alerting_telemetry: { count: 1, status: { idle: 1 } },
session_cleanup: { count: 1, status: { idle: 1 } },
},
});
resolve();
});

availability$.next(false);

await sleep(10);
expect(taskStore.aggregate).not.toHaveBeenCalled();
await sleep(10);
expect(taskStore.aggregate).not.toHaveBeenCalled();
availability$.next(true);
});
});

test('returns a count of the overdue workload', async () => {
const taskStore = taskStoreMock.create({});
taskStore.aggregate.mockResolvedValue(mockAggregatedResult());

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
10,
3000,
loggingSystemMock.create().get()
Expand All @@ -280,6 +323,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
10,
3000,
loggingSystemMock.create().get()
Expand Down Expand Up @@ -307,6 +351,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
60 * 1000,
3000,
loggingSystemMock.create().get()
Expand Down Expand Up @@ -344,6 +389,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
15 * 60 * 1000,
3000,
loggingSystemMock.create().get()
Expand Down Expand Up @@ -392,7 +438,7 @@ describe('Workload Statistics Aggregator', () => {
})
);
const logger = loggingSystemMock.create().get();
const workloadAggregator = createWorkloadAggregator(taskStore, 10, 3000, logger);
const workloadAggregator = createWorkloadAggregator(taskStore, of(true), 10, 3000, logger);

return new Promise((resolve, reject) => {
workloadAggregator.pipe(take(2), bufferCount(2)).subscribe((results) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { timer } from 'rxjs';
import { mergeMap, map, catchError } from 'rxjs/operators';
import { combineLatest, Observable, timer } from 'rxjs';
import { mergeMap, map, filter, catchError } from 'rxjs/operators';
import { Logger } from 'src/core/server';
import { JsonObject } from 'src/plugins/kibana_utils/common';
import { keyBy, mapValues } from 'lodash';
Expand Down Expand Up @@ -94,6 +94,7 @@ const MAX_SHCEDULE_DENSITY_BUCKETS = 50;

export function createWorkloadAggregator(
taskStore: TaskStore,
elasticsearchAndSOAvailability$: Observable<boolean>,
refreshInterval: number,
pollInterval: number,
logger: Logger
Expand All @@ -105,7 +106,8 @@ export function createWorkloadAggregator(
MAX_SHCEDULE_DENSITY_BUCKETS
);

return timer(0, refreshInterval).pipe(
return combineLatest([timer(0, refreshInterval), elasticsearchAndSOAvailability$]).pipe(
filter(([, areElasticsearchAndSOAvailable]) => areElasticsearchAndSOAvailable),
mergeMap(() =>
taskStore.aggregate({
aggs: {
Expand Down
100 changes: 99 additions & 1 deletion x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { TaskManagerPlugin } from './plugin';
import { TaskManagerPlugin, getElasticsearchAndSOAvailability } from './plugin';
import { coreMock } from '../../../../src/core/server/mocks';
import { TaskManagerConfig } from './config';
import { Subject } from 'rxjs';
import { bufferCount, take } from 'rxjs/operators';
import { CoreStatus, ServiceStatusLevels } from 'src/core/server';

describe('TaskManagerPlugin', () => {
describe('setup', () => {
Expand Down Expand Up @@ -88,4 +91,99 @@ describe('TaskManagerPlugin', () => {
);
});
});

describe('getElasticsearchAndSOAvailability', () => {
test('returns true when both services are available', async () => {
const core$ = new Subject<CoreStatus>();

const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(1), bufferCount(1))
.toPromise();

core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));

expect(await availability).toEqual([true]);
});

test('returns false when both services are unavailable', async () => {
const core$ = new Subject<CoreStatus>();

const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(1), bufferCount(1))
.toPromise();

core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));

expect(await availability).toEqual([false]);
});

test('returns false when one service is unavailable but the other is available', async () => {
const core$ = new Subject<CoreStatus>();

const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(1), bufferCount(1))
.toPromise();

core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));

expect(await availability).toEqual([false]);
});

test('shift back and forth between values as status changes', async () => {
const core$ = new Subject<CoreStatus>();

const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(3), bufferCount(3))
.toPromise();

core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));

core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));

core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));

expect(await availability).toEqual([false, true, false]);
});

test(`skips values when the status hasn't changed`, async () => {
const core$ = new Subject<CoreStatus>();

const availability = getElasticsearchAndSOAvailability(core$)
.pipe(take(3), bufferCount(3))
.toPromise();

core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false }));

// still false, so shouldn't emit a second time
core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: true }));

core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));

// shouldn't emit as already true
core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true }));

core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false }));

expect(await availability).toEqual([false, true, false]);
});
});
});

function mockCoreStatusAvailability({
elasticsearch,
savedObjects,
}: {
elasticsearch: boolean;
savedObjects: boolean;
}) {
return {
elasticsearch: {
level: elasticsearch ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable,
summary: '',
},
savedObjects: {
level: savedObjects ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable,
summary: '',
},
};
}
48 changes: 32 additions & 16 deletions x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { PluginInitializerContext, Plugin, CoreSetup, Logger, CoreStart } from 'src/core/server';
import { combineLatest, Subject } from 'rxjs';
import { first, map } from 'rxjs/operators';
import { combineLatest, Observable, Subject } from 'rxjs';
import { first, map, distinctUntilChanged } from 'rxjs/operators';
import {
PluginInitializerContext,
Plugin,
CoreSetup,
Logger,
CoreStart,
ServiceStatusLevels,
CoreStatus,
} from '../../../../src/core/server';
import { TaskDefinition } from './task';
import { TaskPollingLifecycle } from './polling_lifecycle';
import { TaskManagerConfig } from './config';
Expand Down Expand Up @@ -37,6 +45,7 @@ export class TaskManagerPlugin
private logger: Logger;
private definitions: TaskTypeDictionary;
private middleware: Middleware = createInitialMiddleware();
private elasticsearchAndSOAvailability$?: Observable<boolean>;
private monitoringStats$ = new Subject<MonitoringStats>();

constructor(private readonly initContext: PluginInitializerContext) {
Expand All @@ -51,6 +60,8 @@ export class TaskManagerPlugin
.pipe(first())
.toPromise();

this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$);

setupSavedObjects(core.savedObjects, this.config);
this.taskManagerId = this.initContext.env.instanceUuid;

Expand Down Expand Up @@ -115,19 +126,20 @@ export class TaskManagerPlugin
startingPollInterval: this.config!.poll_interval,
});

const taskPollingLifecycle = new TaskPollingLifecycle({
this.taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
logger: this.logger,
taskStore,
middleware: this.middleware,
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
...managedConfiguration,
});
this.taskPollingLifecycle = taskPollingLifecycle;

createMonitoringStats(
taskPollingLifecycle,
this.taskPollingLifecycle,
taskStore,
this.elasticsearchAndSOAvailability$!,
this.config!,
managedConfiguration,
this.logger
Expand All @@ -137,12 +149,9 @@ export class TaskManagerPlugin
logger: this.logger,
taskStore,
middleware: this.middleware,
taskPollingLifecycle,
taskPollingLifecycle: this.taskPollingLifecycle,
});

// start polling for work
taskPollingLifecycle.start();

return {
fetch: (opts: SearchOpts): Promise<FetchResult> => taskStore.fetch(opts),
get: (id: string) => taskStore.get(id),
Expand All @@ -153,12 +162,6 @@ export class TaskManagerPlugin
};
}

public stop() {
if (this.taskPollingLifecycle) {
this.taskPollingLifecycle.stop();
}
}

/**
* Ensures task manager hasn't started
*
Expand All @@ -171,3 +174,16 @@ export class TaskManagerPlugin
}
}
}

export function getElasticsearchAndSOAvailability(
core$: Observable<CoreStatus>
): Observable<boolean> {
return core$.pipe(
map(
({ elasticsearch, savedObjects }) =>
elasticsearch.level === ServiceStatusLevels.available &&
savedObjects.level === ServiceStatusLevels.available
),
distinctUntilChanged()
);
}
Loading