Skip to content

Commit

Permalink
Handle default search pipeline for multiple indices (#13276) (#13454)
Browse files Browse the repository at this point in the history
(cherry picked from commit defbd60)

Signed-off-by: Owais Kazi <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent cbcc150 commit 5fe3568
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))
- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225))
- [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386))
- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ private void executeRequest(
PipelinedRequest searchRequest;
ActionListener<SearchResponse> listener;
try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest, indexNameExpressionResolver);
listener = searchRequest.transformResponseListener(updatedListener);
} catch (Exception e) {
updatedListener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
Expand All @@ -35,10 +36,12 @@
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.core.service.ReportingService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.gateway.GatewayService;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.ingest.ConfigurationUtils;
Expand All @@ -62,6 +65,8 @@
/**
* The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines
* against requests and responses.
*
* @opensearch.internal
*/
public class SearchPipelineService implements ClusterStateApplier, ReportingService<SearchPipelineInfo> {

Expand Down Expand Up @@ -360,7 +365,7 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat
return newState.build();
}

public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameExpressionResolver indexNameExpressionResolver) {
Pipeline pipeline = Pipeline.NO_OP_PIPELINE;

if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) {
Expand Down Expand Up @@ -390,14 +395,27 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
if (searchRequest.pipeline() != null) {
// Named pipeline specified for the request
pipelineId = searchRequest.pipeline();
} else if (state != null && searchRequest.indices() != null && searchRequest.indices().length == 1) {
// Check for index default pipeline
IndexMetadata indexMetadata = state.metadata().index(searchRequest.indices()[0]);
if (indexMetadata != null) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) {
pipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings);
} else if (state != null && searchRequest.indices() != null && searchRequest.indices().length != 0) {
try {
// Check for index default pipeline
Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest);
for (Index index : concreteIndices) {
IndexMetadata indexMetadata = state.metadata().index(index);
if (indexMetadata != null) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) {
String currentPipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings);
if (NOOP_PIPELINE_ID.equals(pipelineId)) {
pipelineId = currentPipelineId;
} else if (!pipelineId.equals(currentPipelineId)) {
pipelineId = NOOP_PIPELINE_ID;
break;
}
}
}
}
} catch (IndexNotFoundException e) {
logger.debug("Default pipeline not applied for {}", (Object) searchRequest.indices());
}
}
if (NOOP_PIPELINE_ID.equals(pipelineId) == false) {
Expand Down
Loading

0 comments on commit 5fe3568

Please sign in to comment.