From eb51f2b73a68b6f6886e2aefff0dccb32f1a323b Mon Sep 17 00:00:00 2001 From: Michael Froh Date: Mon, 22 May 2023 13:07:04 -0700 Subject: [PATCH] [Search Pipelines] Split search pipeline processor factories by type (#7597) In the initial search pipelines commit, I threw request and response processor factories into one combined map. I think that was a mistake. We should embrace type-safety by making sure that the kind of processor is clear from end to end. As we add more processor types (e.g. search phase processor), throwing them all in one big map would get messier. As a bonus, we'll be able to reuse processor names across different types of processor. Signed-off-by: Michael Froh --- CHANGELOG.md | 1 + .../common/FilterQueryRequestProcessor.java | 4 +- .../common/RenameFieldResponseProcessor.java | 4 +- .../SearchPipelineCommonModulePlugin.java | 16 ++-- .../test/search_pipeline/10_basic.yml | 4 +- .../plugins/SearchPipelinePlugin.java | 17 +++- .../opensearch/search/pipeline/Pipeline.java | 39 ++------ .../opensearch/search/pipeline/Processor.java | 5 +- .../search/pipeline/SearchPipelineInfo.java | 75 +++++++++++---- .../pipeline/SearchPipelineService.java | 93 ++++++++++++------- .../nodesinfo/NodeInfoStreamingTests.java | 2 +- .../pipeline/SearchPipelineInfoTests.java | 75 +++++++++++++++ .../pipeline/SearchPipelineServiceTests.java | 73 ++++++++------- 13 files changed, 276 insertions(+), 132 deletions(-) create mode 100644 server/src/test/java/org/opensearch/search/pipeline/SearchPipelineInfoTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a5f816ac9a85..be5ab5f8e03a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253)) - [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470)) - [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377)) +- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597)) - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237)) - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java index 81c00012daec6..0ca090780bb60 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java @@ -75,7 +75,7 @@ public SearchRequest processRequest(SearchRequest request) throws Exception { return request; } - static class Factory implements Processor.Factory { + static class Factory implements Processor.Factory { private final NamedXContentRegistry namedXContentRegistry; public static final ParseField QUERY_FIELD = new ParseField("query"); @@ -85,7 +85,7 @@ static class Factory implements Processor.Factory { @Override public FilterQueryRequestProcessor create( - Map processorFactories, + Map> processorFactories, String tag, String description, Map config diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java index 3a2f0e9fb2492..4c40dda5928f0 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java @@ -128,7 +128,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp /** * This is a factor that creates the RenameResponseProcessor */ - public static final class Factory implements Processor.Factory { + public static final class Factory implements Processor.Factory { /** * Constructor for factory @@ -137,7 +137,7 @@ public static final class Factory implements Processor.Factory { @Override public RenameFieldResponseProcessor create( - Map processorFactories, + Map> processorFactories, String tag, String description, Map config diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java index a0e5182f71443..aa56714085b48 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -11,6 +11,8 @@ import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; +import org.opensearch.search.pipeline.SearchResponseProcessor; import java.util.Map; @@ -25,12 +27,12 @@ public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPi public SearchPipelineCommonModulePlugin() {} @Override - public Map getProcessors(Processor.Parameters parameters) { - return Map.of( - FilterQueryRequestProcessor.TYPE, - new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry), - RenameFieldResponseProcessor.TYPE, - new RenameFieldResponseProcessor.Factory() - ); + public Map> getRequestProcessors(Processor.Parameters parameters) { + return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry)); + } + + @Override + public Map> getResponseProcessors(Processor.Parameters parameters) { + return Map.of(RenameFieldResponseProcessor.TYPE, new RenameFieldResponseProcessor.Factory()); } } diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml index 0d931f8587664..644181d601ea4 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml @@ -12,5 +12,5 @@ nodes.info: {} - contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } } - - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } } - - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: rename_field } } + - contains: { nodes.$cluster_manager.search_pipelines.request_processors: { type: filter_query } } + - contains: { nodes.$cluster_manager.search_pipelines.response_processors: { type: rename_field } } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java index 8e6fbef6c8b1d..b8ceddecd3d20 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPipelinePlugin.java @@ -9,6 +9,8 @@ package org.opensearch.plugins; import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; +import org.opensearch.search.pipeline.SearchResponseProcessor; import java.util.Collections; import java.util.Map; @@ -20,13 +22,24 @@ */ public interface SearchPipelinePlugin { /** - * Returns additional search pipeline processor types added by this plugin. + * Returns additional search pipeline request processor types added by this plugin. * * The key of the returned {@link Map} is the unique name for the processor which is specified * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} * to create the processor from a given pipeline configuration. */ - default Map getProcessors(Processor.Parameters parameters) { + default Map> getRequestProcessors(Processor.Parameters parameters) { + return Collections.emptyMap(); + } + + /** + * Returns additional search pipeline response processor types added by this plugin. + * + * The key of the returned {@link Map} is the unique name for the processor which is specified + * in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory} + * to create the processor from a given pipeline configuration. + */ + default Map> getResponseProcessors(Processor.Parameters parameters) { return Collections.emptyMap(); } } diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index f5dce8ec728b2..c9a5f865d507e 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -46,7 +46,7 @@ class Pipeline { private final NamedWriteableRegistry namedWriteableRegistry; - Pipeline( + private Pipeline( String id, @Nullable String description, @Nullable Integer version, @@ -62,31 +62,24 @@ class Pipeline { this.namedWriteableRegistry = namedWriteableRegistry; } - public static Pipeline create( + static Pipeline create( String id, Map config, - Map processorFactories, + Map> requestProcessorFactories, + Map> responseProcessorFactories, NamedWriteableRegistry namedWriteableRegistry ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); List> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY); - List requestProcessors = readProcessors( - SearchRequestProcessor.class, - processorFactories, - requestProcessorConfigs - ); + List requestProcessors = readProcessors(requestProcessorFactories, requestProcessorConfigs); List> responseProcessorConfigs = ConfigurationUtils.readOptionalList( null, null, config, RESPONSE_PROCESSORS_KEY ); - List responseProcessors = readProcessors( - SearchResponseProcessor.class, - processorFactories, - responseProcessorConfigs - ); + List responseProcessors = readProcessors(responseProcessorFactories, responseProcessorConfigs); if (config.isEmpty() == false) { throw new OpenSearchParseException( "pipeline [" @@ -98,10 +91,8 @@ public static Pipeline create( return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry); } - @SuppressWarnings("unchecked") // Cast is checked using isInstance private static List readProcessors( - Class processorType, - Map processorFactories, + Map> processorFactories, List> requestProcessorConfigs ) throws Exception { List processors = new ArrayList<>(); @@ -117,22 +108,10 @@ private static List readProcessors( Map config = (Map) entry.getValue(); String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY); String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY); - Processor processor = processorFactories.get(type).create(processorFactories, tag, description, config); - if (processorType.isInstance(processor)) { - processors.add((T) processor); - } else { - throw new IllegalArgumentException("Processor type " + type + " is not a " + processorType.getSimpleName()); - } + processors.add(processorFactories.get(type).create(processorFactories, tag, description, config)); } } - return processors; - } - - List flattenAllProcessors() { - List allProcessors = new ArrayList<>(searchRequestProcessors.size() + searchResponseProcessors.size()); - allProcessors.addAll(searchRequestProcessors); - allProcessors.addAll(searchResponseProcessors); - return allProcessors; + return Collections.unmodifiableList(processors); } String getId() { diff --git a/server/src/main/java/org/opensearch/search/pipeline/Processor.java b/server/src/main/java/org/opensearch/search/pipeline/Processor.java index 44f268242b83c..ee28db1cc334d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Processor.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Processor.java @@ -52,7 +52,7 @@ public interface Processor { /** * A factory that knows how to construct a processor based on a map of maps. */ - interface Factory { + interface Factory { /** * Creates a processor based on the specified map of maps config. @@ -65,8 +65,7 @@ interface Factory { * Note: Implementations are responsible for removing the used configuration * keys, so that after creation the config map should be empty. */ - Processor create(Map processorFactories, String tag, String description, Map config) - throws Exception; + T create(Map> processorFactories, String tag, String description, Map config) throws Exception; } /** diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java index 95d1e3720cbb3..b91f18c44cc7b 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineInfo.java @@ -8,15 +8,19 @@ package org.opensearch.search.pipeline; +import org.opensearch.Version; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.node.ReportingService; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; /** @@ -26,45 +30,84 @@ */ public class SearchPipelineInfo implements ReportingService.Info { - private final Set processors; + private final Map> processors = new TreeMap<>(); - public SearchPipelineInfo(List processors) { - this.processors = new TreeSet<>(processors); // we use a treeset here to have a test-able / predictable order + public SearchPipelineInfo(Map> processors) { + for (Map.Entry> processorsEntry : processors.entrySet()) { + // we use a treeset here to have a test-able / predictable order + this.processors.put(processorsEntry.getKey(), new TreeSet<>(processorsEntry.getValue())); + } } /** * Read from a stream. */ public SearchPipelineInfo(StreamInput in) throws IOException { - processors = new TreeSet<>(); - final int size = in.readVInt(); - for (int i = 0; i < size; i++) { - processors.add(new ProcessorInfo(in)); + // TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0) + if (in.getVersion().onOrBefore(Version.V_2_8_0)) { + // Prior to version 2.8, we had a flat list of processors. For best compatibility, assume they're valid + // request and response processor, since we couldn't tell the difference back then. + final int size = in.readVInt(); + Set processorInfos = new TreeSet<>(); + for (int i = 0; i < size; i++) { + processorInfos.add(new ProcessorInfo(in)); + } + processors.put(Pipeline.REQUEST_PROCESSORS_KEY, processorInfos); + processors.put(Pipeline.RESPONSE_PROCESSORS_KEY, processorInfos); + } else { + final int numTypes = in.readVInt(); + for (int i = 0; i < numTypes; i++) { + String type = in.readString(); + int numProcessors = in.readVInt(); + Set processorInfos = new TreeSet<>(); + for (int j = 0; j < numProcessors; j++) { + processorInfos.add(new ProcessorInfo(in)); + } + processors.put(type, processorInfos); + } } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject("search_pipelines"); - builder.startArray("processors"); - for (ProcessorInfo info : processors) { - info.toXContent(builder, params); + for (Map.Entry> processorEntry : processors.entrySet()) { + builder.startArray(processorEntry.getKey()); + for (ProcessorInfo info : processorEntry.getValue()) { + info.toXContent(builder, params); + } + builder.endArray(); } - builder.endArray(); builder.endObject(); return builder; } @Override public void writeTo(StreamOutput out) throws IOException { - out.write(processors.size()); - for (ProcessorInfo info : processors) { - info.writeTo(out); + // TODO: When we backport this to 2.8, we must change this condition to out.getVersion().before(V_2_8_0) + if (out.getVersion().onOrBefore(Version.V_2_8_0)) { + // Prior to version 2.8, we grouped all processors into a single list. + Set processorInfos = new TreeSet<>(); + processorInfos.addAll(processors.getOrDefault(Pipeline.REQUEST_PROCESSORS_KEY, Collections.emptySet())); + processorInfos.addAll(processors.getOrDefault(Pipeline.RESPONSE_PROCESSORS_KEY, Collections.emptySet())); + out.writeVInt(processorInfos.size()); + for (ProcessorInfo processorInfo : processorInfos) { + processorInfo.writeTo(out); + } + } else { + out.write(processors.size()); + for (Map.Entry> processorsEntry : processors.entrySet()) { + out.writeString(processorsEntry.getKey()); + out.writeVInt(processorsEntry.getValue().size()); + for (ProcessorInfo processorInfo : processorsEntry.getValue()) { + processorInfo.writeTo(out); + } + } } } - public boolean containsProcessor(String type) { - return processors.contains(new ProcessorInfo(type)); + public boolean containsProcessor(String processorType, String type) { + return processors.containsKey(processorType) && processors.get(processorType).contains(new ProcessorInfo(type)); } @Override diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index f96a6eb4a6b76..a486e636cbb7d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -55,6 +55,8 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; /** * The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines @@ -68,7 +70,8 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ private static final Logger logger = LogManager.getLogger(SearchPipelineService.class); private final ClusterService clusterService; private final ScriptService scriptService; - private final Map processorFactories; + private final Map> requestProcessorFactories; + private final Map> responseProcessorFactories; private volatile Map pipelines = Collections.emptyMap(); private final ThreadPool threadPool; private final List> searchPipelineClusterStateListeners = new CopyOnWriteArrayList<>(); @@ -95,34 +98,33 @@ public SearchPipelineService( this.scriptService = scriptService; this.threadPool = threadPool; this.namedWriteableRegistry = namedWriteableRegistry; - this.processorFactories = processorFactories( - searchPipelinePlugins, - new Processor.Parameters( - env, - scriptService, - analysisRegistry, - threadPool.getThreadContext(), - threadPool::relativeTimeInMillis, - (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), - this, - client, - threadPool.generic()::execute, - namedXContentRegistry - ) + Processor.Parameters parameters = new Processor.Parameters( + env, + scriptService, + analysisRegistry, + threadPool.getThreadContext(), + threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), + this, + client, + threadPool.generic()::execute, + namedXContentRegistry ); + this.requestProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getRequestProcessors(parameters)); + this.responseProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getResponseProcessors(parameters)); putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true); deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true); this.isEnabled = isEnabled; } - private static Map processorFactories( + private static Map> processorFactories( List searchPipelinePlugins, - Processor.Parameters parameters + Function>> processorLoader ) { - Map processorFactories = new HashMap<>(); + Map> processorFactories = new HashMap<>(); for (SearchPipelinePlugin searchPipelinePlugin : searchPipelinePlugins) { - Map newProcessors = searchPipelinePlugin.getProcessors(parameters); - for (Map.Entry entry : newProcessors.entrySet()) { + Map> newProcessors = processorLoader.apply(searchPipelinePlugin); + for (Map.Entry> entry : newProcessors.entrySet()) { if (processorFactories.put(entry.getKey(), entry.getValue()) != null) { throw new IllegalArgumentException("Search processor [" + entry.getKey() + "] is already registered"); } @@ -173,7 +175,8 @@ void innerUpdatePipelines(SearchPipelineMetadata newSearchPipelineMetadata) { Pipeline newPipeline = Pipeline.create( newConfiguration.getId(), newConfiguration.getConfigAsMap(), - processorFactories, + requestProcessorFactories, + responseProcessorFactories, namedWriteableRegistry ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); @@ -268,12 +271,27 @@ void validatePipeline(Map searchPipelineInfos throw new IllegalStateException("Search pipeline info is empty"); } Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, namedWriteableRegistry); + Pipeline pipeline = Pipeline.create( + request.getId(), + pipelineConfig, + requestProcessorFactories, + responseProcessorFactories, + namedWriteableRegistry + ); List exceptions = new ArrayList<>(); - for (Processor processor : pipeline.flattenAllProcessors()) { + for (SearchRequestProcessor processor : pipeline.getSearchRequestProcessors()) { + for (Map.Entry entry : searchPipelineInfos.entrySet()) { + String type = processor.getType(); + if (entry.getValue().containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, type) == false) { + String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; + exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); + } + } + } + for (SearchResponseProcessor processor : pipeline.getSearchResponseProcessors()) { for (Map.Entry entry : searchPipelineInfos.entrySet()) { String type = processor.getType(); - if (entry.getValue().containsProcessor(type) == false) { + if (entry.getValue().containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, type) == false) { String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); } @@ -352,7 +370,8 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce pipeline = Pipeline.create( AD_HOC_PIPELINE_ID, searchRequest.source().searchPipelineSource(), - processorFactories, + requestProcessorFactories, + responseProcessorFactories, namedWriteableRegistry ); } catch (Exception e) { @@ -385,17 +404,27 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce return new PipelinedRequest(pipeline, transformedRequest); } - Map getProcessorFactories() { - return processorFactories; + Map> getRequestProcessorFactories() { + return requestProcessorFactories; + } + + Map> getResponseProcessorFactories() { + return responseProcessorFactories; } @Override public SearchPipelineInfo info() { - List processorInfoList = new ArrayList<>(); - for (Map.Entry entry : processorFactories.entrySet()) { - processorInfoList.add(new ProcessorInfo(entry.getKey())); - } - return new SearchPipelineInfo(processorInfoList); + List requestProcessorInfoList = requestProcessorFactories.keySet() + .stream() + .map(ProcessorInfo::new) + .collect(Collectors.toList()); + List responseProcessorInfoList = responseProcessorFactories.keySet() + .stream() + .map(ProcessorInfo::new) + .collect(Collectors.toList()); + return new SearchPipelineInfo( + Map.of(Pipeline.REQUEST_PROCESSORS_KEY, requestProcessorInfoList, Pipeline.RESPONSE_PROCESSORS_KEY, responseProcessorInfoList) + ); } public static List getPipelines(ClusterState clusterState, String... ids) { diff --git a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java index bec921bc5bf5d..cdd1c682b40dc 100644 --- a/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java +++ b/server/src/test/java/org/opensearch/nodesinfo/NodeInfoStreamingTests.java @@ -251,7 +251,7 @@ private static NodeInfo createNodeInfo() { for (int i = 0; i < numProcessors; i++) { processors.add(new org.opensearch.search.pipeline.ProcessorInfo(randomAlphaOfLengthBetween(3, 10))); } - searchPipelineInfo = new SearchPipelineInfo(processors); + searchPipelineInfo = new SearchPipelineInfo(Map.of(randomAlphaOfLengthBetween(3, 10), processors)); } return new NodeInfo( diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineInfoTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineInfoTests.java new file mode 100644 index 0000000000000..6eb137cb28e8f --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineInfoTests.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline; + +import org.opensearch.Version; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class SearchPipelineInfoTests extends OpenSearchTestCase { + public void testSerializationRoundtrip() throws IOException { + SearchPipelineInfo searchPipelineInfo = new SearchPipelineInfo( + Map.of( + "a", + List.of(new ProcessorInfo("a1"), new ProcessorInfo("a2"), new ProcessorInfo("a3")), + "b", + List.of(new ProcessorInfo("b1"), new ProcessorInfo("b2")), + "c", + List.of(new ProcessorInfo("c1")) + ) + ); + SearchPipelineInfo deserialized; + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + searchPipelineInfo.writeTo(bytesStreamOutput); + try (StreamInput bytesStreamInput = bytesStreamOutput.bytes().streamInput()) { + deserialized = new SearchPipelineInfo(bytesStreamInput); + } + } + assertTrue(deserialized.containsProcessor("a", "a1")); + assertTrue(deserialized.containsProcessor("a", "a2")); + assertTrue(deserialized.containsProcessor("a", "a3")); + assertTrue(deserialized.containsProcessor("b", "b1")); + assertTrue(deserialized.containsProcessor("b", "b2")); + assertTrue(deserialized.containsProcessor("c", "c1")); + } + + /** + * When serializing / deserializing to / from old versions, processor type info is lost. + * + * Also, we only supported request/response processors. + */ + public void testSerializationRoundtripBackcompat() throws IOException { + SearchPipelineInfo searchPipelineInfo = new SearchPipelineInfo( + Map.of( + Pipeline.REQUEST_PROCESSORS_KEY, + List.of(new ProcessorInfo("a1"), new ProcessorInfo("a2"), new ProcessorInfo("a3")), + Pipeline.RESPONSE_PROCESSORS_KEY, + List.of(new ProcessorInfo("b1"), new ProcessorInfo("b2")) + ) + ); + SearchPipelineInfo deserialized; + try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { + bytesStreamOutput.setVersion(Version.V_2_7_0); + searchPipelineInfo.writeTo(bytesStreamOutput); + try (StreamInput bytesStreamInput = bytesStreamOutput.bytes().streamInput()) { + bytesStreamInput.setVersion(Version.V_2_7_0); + deserialized = new SearchPipelineInfo(bytesStreamInput); + } + } + for (String proc : List.of("a1", "a2", "a3", "b1", "b2")) { + assertTrue(deserialized.containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, proc)); + assertTrue(deserialized.containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, proc)); + } + } +} diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 36978d5310810..516227e9a13d8 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -60,9 +60,13 @@ public class SearchPipelineServiceTests extends OpenSearchTestCase { private static final SearchPipelinePlugin DUMMY_PLUGIN = new SearchPipelinePlugin() { @Override - public Map getProcessors(Processor.Parameters parameters) { + public Map> getRequestProcessors(Processor.Parameters parameters) { return Map.of("foo", (factories, tag, description, config) -> null); } + + public Map> getResponseProcessors(Processor.Parameters parameters) { + return Map.of("bar", (factories, tag, description, config) -> null); + } }; private ThreadPool threadPool; @@ -89,9 +93,14 @@ public void testSearchPipelinePlugin() { client, false ); - Map factories = searchPipelineService.getProcessorFactories(); - assertEquals(1, factories.size()); - assertTrue(factories.containsKey("foo")); + Map> requestProcessorFactories = searchPipelineService + .getRequestProcessorFactories(); + assertEquals(1, requestProcessorFactories.size()); + assertTrue(requestProcessorFactories.containsKey("foo")); + Map> responseProcessorFactories = searchPipelineService + .getResponseProcessorFactories(); + assertEquals(1, responseProcessorFactories.size()); + assertTrue(responseProcessorFactories.containsKey("bar")); } public void testSearchPipelinePluginDuplicate() { @@ -235,8 +244,8 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp } private SearchPipelineService createWithProcessors() { - Map processors = new HashMap<>(); - processors.put("scale_request_size", (processorFactories, tag, description, config) -> { + Map> requestProcessors = new HashMap<>(); + requestProcessors.put("scale_request_size", (processorFactories, tag, description, config) -> { float scale = ((Number) config.remove("scale")).floatValue(); return new FakeRequestProcessor( "scale_request_size", @@ -245,11 +254,12 @@ private SearchPipelineService createWithProcessors() { req -> req.source().size((int) (req.source().size() * scale)) ); }); - processors.put("fixed_score", (processorFactories, tag, description, config) -> { + Map> responseProcessors = new HashMap<>(); + responseProcessors.put("fixed_score", (processorFactories, tag, description, config) -> { float score = ((Number) config.remove("score")).floatValue(); return new FakeResponseProcessor("fixed_score", tag, description, rsp -> rsp.getHits().forEach(h -> h.score(score))); }); - return createWithProcessors(processors); + return createWithProcessors(requestProcessors, responseProcessors); } @Override @@ -258,7 +268,10 @@ protected NamedWriteableRegistry writableRegistry() { return new NamedWriteableRegistry(searchModule.getNamedWriteables()); } - private SearchPipelineService createWithProcessors(Map processors) { + private SearchPipelineService createWithProcessors( + Map> requestProcessors, + Map> responseProcessors + ) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); @@ -274,8 +287,13 @@ private SearchPipelineService createWithProcessors(Map getProcessors(Processor.Parameters parameters) { - return processors; + public Map> getRequestProcessors(Processor.Parameters parameters) { + return requestProcessors; + } + + @Override + public Map> getResponseProcessors(Processor.Parameters parameters) { + return responseProcessors; } }), client, @@ -619,13 +637,14 @@ public void testValidatePipeline() throws Exception { XContentType.JSON ); + SearchPipelineInfo completePipelineInfo = new SearchPipelineInfo( + Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor), Pipeline.RESPONSE_PROCESSORS_KEY, List.of(rspProcessor)) + ); + SearchPipelineInfo incompletePipelineInfo = new SearchPipelineInfo(Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor))); // One node is missing a processor expectThrows( OpenSearchParseException.class, - () -> searchPipelineService.validatePipeline( - Map.of(n1, new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), n2, new SearchPipelineInfo(List.of(reqProcessor))), - putRequest - ) + () -> searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, incompletePipelineInfo), putRequest) ); // Discovery failed, no infos passed. @@ -644,27 +663,11 @@ public void testValidatePipeline() throws Exception { ); expectThrows( ClassCastException.class, - () -> searchPipelineService.validatePipeline( - Map.of( - n1, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), - n2, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)) - ), - badPutRequest - ) + () -> searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, completePipelineInfo), badPutRequest) ); // Success - searchPipelineService.validatePipeline( - Map.of( - n1, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)), - n2, - new SearchPipelineInfo(List.of(reqProcessor, rspProcessor)) - ), - putRequest - ); + searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo, n2, completePipelineInfo), putRequest); } /** @@ -717,7 +720,7 @@ public void testInlinePipeline() throws Exception { public void testInfo() { SearchPipelineService searchPipelineService = createWithProcessors(); SearchPipelineInfo info = searchPipelineService.info(); - assertTrue(info.containsProcessor("scale_request_size")); - assertTrue(info.containsProcessor("fixed_score")); + assertTrue(info.containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, "scale_request_size")); + assertTrue(info.containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, "fixed_score")); } }