Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search pipeline] Add script processor #7607

Merged
merged 4 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470))
- [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377))
- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597))
- [Search Pipelines] Add script processor ([#7607](https://github.com/opensearch-project/OpenSearch/pull/7607))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
- do:
scripts_painless_context: {}
- match: { contexts.0: aggregation_selector}
- match: { contexts.22: update}
- match: { contexts.23: update}
---

"Action to get all API values for score context":
Expand Down
2 changes: 2 additions & 0 deletions modules/search-pipeline-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ apply plugin: 'opensearch.internal-cluster-test'
opensearchplugin {
description 'Module for search pipeline processors that do not require additional security permissions or have large dependencies and resources'
classname 'org.opensearch.search.pipeline.common.SearchPipelineCommonModulePlugin'
extendedPlugins = ['lang-painless']
}

dependencies {
compileOnly project(':modules:lang-painless')
}

restResources {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public class FilterQueryRequestProcessor extends AbstractProcessor implements Se

final QueryBuilder filterQuery;

/**
* Returns the type of the processor.
*
* @return The processor type.
*/
@Override
public String getType() {
return TYPE;
Expand All @@ -57,6 +62,14 @@ public FilterQueryRequestProcessor(String tag, String description, QueryBuilder
this.filterQuery = filterQuery;
}

/**
* Modifies the search request by adding a filtered query to the existing query, if any, and sets it as the new query
* in the search request's SearchSourceBuilder.
*
* @param request The search request to be processed.
* @return The modified search request.
* @throws Exception if an error occurs while processing the request.
*/
@Override
public SearchRequest processRequest(SearchRequest request) throws Exception {
QueryBuilder originalQuery = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;

import org.opensearch.common.Nullable;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;

import org.opensearch.script.Script;
import org.opensearch.script.ScriptException;
import org.opensearch.script.ScriptService;
import org.opensearch.script.ScriptType;
import org.opensearch.script.SearchScript;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;

import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that evaluates a script with a search request in its context
* and then returns the modified search request.
*/
public final class ScriptRequestProcessor extends AbstractProcessor implements SearchRequestProcessor {
/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "script";

private final Script script;
private final ScriptService scriptService;
private final SearchScript precompiledSearchScript;

/**
* Processor that evaluates a script with a search request in its context
*
* @param tag The processor's tag.
* @param description The processor's description.
* @param script The {@link Script} to execute.
* @param precompiledSearchScript The {@link Script} precompiled
* @param scriptService The {@link ScriptService} used to execute the script.
*/
ScriptRequestProcessor(
String tag,
String description,
Script script,
@Nullable SearchScript precompiledSearchScript,
ScriptService scriptService
) {
super(tag, description);
this.script = script;
this.precompiledSearchScript = precompiledSearchScript;
this.scriptService = scriptService;
}

/**
* Executes the script with the search request in context.
*
* @param request The search request passed into the script context.
* @return The modified search request.
* @throws Exception if an error occurs while processing the request.
*/
@Override
public SearchRequest processRequest(SearchRequest request) throws Exception {
// assert request is not null and source is not null
if (request == null || request.source() == null) {
throw new IllegalArgumentException("search request must not be null");
}
final SearchScript searchScript;
if (precompiledSearchScript == null) {
SearchScript.Factory factory = scriptService.compile(script, SearchScript.CONTEXT);
searchScript = factory.newInstance(script.getParams());
} else {
searchScript = precompiledSearchScript;
}
// execute the script with the search request in context
searchScript.execute(Map.of("_source", new SearchRequestMap(request)));
return request;
}

/**
* Returns the type of the processor.
*
* @return The processor type.
*/
@Override
public String getType() {
return TYPE;
}

/**
* Returns the script used by the processor.
*
* @return The script.
*/
Script getScript() {
return script;
}

/**
* Returns the precompiled search script used by the processor.
*
* @return The precompiled search script.
*/
SearchScript getPrecompiledSearchScript() {
return precompiledSearchScript;
}

/**
* Factory class for creating {@link ScriptRequestProcessor}.
*/
public static final class Factory implements Processor.Factory<SearchRequestProcessor> {
private final ScriptService scriptService;

/**
* Constructs a new Factory instance with the specified {@link ScriptService}.
*
* @param scriptService The {@link ScriptService} used to execute scripts.
*/
public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

/**
* Creates a new instance of {@link ScriptRequestProcessor}.
*
* @param registry The registry of processor factories.
* @param processorTag The processor's tag.
* @param description The processor's description.
* @param config The configuration options for the processor.
* @return The created {@link ScriptRequestProcessor} instance.
* @throws Exception if an error occurs during the creation process.
*/
@Override
public ScriptRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
try (
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
) {
Script script = Script.parse(parser);

Arrays.asList("id", "source", "inline", "lang", "params", "options").forEach(config::remove);

// verify script is able to be compiled before successfully creating processor.
SearchScript searchScript = null;
try {
final SearchScript.Factory factory = scriptService.compile(script, SearchScript.CONTEXT);
if (ScriptType.INLINE.equals(script.getType())) {
searchScript = factory.newInstance(script.getParams());
}
} catch (ScriptException e) {
throw newConfigurationException(TYPE, processorTag, null, e);
}
return new ScriptRequestProcessor(processorTag, description, script, searchScript, scriptService);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPi
*/
public SearchPipelineCommonModulePlugin() {}

/**
* Returns a map of processor factories.
*
* @param parameters The parameters required for creating the processor factories.
* @return A map of processor factories, where the keys are the processor types and the values are the corresponding factory instances.
*/
@Override
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry));
return Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
ScriptRequestProcessor.TYPE,
new ScriptRequestProcessor.Factory(parameters.scriptService)
);
}

@Override
Expand Down
Loading