Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SplitResponseProcessor to Search Pipelines #14800

Merged
merged 5 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
TruncateHitsResponseProcessor.TYPE,
new TruncateHitsResponseProcessor.Factory(),
CollapseResponseProcessor.TYPE,
new CollapseResponseProcessor.Factory()
new CollapseResponseProcessor.Factory(),
SplitResponseProcessor.TYPE,
new SplitResponseProcessor.Factory()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

/**
* Processor that sorts an array of items.
* Throws exception is the specified field is not an array.
*/
public class SplitResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/** Key to reference this processor type from a search pipeline. */
public static final String TYPE = "split";
/** Key defining the string field to be split. */
public static final String SPLIT_FIELD = "field";
/** Key defining the delimiter used to split the string. This can be a regular expression pattern. */
public static final String SEPARATOR = "separator";
/** Optional key for handling empty trailing fields. */
public static final String PRESERVE_TRAILING = "preserve_trailing";
/** Optional key to put the split values in a different field. */
public static final String TARGET_FIELD = "target_field";

private final String splitField;
private final String separator;
private final boolean preserveTrailing;
private final String targetField;

SplitResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String splitField,
String separator,
boolean preserveTrailing,
String targetField
) {
super(tag, description, ignoreFailure);
this.splitField = Objects.requireNonNull(splitField);
this.separator = Objects.requireNonNull(separator);
this.preserveTrailing = preserveTrailing;
this.targetField = targetField == null ? splitField : targetField;
}

/**
* Getter function for splitField
* @return sortField
*/
public String getSplitField() {
return splitField;
}

/**
* Getter function for separator
* @return separator
*/
public String getSeparator() {
return separator;
}

/**
* Getter function for preserveTrailing
* @return preserveTrailing;
*/
public boolean isPreserveTrailing() {
return preserveTrailing;
}

/**
* Getter function for targetField
* @return targetField
*/
public String getTargetField() {
return targetField;
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, DocumentField> fields = hit.getFields();
if (fields.containsKey(splitField)) {
DocumentField docField = hit.getFields().get(splitField);
if (docField == null) {
throw new IllegalArgumentException("field [" + splitField + "] is null, cannot split.");
}
Object val = docField.getValue();
if (val == null || !String.class.isAssignableFrom(val.getClass())) {
throw new IllegalArgumentException("field [" + splitField + "] is not a string, cannot split");
}
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
String[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also check for if (val instanceof String) { before L117

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also check for if (val instanceof String) { before L117

We do:

if (val == null || !String.class.isAssignableFrom(val.getClass()))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if (!(val instanceof String)) would be more performant :|

hit.setDocumentField(targetField, new DocumentField(targetField, Arrays.asList(strings)));
}
if (hit.hasSource()) {
BytesReference sourceRef = hit.getSourceRef();
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
sourceRef,
false,
(MediaType) null
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(splitField)) {
Object val = sourceAsMap.get(splitField);
if (val instanceof String) {
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A method for splitting string to avoid duplication. Can be used here and on L117

private String[] splitString(String stringVal) {
    return stringVal.split(separator, preserveTrailing ? -1 : 0);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That already is a single method. It also matches exactly the ingest split processor implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it needs to return Object[] and you'd need to either take Object arg or pre-cast it when calling that method.

sourceAsMap.put(targetField, Arrays.asList(strings));
}
XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
builder.map(sourceAsMap);
hit.sourceRef(BytesReference.bytes(builder));
}
}
}
return response;
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public SplitResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String splitField = ConfigurationUtils.readStringProperty(TYPE, tag, config, SPLIT_FIELD);
String separator = ConfigurationUtils.readStringProperty(TYPE, tag, config, SEPARATOR);
boolean preserveTrailing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, PRESERVE_TRAILING, false);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD, splitField);
return new SplitResponseProcessor(tag, description, ignoreFailure, splitField, separator, preserveTrailing, targetField);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testAllowlistNotSpecified() throws IOException {
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
assertEquals(Set.of("oversample", "filter_query", "script"), plugin.getRequestProcessors(createParameters(settings)).keySet());
assertEquals(
Set.of("rename_field", "truncate_hits", "collapse"),
Set.of("rename_field", "truncate_hits", "collapse", "split"),
plugin.getResponseProcessors(createParameters(settings)).keySet()
);
assertEquals(Set.of(), plugin.getSearchPhaseResultsProcessors(createParameters(settings)).keySet());
Expand Down
Loading
Loading