Skip to content

Commit

Permalink
review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis committed Sep 21, 2018
1 parent 4d782b2 commit 526ae4b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 10 deletions.
15 changes: 6 additions & 9 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class IngestService implements ClusterStateApplier {
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
// processor factories rely on other node services. Custom metadata is statically registered when classes
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
private volatile Map<String, Pipeline> pipelines = new ConcurrentHashMap<>();
private volatile Map<String, Pipeline> pipelines = new HashMap<>();
private final ThreadPool threadPool;
private final IngestMetric totalMetrics = new IngestMetric();

Expand Down Expand Up @@ -255,17 +255,14 @@ Map<String, Pipeline> pipelines() {
@Override
public void applyClusterState(final ClusterChangedEvent event) {
ClusterState state = event.state();
int beforeHashCode = pipelines.hashCode();
//grab the metrics before the pipeline instances are potentially re-created
Map<String, IngestMetric> oldMetrics =
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics()));
Map<String, Pipeline> originalPipelines = pipelines;
innerUpdatePipelines(event.previousState(), state);
//pipelines changed, so add the old metrics to the new metrics
if (beforeHashCode != pipelines.hashCode()) {
if (originalPipelines != pipelines) {
pipelines.forEach((id, pipeline) -> {
IngestMetric oldMetric = oldMetrics.get(id);
if (oldMetric != null) {
pipeline.getMetrics().add(oldMetric);
Pipeline originalPipeline = originalPipelines.get(id);
if (originalPipeline != null) {
pipeline.getMetrics().add(originalPipeline.getMetrics());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pipeline2Id, null, null, new CompoundProcessor(true,
new TestProcessor(ingestDocument -> {
ingestDocument.setFieldValue(key1, randomInt());
try {
Thread.sleep(100); //force the stat time to be non-zero
Thread.sleep(2); //force the stat time to be non-zero
} catch (InterruptedException e) {
//do nothing
}
Expand Down

0 comments on commit 526ae4b

Please sign in to comment.