Skip to content

Commit

Permalink
changed to p metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Oct 13, 2020
1 parent 4f37202 commit 9d92055
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ import {

describe('calculateRunningAverage', () => {
test('calculates the running average and median of a window of values', async () => {
expect(calculateRunningAverage([2, 2, 4, 6, 6])).toEqual({
mean: 4,
median: 4,
});
expect(calculateRunningAverage([2, 2, 4, 6, 6])).toMatchInlineSnapshot(`
Object {
"p50": 4,
"p90": 6,
"p95": 6,
"p99": 6,
}
`);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ import { JsonObject } from 'src/plugins/kibana_utils/common';
import { isUndefined, countBy, mapValues } from 'lodash';

export interface AveragedStat extends JsonObject {
mean: number;
median: number;
p50: number;
p90: number;
p95: number;
p99: number;
}

export function calculateRunningAverage(values: number[]): AveragedStat {
return {
mean: Math.round(stats.mean(values)),
median: stats.median(values),
p50: stats.percentile(values, 0.5),
p90: stats.percentile(values, 0.9),
p95: stats.percentile(values, 0.95),
p99: stats.percentile(values, 0.99),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ describe('Task Run Statistics', () => {
window: number[]
) {
expect(taskStat.value.drift).toMatchObject({
mean: Math.round(stats.mean(window)),
median: stats.median(window),
p50: stats.percentile(window, 0.5),
p90: stats.percentile(window, 0.9),
p95: stats.percentile(window, 0.95),
p99: stats.percentile(window, 0.99),
});
}

Expand Down Expand Up @@ -111,8 +113,10 @@ describe('Task Run Statistics', () => {
) {
for (const [type, window] of Object.entries(windows)) {
expect(taskStat.value.execution.duration[type]).toMatchObject({
mean: Math.round(stats.mean(window)),
median: stats.median(window),
p50: stats.percentile(window, 0.5),
p90: stats.percentile(window, 0.9),
p95: stats.percentile(window, 0.95),
p99: stats.percentile(window, 0.99),
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { timer } from 'rxjs';
import { concatMap, map, filter, catchError } from 'rxjs/operators';
import { mergeMap, map, filter, catchError } from 'rxjs/operators';
import { Logger } from 'src/core/server';
import { JsonObject } from 'src/plugins/kibana_utils/common';
import { keyBy, mapValues } from 'lodash';
Expand Down Expand Up @@ -112,7 +112,7 @@ export function createWorkloadAggregator(
// To avoid erros due to ES not being ready, we'll wait until Start
// to begin polling for the workload
filter(() => taskManager.isStarted),
concatMap(() =>
mergeMap(() =>
taskManager.aggregate<WorkloadAggregation>({
aggs: {
taskType: {
Expand All @@ -136,7 +136,10 @@ export function createWorkloadAggregator(
range: {
field: 'task.runAt',
ranges: [
{ from: `now`, to: `now+${asInterval(scheduleDensityBuckets * pollInterval)}` },
{
from: `now`,
to: `now+${asInterval(scheduleDensityBuckets * pollInterval)}`,
},
],
},
aggs: {
Expand Down
63 changes: 41 additions & 22 deletions x-pack/plugins/task_manager/server/routes/health.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,30 @@ describe('healthRoute', () => {
it('returns a error status if the overall stats have not been updated within the required hot freshness', async () => {
const router = httpServiceMock.createRouter();

const mockStat = mockHealthStats({
lastUpdate: new Date(Date.now() - 1500).toISOString(),
});
const stats$ = new Subject<MonitoringStats>();

const serviceStatus$ = healthRoute(
router,
Promise.resolve(of(mockStat)),
Promise.resolve(stats$),
mockLogger(),
uuid.v4(),
1000,
60000
);

const serviceStatus = getLatest(serviceStatus$);

const [, handler] = router.get.mock.calls[0];

const [context, req, res] = mockHandlerArguments({}, {}, ['ok', 'internalError']);

await sleep(2000);
await sleep(0);

stats$.next(
mockHealthStats({
lastUpdate: new Date(Date.now() - 1500).toISOString(),
})
);

expect(await handler(context, req, res)).toMatchObject({
body: {
Expand Down Expand Up @@ -127,7 +133,7 @@ describe('healthRoute', () => {
},
});

expect(await getLatest(serviceStatus$)).toMatchObject({
expect(await serviceStatus).toMatchObject({
level: ServiceStatusLevels.unavailable,
summary: 'Task Manager is unavailable',
meta: {
Expand Down Expand Up @@ -160,15 +166,22 @@ describe('healthRoute', () => {
it('returns a error status if the workload stats have not been updated within the required cold freshness', async () => {
const router = httpServiceMock.createRouter();

const stats$ = new Subject<MonitoringStats>();

healthRoute(router, Promise.resolve(stats$), mockLogger(), uuid.v4(), 5000, 60000);

await sleep(0);

const lastUpdateOfWorkload = new Date(Date.now() - 120000).toISOString();
const mockStat = mockHealthStats({
stats: {
workload: {
timestamp: lastUpdateOfWorkload,
stats$.next(
mockHealthStats({
stats: {
workload: {
timestamp: lastUpdateOfWorkload,
},
},
},
});
healthRoute(router, Promise.resolve(of(mockStat)), mockLogger(), uuid.v4(), 5000, 60000);
})
);

const [, handler] = router.get.mock.calls[0];

Expand Down Expand Up @@ -207,19 +220,25 @@ describe('healthRoute', () => {
it('returns a error status if the poller hasnt polled within the required hot freshness', async () => {
const router = httpServiceMock.createRouter();

const stats$ = new Subject<MonitoringStats>();
healthRoute(router, Promise.resolve(stats$), mockLogger(), uuid.v4(), 1000, 60000);

await sleep(0);

const lastSuccessfulPoll = new Date(Date.now() - 2000).toISOString();
const mockStat = mockHealthStats({
stats: {
runtime: {
value: {
polling: {
lastSuccessfulPoll,
stats$.next(
mockHealthStats({
stats: {
runtime: {
value: {
polling: {
lastSuccessfulPoll,
},
},
},
},
},
});
healthRoute(router, Promise.resolve(of(mockStat)), mockLogger(), uuid.v4(), 1000, 60000);
})
);

const [, handler] = router.get.mock.calls[0];

Expand Down
88 changes: 45 additions & 43 deletions x-pack/plugins/task_manager/server/routes/health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import {
IKibanaResponse,
KibanaResponseFactory,
} from 'kibana/server';
import { Observable, from } from 'rxjs';
import { take, mergeMap, map } from 'rxjs/operators';
import { Observable, from, Subject } from 'rxjs';
import { take, mergeMap, tap, map } from 'rxjs/operators';
import { throttleTime } from 'rxjs/operators';
import { isString } from 'lodash';
import { JsonValue } from 'src/plugins/kibana_utils/common';
Expand Down Expand Up @@ -43,7 +43,6 @@ export function healthRoute(
function calculateStatus(monitoredStats: MonitoringStats): MonitoredHealth {
const now = Date.now();
const timestamp = new Date(now).toISOString();

const summarizedStats = summarizeMonitoringStats(monitoredStats);

/**
Expand All @@ -60,21 +59,30 @@ export function healthRoute(
return { id: taskManagerId, timestamp, status: healthStatus, ...summarizedStats };
}

// Only calculate the summerized stats (calculates all runnign averages and evaluates state)
// when needed by throttling down to the requiredHotStatsFreshness
const throttledMonitoredStats$ = from(monitoringStats).pipe(
mergeMap((monitoringStats$) =>
monitoringStats$.pipe(
throttleTime(requiredHotStatsFreshness),
map((stats) => calculateStatus(stats))
)
)
);
const serviceStatus$: Subject<ServiceStatus> = new Subject<ServiceStatus>();

/* keep track of last health summary, as we'll return that to the next call to _health */
let lastMonitoredStats: MonitoringStats | null = null;

/* Log Task Manager stats as a Debug log line at a fixed interval */
throttledMonitoredStats$.subscribe((stats) => {
logger.debug(JSON.stringify(stats));
});
from(monitoringStats)
.pipe(
mergeMap((monitoringStats$) =>
monitoringStats$.pipe(
throttleTime(requiredHotStatsFreshness),
tap((stats) => {
lastMonitoredStats = stats;
}),
// Only calculate the summerized stats (calculates all runnign averages and evaluates state)
// when needed by throttling down to the requiredHotStatsFreshness
map((stats) => withServiceStatus(calculateStatus(stats)))
)
)
)
.subscribe(([monitoredHealth, serviceStatus]) => {
serviceStatus$.next(serviceStatus);
logger.debug(JSON.stringify(monitoredHealth));
});

router.get(
{
Expand All @@ -87,32 +95,32 @@ export function healthRoute(
res: KibanaResponseFactory
): Promise<IKibanaResponse> {
return res.ok({
body: calculateStatus(await getLatestStats(await monitoringStats)),
body: lastMonitoredStats
? calculateStatus(lastMonitoredStats)
: { id: taskManagerId, timestamp: new Date().toISOString(), status: HealthStatus.Error },
});
}
);

return asServiceStatus(throttledMonitoredStats$);
return serviceStatus$;
}

export function asServiceStatus(
monitoredHealth$: Observable<MonitoredHealth>
): Observable<ServiceStatus> {
return monitoredHealth$.pipe(
map((monitoredHealth) => {
const level =
monitoredHealth.status === HealthStatus.OK
? ServiceStatusLevels.available
: monitoredHealth.status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
level,
summary: LEVEL_SUMMARY[level.toString()],
meta: monitoredHealth,
};
})
);
export function withServiceStatus(
monitoredHealth: MonitoredHealth
): [MonitoredHealth, ServiceStatus] {
const level =
monitoredHealth.status === HealthStatus.OK
? ServiceStatusLevels.available
: monitoredHealth.status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return [
monitoredHealth,
{
level,
summary: LEVEL_SUMMARY[level.toString()],
meta: monitoredHealth,
},
];
}

/**
Expand Down Expand Up @@ -156,9 +164,3 @@ function getOldestTimestamp(...timestamps: Array<JsonValue | undefined>): number
.filter((timestamp) => !isNaN(timestamp));
return validTimestamps.length ? Math.min(...validTimestamps) : 0;
}

async function getLatestStats(monitoringStats$: Observable<MonitoringStats>) {
return new Promise<MonitoringStats>((resolve) =>
monitoringStats$.pipe(take(1)).subscribe((stats) => resolve(stats))
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ interface MonitoringStats {
taskTypes: Record<string, object>;
schedule: Array<[string, number]>;
overdue: number;
scheduleDensity: number[];
estimatedScheduleDensity: number[];
};
};
runtime: {
Expand Down Expand Up @@ -148,11 +148,11 @@ export default function ({ getService }: FtrProviderContext) {

expect(typeof workload.overdue).to.eql('number');

expect(Array.isArray(workload.scheduleDensity)).to.eql(true);
expect(Array.isArray(workload.estimatedScheduleDensity)).to.eql(true);

// test run with the default poll_interval of 3s and a monitored_aggregated_stats_refresh_rate of 5s,
// so we expect the scheduleDensity to span a minute (which means 20 buckets, as 60s / 3s = 20)
expect(workload.scheduleDensity.length).to.eql(20);
// so we expect the estimatedScheduleDensity to span a minute (which means 20 buckets, as 60s / 3s = 20)
expect(workload.estimatedScheduleDensity.length).to.eql(20);
});

it('should return the task manager runtime stats', async () => {
Expand All @@ -172,11 +172,15 @@ export default function ({ getService }: FtrProviderContext) {
expect(typeof polling.resultFrequency.RanOutOfCapacity).to.eql('number');
expect(typeof polling.resultFrequency.PoolFilled).to.eql('number');

expect(typeof drift.mean).to.eql('number');
expect(typeof drift.median).to.eql('number');
expect(typeof drift.p50).to.eql('number');
expect(typeof drift.p90).to.eql('number');
expect(typeof drift.p95).to.eql('number');
expect(typeof drift.p99).to.eql('number');

expect(typeof execution.duration.sampleTask.mean).to.eql('number');
expect(typeof execution.duration.sampleTask.median).to.eql('number');
expect(typeof execution.duration.sampleTask.p50).to.eql('number');
expect(typeof execution.duration.sampleTask.p90).to.eql('number');
expect(typeof execution.duration.sampleTask.p95).to.eql('number');
expect(typeof execution.duration.sampleTask.p99).to.eql('number');

expect(typeof execution.resultFrequency.sampleTask.Success).to.eql('number');
expect(typeof execution.resultFrequency.sampleTask.RetryScheduled).to.eql('number');
Expand Down

0 comments on commit 9d92055

Please sign in to comment.