Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: correctly measure chained pipeline stats #33912

Merged
merged 9 commits into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestMetric.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;

class IngestMetric {

private final MeanMetric ingestMetric = new MeanMetric();
private final CounterMetric ingestCurrent = new CounterMetric();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "current"? I see this existed before, but the purpose is unclear. Some java docs here would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current is briefly incremented while the ingest thing that is getting measured (total, pipeline, (soon) processor) is working on a document. It makes the most sense for total to see how many documents are currently being processed.

I will add some Javadoc.

private final CounterMetric ingestCount = new CounterMetric();
private final CounterMetric ingestFailed = new CounterMetric();

void preIngest() {
ingestCurrent.inc();
}

void postIngest(long ingestTimeInMillis) {
ingestCurrent.dec();
ingestMetric.inc(ingestTimeInMillis);
ingestCount.inc();
}

void ingestFailed() {
ingestFailed.inc();
}

void add(IngestMetric metrics){
ingestCount.inc(metrics.ingestCount.count());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this missing "current"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, but I see why it looks that.

add is used to carry forward metrics between cluster state changes (when the pipeline changes). When the cluster state changes for the pipelines, the pipeline instances are rebuilt, and now (with this change) the metrics get wiped out too. Add is used to add the old metrics to the metrics on the newly created pipeline instances (pre-populate/carry forward the old). Since the old metrics are a snapshot at time of cluster state change, we don't want to carry forward current since it will never be decremented. current may incorrectly show zero value during pipelines cluster state change until the existing bulk request is finished processing. This should be fairly rare, fairly short time period where current drops to zero.

I can change the name to carryForward, prePopulate ? other suggestions to make this more obvious ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hopefully the added javadoc sufficiently addresses the missing addition of current.

ingestMetric.inc(metrics.ingestMetric.sum());
ingestFailed.inc(metrics.ingestFailed.count());
}

IngestStats.Stats createStats() {
return new IngestStats.Stats(ingestCount.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count());
}
}
93 changes: 24 additions & 69 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
Expand All @@ -49,8 +50,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand All @@ -77,10 +76,9 @@ public class IngestService implements ClusterStateApplier {
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
// processor factories rely on other node services. Custom metadata is statically registered when classes
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
private volatile Map<String, Pipeline> pipelines = new HashMap<>();
private volatile Map<String, Pipeline> pipelines = new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the whole reason we have a volatile and just use a normal non-concurrent HashMap is that we're only updating this field's reference but never update a specific instance of the map => I don't think you need to move to CHM here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah.. you are right updates are rebuilt local to a method then atomically swapped to an unmodified map. => no need for CHM

will put back to normal HM and update commit message.

private final ThreadPool threadPool;
private final StatsHolder totalStats = new StatsHolder();
private volatile Map<String, StatsHolder> statsHolderPerPipeline = Collections.emptyMap();
private final IngestMetric totalMetrics = new IngestMetric();

public IngestService(ClusterService clusterService, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
Expand Down Expand Up @@ -257,10 +255,19 @@ Map<String, Pipeline> pipelines() {
@Override
public void applyClusterState(final ClusterChangedEvent event) {
ClusterState state = event.state();
int beforeHashCode = pipelines.hashCode();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I think you can just keep a reference here to the pipelines field and then use straight instance inequality instead of hash codes below to check if the pipelines changed. That would be a little clearer since we specifically use the volatile pipelines in a way so that we only ever replace the pipelines instance as a whole.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. now that i see the update is is an atomic swap I can clean this up by holding onto the original reference.

//grab the metrics before the pipeline instances are potentially re-created
Map<String, IngestMetric> oldMetrics =
pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics()));
innerUpdatePipelines(event.previousState(), state);
IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
if (ingestMetadata != null) {
updatePipelineStats(ingestMetadata);
//pipelines changed, so add the old metrics to the new metrics
if (beforeHashCode != pipelines.hashCode()) {
pipelines.forEach((id, pipeline) -> {
IngestMetric oldMetric = oldMetrics.get(id);
if (oldMetric != null) {
pipeline.getMetrics().add(oldMetric);
}
});
}
}

Expand Down Expand Up @@ -325,6 +332,7 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler,
Consumer<IndexRequest> itemDroppedHandler) {

threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {

@Override
Expand Down Expand Up @@ -367,37 +375,11 @@ protected void doRun() {
}

public IngestStats stats() {
Map<String, StatsHolder> statsHolderPerPipeline = this.statsHolderPerPipeline;

Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size());
for (Map.Entry<String, StatsHolder> entry : statsHolderPerPipeline.entrySet()) {
statsPerPipeline.put(entry.getKey(), entry.getValue().createStats());
}
Map<String, IngestStats.Stats> statsPerPipeline =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I think even in 2018 JDKs the old version with the manual iteration is faster. I don't think it matters much here, but probably not worth the noise to change to the functional iteration here.

Copy link
Contributor Author

@jakelandis jakelandis Sep 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable name is the same, but the object and how it iterates is different, hence the update. (e.g. more then noise)

pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats()));

return new IngestStats(totalStats.createStats(), statsPerPipeline);
}

void updatePipelineStats(IngestMetadata ingestMetadata) {
boolean changed = false;
Map<String, StatsHolder> newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline);
Iterator<String> iterator = newStatsPerPipeline.keySet().iterator();
while (iterator.hasNext()) {
String pipeline = iterator.next();
if (ingestMetadata.getPipelines().containsKey(pipeline) == false) {
iterator.remove();
changed = true;
}
}
for (String pipeline : ingestMetadata.getPipelines().keySet()) {
if (newStatsPerPipeline.containsKey(pipeline) == false) {
newStatsPerPipeline.put(pipeline, new StatsHolder());
changed = true;
}
}

if (changed) {
statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline);
}
return new IngestStats(totalMetrics.createStats(), statsPerPipeline);
}

private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer<IndexRequest> itemDroppedHandler) throws Exception {
Expand All @@ -408,10 +390,8 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
long startTimeInNanos = System.nanoTime();
// the pipeline specific stat holder may not exist and that is fine:
// (e.g. the pipeline may have been removed while we're ingesting a document
Optional<StatsHolder> pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId()));
try {
totalStats.preIngest();
pipelineStats.ifPresent(StatsHolder::preIngest);
totalMetrics.preIngest();
String index = indexRequest.index();
String type = indexRequest.type();
String id = indexRequest.id();
Expand All @@ -437,13 +417,11 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer
indexRequest.source(ingestDocument.getSourceAndMetadata());
}
} catch (Exception e) {
totalStats.ingestFailed();
pipelineStats.ifPresent(StatsHolder::ingestFailed);
totalMetrics.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
totalStats.postIngest(ingestTimeInMillis);
pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis));
totalMetrics.postIngest(ingestTimeInMillis);
}
}

Expand Down Expand Up @@ -480,27 +458,4 @@ private void innerUpdatePipelines(ClusterState previousState, ClusterState state
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

private static class StatsHolder {

private final MeanMetric ingestMetric = new MeanMetric();
private final CounterMetric ingestCurrent = new CounterMetric();
private final CounterMetric ingestFailed = new CounterMetric();

void preIngest() {
ingestCurrent.inc();
}

void postIngest(long ingestTimeInMillis) {
ingestCurrent.dec();
ingestMetric.inc(ingestTimeInMillis);
}

void ingestFailed() {
ingestFailed.inc();
}

IngestStats.Stats createStats() {
return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count());
}
}
}
22 changes: 21 additions & 1 deletion server/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.elasticsearch.script.ScriptService;

/**
Expand All @@ -44,12 +46,14 @@ public final class Pipeline {
@Nullable
private final Integer version;
private final CompoundProcessor compoundProcessor;
private final IngestMetric metrics;

public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
this.id = id;
this.description = description;
this.compoundProcessor = compoundProcessor;
this.version = version;
this.metrics = new IngestMetric();
}

public static Pipeline create(String id, Map<String, Object> config,
Expand Down Expand Up @@ -78,7 +82,17 @@ public static Pipeline create(String id, Map<String, Object> config,
* Modifies the data of a document to be indexed based on the processor this pipeline holds
*/
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return compoundProcessor.execute(ingestDocument);
long startTimeInNanos = System.nanoTime();
try {
metrics.preIngest();
return compoundProcessor.execute(ingestDocument);
} catch (Exception e) {
metrics.ingestFailed();
throw e;
} finally {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
metrics.postIngest(ingestTimeInMillis);
}
}

/**
Expand Down Expand Up @@ -136,4 +150,10 @@ public List<Processor> flattenAllProcessors() {
return compoundProcessor.flattenProcessors();
}

/**
* The metrics associated with this pipeline.
*/
public IngestMetric getMetrics() {
return metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,6 @@

package org.elasticsearch.ingest;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
Expand Down Expand Up @@ -59,13 +49,22 @@
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
Expand Down Expand Up @@ -769,16 +768,14 @@ public void testStats() {
previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));
configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON));
ingestService.updatePipelineStats(new IngestMetadata(configurationMap));


@SuppressWarnings("unchecked") final BiConsumer<IndexRequest, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") final Consumer<Exception> completionHandler = mock(Consumer.class);

final IndexRequest indexRequest = new IndexRequest("_index");
indexRequest.setPipeline("_id1");
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterFirstRequestStats = ingestService.stats();
assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2));
Expand All @@ -793,23 +790,21 @@ public void testStats() {
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L));
}

// issue: https://github.com/elastic/elasticsearch/issues/18126
public void testUpdatingStatsWhenRemovingPipelineWorks() {
IngestService ingestService = createWithProcessors();
Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));
configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON));
ingestService.updatePipelineStats(new IngestMetadata(configurationMap));
assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id1"));
assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id2"));

configurationMap = new HashMap<>();
configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}"), XContentType.JSON));
ingestService.updatePipelineStats(new IngestMetadata(configurationMap));
assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id1")));
assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2")));
//update cluster state and ensure that new stats are added to old stats
putRequest = new PutPipelineRequest("_id1",
new BytesArray("{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON);
previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
indexRequest.setPipeline("_id1");
ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
final IngestStats afterThirdRequestStats = ingestService.stats();
assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2));
assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L));
assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L));

}

private IngestDocument eqIndexTypeId(final Map<String, Object> source) {
Expand Down
Loading