Skip to content

Commit

Permalink
Add RenameResponseProcessor for Search Pipelines (opensearch-project#…
Browse files Browse the repository at this point in the history
…7377) (opensearch-project#7635)

* Adding RenameResponseProcessor



* running spotlessApply



* adding javadocs



* addressing comments



* fixing conflict in changelog



* minor change to use OpenSearchTestCase for tests



* renaming processor to RenameFieldResponseProcessor



* updating changelog



* updating processor name in yaml rest tests



---------


(cherry picked from commit 30a1a43)

Signed-off-by: Sean Li <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 7a6996b commit 479aa19
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))
- [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253))
- [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470))
- [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Map;

/**
* This is a {@link SearchRequestProcessor} that renames a field before returning the search response
*/
public class RenameFieldResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {

private final String oldField;
private final String newField;
private final boolean ignoreMissing;

/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "rename_field";

/**
* Constructor that takes a target field to rename and the new name
*
* @param tag processor tag
* @param description processor description
* @param oldField name of field to be renamed
* @param newField name of field that will replace the old field
* @param ignoreMissing if true, do not throw error if oldField does not exist within search response
*/
public RenameFieldResponseProcessor(String tag, String description, String oldField, String newField, boolean ignoreMissing) {
super(tag, description);
this.oldField = oldField;
this.newField = newField;
this.ignoreMissing = ignoreMissing;
}

@Override
public String getType() {
return TYPE;
}

/**
* Getter function for oldField
* @return oldField
*/
public String getOldField() {
return oldField;
}

/**
* Getter function for newField
* @return newField
*/
public String getNewField() {
return newField;
}

/**
* Getter function for ignoreMissing
* @return ignoreMissing
*/
public boolean isIgnoreMissing() {
return ignoreMissing;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
boolean foundField = false;

SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, DocumentField> fields = hit.getFields();
if (fields.containsKey(oldField)) {
foundField = true;
DocumentField old = hit.removeDocumentField(oldField);
DocumentField newDocField = new DocumentField(newField, old.getValues());
hit.setDocumentField(newField, newDocField);
}

if (hit.hasSource()) {
BytesReference sourceRef = hit.getSourceRef();
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
sourceRef,
false,
(MediaType) null
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(oldField)) {
foundField = true;
Object val = sourceAsMap.remove(oldField);
sourceAsMap.put(newField, val);

XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
builder.map(sourceAsMap);
hit.sourceRef(BytesReference.bytes(builder));
}
}

if (!foundField && !ignoreMissing) {
throw new IllegalArgumentException("Document with id " + hit.getId() + " is missing field " + oldField);
}
}

return response;
}

/**
* This is a factor that creates the RenameResponseProcessor
*/
public static final class Factory implements Processor.Factory {

/**
* Constructor for factory
*/
Factory() {}

@Override
public RenameFieldResponseProcessor create(
Map<String, Processor.Factory> processorFactories,
String tag,
String description,
Map<String, Object> config
) throws Exception {
String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
return new RenameFieldResponseProcessor(tag, description, oldField, newField, ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public SearchPipelineCommonModulePlugin() {}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry));
return Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
RenameFieldResponseProcessor.TYPE,
new RenameFieldResponseProcessor.Factory()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a.java
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.apache.lucene.search.TotalHits;
import org.opensearch.OpenSearchParseException;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchResponseSections;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.document.DocumentField;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.ingest.RandomDocumentPicks;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class RenameFieldResponseProcessorTests extends OpenSearchTestCase {

private SearchRequest createDummyRequest() {
QueryBuilder query = new TermQueryBuilder("field", "value");
SearchSourceBuilder source = new SearchSourceBuilder().query(query);
return new SearchRequest().source(source);
}

private SearchResponse createTestResponse(int size, boolean includeMapping) {
SearchHit[] hits = new SearchHit[size];
for (int i = 0; i < size; i++) {
Map<String, DocumentField> searchHitFields = new HashMap<>();
if (includeMapping) {
searchHitFields.put("field " + i, new DocumentField("value " + i, Collections.emptyList()));
}
searchHitFields.put("field " + i, new DocumentField("value " + i, Collections.emptyList()));
hits[i] = new SearchHit(i, "doc " + i, searchHitFields, Collections.emptyMap());
hits[i].sourceRef(new BytesArray("{ \"field " + i + "\" : \"value " + i + "\" }"));
hits[i].score(i);
}
SearchHits searchHits = new SearchHits(hits, new TotalHits(size * 2L, TotalHits.Relation.EQUAL_TO), size);
SearchResponseSections searchResponseSections = new SearchResponseSections(searchHits, null, null, false, false, null, 0);
return new SearchResponse(searchResponseSections, null, 1, 1, 0, 10, null, null);
}

public void testRenameResponse() throws Exception {
SearchRequest request = createDummyRequest();

RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
null,
null,
"field 0",
"new field",
false
);
SearchResponse response = createTestResponse(2, false);
SearchResponse renameResponse = renameFieldResponseProcessor.processResponse(request, createTestResponse(5, false));

assertNotEquals(response.getHits(), renameResponse.getHits());
}

public void testRenameResponseWithMapping() throws Exception {
SearchRequest request = createDummyRequest();

RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
null,
null,
"field 0",
"new field",
true
);
SearchResponse response = createTestResponse(5, true);
SearchResponse renameResponse = renameFieldResponseProcessor.processResponse(request, createTestResponse(5, true));

assertNotEquals(response.getHits(), renameResponse.getHits());

boolean foundField = false;
for (SearchHit hit : renameResponse.getHits().getHits()) {
if (hit.getFields().containsKey("new field")) {
foundField = true;
}
}
assertTrue(foundField);
}

public void testMissingField() throws Exception {
SearchRequest request = createDummyRequest();
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
null,
null,
"field",
"new field",
false
);
assertThrows(
IllegalArgumentException.class,
() -> renameFieldResponseProcessor.processResponse(request, createTestResponse(3, true))
);
}

public void testFactory() throws Exception {
String oldField = RandomDocumentPicks.randomFieldName(random());
String newField = RandomDocumentPicks.randomFieldName(random());
Map<String, Object> config = new HashMap<>();
config.put("field", oldField);
config.put("target_field", newField);

RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory();
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config);
assertEquals(processor.getType(), "rename_field");
assertEquals(processor.getOldField(), oldField);
assertEquals(processor.getNewField(), newField);
assertFalse(processor.isIgnoreMissing());

expectThrows(OpenSearchParseException.class, () -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@

- contains: { nodes.$cluster_manager.modules: { name: search-pipeline-common } }
- contains: { nodes.$cluster_manager.search_pipelines.processors: { type: filter_query } }
- contains: { nodes.$cluster_manager.search_pipelines.processors: { type: rename_field } }
Loading

0 comments on commit 479aa19

Please sign in to comment.