Skip to content

Commit

Permalink
simplify the IngestStats pipeline->per-processor object model
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis committed Oct 15, 2018
1 parent 2714338 commit 8067758
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 198 deletions.
45 changes: 24 additions & 21 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,23 +290,30 @@ public void applyClusterState(final ClusterChangedEvent event) {
}
}

static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(CompoundProcessor compoundProcessor,
List<Tuple<Processor, IngestMetric>> processorStats) {
/**
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
* @param compoundProcessor The compound processor to start walking the non-failure processors
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
*/
private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(CompoundProcessor compoundProcessor,
List<Tuple<Processor, IngestMetric>> processorMetrics) {
//only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure
for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();
if (processor instanceof CompoundProcessor) {
getProcessorMetrics((CompoundProcessor) processor, processorStats);
getProcessorMetrics((CompoundProcessor) processor, processorMetrics);
} else {
//Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
if (processor instanceof ConditionalProcessor) {
metric = ((ConditionalProcessor) processor).getMetric();
}
processorStats.add(new Tuple<>(processor, metric));
processorMetrics.add(new Tuple<>(processor, metric));
}
}
return processorStats;
return processorMetrics;
}

private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
Expand Down Expand Up @@ -413,27 +420,24 @@ protected void doRun() {
}

public IngestStats stats() {
//This is a map of a pipelineId to a it's owns pipeline stats and then a List of all its processor stats.
//Each processor stat has a name (see getName) and the stats associated to that processor
Map<String, Tuple<IngestStats.Stats, List<Tuple<String, IngestStats.Stats>>>> statsPerPipeline = new HashMap<>(pipelines.size());
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);
pipelines.forEach((id, pipeline) -> {
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
List<Tuple<String, IngestStats.Stats>> processorStats = new ArrayList<>();
statsPerPipeline.put(id, new Tuple<>(pipeline.getMetrics().createStats(), getProcessorStats(rootProcessor, processorStats)));
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
getProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
});
});

return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
}

public static List<Tuple<String, IngestStats.Stats>> getProcessorStats(CompoundProcessor rootProcessor, List<Tuple<String, IngestStats.Stats>> processorStats){
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
getProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> processorStats.add(new Tuple<>(getName(t.v1()), t.v2().createStats())));
return processorStats;
return statsBuilder.build();
}

//package private for testing
static String getName(Processor processor){
static String getProcessorName(Processor processor){
// conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
if(processor instanceof ConditionalProcessor){
processor = ((ConditionalProcessor) processor).getProcessor();
Expand All @@ -451,7 +455,6 @@ static String getName(Processor processor){
sb.append(":");
sb.append(tag);
}

return sb.toString();
}

Expand Down
192 changes: 129 additions & 63 deletions server/src/main/java/org/elasticsearch/ingest/IngestStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.ingest;

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -29,24 +28,27 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class IngestStats implements Writeable, ToXContentFragment {
private final Stats totalStats;
private final Map<String, Tuple<IngestStats.Stats, List<Tuple<String, IngestStats.Stats>>>> statsPerPipeline;
private final List<PipelineStat> pipelineStats;
private final Map<String, List<ProcessorStat>> processorStats;

/**
* @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats,
* and pipeline stats are logically the sum of the processor stats.
* @param statsPerPipeline - A Map(pipelineId -> Tuple(pipelineStats, List(perProcessorStats))
* where perProcessorStats = Tuple(processorDisplayName, processorStats)
* @param totalStats - The total stats for Ingest. This is the logically the sum of all pipeline stats,
* and pipeline stats are logically the sum of the processor stats.
* @param pipelineStats - The stats for a given ingest pipeline.
* @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier.
*/
public IngestStats(Stats totalStats, Map<String, Tuple<IngestStats.Stats, List<Tuple<String, IngestStats.Stats>>>> statsPerPipeline) {
public IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Map<String, List<ProcessorStat>> processorStats) {
this.totalStats = totalStats;
this.statsPerPipeline = statsPerPipeline;
this.pipelineStats = pipelineStats;
this.processorStats = processorStats;
}

/**
Expand All @@ -55,84 +57,64 @@ public IngestStats(Stats totalStats, Map<String, Tuple<IngestStats.Stats, List<T
public IngestStats(StreamInput in) throws IOException {
this.totalStats = new Stats(in);
int size = in.readVInt();
this.statsPerPipeline = new HashMap<>(size);
this.pipelineStats = new ArrayList<>(size);
this.processorStats = new HashMap<>(size);
for (int i = 0; i < size; i++) {
String pipelineName = in.readString();
Stats pipelineStats = new Stats(in);
String pipelineId = in.readString();
Stats pipelineStat = new Stats(in);
this.pipelineStats.add(new PipelineStat(pipelineId, pipelineStat));
int processorsSize = in.readVInt();
List<Tuple<String, IngestStats.Stats>> processors = new ArrayList<>(processorsSize);
List<ProcessorStat> processorStatsPerPipeline = new ArrayList<>(processorsSize);
for (int j = 0; j < processorsSize; j++) {
String processorName = in.readString();
processors.add(new Tuple<>(processorName, new Stats(in)));
Stats processorStat = new Stats(in);
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat));
}
statsPerPipeline.put(pipelineName, new Tuple<>(pipelineStats, processors));
this.processorStats.put(pipelineId, processorStatsPerPipeline);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
totalStats.writeTo(out);
out.writeVInt(statsPerPipeline.size());
for (Map.Entry<String, Tuple<Stats, List<Tuple<String, Stats>>>> entry : statsPerPipeline.entrySet()) {
String pipelineName = entry.getKey();
Stats pipelineStats = entry.getValue().v1();
out.writeString(pipelineName);
pipelineStats.writeTo(out);
List<Tuple<String, Stats>> processorStats = entry.getValue().v2();
out.writeVInt(processorStats.size());
for (Tuple<String, Stats> processorTuple : processorStats) {
String processorName = processorTuple.v1();
Stats processorStat = processorTuple.v2();
out.writeString(processorName);
processorStat.writeTo(out);
out.writeVInt(pipelineStats.size());
for (PipelineStat pipelineStat : pipelineStats) {
out.writeString(pipelineStat.getPipelineId());
pipelineStat.getStats().writeTo(out);
List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId());
if(processorStatsForPipeline == null) {
out.writeVInt(0);
}else{
out.writeVInt(processorStatsForPipeline.size());
for (ProcessorStat processorStat : processorStatsForPipeline) {
out.writeString(processorStat.getName());
processorStat.getStats().writeTo(out);
}
}
}
}

/**
* @return The accumulated stats for all pipelines
*/
public Stats getTotalStats() {
return totalStats;
}

/**
* @return The stats on a per pipeline basis. A Map(pipelineId -> Tuple(pipelineStats, List(perProcessorStats))
* where perProcessorStats = Tuple(processorDisplayName, processorStats)
*/
public Map<String, Tuple<IngestStats.Stats, List<Tuple<String, IngestStats.Stats>>>> getStatsPerPipeline() {
return statsPerPipeline;
}

public IngestStats.Stats getStatsForPipeline(String id) {
return statsPerPipeline.get(id).v1();
}

public List<Tuple<String, Stats>> getProcessorStatsForPipeline(String id) {
return statsPerPipeline.get(id).v2();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("ingest");
builder.startObject("total");
totalStats.toXContent(builder, params);
builder.endObject();
builder.startObject("pipelines");
for (Map.Entry<String, Tuple<Stats, List<Tuple<String,Stats>>>> entry : statsPerPipeline.entrySet()) {
builder.startObject(entry.getKey());
Stats pipelineStats = entry.getValue().v1();
pipelineStats.toXContent(builder, params);
List<Tuple<String,Stats>> perProcessorStats = entry.getValue().v2();
for (PipelineStat pipelineStat : pipelineStats) {
builder.startObject(pipelineStat.getPipelineId());
pipelineStat.getStats().toXContent(builder, params);
List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId());
builder.startArray("processors");
for (Tuple<String,Stats> processorStats : perProcessorStats) {
builder.startObject();
builder.startObject(processorStats.v1());
processorStats.v2().toXContent(builder, params);
builder.endObject();
builder.endObject();
if (processorStatsForPipeline != null) {
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
}
}

builder.endArray();
builder.endObject();
}
Expand All @@ -141,6 +123,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public Stats getTotalStats() {
return totalStats;
}

public List<PipelineStat> getPipelineStats() {
return pipelineStats;
}

public Map<String, List<ProcessorStat>> getProcessorStats() {
return processorStats;
}

public static class Stats implements Writeable, ToXContentFragment {

private final long ingestCount;
Expand Down Expand Up @@ -181,7 +175,6 @@ public long getIngestCount() {
}

/**
*
* @return The total time spent of ingest preprocessing in millis.
*/
public long getIngestTimeInMillis() {
Expand Down Expand Up @@ -211,4 +204,77 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}
}

/**
* Easy conversion from scoped {@link IngestMetric} objects to a serializable Stats objects
*/
static class Builder {
private Stats totalStats;
private List<PipelineStat> pipelineStats = new ArrayList<>();
private Map<String, List<ProcessorStat>> processorStats = new HashMap<>();


Builder addTotalMetrics(IngestMetric totalMetric) {
this.totalStats = totalMetric.createStats();
return this;
}

Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) {
this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats()));
return this;
}

Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) {
this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStat(processorName, metric.createStats()));
return this;
}

IngestStats build() {
return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats),
Collections.unmodifiableMap(processorStats));
}
}

/**
* Container for pipeline stats.
*/
public static class PipelineStat {
private final String pipelineId;
private final Stats stats;

public PipelineStat(String pipelineId, Stats stats) {
this.pipelineId = pipelineId;
this.stats = stats;
}

public String getPipelineId() {
return pipelineId;
}

public Stats getStats() {
return stats;
}
}

/**
* Container for processor stats.
*/
public static class ProcessorStat {
private final String name;
private final Stats stats;

public ProcessorStat(String name, Stats stats) {
this.name = name;
this.stats = stats;
}

public String getName() {
return name;
}

public Stats getStats() {
return stats;
}
}
}
Loading

0 comments on commit 8067758

Please sign in to comment.