diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index dfec70e3c9560..edb3df5e9f72c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -76,7 +76,7 @@ public class IngestService implements ClusterStateApplier { // We know of all the processor factories when a node with all its plugin have been initialized. Also some // processor factories rely on other node services. Custom metadata is statically registered when classes // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. - private volatile Map pipelines = new ConcurrentHashMap<>(); + private volatile Map pipelines = new HashMap<>(); private final ThreadPool threadPool; private final IngestMetric totalMetrics = new IngestMetric(); @@ -255,17 +255,14 @@ Map pipelines() { @Override public void applyClusterState(final ClusterChangedEvent event) { ClusterState state = event.state(); - int beforeHashCode = pipelines.hashCode(); - //grab the metrics before the pipeline instances are potentially re-created - Map oldMetrics = - pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics())); + Map originalPipelines = pipelines; innerUpdatePipelines(event.previousState(), state); //pipelines changed, so add the old metrics to the new metrics - if (beforeHashCode != pipelines.hashCode()) { + if (originalPipelines != pipelines) { pipelines.forEach((id, pipeline) -> { - IngestMetric oldMetric = oldMetrics.get(id); - if (oldMetric != null) { - pipeline.getMetrics().add(oldMetric); + Pipeline originalPipeline = originalPipelines.get(id); + if (originalPipeline != null) { + pipeline.getMetrics().add(originalPipeline.getMetrics()); } }); } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 38e7aa4d319c6..d7a77bc2fdfc0 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -153,7 +153,7 @@ pipeline2Id, null, null, new CompoundProcessor(true, new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); try { - Thread.sleep(100); //force the stat time to be non-zero + Thread.sleep(2); //force the stat time to be non-zero } catch (InterruptedException e) { //do nothing }