diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index e4e5e6e86aa53..705e77028a1ef 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -290,23 +290,30 @@ public void applyClusterState(final ClusterChangedEvent event) { } } - static List> getProcessorMetrics(CompoundProcessor compoundProcessor, - List> processorStats) { + /** + * Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as + * wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric. + * @param compoundProcessor The compound processor to start walking the non-failure processors + * @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples. + * @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor + */ + private static List> getProcessorMetrics(CompoundProcessor compoundProcessor, + List> processorMetrics) { //only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure for (Tuple processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) { Processor processor = processorWithMetric.v1(); IngestMetric metric = processorWithMetric.v2(); if (processor instanceof CompoundProcessor) { - getProcessorMetrics((CompoundProcessor) processor, processorStats); + getProcessorMetrics((CompoundProcessor) processor, processorMetrics); } else { //Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true. if (processor instanceof ConditionalProcessor) { metric = ((ConditionalProcessor) processor).getMetric(); } - processorStats.add(new Tuple<>(processor, metric)); + processorMetrics.add(new Tuple<>(processor, metric)); } } - return processorStats; + return processorMetrics; } private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { @@ -413,27 +420,24 @@ protected void doRun() { } public IngestStats stats() { - //This is a map of a pipelineId to a it's owns pipeline stats and then a List of all its processor stats. - //Each processor stat has a name (see getName) and the stats associated to that processor - Map>>> statsPerPipeline = new HashMap<>(pipelines.size()); + IngestStats.Builder statsBuilder = new IngestStats.Builder(); + statsBuilder.addTotalMetrics(totalMetrics); pipelines.forEach((id, pipeline) -> { CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); - List> processorStats = new ArrayList<>(); - statsPerPipeline.put(id, new Tuple<>(pipeline.getMetrics().createStats(), getProcessorStats(rootProcessor, processorStats))); + statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); + List> processorMetrics = new ArrayList<>(); + getProcessorMetrics(rootProcessor, processorMetrics); + processorMetrics.forEach(t -> { + Processor processor = t.v1(); + IngestMetric processorMetric = t.v2(); + statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric); + }); }); - - return new IngestStats(totalMetrics.createStats(), statsPerPipeline); - } - - public static List> getProcessorStats(CompoundProcessor rootProcessor, List> processorStats){ - List> processorMetrics = new ArrayList<>(); - getProcessorMetrics(rootProcessor, processorMetrics); - processorMetrics.forEach(t -> processorStats.add(new Tuple<>(getName(t.v1()), t.v2().createStats()))); - return processorStats; + return statsBuilder.build(); } //package private for testing - static String getName(Processor processor){ + static String getProcessorName(Processor processor){ // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name if(processor instanceof ConditionalProcessor){ processor = ((ConditionalProcessor) processor).getProcessor(); @@ -451,7 +455,6 @@ static String getName(Processor processor){ sb.append(":"); sb.append(tag); } - return sb.toString(); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index bd38ee70a6b25..e3d671bc8b2a0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -19,7 +19,6 @@ package org.elasticsearch.ingest; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -29,6 +28,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,17 +36,19 @@ public class IngestStats implements Writeable, ToXContentFragment { private final Stats totalStats; - private final Map>>> statsPerPipeline; + private final List pipelineStats; + private final Map> processorStats; /** - * @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats, - * and pipeline stats are logically the sum of the processor stats. - * @param statsPerPipeline - A Map(pipelineId -> Tuple(pipelineStats, List(perProcessorStats)) - * where perProcessorStats = Tuple(processorDisplayName, processorStats) + * @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats, + * and pipeline stats are logically the sum of the processor stats. + * @param pipelineStats - The stats for a given ingest pipeline. + * @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier. */ - public IngestStats(Stats totalStats, Map>>> statsPerPipeline) { + public IngestStats(Stats totalStats, List pipelineStats, Map> processorStats) { this.totalStats = totalStats; - this.statsPerPipeline = statsPerPipeline; + this.pipelineStats = pipelineStats; + this.processorStats = processorStats; } /** @@ -55,63 +57,43 @@ public IngestStats(Stats totalStats, Map(size); + this.pipelineStats = new ArrayList<>(size); + this.processorStats = new HashMap<>(size); for (int i = 0; i < size; i++) { - String pipelineName = in.readString(); - Stats pipelineStats = new Stats(in); + String pipelineId = in.readString(); + Stats pipelineStat = new Stats(in); + this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat)); int processorsSize = in.readVInt(); - List> processors = new ArrayList<>(processorsSize); + List processorStatsPerPipeline = new ArrayList<>(processorsSize); for (int j = 0; j < processorsSize; j++) { String processorName = in.readString(); - processors.add(new Tuple<>(processorName, new Stats(in))); + Stats processorStat = new Stats(in); + processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); } - statsPerPipeline.put(pipelineName, new Tuple<>(pipelineStats, processors)); + this.processorStats.put(pipelineId, processorStatsPerPipeline); } } @Override public void writeTo(StreamOutput out) throws IOException { totalStats.writeTo(out); - out.writeVInt(statsPerPipeline.size()); - for (Map.Entry>>> entry : statsPerPipeline.entrySet()) { - String pipelineName = entry.getKey(); - Stats pipelineStats = entry.getValue().v1(); - out.writeString(pipelineName); - pipelineStats.writeTo(out); - List> processorStats = entry.getValue().v2(); - out.writeVInt(processorStats.size()); - for (Tuple processorTuple : processorStats) { - String processorName = processorTuple.v1(); - Stats processorStat = processorTuple.v2(); - out.writeString(processorName); - processorStat.writeTo(out); + out.writeVInt(pipelineStats.size()); + for (PipelineStat pipelineStat : pipelineStats) { + out.writeString(pipelineStat.getPipelineId()); + pipelineStat.getStats().writeTo(out); + List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); + if(processorStatsForPipeline == null) { + out.writeVInt(0); + }else{ + out.writeVInt(processorStatsForPipeline.size()); + for (ProcessorStat processorStat : processorStatsForPipeline) { + out.writeString(processorStat.getName()); + processorStat.getStats().writeTo(out); + } } } } - /** - * @return The accumulated stats for all pipelines - */ - public Stats getTotalStats() { - return totalStats; - } - - /** - * @return The stats on a per pipeline basis. A Map(pipelineId -> Tuple(pipelineStats, List(perProcessorStats)) - * where perProcessorStats = Tuple(processorDisplayName, processorStats) - */ - public Map>>> getStatsPerPipeline() { - return statsPerPipeline; - } - - public IngestStats.Stats getStatsForPipeline(String id) { - return statsPerPipeline.get(id).v1(); - } - - public List> getProcessorStatsForPipeline(String id) { - return statsPerPipeline.get(id).v2(); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("ingest"); @@ -119,20 +101,20 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws totalStats.toXContent(builder, params); builder.endObject(); builder.startObject("pipelines"); - for (Map.Entry>>> entry : statsPerPipeline.entrySet()) { - builder.startObject(entry.getKey()); - Stats pipelineStats = entry.getValue().v1(); - pipelineStats.toXContent(builder, params); - List> perProcessorStats = entry.getValue().v2(); + for (PipelineStat pipelineStat : pipelineStats) { + builder.startObject(pipelineStat.getPipelineId()); + pipelineStat.getStats().toXContent(builder, params); + List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); builder.startArray("processors"); - for (Tuple processorStats : perProcessorStats) { - builder.startObject(); - builder.startObject(processorStats.v1()); - processorStats.v2().toXContent(builder, params); - builder.endObject(); - builder.endObject(); + if (processorStatsForPipeline != null) { + for (ProcessorStat processorStat : processorStatsForPipeline) { + builder.startObject(); + builder.startObject(processorStat.getName()); + processorStat.getStats().toXContent(builder, params); + builder.endObject(); + builder.endObject(); + } } - builder.endArray(); builder.endObject(); } @@ -141,6 +123,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public Stats getTotalStats() { + return totalStats; + } + + public List getPipelineStats() { + return pipelineStats; + } + + public Map> getProcessorStats() { + return processorStats; + } + public static class Stats implements Writeable, ToXContentFragment { private final long ingestCount; @@ -181,7 +175,6 @@ public long getIngestCount() { } /** - * * @return The total time spent of ingest preprocessing in millis. */ public long getIngestTimeInMillis() { @@ -211,4 +204,77 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } } + + /** + * Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects + */ + static class Builder { + private Stats totalStats; + private List pipelineStats = new ArrayList<>(); + private Map> processorStats = new HashMap<>(); + + + Builder addTotalMetrics(IngestMetric totalMetric) { + this.totalStats = totalMetric.createStats(); + return this; + } + + Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) { + this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats())); + return this; + } + + Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) { + this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) + .add(new ProcessorStat(processorName, metric.createStats())); + return this; + } + + IngestStats build() { + return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats), + Collections.unmodifiableMap(processorStats)); + } + } + + /** + * Container for pipeline stats. + */ + public static class PipelineStat { + private final String pipelineId; + private final Stats stats; + + public PipelineStat(String pipelineId, Stats stats) { + this.pipelineId = pipelineId; + this.stats = stats; + } + + public String getPipelineId() { + return pipelineId; + } + + public Stats getStats() { + return stats; + } + } + + /** + * Container for processor stats. + */ + public static class ProcessorStat { + private final String name; + private final Stats stats; + + public ProcessorStat(String name, Stats stats) { + this.name = name; + this.stats = stats; + } + + public String getName() { + return name; + } + + public Stats getStats() { + return stats; + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index a865fabc48bdc..8f51fb08dd23f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.admin.cluster.node.stats; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.discovery.DiscoveryStats; @@ -271,30 +270,29 @@ public void testSerialization() throws IOException { assertEquals(totalStats.getIngestCurrent(), deserializedIngestStats.getTotalStats().getIngestCurrent()); assertEquals(totalStats.getIngestFailedCount(), deserializedIngestStats.getTotalStats().getIngestFailedCount()); assertEquals(totalStats.getIngestTimeInMillis(), deserializedIngestStats.getTotalStats().getIngestTimeInMillis()); - assertEquals(ingestStats.getStatsPerPipeline().size(), deserializedIngestStats.getStatsPerPipeline().size()); - for (Map.Entry>>> entry : - ingestStats.getStatsPerPipeline().entrySet()) { - String pipelineName = entry.getKey(); - IngestStats.Stats pipelineStats = entry.getValue().v1(); - IngestStats.Stats deserializedPipelineStats = deserializedIngestStats.getStatsForPipeline(pipelineName); - assertEquals(pipelineStats.getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount()); - assertEquals(pipelineStats.getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis()); - assertEquals(pipelineStats.getIngestCurrent(), deserializedPipelineStats.getIngestCurrent()); - assertEquals(pipelineStats.getIngestCount(), deserializedPipelineStats.getIngestCount()); - List> processorStats = entry.getValue().v2(); - List> deserializedProcessorStats = - deserializedIngestStats.getProcessorStatsForPipeline(pipelineName); - Iterator> it = deserializedProcessorStats.iterator(); - for (Tuple processorTuple : processorStats) { - Tuple deserializedTuple = it.next(); - assertEquals(processorTuple.v1(), deserializedTuple.v1()); - IngestStats.Stats pStats = processorTuple.v2(); - IngestStats.Stats dStats = deserializedTuple.v2(); - assertEquals(pStats.getIngestFailedCount(), dStats.getIngestFailedCount()); - assertEquals(pStats.getIngestTimeInMillis(), dStats.getIngestTimeInMillis()); - assertEquals(pStats.getIngestCurrent(), dStats.getIngestCurrent()); - assertEquals(pStats.getIngestCount(), dStats.getIngestCount()); + assertEquals(ingestStats.getPipelineStats().size(), deserializedIngestStats.getPipelineStats().size()); + for (IngestStats.PipelineStat pipelineStat : ingestStats.getPipelineStats()) { + String pipelineId = pipelineStat.getPipelineId(); + IngestStats.Stats deserializedPipelineStats = + getPipelineStats(deserializedIngestStats.getPipelineStats(), pipelineId); + assertEquals(pipelineStat.getStats().getIngestFailedCount(), deserializedPipelineStats.getIngestFailedCount()); + assertEquals(pipelineStat.getStats().getIngestTimeInMillis(), deserializedPipelineStats.getIngestTimeInMillis()); + assertEquals(pipelineStat.getStats().getIngestCurrent(), deserializedPipelineStats.getIngestCurrent()); + assertEquals(pipelineStat.getStats().getIngestCount(), deserializedPipelineStats.getIngestCount()); + List processorStats = ingestStats.getProcessorStats().get(pipelineId); + //intentionally validating identical order + Iterator it = deserializedIngestStats.getProcessorStats().get(pipelineId).iterator(); + for (IngestStats.ProcessorStat processorStat : processorStats) { + IngestStats.ProcessorStat deserializedProcessorStat = it.next(); + assertEquals(processorStat.getStats().getIngestFailedCount(), + deserializedProcessorStat.getStats().getIngestFailedCount()); + assertEquals(processorStat.getStats().getIngestTimeInMillis(), + deserializedProcessorStat.getStats().getIngestTimeInMillis()); + assertEquals(processorStat.getStats().getIngestCurrent(), + deserializedProcessorStat.getStats().getIngestCurrent()); + assertEquals(processorStat.getStats().getIngestCount(), deserializedProcessorStat.getStats().getIngestCount()); } + assertFalse(it.hasNext()); } } AdaptiveSelectionStats adaptiveStats = nodeStats.getAdaptiveSelectionStats(); @@ -445,22 +443,24 @@ private static NodeStats createNodeStats() { if (frequently()) { IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); - int numPipelines = randomIntBetween(0, 10); - Map>>> statsPerPipeline = new HashMap<>(numPipelines); int numProcessors = randomIntBetween(0, 10); - List> processorsStats = new ArrayList<>(numProcessors); + List ingestPipelineStats = new ArrayList<>(numPipelines); + Map> ingestProcessorStats = new HashMap<>(numPipelines); for (int i = 0; i < numPipelines; i++) { - for (int j =0; j < numProcessors;j++){ - IngestStats.Stats processorStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong()); - processorsStats.add(new Tuple<>(randomAlphaOfLengthBetween(3, 10), processorStats)); + String pipelineId = randomAlphaOfLengthBetween(3, 10); + ingestPipelineStats.add(new IngestStats.PipelineStat(pipelineId, new IngestStats.Stats + (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()))); + + List processorPerPipeline = new ArrayList<>(numProcessors); + for (int j =0; j < numProcessors;j++) { + IngestStats.Stats processorStats = new IngestStats.Stats + (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats)); } - IngestStats.Stats pipelineStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong()); - statsPerPipeline.put(randomAlphaOfLengthBetween(3, 10), new Tuple<>(pipelineStats, processorsStats)); + ingestProcessorStats.put(pipelineId,processorPerPipeline); } - ingestStats = new IngestStats(totalStats, statsPerPipeline); + ingestStats = new IngestStats(totalStats, ingestPipelineStats, ingestProcessorStats); } AdaptiveSelectionStats adaptiveSelectionStats = null; if (frequently()) { @@ -489,4 +489,8 @@ private static NodeStats createNodeStats() { fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats, ingestStats, adaptiveSelectionStats); } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 44d966e07f8a1..3dde7babb0a96 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -762,7 +762,7 @@ public void testStats() throws Exception { IngestService ingestService = createWithProcessors(map); final IngestStats initialStats = ingestService.stats(); - assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); + assertThat(initialStats.getPipelineStats().size(), equalTo(0)); assertStats(initialStats.getTotalStats(), 0, 0, 0); PutPipelineRequest putRequest = new PutPipelineRequest("_id1", @@ -785,33 +785,30 @@ public void testStats() throws Exception { indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); - assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterFirstRequestStats.getProcessorStatsForPipeline("_id1").size(), equalTo(1)); - assertThat(afterFirstRequestStats.getProcessorStatsForPipeline("_id2").size(), equalTo(1)); - afterFirstRequestStats.getStatsPerPipeline().get("_id1").v2() - .forEach(statsTuple -> assertThat(statsTuple.v1(), equalTo("mock:mockTag"))); - afterFirstRequestStats.getStatsPerPipeline().get("_id2").v2() - .forEach(statsTuple -> assertThat(statsTuple.v1(), equalTo("mock:mockTag"))); + assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); + + afterFirstRequestStats.getProcessorStats().get("_id1").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); + afterFirstRequestStats.getProcessorStats().get("_id2").forEach(p -> assertEquals(p.getName(), "mock:mockTag")); + //total assertStats(afterFirstRequestStats.getTotalStats(), 1, 0 ,0); //pipeline - assertPipelineStats(afterFirstRequestStats, "_id1", 1, 0, 0); - assertPipelineStats(afterFirstRequestStats, "_id2", 0, 0, 0); + assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id1", 1, 0, 0); + assertPipelineStats(afterFirstRequestStats.getPipelineStats(), "_id2", 0, 0, 0); //processor assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0); assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); + indexRequest.setPipeline("_id2"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); - assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterFirstRequestStats.getProcessorStatsForPipeline("_id1").size(), equalTo(1)); - assertThat(afterFirstRequestStats.getProcessorStatsForPipeline("_id2").size(), equalTo(1)); + assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); //total assertStats(afterSecondRequestStats.getTotalStats(), 2, 0 ,0); //pipeline - assertPipelineStats(afterSecondRequestStats, "_id1", 1, 0, 0); - assertPipelineStats(afterSecondRequestStats, "_id2", 1, 0, 0); + assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id1", 1, 0, 0); + assertPipelineStats(afterSecondRequestStats.getPipelineStats(), "_id2", 1, 0, 0); //processor assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0); assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0); @@ -825,14 +822,12 @@ public void testStats() throws Exception { indexRequest.setPipeline("_id1"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterThirdRequestStats = ingestService.stats(); - assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterThirdRequestStats.getProcessorStatsForPipeline("_id1").size(), equalTo(2)); - assertThat(afterThirdRequestStats.getProcessorStatsForPipeline("_id2").size(), equalTo(1)); + assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); //total assertStats(afterThirdRequestStats.getTotalStats(), 3, 0 ,0); //pipeline - assertPipelineStats(afterThirdRequestStats, "_id1", 2, 0, 0); - assertPipelineStats(afterThirdRequestStats, "_id2", 1, 0, 0); + assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id1", 2, 0, 0); + assertPipelineStats(afterThirdRequestStats.getPipelineStats(), "_id2", 1, 0, 0); //The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is //due to the parallel array's used to identify which metrics to carry forward. With out unique ids or semantic equals for each //processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases, @@ -843,21 +838,20 @@ public void testStats() throws Exception { //test a failure, and that the processor stats are added from the old stats putRequest = new PutPipelineRequest("_id1", - new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"), XContentType.JSON); + new BytesArray("{\"processors\": [{\"failure-mock\" : { \"on_failure\": [{\"mock\" : {}}]}}, {\"mock\" : {}}]}"), + XContentType.JSON); previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterForthRequestStats = ingestService.stats(); - assertThat(afterForthRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterForthRequestStats.getProcessorStatsForPipeline("_id1").size(), equalTo(2)); - assertThat(afterForthRequestStats.getProcessorStatsForPipeline("_id2").size(), equalTo(1)); + assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); //total assertStats(afterForthRequestStats.getTotalStats(), 4, 0 ,0); //pipeline - assertPipelineStats(afterForthRequestStats, "_id1", 3, 0, 0); - assertPipelineStats(afterForthRequestStats, "_id2", 1, 0, 0); + assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id1", 3, 0, 0); + assertPipelineStats(afterForthRequestStats.getPipelineStats(), "_id2", 1, 0, 0); //processor assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); //not carried forward since type changed assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); //carried forward and added from old stats @@ -868,23 +862,23 @@ public void testStatName(){ Processor processor = mock(Processor.class); String name = randomAlphaOfLength(10); when(processor.getType()).thenReturn(name); - assertThat(IngestService.getName(processor), equalTo(name)); + assertThat(IngestService.getProcessorName(processor), equalTo(name)); String tag = randomAlphaOfLength(10); when(processor.getTag()).thenReturn(tag); - assertThat(IngestService.getName(processor), equalTo(name + ":" + tag)); + assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag)); ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class); when(conditionalProcessor.getProcessor()).thenReturn(processor); - assertThat(IngestService.getName(conditionalProcessor), equalTo(name + ":" + tag)); + assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag)); PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class); String pipelineName = randomAlphaOfLength(10); when(pipelineProcessor.getPipelineName()).thenReturn(pipelineName); name = PipelineProcessor.TYPE; when(pipelineProcessor.getType()).thenReturn(name); - assertThat(IngestService.getName(pipelineProcessor), equalTo(name + ":" + pipelineName)); + assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName)); when(pipelineProcessor.getTag()).thenReturn(tag); - assertThat(IngestService.getName(pipelineProcessor), equalTo(name + ":" + pipelineName + ":" + tag)); + assertThat(IngestService.getProcessorName(pipelineProcessor), equalTo(name + ":" + pipelineName + ":" + tag)); } @@ -1018,11 +1012,11 @@ public boolean matches(Object o) { } private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) { - assertStats(stats.getProcessorStatsForPipeline(pipelineId).get(processor).v2(), count, failed, time); + assertStats(stats.getProcessorStats().get(pipelineId).get(processor).getStats(), count, failed, time); } - private void assertPipelineStats(IngestStats stats, String pipelineId, long count, long failed, long time) { - assertStats(stats.getStatsForPipeline(pipelineId), count, failed, time); + private void assertPipelineStats(List pipelineStats, String pipelineId, long count, long failed, long time) { + assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time); } private void assertStats(IngestStats.Stats stats, long count, long failed, long time) { @@ -1031,4 +1025,8 @@ private void assertStats(IngestStats.Stats stats, long count, long failed, long assertThat(stats.getIngestFailedCount(), equalTo(failed)); assertThat(stats.getIngestTimeInMillis(), greaterThanOrEqualTo(time)); } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index 0b5906d0f3c89..3d39faf9a7447 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -19,69 +19,67 @@ package org.elasticsearch.ingest; -import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.test.ESTestCase; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; - -import static org.hamcrest.CoreMatchers.equalTo; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class IngestStatsTests extends ESTestCase { public void testSerialization() throws IOException { + //total IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); - IngestStats.Stats pipeline1Stats = new IngestStats.Stats(3, 3, 3, 3); - IngestStats.Stats processor1Stats = new IngestStats.Stats(1, 1, 1, 1); - IngestStats.Stats processor2Stats = new IngestStats.Stats(2, 2, 2, 2); - IngestStats.Stats pipeline2Stats = new IngestStats.Stats(47, 97, 197, 297); - IngestStats.Stats processor3Stats = new IngestStats.Stats(47, 97, 197, 297); - - Map>>> pipelinesStats = new HashMap<>(2); - List> processorStats = new ArrayList<>(2); - //pipeline1 -> processor1,processor2 - processorStats.add(new Tuple<>("processor1", processor1Stats)); - processorStats.add(new Tuple<>("processor2", processor2Stats)); - pipelinesStats.put("pipeline1", new Tuple<>(pipeline1Stats, processorStats)); - //pipeline2 -> processor3 - processorStats.clear(); - processorStats.add(new Tuple<>("processor3", processor3Stats)); - pipelinesStats.put("pipeline2", new Tuple<>(pipeline2Stats, processorStats)); + //pipeline + IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3)); + IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297)); + IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0)); + List pipelineStats = + Stream.of(pipeline1Stats, pipeline2Stats, pipeline3Stats).collect(Collectors.toList()); + //processor + IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); + IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2)); + IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297)); + //pipeline1 -> processor1,processor2; pipeline2 -> processor3 + Map> processorStats = MapBuilder.>newMapBuilder() + .put(pipeline1Stats.getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) + .put(pipeline2Stats.getPipelineId(), Collections.singletonList(processor3Stat)) + .map(); - IngestStats ingestStats = new IngestStats(totalStats, pipelinesStats); - IngestStats serialize = serialize(ingestStats); - assertNotSame(serialize, ingestStats); - assertNotSame(serialize.getTotalStats(), totalStats); - assertEquals(totalStats.getIngestCount(), serialize.getTotalStats().getIngestCount()); - assertEquals(totalStats.getIngestFailedCount(), serialize.getTotalStats().getIngestFailedCount()); - assertEquals(totalStats.getIngestTimeInMillis(), serialize.getTotalStats().getIngestTimeInMillis()); - assertEquals(totalStats.getIngestCurrent(), serialize.getTotalStats().getIngestCurrent()); + IngestStats ingestStats = new IngestStats(totalStats,pipelineStats, processorStats); - assertEquals(ingestStats.getStatsPerPipeline().size(), 2); - assertTrue(ingestStats.getStatsPerPipeline().containsKey("pipeline1")); - assertTrue(ingestStats.getStatsPerPipeline().containsKey("pipeline2")); - assertStats(ingestStats.getStatsForPipeline("pipeline1"), serialize.getStatsForPipeline("pipeline1")); - assertStats(ingestStats.getStatsForPipeline("pipeline2"), serialize.getStatsForPipeline("pipeline2")); + IngestStats serializedStats = serialize(ingestStats); + assertNotSame(ingestStats, serializedStats); + assertNotSame(totalStats, serializedStats.getTotalStats()); + assertNotSame(pipelineStats, serializedStats.getPipelineStats()); + assertNotSame(processorStats, serializedStats.getProcessorStats()); - Iterator> it = serialize.getProcessorStatsForPipeline("pipeline1").iterator(); - for(Tuple objectTuple : ingestStats.getProcessorStatsForPipeline("pipeline1")){ - Tuple streamTuple = it.next(); - assertThat(objectTuple.v1(), equalTo(streamTuple.v1())); - assertStats(objectTuple.v2(), streamTuple.v2()); - } + assertStats(totalStats, serializedStats.getTotalStats()); + assertEquals(serializedStats.getPipelineStats().size(), 3); - it = serialize.getProcessorStatsForPipeline("pipeline2").iterator(); - for(Tuple objectTuple : ingestStats.getProcessorStatsForPipeline("pipeline2")){ - Tuple streamTuple = it.next(); - assertThat(objectTuple.v1(), equalTo(streamTuple.v1())); - assertStats(objectTuple.v2(), streamTuple.v2()); + for (IngestStats.PipelineStat serializedPipelineStat : serializedStats.getPipelineStats()) { + assertStats(getPipelineStats(pipelineStats, serializedPipelineStat.getPipelineId()), serializedPipelineStat.getStats()); + List serializedProcessorStats = + serializedStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); + List processorStat = ingestStats.getProcessorStats().get(serializedPipelineStat.getPipelineId()); + if(processorStat != null) { + Iterator it = processorStat.iterator(); + //intentionally enforcing the identical ordering + for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) { + IngestStats.ProcessorStat ps = it.next(); + assertEquals(ps.getName(), serializedProcessorStat.getName()); + assertStats(ps.getStats(), serializedProcessorStat.getStats()); + } + assertFalse(it.hasNext()); + } } } @@ -98,4 +96,8 @@ private IngestStats serialize(IngestStats stats) throws IOException { StreamInput in = out.bytes().streamInput(); return new IngestStats(in); } + + private IngestStats.Stats getPipelineStats(List pipelineStats, String id) { + return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); + } }