Skip to content

Commit

Permalink
Rewrite this method to return the pipeline ids
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Oct 22, 2024
1 parent 493045e commit 3583357
Showing 1 changed file with 9 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,11 @@ public void clusterChanged(ClusterChangedEvent event) {
}

static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
if (pipelinesWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
return true;
}

Set<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
.map(PipelineConfiguration::getId)
.collect(Collectors.toSet());

final Set<String> checkReferencedPipelines = pipelinesWithGeoIpProcessor(clusterState, false);
if (checkReferencedPipelines.isEmpty()) {
return false;
}
Expand All @@ -258,22 +255,19 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
}

/**
* Retrieve list of pipelines that have at least one geoip processor.
* Retrieve the set of pipeline ids that have at least one geoip processor.
* @param clusterState Cluster state.
* @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
* matching the param.
* @return A list of {@link PipelineConfiguration} matching criteria.
* @return A set of pipeline ids matching criteria.
*/
@SuppressWarnings("unchecked")
private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
ClusterState clusterState,
boolean downloadDatabaseOnPipelineCreation
) {
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
return pipelineDefinitions.stream().filter(pipelineConfig -> {
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
private static Set<String> pipelinesWithGeoIpProcessor(ClusterState clusterState, boolean downloadDatabaseOnPipelineCreation) {
List<PipelineConfiguration> configurations = IngestService.getPipelines(clusterState);
return configurations.stream().filter(configuration -> {
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
}).toList();
}).map(PipelineConfiguration::getId).collect(Collectors.toSet());
}

/**
Expand Down

0 comments on commit 3583357

Please sign in to comment.