Skip to content

Commit

Permalink
Fix delayed status API updates in alerting and task_manager (elastic#…
Browse files Browse the repository at this point in the history
  • Loading branch information
joshdover authored Jun 14, 2021
1 parent 277212d commit 0e7d4fe
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 33 deletions.
9 changes: 9 additions & 0 deletions test/functional/apps/bundles/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ export default function ({ getService }) {

let buildNum;
before(async () => {
// Wait for status to become green
let status;
const start = Date.now();
do {
const resp = await supertest.get('/api/status');
status = resp.status;
// Stop polling once status stabilizes OR once 40s has passed
} while (status !== 200 && Date.now() - start < 40_000);

const resp = await supertest.get('/api/status').expect(200);
buildNum = resp.body.version.build_number;
});
Expand Down
11 changes: 11 additions & 0 deletions test/server_integration/http/platform/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');

describe('kibana server cache-control', () => {
before(async () => {
// Wait for status to become green
let status;
const start = Date.now();
do {
const resp = await supertest.get('/api/status');
status = resp.status;
// Stop polling once status stabilizes OR once 40s has passed
} while (status !== 200 && Date.now() - start < 40_000);
});

it('properly marks responses as private, with directives to disable caching', async () => {
await supertest
.get('/api/status')
Expand Down
20 changes: 8 additions & 12 deletions x-pack/plugins/alerting/server/health/get_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(getHealthCheckTask());
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);

getHealthStatusStream(
mockTaskManager,
Expand All @@ -77,24 +76,22 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
pollInterval
).subscribe();

// shouldn't fire before poll interval passes
// should fire before poll interval passes
// should fire once each poll interval
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(1);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(2);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(3);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(4);
});

it('should retry on error', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue(new Error('Failure'));
const retryDelay = 10;
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);

getHealthStatusStream(
mockTaskManager,
Expand All @@ -114,28 +111,27 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
retryDelay
).subscribe();

jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(1);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(2);

// Retry on failure
let numTimesCalled = 1;
for (let i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
await tick();
jest.advanceTimersByTime(retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled++ + 1);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled++ + 2);
}

// Once we've exceeded max retries, should not try again
await tick();
jest.advanceTimersByTime(retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled + 1);

// Once another poll interval passes, should call fn again
await tick();
jest.advanceTimersByTime(pollInterval - MAX_RETRY_ATTEMPTS * retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled + 1);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled + 2);
});

it('should return healthy status when health status is "ok"', async () => {
Expand Down
13 changes: 12 additions & 1 deletion x-pack/plugins/alerting/server/health/get_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import { i18n } from '@kbn/i18n';
import { defer, of, interval, Observable, throwError, timer } from 'rxjs';
import { catchError, mergeMap, retryWhen, switchMap } from 'rxjs/operators';
import { catchError, mergeMap, retryWhen, startWith, switchMap } from 'rxjs/operators';
import {
Logger,
SavedObjectsServiceStart,
Expand Down Expand Up @@ -121,6 +121,17 @@ export const getHealthStatusStream = (
retryDelay?: number
): Observable<ServiceStatus<unknown>> =>
interval(healthStatusInterval ?? HEALTH_STATUS_INTERVAL).pipe(
// Emit an initial check
startWith(
getHealthServiceStatusWithRetryAndErrorHandling(
taskManager,
logger,
savedObjects,
config,
retryDelay
)
),
// On each interval do a new check
switchMap(() =>
getHealthServiceStatusWithRetryAndErrorHandling(
taskManager,
Expand Down
31 changes: 19 additions & 12 deletions x-pack/plugins/alerting/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import type { PublicMethodsOf } from '@kbn/utility-types';
import { first, map, share } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { BehaviorSubject, Observable } from 'rxjs';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { combineLatest } from 'rxjs';
import { SecurityPluginSetup, SecurityPluginStart } from '../../security/server';
Expand All @@ -34,6 +34,7 @@ import {
StatusServiceSetup,
ServiceStatus,
SavedObjectsBulkGetObject,
ServiceStatusLevels,
} from '../../../../src/core/server';
import type { AlertingRequestHandlerContext } from './types';
import { defineRoutes } from './routes';
Expand Down Expand Up @@ -226,17 +227,23 @@ export class AlertingPlugin {
this.config
);

const serviceStatus$ = new BehaviorSubject<ServiceStatus>({
level: ServiceStatusLevels.unavailable,
summary: 'Alerting is initializing',
});
core.status.set(serviceStatus$);

core.getStartServices().then(async ([coreStart, startPlugins]) => {
core.status.set(
combineLatest([
core.status.derivedStatus$,
getHealthStatusStream(
startPlugins.taskManager,
this.logger,
coreStart.savedObjects,
this.config
),
]).pipe(
combineLatest([
core.status.derivedStatus$,
getHealthStatusStream(
startPlugins.taskManager,
this.logger,
coreStart.savedObjects,
this.config
),
])
.pipe(
map(([derivedStatus, healthStatus]) => {
if (healthStatus.level > derivedStatus.level) {
return healthStatus as ServiceStatus;
Expand All @@ -246,7 +253,7 @@ export class AlertingPlugin {
}),
share()
)
);
.subscribe(serviceStatus$);
});

initializeAlertingHealth(this.logger, plugins.taskManager, core.getStartServices());
Expand Down
14 changes: 6 additions & 8 deletions x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,13 @@ export class TaskManagerPlugin
this.config!
);

core.getStartServices().then(async () => {
core.status.set(
combineLatest([core.status.derivedStatus$, serviceStatus$]).pipe(
map(([derivedStatus, serviceStatus]) =>
serviceStatus.level > derivedStatus.level ? serviceStatus : derivedStatus
)
core.status.set(
combineLatest([core.status.derivedStatus$, serviceStatus$]).pipe(
map(([derivedStatus, serviceStatus]) =>
serviceStatus.level > derivedStatus.level ? serviceStatus : derivedStatus
)
);
});
)
);

return {
index: this.config.index,
Expand Down

0 comments on commit 0e7d4fe

Please sign in to comment.