Skip to content

Commit

Permalink
Add ingest info to Cluster Stats (#48485)
Browse files Browse the repository at this point in the history
This commit enhances the ClusterStatsNodes response to include global 
processor usage stats on a per-processor basis.

example output:

```
...
    "processor_stats": {
      "gsub": {
        "count": 0,
        "failed": 0
        "current": 0
        "time_in_millis": 0
      },
      "script": {
        "count": 0,
        "failed": 0
        "current": 0,
        "time_in_millis": 0
      }
    }
...
```

The purpose for this enhancement is to make it easier to collect stats on how specific processors are being used across the cluster beyond the current per-node usage statistics that currently exist in node stats.

Closes #46146.
  • Loading branch information
talevy authored Oct 29, 2019
1 parent 4b89171 commit 523cb23
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 12 deletions.
7 changes: 7 additions & 0 deletions docs/reference/cluster/stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ The API returns the following response:
},
...
],
"ingest": {
"number_of_pipelines" : 1,
"processor_stats": {
...
}
},
"network_types": {
...
},
Expand All @@ -244,6 +250,7 @@ The API returns the following response:
// TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/]
// TESTRESPONSE[s/"network_types": \{[^\}]*\}/"network_types": $body.$_path/]
// TESTRESPONSE[s/"discovery_types": \{[^\}]*\}/"discovery_types": $body.$_path/]
// TESTRESPONSE[s/"processor_stats": \{[^\}]*\}/"processor_stats": $body.$_path/]
// TESTRESPONSE[s/"count": \{[^\}]*\}/"count": $body.$_path/]
// TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/]
// TESTRESPONSE[s/: true|false/: $body.$_path/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.plugins.PluginInfo;
Expand All @@ -49,7 +50,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ClusterStatsNodes implements ToXContentFragment {
Expand All @@ -64,6 +67,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
private final NetworkTypes networkTypes;
private final DiscoveryTypes discoveryTypes;
private final PackagingTypes packagingTypes;
private final IngestStats ingestStats;

ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
Expand Down Expand Up @@ -97,6 +101,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
this.networkTypes = new NetworkTypes(nodeInfos);
this.discoveryTypes = new DiscoveryTypes(nodeInfos);
this.packagingTypes = new PackagingTypes(nodeInfos);
this.ingestStats = new IngestStats(nodeStats);
}

public Counts getCounts() {
Expand Down Expand Up @@ -178,6 +183,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
discoveryTypes.toXContent(builder, params);

packagingTypes.toXContent(builder, params);

ingestStats.toXContent(builder, params);

return builder;
}

Expand Down Expand Up @@ -690,4 +698,68 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa

}

static class IngestStats implements ToXContentFragment {

final int pipelineCount;
final SortedMap<String, long[]> stats;

IngestStats(final List<NodeStats> nodeStats) {
Set<String> pipelineIds = new HashSet<>();
SortedMap<String, long[]> stats = new TreeMap<>();
for (NodeStats nodeStat : nodeStats) {
if (nodeStat.getIngestStats() != null) {
for (Map.Entry<String,
List<org.elasticsearch.ingest.IngestStats.ProcessorStat>> processorStats : nodeStat.getIngestStats()
.getProcessorStats().entrySet()) {
pipelineIds.add(processorStats.getKey());
for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.getType(), (k, v) -> {
org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats();
if (v == null) {
return new long[] {
nodeIngestStats.getIngestCount(),
nodeIngestStats.getIngestFailedCount(),
nodeIngestStats.getIngestCurrent(),
nodeIngestStats.getIngestTimeInMillis()
};
} else {
v[0] += nodeIngestStats.getIngestCount();
v[1] += nodeIngestStats.getIngestFailedCount();
v[2] += nodeIngestStats.getIngestCurrent();
v[3] += nodeIngestStats.getIngestTimeInMillis();
return v;
}
});
}
}
}
}
this.pipelineCount = pipelineIds.size();
this.stats = Collections.unmodifiableSortedMap(stats);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject("ingest");
{
builder.field("number_of_pipelines", pipelineCount);
builder.startObject("processor_stats");
for (Map.Entry<String, long[]> stat : stats.entrySet()) {
long[] statValues = stat.getValue();
builder.startObject(stat.getKey());
builder.field("count", statValues[0]);
builder.field("failed", statValues[1]);
builder.field("current", statValues[2]);
builder.humanReadableField("time_in_millis", "time",
new TimeValue(statValues[3], TimeUnit.MILLISECONDS));
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, false, false);
true, true, true, false, true, false, false, false, false, false, true, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public IngestStats stats() {
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric);
});
});
return statsBuilder.build();
Expand Down
25 changes: 21 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -67,8 +68,12 @@ public IngestStats(StreamInput in) throws IOException {
List<ProcessorStat> processorStatsPerPipeline = new ArrayList<>(processorsSize);
for (int j = 0; j < processorsSize; j++) {
String processorName = in.readString();
String processorType = null;
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
processorType = in.readString();
}
Stats processorStat = new Stats(in);
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat));
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
}
this.processorStats.put(pipelineId, processorStatsPerPipeline);
}
Expand All @@ -88,6 +93,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(processorStatsForPipeline.size());
for (ProcessorStat processorStat : processorStatsForPipeline) {
out.writeString(processorStat.getName());
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeString(processorStat.getType());
}
processorStat.getStats().writeTo(out);
}
}
Expand All @@ -110,9 +118,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
builder.field("type", processorStat.getType());
builder.startObject("stats");
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
builder.endObject();
}
}
builder.endArray();
Expand Down Expand Up @@ -224,9 +235,9 @@ Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) {
return this;
}

Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) {
Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) {
this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStat(processorName, metric.createStats()));
.add(new ProcessorStat(processorName, processorType, metric.createStats()));
return this;
}

Expand Down Expand Up @@ -262,17 +273,23 @@ public Stats getStats() {
*/
public static class ProcessorStat {
private final String name;
private final String type;
private final Stats stats;

public ProcessorStat(String name, Stats stats) {
public ProcessorStat(String name, String type, Stats stats) {
this.name = name;
this.type = type;
this.stats = stats;
}

public String getName() {
return name;
}

public String getType() {
return type;
}

public Stats getStats() {
return stats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void testSerialization() throws IOException {
}
}

private static NodeStats createNodeStats() {
public static NodeStats createNodeStats() {
DiscoveryNode node = new DiscoveryNode("test_node", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), VersionUtils.randomVersion(random()));
OsStats osStats = null;
Expand Down Expand Up @@ -456,7 +456,8 @@ private static NodeStats createNodeStats() {
for (int j =0; j < numProcessors;j++) {
IngestStats.Stats processorStats = new IngestStats.Stats
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats));
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10),
randomAlphaOfLengthBetween(3, 10), processorStats));
}
ingestProcessorStats.put(pipelineId,processorPerPipeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@
package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests.createNodeStats;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.hamcrest.Matchers.equalTo;

public class ClusterStatsNodesTests extends ESTestCase {

Expand Down Expand Up @@ -59,6 +67,41 @@ public void testNetworkTypesToXContent() throws Exception {
+ "}", toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString());
}

public void testIngestStats() throws Exception {
NodeStats nodeStats = createNodeStats();

SortedMap<String, long[]> processorStats = new TreeMap<>();
nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorStats.put(s.getType(),
new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount(),
s.getStats().getIngestCurrent(), s.getStats().getIngestTimeInMillis()})));
ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats));
assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size()));
String processorStatsString = "{";
Iterator<Map.Entry<String, long[]>> iter = processorStats.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, long[]> entry = iter.next();
long[] statValues = entry.getValue();
long count = statValues[0];
long failedCount = statValues[1];
long current = statValues[2];
long timeInMillis = statValues[3];
processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count
+ ",\"failed\":" + failedCount
+ ",\"current\":" + current
+ ",\"time_in_millis\":" + timeInMillis
+ "}";
if (iter.hasNext()) {
processorStatsString += ",";
}
}
processorStatsString += "}";
assertThat(toXContent(stats, XContentType.JSON, false).utf8ToString(), equalTo(
"{\"ingest\":{"
+ "\"number_of_pipelines\":" + stats.pipelineCount + ","
+ "\"processor_stats\":" + processorStatsString
+ "}}"));
}

private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) {
Settings.Builder settings = Settings.builder();
if (transportType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ private List<IngestStats.PipelineStat> createPipelineStats() {

private Map<String, List<IngestStats.ProcessorStat>> createProcessorStats(List<IngestStats.PipelineStat> pipelineStats){
assert(pipelineStats.size() >= 2);
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297));
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", "type",
new IngestStats.Stats(47, 97, 197, 297));
//pipeline1 -> processor1,processor2; pipeline2 -> processor3
return MapBuilder.<String, List<IngestStats.ProcessorStat>>newMapBuilder()
.put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,11 @@ public void testToXContent() throws IOException {
+ "\"type\":\"docker\","
+ "\"count\":1"
+ "}"
+ "]"
+ "],"
+ "\"ingest\":{"
+ "\"number_of_pipelines\":0,"
+ "\"processor_stats\":{}"
+ "}"
+ "}"
+ "},"
+ "\"cluster_state\":{"
Expand Down

0 comments on commit 523cb23

Please sign in to comment.