From 523cb23e02f1db3470fcbd725645e99fd21f2589 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 29 Oct 2019 11:51:16 -0700 Subject: [PATCH] Add ingest info to Cluster Stats (#48485) This commit enhances the ClusterStatsNodes response to include global processor usage stats on a per-processor basis. example output: ``` ... "processor_stats": { "gsub": { "count": 0, "failed": 0 "current": 0 "time_in_millis": 0 }, "script": { "count": 0, "failed": 0 "current": 0, "time_in_millis": 0 } } ... ``` The purpose for this enhancement is to make it easier to collect stats on how specific processors are being used across the cluster beyond the current per-node usage statistics that currently exist in node stats. Closes #46146. --- docs/reference/cluster/stats.asciidoc | 7 ++ .../cluster/stats/ClusterStatsNodes.java | 72 +++++++++++++++++++ .../stats/TransportClusterStatsAction.java | 2 +- .../elasticsearch/ingest/IngestService.java | 2 +- .../org/elasticsearch/ingest/IngestStats.java | 25 +++++-- .../cluster/node/stats/NodeStatsTests.java | 5 +- .../cluster/stats/ClusterStatsNodesTests.java | 43 +++++++++++ .../ingest/IngestStatsTests.java | 7 +- .../ClusterStatsMonitoringDocTests.java | 6 +- 9 files changed, 157 insertions(+), 12 deletions(-) diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index dbb4ea25d365f..60cee385eabbc 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -227,6 +227,12 @@ The API returns the following response: }, ... ], + "ingest": { + "number_of_pipelines" : 1, + "processor_stats": { + ... + } + }, "network_types": { ... }, @@ -244,6 +250,7 @@ The API returns the following response: // TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/] // TESTRESPONSE[s/"network_types": \{[^\}]*\}/"network_types": $body.$_path/] // TESTRESPONSE[s/"discovery_types": \{[^\}]*\}/"discovery_types": $body.$_path/] +// TESTRESPONSE[s/"processor_stats": \{[^\}]*\}/"processor_stats": $body.$_path/] // TESTRESPONSE[s/"count": \{[^\}]*\}/"count": $body.$_path/] // TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/] // TESTRESPONSE[s/: true|false/: $body.$_path/] diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index bcadbc8e3292a..485f215e594b3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.plugins.PluginInfo; @@ -49,7 +50,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ClusterStatsNodes implements ToXContentFragment { @@ -64,6 +67,7 @@ public class ClusterStatsNodes implements ToXContentFragment { private final NetworkTypes networkTypes; private final DiscoveryTypes discoveryTypes; private final PackagingTypes packagingTypes; + private final IngestStats ingestStats; ClusterStatsNodes(List nodeResponses) { this.versions = new HashSet<>(); @@ -97,6 +101,7 @@ public class ClusterStatsNodes implements ToXContentFragment { this.networkTypes = new NetworkTypes(nodeInfos); this.discoveryTypes = new DiscoveryTypes(nodeInfos); this.packagingTypes = new PackagingTypes(nodeInfos); + this.ingestStats = new IngestStats(nodeStats); } public Counts getCounts() { @@ -178,6 +183,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws discoveryTypes.toXContent(builder, params); packagingTypes.toXContent(builder, params); + + ingestStats.toXContent(builder, params); + return builder; } @@ -690,4 +698,68 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa } + static class IngestStats implements ToXContentFragment { + + final int pipelineCount; + final SortedMap stats; + + IngestStats(final List nodeStats) { + Set pipelineIds = new HashSet<>(); + SortedMap stats = new TreeMap<>(); + for (NodeStats nodeStat : nodeStats) { + if (nodeStat.getIngestStats() != null) { + for (Map.Entry> processorStats : nodeStat.getIngestStats() + .getProcessorStats().entrySet()) { + pipelineIds.add(processorStats.getKey()); + for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) { + stats.compute(stat.getType(), (k, v) -> { + org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats(); + if (v == null) { + return new long[] { + nodeIngestStats.getIngestCount(), + nodeIngestStats.getIngestFailedCount(), + nodeIngestStats.getIngestCurrent(), + nodeIngestStats.getIngestTimeInMillis() + }; + } else { + v[0] += nodeIngestStats.getIngestCount(); + v[1] += nodeIngestStats.getIngestFailedCount(); + v[2] += nodeIngestStats.getIngestCurrent(); + v[3] += nodeIngestStats.getIngestTimeInMillis(); + return v; + } + }); + } + } + } + } + this.pipelineCount = pipelineIds.size(); + this.stats = Collections.unmodifiableSortedMap(stats); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject("ingest"); + { + builder.field("number_of_pipelines", pipelineCount); + builder.startObject("processor_stats"); + for (Map.Entry stat : stats.entrySet()) { + long[] statValues = stat.getValue(); + builder.startObject(stat.getKey()); + builder.field("count", statValues[0]); + builder.field("failed", statValues[1]); + builder.field("current", statValues[2]); + builder.humanReadableField("time_in_millis", "time", + new TimeValue(statValues[3], TimeUnit.MILLISECONDS)); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + return builder; + } + + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 5ecf569397d4f..4f23874a54fbd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -95,7 +95,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) { NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false); NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, - true, true, true, false, true, false, false, false, false, false, false, false); + true, true, true, false, true, false, false, false, false, false, true, false); List shardsStats = new ArrayList<>(); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index dc5285d0c48c1..b17b530aca9f0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -413,7 +413,7 @@ public IngestStats stats() { processorMetrics.forEach(t -> { Processor processor = t.v1(); IngestMetric processorMetric = t.v2(); - statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric); + statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric); }); }); return statsBuilder.build(); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index f140c5f155563..b7696ec61268f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -67,8 +68,12 @@ public IngestStats(StreamInput in) throws IOException { List processorStatsPerPipeline = new ArrayList<>(processorsSize); for (int j = 0; j < processorsSize; j++) { String processorName = in.readString(); + String processorType = null; + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + processorType = in.readString(); + } Stats processorStat = new Stats(in); - processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); + processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat)); } this.processorStats.put(pipelineId, processorStatsPerPipeline); } @@ -88,6 +93,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(processorStatsForPipeline.size()); for (ProcessorStat processorStat : processorStatsForPipeline) { out.writeString(processorStat.getName()); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeString(processorStat.getType()); + } processorStat.getStats().writeTo(out); } } @@ -110,9 +118,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws for (ProcessorStat processorStat : processorStatsForPipeline) { builder.startObject(); builder.startObject(processorStat.getName()); + builder.field("type", processorStat.getType()); + builder.startObject("stats"); processorStat.getStats().toXContent(builder, params); builder.endObject(); builder.endObject(); + builder.endObject(); } } builder.endArray(); @@ -224,9 +235,9 @@ Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) { return this; } - Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) { + Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) { this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) - .add(new ProcessorStat(processorName, metric.createStats())); + .add(new ProcessorStat(processorName, processorType, metric.createStats())); return this; } @@ -262,10 +273,12 @@ public Stats getStats() { */ public static class ProcessorStat { private final String name; + private final String type; private final Stats stats; - public ProcessorStat(String name, Stats stats) { + public ProcessorStat(String name, String type, Stats stats) { this.name = name; + this.type = type; this.stats = stats; } @@ -273,6 +286,10 @@ public String getName() { return name; } + public String getType() { + return type; + } + public Stats getStats() { return stats; } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 57395859c503f..fdd71ba98b495 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -315,7 +315,7 @@ public void testSerialization() throws IOException { } } - private static NodeStats createNodeStats() { + public static NodeStats createNodeStats() { DiscoveryNode node = new DiscoveryNode("test_node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), VersionUtils.randomVersion(random())); OsStats osStats = null; @@ -456,7 +456,8 @@ private static NodeStats createNodeStats() { for (int j =0; j < numProcessors;j++) { IngestStats.Stats processorStats = new IngestStats.Stats (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); - processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats)); + processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), + randomAlphaOfLengthBetween(3, 10), processorStats)); } ingestProcessorStats.put(pipelineId,processorPerPipeline); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 04edc775a2d53..c6ba462eb9e33 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.stats; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -27,11 +28,18 @@ import org.elasticsearch.test.ESTestCase; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests.createNodeStats; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; +import static org.hamcrest.Matchers.equalTo; public class ClusterStatsNodesTests extends ESTestCase { @@ -59,6 +67,41 @@ public void testNetworkTypesToXContent() throws Exception { + "}", toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString()); } + public void testIngestStats() throws Exception { + NodeStats nodeStats = createNodeStats(); + + SortedMap processorStats = new TreeMap<>(); + nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorStats.put(s.getType(), + new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount(), + s.getStats().getIngestCurrent(), s.getStats().getIngestTimeInMillis()}))); + ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats)); + assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size())); + String processorStatsString = "{"; + Iterator> iter = processorStats.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + long[] statValues = entry.getValue(); + long count = statValues[0]; + long failedCount = statValues[1]; + long current = statValues[2]; + long timeInMillis = statValues[3]; + processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count + + ",\"failed\":" + failedCount + + ",\"current\":" + current + + ",\"time_in_millis\":" + timeInMillis + + "}"; + if (iter.hasNext()) { + processorStatsString += ","; + } + } + processorStatsString += "}"; + assertThat(toXContent(stats, XContentType.JSON, false).utf8ToString(), equalTo( + "{\"ingest\":{" + + "\"number_of_pipelines\":" + stats.pipelineCount + "," + + "\"processor_stats\":" + processorStatsString + + "}}")); + } + private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) { Settings.Builder settings = Settings.builder(); if (transportType != null) { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index bb132f73840f3..d50df691ac948 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -52,9 +52,10 @@ private List createPipelineStats() { private Map> createProcessorStats(List pipelineStats){ assert(pipelineStats.size() >= 2); - IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); - IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2)); - IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297)); + IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1)); + IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2)); + IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", "type", + new IngestStats.Stats(47, 97, 197, 297)); //pipeline1 -> processor1,processor2; pipeline2 -> processor3 return MapBuilder.>newMapBuilder() .put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java index a4e76d60f041c..f7da3118c5e29 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java @@ -536,7 +536,11 @@ public void testToXContent() throws IOException { + "\"type\":\"docker\"," + "\"count\":1" + "}" - + "]" + + "]," + + "\"ingest\":{" + + "\"number_of_pipelines\":0," + + "\"processor_stats\":{}" + + "}" + "}" + "}," + "\"cluster_state\":{"