From 479aa19b84975b5f723848a6b1c20e82d6f65fc1 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 19 May 2023 09:02:27 -0400 Subject: [PATCH] Add RenameResponseProcessor for Search Pipelines (#7377) (#7635) * Adding RenameResponseProcessor * running spotlessApply * adding javadocs * addressing comments * fixing conflict in changelog * minor change to use OpenSearchTestCase for tests * renaming processor to RenameFieldResponseProcessor * updating changelog * updating processor name in yaml rest tests --------- (cherry picked from commit 30a1a439bac66b7d1ce17d35f8f815c5a5aebbd4) Signed-off-by: Sean Li Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- CHANGELOG.md | 1 + .../common/RenameFieldResponseProcessor.java | 151 ++++++++++++++++++ .../SearchPipelineCommonModulePlugin.java | 7 +- .../RenameFieldResponseProcessorTests.java | 126 +++++++++++++++ .../test/search_pipeline/10_basic.yml | 1 + .../search_pipeline/40_rename_response.yml | 151 ++++++++++++++++++ .../java/org/opensearch/search/SearchHit.java | 4 + 7 files changed, 440 insertions(+), 1 deletion(-) create mode 100644 modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java create mode 100644 modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java create mode 100644 modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/40_rename_response.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index e4422aa30817c..b85fb13b7452d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866)) - [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253)) - [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470)) +- [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377)) - Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244)) - Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237)) - Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375)) diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java new file mode 100644 index 0000000000000..3a2f0e9fb2492 --- /dev/null +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java @@ -0,0 +1,151 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.document.DocumentField; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.ingest.ConfigurationUtils; +import org.opensearch.search.SearchHit; +import org.opensearch.search.pipeline.Processor; +import org.opensearch.search.pipeline.SearchRequestProcessor; +import org.opensearch.search.pipeline.SearchResponseProcessor; + +import java.util.Map; + +/** + * This is a {@link SearchRequestProcessor} that renames a field before returning the search response + */ +public class RenameFieldResponseProcessor extends AbstractProcessor implements SearchResponseProcessor { + + private final String oldField; + private final String newField; + private final boolean ignoreMissing; + + /** + * Key to reference this processor type from a search pipeline. + */ + public static final String TYPE = "rename_field"; + + /** + * Constructor that takes a target field to rename and the new name + * + * @param tag processor tag + * @param description processor description + * @param oldField name of field to be renamed + * @param newField name of field that will replace the old field + * @param ignoreMissing if true, do not throw error if oldField does not exist within search response + */ + public RenameFieldResponseProcessor(String tag, String description, String oldField, String newField, boolean ignoreMissing) { + super(tag, description); + this.oldField = oldField; + this.newField = newField; + this.ignoreMissing = ignoreMissing; + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Getter function for oldField + * @return oldField + */ + public String getOldField() { + return oldField; + } + + /** + * Getter function for newField + * @return newField + */ + public String getNewField() { + return newField; + } + + /** + * Getter function for ignoreMissing + * @return ignoreMissing + */ + public boolean isIgnoreMissing() { + return ignoreMissing; + } + + @Override + public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception { + boolean foundField = false; + + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + Map fields = hit.getFields(); + if (fields.containsKey(oldField)) { + foundField = true; + DocumentField old = hit.removeDocumentField(oldField); + DocumentField newDocField = new DocumentField(newField, old.getValues()); + hit.setDocumentField(newField, newDocField); + } + + if (hit.hasSource()) { + BytesReference sourceRef = hit.getSourceRef(); + Tuple> typeAndSourceMap = XContentHelper.convertToMap( + sourceRef, + false, + (MediaType) null + ); + + Map sourceAsMap = typeAndSourceMap.v2(); + if (sourceAsMap.containsKey(oldField)) { + foundField = true; + Object val = sourceAsMap.remove(oldField); + sourceAsMap.put(newField, val); + + XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent()); + builder.map(sourceAsMap); + hit.sourceRef(BytesReference.bytes(builder)); + } + } + + if (!foundField && !ignoreMissing) { + throw new IllegalArgumentException("Document with id " + hit.getId() + " is missing field " + oldField); + } + } + + return response; + } + + /** + * This is a factor that creates the RenameResponseProcessor + */ + public static final class Factory implements Processor.Factory { + + /** + * Constructor for factory + */ + Factory() {} + + @Override + public RenameFieldResponseProcessor create( + Map processorFactories, + String tag, + String description, + Map config + ) throws Exception { + String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field"); + String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field"); + boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false); + return new RenameFieldResponseProcessor(tag, description, oldField, newField, ignoreMissing); + } + } +} diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java index caca753caf819..a0e5182f71443 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java @@ -26,6 +26,11 @@ public SearchPipelineCommonModulePlugin() {} @Override public Map getProcessors(Processor.Parameters parameters) { - return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry)); + return Map.of( + FilterQueryRequestProcessor.TYPE, + new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry), + RenameFieldResponseProcessor.TYPE, + new RenameFieldResponseProcessor.Factory() + ); } } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java new file mode 100644 index 0000000000000..a2fc7f6acfa7c --- /dev/null +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a.java + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common; + +import org.apache.lucene.search.TotalHits; +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchResponseSections; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.document.DocumentField; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.ingest.RandomDocumentPicks; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class RenameFieldResponseProcessorTests extends OpenSearchTestCase { + + private SearchRequest createDummyRequest() { + QueryBuilder query = new TermQueryBuilder("field", "value"); + SearchSourceBuilder source = new SearchSourceBuilder().query(query); + return new SearchRequest().source(source); + } + + private SearchResponse createTestResponse(int size, boolean includeMapping) { + SearchHit[] hits = new SearchHit[size]; + for (int i = 0; i < size; i++) { + Map searchHitFields = new HashMap<>(); + if (includeMapping) { + searchHitFields.put("field " + i, new DocumentField("value " + i, Collections.emptyList())); + } + searchHitFields.put("field " + i, new DocumentField("value " + i, Collections.emptyList())); + hits[i] = new SearchHit(i, "doc " + i, searchHitFields, Collections.emptyMap()); + hits[i].sourceRef(new BytesArray("{ \"field " + i + "\" : \"value " + i + "\" }")); + hits[i].score(i); + } + SearchHits searchHits = new SearchHits(hits, new TotalHits(size * 2L, TotalHits.Relation.EQUAL_TO), size); + SearchResponseSections searchResponseSections = new SearchResponseSections(searchHits, null, null, false, false, null, 0); + return new SearchResponse(searchResponseSections, null, 1, 1, 0, 10, null, null); + } + + public void testRenameResponse() throws Exception { + SearchRequest request = createDummyRequest(); + + RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor( + null, + null, + "field 0", + "new field", + false + ); + SearchResponse response = createTestResponse(2, false); + SearchResponse renameResponse = renameFieldResponseProcessor.processResponse(request, createTestResponse(5, false)); + + assertNotEquals(response.getHits(), renameResponse.getHits()); + } + + public void testRenameResponseWithMapping() throws Exception { + SearchRequest request = createDummyRequest(); + + RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor( + null, + null, + "field 0", + "new field", + true + ); + SearchResponse response = createTestResponse(5, true); + SearchResponse renameResponse = renameFieldResponseProcessor.processResponse(request, createTestResponse(5, true)); + + assertNotEquals(response.getHits(), renameResponse.getHits()); + + boolean foundField = false; + for (SearchHit hit : renameResponse.getHits().getHits()) { + if (hit.getFields().containsKey("new field")) { + foundField = true; + } + } + assertTrue(foundField); + } + + public void testMissingField() throws Exception { + SearchRequest request = createDummyRequest(); + RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor( + null, + null, + "field", + "new field", + false + ); + assertThrows( + IllegalArgumentException.class, + () -> renameFieldResponseProcessor.processResponse(request, createTestResponse(3, true)) + ); + } + + public void testFactory() throws Exception { + String oldField = RandomDocumentPicks.randomFieldName(random()); + String newField = RandomDocumentPicks.randomFieldName(random()); + Map config = new HashMap<>(); + config.put("field", oldField); + config.put("target_field", newField); + + RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory(); + RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config); + assertEquals(processor.getType(), "rename_field"); + assertEquals(processor.getOldField(), oldField); + assertEquals(processor.getNewField(), newField); + assertFalse(processor.isIgnoreMissing()); + + expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap())); + } +} diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml index 473d92aa18052..0d931f8587664 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/10_basic.yml @@ -13,3 +13,4 @@ - contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } } - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } } + - contains: { nodes.$cluster_manager.search_pipelines.processors: { type: rename_field } } diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/40_rename_response.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/40_rename_response.yml new file mode 100644 index 0000000000000..3b705f9bd5356 --- /dev/null +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/40_rename_response.yml @@ -0,0 +1,151 @@ +--- +teardown: + - do: + search_pipeline.delete: + id: "my_pipeline" + ignore: 404 + +--- +"Test filter_query processor": + - do: + search_pipeline.put: + id: "my_pipeline" + body: > + { + "description": "test pipeline", + "response_processors": [ + { + "rename_field": + { + "field": "a", + "target_field": "b" + } + } + ] + } + - match: { acknowledged: true } + + - do: + search_pipeline.put: + id: "my_pipeline_2" + body: > + { + "description": "test pipeline with ignore missing true", + "response_processors": [ + { + "rename_field": + { + "field": "aa", + "target_field": "b", + "ignore_missing": true + } + } + ] + } + - match: { acknowledged: true } + + - do: + search_pipeline.put: + id: "my_pipeline_3" + body: > + { + "description": "test pipeline with ignore missing false", + "response_processors": [ + { + "rename_field": + { + "field": "aa", + "target_field": "b", + "ignore_missing": false + } + } + ] + } + - match: { acknowledged: true } + + - do: + indices.create: + index: test + + - do: + indices.put_mapping: + index: test + body: + properties: + a: + type: keyword + store: true + doc_values: true + + - do: + index: + index: test + id: 1 + body: { + "a": "foo" + } + + - do: + indices.refresh: + index: test + + - do: + search: + body: { } + - match: { hits.total.value: 1 } + + - do: + search: + index: test + search_pipeline: "my_pipeline" + body: { } + - match: { hits.total.value: 1 } + - match: { hits.hits.0._source: { "b": "foo" } } + + # Should also work with no search body specified + - do: + search: + index: test + search_pipeline: "my_pipeline" + - match: { hits.total.value: 1 } + - match: { hits.hits.0._source: { "b": "foo" } } + + # Pipeline with ignore_missing set to true + # Should still pass even though index does not contain field + - do: + search: + index: test + search_pipeline: "my_pipeline_2" + - match: { hits.total.value: 1 } + - match: {hits.hits.0._source: { "a": "foo" } } + + # Pipeline with ignore_missing set to true + # Should still pass even though index does not contain field + - do: + catch: bad_request + search: + index: test + search_pipeline: "my_pipeline_3" + - match: { error.type: "illegal_argument_exception" } + + # No source, using stored_fields + - do: + search: + index: test + search_pipeline: "my_pipeline" + body: { + "_source": false, + "stored_fields": [ "a" ] + } + - match: { hits.hits.0.fields: { "b": ["foo"] } } + + # No source, using docvalue_fields + - do: + search: + index: test + search_pipeline: "my_pipeline" + body: { + "_source": false, + "docvalue_fields": [ "a" ] + } + - match: { hits.hits.0.fields: { "b": [ "foo" ] } } diff --git a/server/src/main/java/org/opensearch/search/SearchHit.java b/server/src/main/java/org/opensearch/search/SearchHit.java index 8c1018f8dff4e..f7b8776bc0b49 100644 --- a/server/src/main/java/org/opensearch/search/SearchHit.java +++ b/server/src/main/java/org/opensearch/search/SearchHit.java @@ -481,6 +481,10 @@ public void setDocumentField(String fieldName, DocumentField field) { this.documentFields.put(fieldName, field); } + public DocumentField removeDocumentField(String fieldName) { + return documentFields.remove(fieldName); + } + /** * A map of hit fields (from field name to hit fields) if additional fields * were required to be loaded.