-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Initial search pipelines implementation
This commit includes the basic features of search pipelines (see opensearch-project/search-processor#80). Search pipelines are modeled after ingest pipelines and provide a simple, clean API for components to modify search requests and responses. With this commit we can: 1. Can create, retrieve, update, and delete search pipelines. 2. Transform search requests and responses by explicitly referencing a pipeline. Later work will include: 1. Adding an index setting to specify a default search pipeline. 2. Allowing search pipelines to be defined within a search request (for development/testing purposes, akin to simulating an ingest pipeline). 3. Adding a collection of search pipeline processors to support common useful transformations. (Suggestions welcome!) Signed-off-by: Michael Froh <[email protected]>
- Loading branch information
Showing
53 changed files
with
3,269 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
apply plugin: 'opensearch.yaml-rest-test' | ||
apply plugin: 'opensearch.internal-cluster-test' | ||
|
||
opensearchplugin { | ||
description 'Module for search pipeline processors that do not require additional security permissions or have large dependencies and resources' | ||
classname 'org.opensearch.search.pipeline.common.SearchPipelineCommonModulePlugin' | ||
extendedPlugins = ['lang-painless'] | ||
} | ||
|
||
dependencies { | ||
compileOnly project(':modules:lang-painless') | ||
api project(':libs:opensearch-grok') | ||
api project(':libs:opensearch-dissect') | ||
} | ||
|
||
restResources { | ||
restApi { | ||
includeCore '_common', 'ingest', 'cluster', 'indices', 'index', 'bulk', 'nodes', 'get', 'update', 'cat', 'mget' | ||
} | ||
} | ||
|
||
testClusters.all { | ||
// Needed in order to test ingest pipeline templating: | ||
// (this is because the integTest node is not using default distribution, but only the minimal number of required modules) | ||
module ':modules:lang-mustache' | ||
} | ||
|
||
thirdPartyAudit.ignoreMissingClasses( | ||
// from log4j | ||
'org.osgi.framework.AdaptPermission', | ||
'org.osgi.framework.AdminPermission', | ||
'org.osgi.framework.Bundle', | ||
'org.osgi.framework.BundleActivator', | ||
'org.osgi.framework.BundleContext', | ||
'org.osgi.framework.BundleEvent', | ||
'org.osgi.framework.SynchronousBundleListener', | ||
'org.osgi.framework.wiring.BundleWire', | ||
'org.osgi.framework.wiring.BundleWiring' | ||
) |
13 changes: 13 additions & 0 deletions
13
...nternalClusterTest/java/org/opensearch/search/pipeline/common/SearchPipelineCommonIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.search.pipeline.common; | ||
|
||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
public class SearchPipelineCommonIT extends OpenSearchIntegTestCase {} |
129 changes: 129 additions & 0 deletions
129
...mmon/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.search.pipeline.common; | ||
|
||
import org.opensearch.action.search.SearchRequest; | ||
import org.opensearch.common.bytes.BytesReference; | ||
import org.opensearch.common.xcontent.LoggingDeprecationHandler; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.common.xcontent.json.JsonXContent; | ||
import org.opensearch.core.ParseField; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
import org.opensearch.index.query.BoolQueryBuilder; | ||
import org.opensearch.index.query.QueryBuilder; | ||
import org.opensearch.search.builder.SearchSourceBuilder; | ||
import org.opensearch.search.pipeline.Processor; | ||
import org.opensearch.search.pipeline.SearchRequestProcessor; | ||
|
||
import java.io.InputStream; | ||
import java.util.Map; | ||
|
||
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; | ||
|
||
/** | ||
* This is a {@link SearchRequestProcessor} that replaces the incoming query with a BooleanQuery | ||
* that MUST match the incoming query with a FILTER clause based on the configured query. | ||
*/ | ||
public class FilterQueryRequestProcessor implements SearchRequestProcessor { | ||
/** | ||
* Key to reference this processor type from a search pipeline. | ||
*/ | ||
public static final String TYPE = "filter_query"; | ||
|
||
private final String tag; | ||
private final String description; | ||
|
||
final QueryBuilder filterQuery; | ||
|
||
@Override | ||
public String getType() { | ||
return TYPE; | ||
} | ||
|
||
@Override | ||
public String getTag() { | ||
return tag; | ||
} | ||
|
||
@Override | ||
public String getDescription() { | ||
return description; | ||
} | ||
|
||
/** | ||
* @param tag processor tag | ||
* @param description processor description | ||
* @param filterQuery the query that will be added as a filter to incoming queries | ||
*/ | ||
public FilterQueryRequestProcessor(String tag, String description, QueryBuilder filterQuery) { | ||
this.tag = tag; | ||
this.description = description; | ||
this.filterQuery = filterQuery; | ||
} | ||
|
||
@Override | ||
public SearchRequest processRequest(SearchRequest request) throws Exception { | ||
QueryBuilder originalQuery = null; | ||
if (request.source() != null) { | ||
originalQuery = request.source().query(); | ||
} | ||
|
||
BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(filterQuery); | ||
if (originalQuery != null) { | ||
filteredQuery.must(originalQuery); | ||
} | ||
if (request.source() == null) { | ||
request.source(new SearchSourceBuilder()); | ||
} | ||
request.source().query(filteredQuery); | ||
return request; | ||
} | ||
|
||
static class Factory implements Processor.Factory { | ||
private final NamedXContentRegistry namedXContentRegistry; | ||
public static final ParseField QUERY_FIELD = new ParseField("query"); | ||
|
||
Factory(NamedXContentRegistry namedXContentRegistry) { | ||
this.namedXContentRegistry = namedXContentRegistry; | ||
} | ||
|
||
@Override | ||
public FilterQueryRequestProcessor create( | ||
Map<String, Processor.Factory> processorFactories, | ||
String tag, | ||
String description, | ||
Map<String, Object> config | ||
) throws Exception { | ||
try ( | ||
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config); | ||
InputStream stream = BytesReference.bytes(builder).streamInput(); | ||
XContentParser parser = XContentType.JSON.xContent() | ||
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream) | ||
) { | ||
XContentParser.Token token = parser.nextToken(); | ||
assert token == XContentParser.Token.START_OBJECT; | ||
String currentFieldName = null; | ||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { | ||
if (token == XContentParser.Token.FIELD_NAME) { | ||
currentFieldName = parser.currentName(); | ||
} else if (token == XContentParser.Token.START_OBJECT) { | ||
if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { | ||
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser)); | ||
} | ||
} | ||
} | ||
} | ||
throw new IllegalArgumentException( | ||
"Did not specify the " + QUERY_FIELD.getPreferredName() + " property in processor of type " + TYPE | ||
); | ||
} | ||
} | ||
} |
26 changes: 26 additions & 0 deletions
26
...src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.search.pipeline.common; | ||
|
||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.plugins.SearchPipelinesPlugin; | ||
import org.opensearch.search.pipeline.Processor; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* Plugin providing common search request/response processors for use in search pipelines. | ||
*/ | ||
public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPipelinesPlugin { | ||
|
||
@Override | ||
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) { | ||
return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry)); | ||
} | ||
} |
51 changes: 51 additions & 0 deletions
51
...src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.search.pipeline.common; | ||
|
||
import org.opensearch.action.search.SearchRequest; | ||
import org.opensearch.index.query.BoolQueryBuilder; | ||
import org.opensearch.index.query.QueryBuilder; | ||
import org.opensearch.index.query.TermQueryBuilder; | ||
import org.opensearch.search.builder.SearchSourceBuilder; | ||
import org.opensearch.test.AbstractBuilderTestCase; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase { | ||
|
||
public void testFilterQuery() throws Exception { | ||
QueryBuilder filterQuery = new TermQueryBuilder("field", "value"); | ||
FilterQueryRequestProcessor filterQueryRequestProcessor = new FilterQueryRequestProcessor(null, null, filterQuery); | ||
QueryBuilder incomingQuery = new TermQueryBuilder("text", "foo"); | ||
SearchSourceBuilder source = new SearchSourceBuilder().query(incomingQuery); | ||
SearchRequest request = new SearchRequest().source(source); | ||
SearchRequest transformedRequest = filterQueryRequestProcessor.processRequest(request); | ||
assertEquals(new BoolQueryBuilder().must(incomingQuery).filter(filterQuery), transformedRequest.source().query()); | ||
|
||
// Test missing incoming query | ||
request = new SearchRequest(); | ||
transformedRequest = filterQueryRequestProcessor.processRequest(request); | ||
assertEquals(new BoolQueryBuilder().filter(filterQuery), transformedRequest.source().query()); | ||
} | ||
|
||
public void testFactory() throws Exception { | ||
FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry()); | ||
FilterQueryRequestProcessor processor = factory.create( | ||
Collections.emptyMap(), | ||
null, | ||
null, | ||
Map.of("query", Map.of("term", Map.of("field", "value"))) | ||
); | ||
assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery); | ||
|
||
// Missing "query" parameter: | ||
expectThrows(IllegalArgumentException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap())); | ||
} | ||
} |
25 changes: 25 additions & 0 deletions
25
...tTest/java/org/opensearch/search/pipeline/common/SearchPipelineCommonYamlTestSuiteIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.search.pipeline.common; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.Name; | ||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; | ||
import org.opensearch.test.rest.yaml.ClientYamlTestCandidate; | ||
import org.opensearch.test.rest.yaml.OpenSearchClientYamlSuiteTestCase; | ||
|
||
public class SearchPipelineCommonYamlTestSuiteIT extends OpenSearchClientYamlSuiteTestCase { | ||
public SearchPipelineCommonYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { | ||
super(testCandidate); | ||
} | ||
|
||
@ParametersFactory | ||
public static Iterable<Object[]> parameters() throws Exception { | ||
return OpenSearchClientYamlSuiteTestCase.createParameters(); | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
...ipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
"Search pipeline common installed": | ||
- skip: | ||
reason: "contains is a newly added assertion" | ||
features: contains | ||
- do: | ||
cluster.state: {} | ||
|
||
# Get cluster-manager node id | ||
- set: { cluster_manager_node: cluster_manager } | ||
|
||
- do: | ||
nodes.info: {} | ||
|
||
- contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } } | ||
- contains: { nodes.$cluster_manager.search_pipeline.processors: { type: filter_query } } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.