-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest: correctly measure chained pipeline stats #33912
Changes from 7 commits
cab6dbf
4d782b2
526ae4b
a4c44d4
bc7403f
5d2adf9
cba6306
1408127
e385159
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.ingest; | ||
|
||
import org.elasticsearch.common.metrics.CounterMetric; | ||
import org.elasticsearch.common.metrics.MeanMetric; | ||
|
||
/** | ||
* Metrics to measure ingest actions. This counts measure documents and timings for a given scope. | ||
* The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline, | ||
* or you can use this class to count documents for a given pipeline or a specific processor. | ||
* This class does not make assumptions about it's given scope. | ||
*/ | ||
class IngestMetric { | ||
|
||
/** | ||
* The time it takes to complete the measured item. | ||
*/ | ||
private final MeanMetric ingestTime = new MeanMetric(); | ||
/** | ||
* The current count of things being measure. Should mostly like ever be 0 or 1. | ||
* Useful when aggregating multiple metrics to see how many things are in flight. | ||
*/ | ||
private final CounterMetric ingestCurrent = new CounterMetric(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is "current"? I see this existed before, but the purpose is unclear. Some java docs here would be good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I will add some Javadoc. |
||
/** | ||
* The ever increasing count of things being measured | ||
*/ | ||
private final CounterMetric ingestCount = new CounterMetric(); | ||
/** | ||
* The only increasing count of failures | ||
*/ | ||
private final CounterMetric ingestFailed = new CounterMetric(); | ||
|
||
/** | ||
* Call this prior to the ingest action. | ||
*/ | ||
void preIngest() { | ||
ingestCurrent.inc(); | ||
} | ||
|
||
/** | ||
* Call this after the performing the ingest action, even if the action failed. | ||
* @param ingestTimeInMillis The time it took to perform the action. | ||
*/ | ||
void postIngest(long ingestTimeInMillis) { | ||
ingestCurrent.dec(); | ||
ingestTime.inc(ingestTimeInMillis); | ||
ingestCount.inc(); | ||
} | ||
|
||
/** | ||
* Call this if the ingest action failed. | ||
*/ | ||
void ingestFailed() { | ||
ingestFailed.inc(); | ||
} | ||
|
||
/** | ||
* Add two sets of metrics together. *Important* does NOT add the current count intentionally, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would move this important section down to its own paragraph, and please use proper javadoc formatting for emphasis and paragraph breaks. |
||
* since the current count value is ephemeral and requires a increase/decrease pairing. | ||
* | ||
* @param metrics The metric to add. | ||
*/ | ||
void add(IngestMetric metrics) { | ||
ingestCount.inc(metrics.ingestCount.count()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this missing "current"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, but I see why it looks that.
I can change the name to carryForward, prePopulate ? other suggestions to make this more obvious ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hopefully the added javadoc sufficiently addresses the missing addition of |
||
ingestTime.inc(metrics.ingestTime.sum()); | ||
ingestFailed.inc(metrics.ingestFailed.count()); | ||
} | ||
|
||
/** | ||
* Creates a serializable representation for these metrics. | ||
*/ | ||
IngestStats.Stats createStats() { | ||
return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.count(), ingestFailed.count()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,15 +23,15 @@ | |
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Consumer; | ||
import java.util.stream.Collectors; | ||
|
||
import org.elasticsearch.ElasticsearchParseException; | ||
import org.elasticsearch.ExceptionsHelper; | ||
import org.elasticsearch.ResourceNotFoundException; | ||
|
@@ -49,8 +49,6 @@ | |
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.metrics.CounterMetric; | ||
import org.elasticsearch.common.metrics.MeanMetric; | ||
import org.elasticsearch.common.regex.Regex; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
|
@@ -79,8 +77,7 @@ public class IngestService implements ClusterStateApplier { | |
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. | ||
private volatile Map<String, Pipeline> pipelines = new HashMap<>(); | ||
private final ThreadPool threadPool; | ||
private final StatsHolder totalStats = new StatsHolder(); | ||
private volatile Map<String, StatsHolder> statsHolderPerPipeline = Collections.emptyMap(); | ||
private final IngestMetric totalMetrics = new IngestMetric(); | ||
|
||
public IngestService(ClusterService clusterService, ThreadPool threadPool, | ||
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, | ||
|
@@ -257,10 +254,16 @@ Map<String, Pipeline> pipelines() { | |
@Override | ||
public void applyClusterState(final ClusterChangedEvent event) { | ||
ClusterState state = event.state(); | ||
Map<String, Pipeline> originalPipelines = pipelines; | ||
innerUpdatePipelines(event.previousState(), state); | ||
IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); | ||
if (ingestMetadata != null) { | ||
updatePipelineStats(ingestMetadata); | ||
//pipelines changed, so add the old metrics to the new metrics | ||
if (originalPipelines != pipelines) { | ||
pipelines.forEach((id, pipeline) -> { | ||
Pipeline originalPipeline = originalPipelines.get(id); | ||
if (originalPipeline != null) { | ||
pipeline.getMetrics().add(originalPipeline.getMetrics()); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
|
@@ -325,6 +328,7 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq | |
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests, | ||
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler, | ||
Consumer<IndexRequest> itemDroppedHandler) { | ||
|
||
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { | ||
|
||
@Override | ||
|
@@ -367,37 +371,11 @@ protected void doRun() { | |
} | ||
|
||
public IngestStats stats() { | ||
Map<String, StatsHolder> statsHolderPerPipeline = this.statsHolderPerPipeline; | ||
|
||
Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size()); | ||
for (Map.Entry<String, StatsHolder> entry : statsHolderPerPipeline.entrySet()) { | ||
statsPerPipeline.put(entry.getKey(), entry.getValue().createStats()); | ||
} | ||
Map<String, IngestStats.Stats> statsPerPipeline = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: I think even in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. variable name is the same, but the object and how it iterates is different, hence the update. (e.g. more then noise) |
||
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats())); | ||
|
||
return new IngestStats(totalStats.createStats(), statsPerPipeline); | ||
} | ||
|
||
void updatePipelineStats(IngestMetadata ingestMetadata) { | ||
boolean changed = false; | ||
Map<String, StatsHolder> newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline); | ||
Iterator<String> iterator = newStatsPerPipeline.keySet().iterator(); | ||
while (iterator.hasNext()) { | ||
String pipeline = iterator.next(); | ||
if (ingestMetadata.getPipelines().containsKey(pipeline) == false) { | ||
iterator.remove(); | ||
changed = true; | ||
} | ||
} | ||
for (String pipeline : ingestMetadata.getPipelines().keySet()) { | ||
if (newStatsPerPipeline.containsKey(pipeline) == false) { | ||
newStatsPerPipeline.put(pipeline, new StatsHolder()); | ||
changed = true; | ||
} | ||
} | ||
|
||
if (changed) { | ||
statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline); | ||
} | ||
return new IngestStats(totalMetrics.createStats(), statsPerPipeline); | ||
} | ||
|
||
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception { | ||
|
@@ -408,10 +386,8 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer | |
long startTimeInNanos = System.nanoTime(); | ||
// the pipeline specific stat holder may not exist and that is fine: | ||
// (e.g. the pipeline may have been removed while we're ingesting a document | ||
Optional<StatsHolder> pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId())); | ||
try { | ||
totalStats.preIngest(); | ||
pipelineStats.ifPresent(StatsHolder::preIngest); | ||
totalMetrics.preIngest(); | ||
String index = indexRequest.index(); | ||
String type = indexRequest.type(); | ||
String id = indexRequest.id(); | ||
|
@@ -437,13 +413,11 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer | |
indexRequest.source(ingestDocument.getSourceAndMetadata()); | ||
} | ||
} catch (Exception e) { | ||
totalStats.ingestFailed(); | ||
pipelineStats.ifPresent(StatsHolder::ingestFailed); | ||
totalMetrics.ingestFailed(); | ||
throw e; | ||
} finally { | ||
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); | ||
totalStats.postIngest(ingestTimeInMillis); | ||
pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis)); | ||
totalMetrics.postIngest(ingestTimeInMillis); | ||
} | ||
} | ||
|
||
|
@@ -480,27 +454,4 @@ private void innerUpdatePipelines(ClusterState previousState, ClusterState state | |
ExceptionsHelper.rethrowAndSuppress(exceptions); | ||
} | ||
|
||
private static class StatsHolder { | ||
|
||
private final MeanMetric ingestMetric = new MeanMetric(); | ||
private final CounterMetric ingestCurrent = new CounterMetric(); | ||
private final CounterMetric ingestFailed = new CounterMetric(); | ||
|
||
void preIngest() { | ||
ingestCurrent.inc(); | ||
} | ||
|
||
void postIngest(long ingestTimeInMillis) { | ||
ingestCurrent.dec(); | ||
ingestMetric.inc(ingestTimeInMillis); | ||
} | ||
|
||
void ingestFailed() { | ||
ingestFailed.inc(); | ||
} | ||
|
||
IngestStats.Stats createStats() { | ||
return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wording is confusing here: most likely?