Skip to content

Commit

Permalink
Unroll these loops
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Oct 22, 2024
1 parent 3583357 commit e88dda9
Showing 1 changed file with 30 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
Expand Down Expand Up @@ -264,10 +265,15 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
@SuppressWarnings("unchecked")
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
return configurations.stream().filter(configuration -> {
Set<String> ids = new HashSet<>();
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (PipelineConfiguration configuration : configurations) {
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
}).map(PipelineConfiguration::getId).collect(Collectors.toSet());
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
ids.add(configuration.getId());
}
}
return Collections.unmodifiableSet(ids);
}

/**
Expand All @@ -277,7 +283,15 @@ private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState
* @return true if a geoip processor is found in the processor list.
*/
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation));
if (processors != null) {
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Map<String, Object> processor : processors) {
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
return true;
}
}
}
return false;
}

/**
Expand Down Expand Up @@ -321,16 +335,17 @@ private static boolean isProcessorWithOnFailureGeoIpProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation
) {
return processor != null
&& processor.values()
.stream()
.anyMatch(
value -> value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
)
);
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Object value : processor.values()) {
if (value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
)) {
return true;
}
}
return false;
}

/**
Expand Down

0 comments on commit e88dda9

Please sign in to comment.