Skip to content

Commit

Permalink
Stack monitoring - logstash 8.0 metricbeat standalone compatibility (#…
Browse files Browse the repository at this point in the history
…122177)

* use node_stats metricset filter

* fix pipeline dropdown redirection

* add get_pipeline metricbeat aggregation

* add get_pipeline_vertex metricbeat aggregation

* update logstash metricbeat archived data

* merge_pipeline_versions

* type parameter

* use datastream for mb functional tests

* fix metricbeat apis tests

* lint

* lint

* lint

* fix multicluster api test

* lint

* fix singlecluster_green_gold_mb tests

* lint

* add test case

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
klacabane and kibanamachine authored Jan 5, 2022
1 parent b0c9977 commit 72297e5
Show file tree
Hide file tree
Showing 38 changed files with 399 additions and 588,951 deletions.
11 changes: 11 additions & 0 deletions x-pack/plugins/monitoring/common/types/es.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,17 @@ export interface ElasticsearchMetricbeatSource {
};
logstash?: {
node?: {
state?: {
pipeline?: {
id: string;
name: string;
representation?: {
graph?: {
vertices: ElasticsearchSourceLogstashPipelineVertex[];
};
};
};
};
stats?: {
timestamp?: string;
logstash?: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ export const LogStashPipelinePage: React.FC<ComponentProps> = ({ clusters }) =>
}
}, [detailVertexId, getPageData]);

const onChangePipelineHash = useCallback(() => {
window.location.hash = getSafeForExternalLink(
`#/logstash/pipelines/${pipelineId}/${pipelineHash}`
);
}, [pipelineId, pipelineHash]);
const onChangePipelineHash = useCallback(
(hash) => {
window.location.hash = getSafeForExternalLink(`#/logstash/pipelines/${pipelineId}/${hash}`);
},
[pipelineId]
);

useEffect(() => {
if (cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import React from 'react';
interface Props {
pipelineVersions: any[];
pipelineHash?: string;
onChangePipelineHash: () => void;
onChangePipelineHash: (hash: string) => void;
}

export const PipelineVersions = (props: Props) => {
Expand All @@ -23,23 +23,25 @@ export const PipelineVersions = (props: Props) => {
<EuiFlexItem grow={false}>
<EuiSelect
value={pipelineHash}
options={pipelineVersions.map((option) => {
return {
text: i18n.translate(
'xpack.monitoring.logstashNavigation.pipelineVersionDescription',
{
defaultMessage:
'Version active {relativeLastSeen} and first seen {relativeFirstSeen}',
values: {
relativeLastSeen: option.relativeLastSeen,
relativeFirstSeen: option.relativeFirstSeen,
},
}
),
value: option.hash,
};
})}
onChange={onChangePipelineHash}
options={pipelineVersions.map(
(option: { hash: string; relativeLastSeen: number; relativeFirstSeen: number }) => {
return {
text: i18n.translate(
'xpack.monitoring.logstashNavigation.pipelineVersionDescription',
{
defaultMessage:
'Version active {relativeLastSeen} and first seen {relativeFirstSeen}',
values: {
relativeLastSeen: option.relativeLastSeen,
relativeFirstSeen: option.relativeFirstSeen,
},
}
),
value: option.hash,
};
}
)}
onChange={({ target }) => onChangePipelineHash(target.value)}
/>
</EuiFlexItem>
</EuiFlexGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export function getLogstashForClusters(
ignore_unavailable: true,
body: {
query: createQuery({
types: ['stats', 'logstash_stats'],
types: ['node_stats', 'logstash_stats'],
start,
end,
clusterUuid,
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/monitoring/server/lib/logstash/get_nodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export async function getNodes(
end,
clusterUuid,
metric: LogstashMetric.getMetricFields(),
types: ['stats', 'logstash_stats'],
types: ['node_stats', 'logstash_stats'],
}),
collapse: {
field: 'logstash_stats.logstash.uuid',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async function getPaginatedThroughputData(
},
{
term: {
'metricset.name': 'stats',
'metricset.name': 'node_stats',
},
},
],
Expand Down Expand Up @@ -208,7 +208,10 @@ async function getPaginatedNodesData(
[
{
bool: {
should: [{ term: { type: 'logstash_stats' } }, { term: { 'metricset.name': 'stats' } }],
should: [
{ term: { type: 'logstash_stats' } },
{ term: { 'metricset.name': 'node_stats' } },
],
},
},
],
Expand Down Expand Up @@ -296,7 +299,7 @@ async function getThroughputPipelines(
},
{
term: {
'metricset.name': 'stats',
'metricset.name': 'node_stats',
},
},
],
Expand Down Expand Up @@ -330,7 +333,10 @@ async function getNodePipelines(
[
{
bool: {
should: [{ term: { type: 'logstash_stats' } }, { term: { 'metricset.name': 'stats' } }],
should: [
{ term: { type: 'logstash_stats' } },
{ term: { 'metricset.name': 'node_stats' } },
],
},
},
],
Expand Down
10 changes: 6 additions & 4 deletions x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export function _enrichStateWithStatsAggregation(
statsAggregation: any,
timeseriesIntervalInSeconds: number
) {
const logstashState = stateDocument.logstash_state;
const logstashState = stateDocument.logstash_state || stateDocument.logstash?.node?.state;
const vertices = logstashState?.pipeline?.representation?.graph?.vertices ?? [];

const verticesById: any = {};
Expand All @@ -90,7 +90,9 @@ export function _enrichStateWithStatsAggregation(
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;

const verticesWithStatsBuckets =
statsAggregation.aggregations?.pipelines.scoped.vertices.vertex_id.buckets ?? [];
statsAggregation.aggregations?.pipelines.scoped.vertices?.vertex_id.buckets ??
statsAggregation.aggregations?.pipelines_mb.scoped.vertices?.vertex_id.buckets ??
[];
verticesWithStatsBuckets.forEach((vertexStatsBucket: any) => {
// Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval
const vertexId = vertexStatsBucket.key;
Expand All @@ -107,7 +109,7 @@ export function _enrichStateWithStatsAggregation(
}
});

return stateDocument.logstash_state?.pipeline;
return logstashState?.pipeline;
}

export async function getPipeline(
Expand All @@ -121,7 +123,7 @@ export async function getPipeline(
checkParam(lsIndexPattern, 'lsIndexPattern in getPipeline');

// Determine metrics' timeseries interval based on version's timespan
const minIntervalSeconds = config.get('monitoring.ui.min_interval_seconds');
const minIntervalSeconds = Math.max(Number(config.get('monitoring.ui.min_interval_seconds')), 30);
const timeseriesInterval = calculateTimeseriesInterval(
Number(version.firstSeen),
Number(version.lastSeen),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export async function getPipelineStateDocument({
// This is important because a user may pick a very narrow time picker window. If we were to use a start/end value
// that could result in us being unable to render the graph
// Use the logstash_stats documents to determine whether the instance is up/down
type: 'logstash_state',
types: ['logstash_state', 'node'],
metric: LogstashMetric.getMetricFields(),
clusterUuid,
filters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ function scalarCounterAggregation(
return aggs;
}

function nestedVertices(maxBucketSize: string) {
const fieldPath = 'logstash_stats.pipelines.vertices';
const ephemeralIdField = 'logstash_stats.pipelines.vertices.pipeline_ephemeral_id';
function nestedVertices(statsPath: string, maxBucketSize: string) {
const fieldPath = `${statsPath}.pipelines.vertices`;
const ephemeralIdField = `${statsPath}.pipelines.vertices.pipeline_ephemeral_id`;

return {
nested: { path: 'logstash_stats.pipelines.vertices' },
nested: { path: `${statsPath}.pipelines.vertices` },
aggs: {
vertex_id: {
terms: {
field: 'logstash_stats.pipelines.vertices.id',
field: `${statsPath}.pipelines.vertices.id`,
size: maxBucketSize,
},
aggs: {
Expand All @@ -79,24 +79,33 @@ function nestedVertices(maxBucketSize: string) {
};
}

function createScopedAgg(pipelineId: string, pipelineHash: string, agg: { [key: string]: any }) {
return {
pipelines: {
nested: { path: 'logstash_stats.pipelines' },
function createScopedAgg(pipelineId: string, pipelineHash: string, maxBucketSize: string) {
return (statsPath: string) => {
const verticesAgg = {
vertices: nestedVertices(statsPath, maxBucketSize),
total_processor_duration_stats: {
stats: {
field: `${statsPath}.pipelines.events.duration_in_millis`,
},
},
};

return {
nested: { path: `${statsPath}.pipelines` },
aggs: {
scoped: {
filter: {
bool: {
filter: [
{ term: { 'logstash_stats.pipelines.id': pipelineId } },
{ term: { 'logstash_stats.pipelines.hash': pipelineHash } },
{ term: { [`${statsPath}.pipelines.id`]: pipelineId } },
{ term: { [`${statsPath}.pipelines.hash`]: pipelineHash } },
],
},
},
aggs: agg,
aggs: verticesAgg,
},
},
},
};
};
}

Expand All @@ -109,6 +118,7 @@ function fetchPipelineLatestStats(
callWithRequest: any,
req: LegacyRequest
) {
const pipelineAggregation = createScopedAgg(pipelineId, version.hash, maxBucketSize);
const params = {
index: logstashIndexPattern,
size: 0,
Expand All @@ -119,17 +129,18 @@ function fetchPipelineLatestStats(
'aggregations.pipelines.scoped.vertices.vertex_id.buckets.events_out_total',
'aggregations.pipelines.scoped.vertices.vertex_id.buckets.duration_in_millis_total',
'aggregations.pipelines.scoped.total_processor_duration_stats',
'aggregations.pipelines_mb.scoped.vertices.vertex_id.buckets.key',
'aggregations.pipelines_mb.scoped.vertices.vertex_id.buckets.events_in_total',
'aggregations.pipelines_mb.scoped.vertices.vertex_id.buckets.events_out_total',
'aggregations.pipelines_mb.scoped.vertices.vertex_id.buckets.duration_in_millis_total',
'aggregations.pipelines_mb.scoped.total_processor_duration_stats',
],
body: {
query,
aggs: createScopedAgg(pipelineId, version.hash, {
vertices: nestedVertices(maxBucketSize),
total_processor_duration_stats: {
stats: {
field: 'logstash_stats.pipelines.events.duration_in_millis',
},
},
}),
aggs: {
pipelines: pipelineAggregation('logstash_stats'),
pipelines_mb: pipelineAggregation('logstash.node.stats'),
},
},
};

Expand All @@ -154,16 +165,31 @@ export function getPipelineStatsAggregation({
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const filters = [
{
nested: {
path: 'logstash_stats.pipelines',
query: {
bool: {
must: [
{ term: { 'logstash_stats.pipelines.hash': version.hash } },
{ term: { 'logstash_stats.pipelines.id': pipelineId } },
],
bool: {
should: [
{
nested: {
path: 'logstash_stats.pipelines',
ignore_unmapped: true,
query: {
bool: {
filter: [{ term: { 'logstash_stats.pipelines.id': pipelineId } }],
},
},
},
},
},
{
nested: {
path: 'logstash.node.stats.pipelines',
ignore_unmapped: true,
query: {
bool: {
filter: [{ term: { 'logstash.node.stats.pipelines.id': pipelineId } }],
},
},
},
},
],
},
},
];
Expand All @@ -172,7 +198,7 @@ export function getPipelineStatsAggregation({
const end = version.lastSeen;

const query = createQuery({
types: ['stats', 'logstash_stats'],
types: ['node_stats', 'logstash_stats'],
start,
end,
metric: LogstashMetric.getMetricFields(),
Expand Down
Loading

0 comments on commit 72297e5

Please sign in to comment.