Skip to content

Commit

Permalink
[Monitoring] Fix inaccuracies in logstash pipeline listing metrics (#…
Browse files Browse the repository at this point in the history
…55868) (#56176)

* Change how we fetch pipeline listing metrics to match what other charts show

* Fix tests

* Fix tests

Co-authored-by: Elastic Machine <[email protected]>

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
chrisronline and elasticmachine authored Jan 28, 2020
1 parent 029e37b commit 001dcb9
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { checkCcrEnabled } from '../elasticsearch/ccr';
import { getStandaloneClusterDefinition, hasStandaloneClusters } from '../standalone_clusters';
import { getLogTypes } from '../logs';
import { isInCodePath } from './is_in_code_path';
import { getLogstashPipelineIds } from '../logstash/get_pipeline_ids';

/**
* Get all clusters or the cluster associated with {@code clusterUuid} when it is defined.
Expand All @@ -53,6 +54,8 @@ export async function getClustersFromRequest(
filebeatIndexPattern,
} = indexPatterns;

const config = req.server.config();
const size = config.get('xpack.monitoring.max_bucket_size');
const isStandaloneCluster = clusterUuid === STANDALONE_CLUSTER_CLUSTER_UUID;

let clusters = [];
Expand Down Expand Up @@ -158,25 +161,27 @@ export async function getClustersFromRequest(
});

// add logstash data
const logstashes = isInCodePath(codePaths, [CODE_PATH_LOGSTASH])
? await getLogstashForClusters(req, lsIndexPattern, clusters)
: [];

const clusterPipelineNodesCount = isInCodePath(codePaths, [CODE_PATH_LOGSTASH])
? await getPipelines(req, lsIndexPattern, null, ['logstash_cluster_pipeline_nodes_count'])
: [];

// add the logstash data to each cluster
logstashes.forEach(logstash => {
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });

// withhold LS overview stats until pipeline metrics have at least one full bucket
if (logstash.clusterUuid === req.params.clusterUuid && clusterPipelineNodesCount.length === 0) {
logstash.stats = {};
}

set(clusters[clusterIndex], 'logstash', logstash.stats);
});
if (isInCodePath(codePaths, [CODE_PATH_LOGSTASH])) {
const logstashes = await getLogstashForClusters(req, lsIndexPattern, clusters);
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, size);
const clusterPipelineNodesCount = await getPipelines(req, lsIndexPattern, pipelines, [
'logstash_cluster_pipeline_nodes_count',
]);
// add the logstash data to each cluster
logstashes.forEach(logstash => {
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });

// withhold LS overview stats until pipeline metrics have at least one full bucket
if (
logstash.clusterUuid === req.params.clusterUuid &&
clusterPipelineNodesCount.length === 0
) {
logstash.stats = {};
}

set(clusters[clusterIndex], 'logstash', logstash.stats);
});
}

// add beats data
const beatsByCluster = isInCodePath(codePaths, [CODE_PATH_BEATS])
Expand All @@ -199,7 +204,6 @@ export async function getClustersFromRequest(
// check ccr configuration
const isCcrEnabled = await checkCcrEnabled(req, esIndexPattern);

const config = req.server.config();
const kibanaUuid = config.get('server.uuid');

return getClustersSummary(req.server, clusters, kibanaUuid, isCcrEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
*/

import expect from '@kbn/expect';
import { handleGetPipelinesResponse, processPipelinesAPIResponse } from '../get_pipelines';
import { processPipelinesAPIResponse } from '../get_pipelines';

describe('processPipelinesAPIResponse', () => {
let response;
beforeEach(() => {
response = {
pipelines: [
{
id: 1,
metrics: {
throughput_for_cluster: {
data: [
Expand All @@ -22,8 +23,8 @@ describe('processPipelinesAPIResponse', () => {
},
nodes_count_for_cluster: {
data: [
[1513721903, 3],
[1513722162, 2],
[1513721903, { 1: 5 }],
[1513722162, { 1: 10 }],
],
},
},
Expand All @@ -32,96 +33,27 @@ describe('processPipelinesAPIResponse', () => {
};
});

it('normalizes the metric keys', () => {
processPipelinesAPIResponse(response, 'throughput_for_cluster', 'nodes_count_for_cluster').then(
processedResponse => {
expect(processedResponse.pipelines[0].metrics.throughput).to.eql(
response.pipelines[0].metrics.throughput_for_cluster
);
expect(processedResponse.pipelines[0].metrics.nodesCount).to.eql(
response.pipelines[0].metrics.nodes_count_for_cluster
);
}
it('normalizes the metric keys', async () => {
const processedResponse = await processPipelinesAPIResponse(
response,
'throughput_for_cluster',
'nodes_count_for_cluster'
);
expect(processedResponse.pipelines[0].metrics.throughput).to.eql(
response.pipelines[0].metrics.throughput_for_cluster
);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][0]).to.eql(1513721903);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][1]).to.eql(5);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][0]).to.eql(1513722162);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][1]).to.eql(10);
});

it('computes the latest metrics', () => {
processPipelinesAPIResponse(response, 'throughput_for_cluster', 'nodes_count_for_cluster').then(
processedResponse => {
expect(processedResponse.pipelines[0].latestThroughput).to.eql(23);
expect(processedResponse.pipelines[0].latestNodesCount).to.eql(2);
expect(processedResponse.pipelines[0].latestNodesCount).to.eql(10);
}
);
});
});

describe('get_pipelines', () => {
let fetchPipelinesWithMetricsResult;

describe('fetchPipelinesWithMetrics result contains no pipelines', () => {
beforeEach(() => {
fetchPipelinesWithMetricsResult = {
logstash_pipeline_throughput: [
{
data: [],
},
],
logstash_pipeline_nodes_count: [
{
data: [],
},
],
};
});

it('returns an empty array', () => {
const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult);
expect(result).to.eql([]);
});
});

describe('fetchPipelinesWithMetrics result contains pipelines', () => {
beforeEach(() => {
fetchPipelinesWithMetricsResult = {
logstash_pipeline_throughput: [
{
data: [[1513123151000, { apache_logs: 231, logstash_tweets: 34 }]],
},
],
logstash_pipeline_nodes_count: [
{
data: [[1513123151000, { apache_logs: 3, logstash_tweets: 1 }]],
},
],
};
});

it('returns the correct structure for a non-empty response', () => {
const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult);
expect(result).to.eql([
{
id: 'apache_logs',
metrics: {
logstash_pipeline_throughput: {
data: [[1513123151000, 231]],
},
logstash_pipeline_nodes_count: {
data: [[1513123151000, 3]],
},
},
},
{
id: 'logstash_tweets',
metrics: {
logstash_pipeline_throughput: {
data: [[1513123151000, 34]],
},
logstash_pipeline_nodes_count: {
data: [[1513123151000, 1]],
},
},
},
]);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import { get } from 'lodash';
import { filter } from '../pagination/filter';
import { getLogstashPipelineIds } from './get_pipeline_ids';
import { handleGetPipelinesResponse } from './get_pipelines';
import { sortPipelines } from './sort_pipelines';
import { paginate } from '../pagination/paginate';
import { getMetrics } from '../details/get_metrics';
Expand Down Expand Up @@ -51,19 +50,33 @@ export async function getPaginatedPipelines(
// the necessary sort - we only need the last bucket of data so we
// fetch the last two buckets of data (to ensure we have a single full bucekt),
// then return the value from that last bucket
const metricSeriesData = await getMetrics(
req,
lsIndexPattern,
metricSet,
[],
{ pageOfPipelines: pipelines },
2
);
const pipelineAggregationsData = handleGetPipelinesResponse(
metricSeriesData,
pipelines.map(p => p.id)
const metricSeriesData = Object.values(
await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(
req,
lsIndexPattern,
metricSet,
[],
{
pipeline,
},
2
);

resolve({
id: pipeline.id,
metrics: Object.keys(data).reduce((accum, metricName) => {
accum[metricName] = data[metricName][0];
return accum;
}, {}),
});
});
})
)
);
for (const pipelineAggregationData of pipelineAggregationsData) {
for (const pipelineAggregationData of metricSeriesData) {
for (const pipeline of pipelines) {
if (pipelineAggregationData.id === pipeline.id) {
for (const metric of metricSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { get, uniq } from 'lodash';
import { get } from 'lodash';
import { createQuery } from '../create_query';
import { LogstashMetric } from '../metrics';

Expand All @@ -26,7 +26,7 @@ export async function getLogstashPipelineIds(
index: logstashIndexPattern,
size: 0,
ignoreUnavailable: true,
filterPath: ['aggregations.nested_context.composite_data.buckets'],
filterPath: ['aggregations.nest.id.buckets'],
body: {
query: createQuery({
start,
Expand All @@ -36,37 +36,28 @@ export async function getLogstashPipelineIds(
filters,
}),
aggs: {
nested_context: {
nest: {
nested: {
path: 'logstash_stats.pipelines',
},
aggs: {
composite_data: {
composite: {
id: {
terms: {
field: 'logstash_stats.pipelines.id',
size,
sources: [
{
id: {
terms: {
field: 'logstash_stats.pipelines.id',
},
},
},
{
hash: {
terms: {
field: 'logstash_stats.pipelines.hash',
},
},
},
{
ephemeral_id: {
},
aggs: {
unnest: {
reverse_nested: {},
aggs: {
nodes: {
terms: {
field: 'logstash_stats.pipelines.ephemeral_id',
field: 'logstash_stats.logstash.uuid',
size,
},
},
},
],
},
},
},
},
Expand All @@ -77,8 +68,8 @@ export async function getLogstashPipelineIds(

const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const response = await callWithRequest(req, 'search', params);
const data = get(response, 'aggregations.nested_context.composite_data.buckets', []).map(
bucket => bucket.key
);
return uniq(data, item => item.id);
return get(response, 'aggregations.nest.id.buckets', []).map(bucket => ({
id: bucket.key,
nodeIds: get(bucket, 'unnest.nodes.buckets', []).map(item => item.key),
}));
}
Loading

0 comments on commit 001dcb9

Please sign in to comment.