Skip to content

Commit

Permalink
Coordinator can return partial results after the timeout when allow_p…
Browse files Browse the repository at this point in the history
…artial_search_results is true

Signed-off-by: kkewwei <[email protected]>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei committed Nov 24, 2024
1 parent c82cd2e commit 3e56548
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Coordinator can return partial results after the timeout when allow_partial_search_results is true ([#16681](https://github.com/opensearch-project/OpenSearch/pull/16681)).

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ public static void readMultiLineFormat(
searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null));
} else if ("phase_took".equals(entry.getKey())) {
searchRequest.setPhaseTook(nodeBooleanValue(value));
} else if ("coordinator_timeout".equals(entry.getKey())) {
searchRequest.setCoordinatorTimeout(nodeTimeValue(value));
} else {
throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section");
}
Expand Down Expand Up @@ -385,6 +387,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild
if (request.isPhaseTook() != null) {
xContentBuilder.field("phase_took", request.isPhaseTook());
}
if (request.getCoordinatorTimeout() != null) {
xContentBuilder.field("coordinator_timeout", request.getCoordinatorTimeout().getStringRep());
}
xContentBuilder.endObject();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Objects;

import static org.opensearch.action.ValidateActions.addValidationError;
import static org.opensearch.search.SearchService.NO_TIMEOUT;

/**
* A request to execute search against one or more indices (or all). Best created using
Expand Down Expand Up @@ -123,6 +124,9 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private Boolean phaseTook = null;

// it's only been used in coordinator, so we don't need to serialize/deserialize it
private TimeValue coordinatorTimeout = null;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
Expand Down Expand Up @@ -228,6 +232,7 @@ private SearchRequest(
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
this.phaseTook = searchRequest.phaseTook;
this.coordinatorTimeout = searchRequest.coordinatorTimeout;
}

/**
Expand Down Expand Up @@ -275,6 +280,7 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
phaseTook = in.readOptionalBoolean();
}
coordinatorTimeout = null;
}

@Override
Expand Down Expand Up @@ -341,6 +347,13 @@ public ActionRequestValidationException validate() {
if (source.aggregations() != null) {
validationException = source.aggregations().validate(validationException);
}
if (source.timeout() != null && coordinatorTimeout != null && source.timeout().compareTo(coordinatorTimeout) < 0) {
validationException = addValidationError(
"timeout [" + source.timeout() + "] cannot be smaller than coordinator timeout [" + coordinatorTimeout + "]",
validationException
);

}
}
if (pointInTimeBuilder() != null) {
if (scroll) {
Expand Down Expand Up @@ -711,9 +724,18 @@ public String pipeline() {
return pipeline;
}

public void setCoordinatorTimeout(TimeValue coordinatorTimeout) {
assert coordinatorTimeout != NO_TIMEOUT;
this.coordinatorTimeout = coordinatorTimeout;
}

public TimeValue getCoordinatorTimeout() {
return coordinatorTimeout;
}

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval, coordinatorTimeout);
}

public final String buildDescription() {
Expand Down Expand Up @@ -765,7 +787,8 @@ public boolean equals(Object o) {
&& ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips
&& Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval)
&& Objects.equals(pipeline, that.pipeline)
&& Objects.equals(phaseTook, that.phaseTook);
&& Objects.equals(phaseTook, that.phaseTook)
&& Objects.equals(coordinatorTimeout, that.coordinatorTimeout);
}

@Override
Expand All @@ -787,7 +810,8 @@ public int hashCode() {
absoluteStartMillis,
ccsMinimizeRoundtrips,
cancelAfterTimeInterval,
phaseTook
phaseTook,
coordinatorTimeout
);
}

Expand Down Expand Up @@ -832,6 +856,8 @@ public String toString() {
+ pipeline
+ ", phaseTook="
+ phaseTook
+ ", coordinatorTimeout="
+ coordinatorTimeout
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
private final TimeValue coordinatorTimeout;

public SearchTask(
long id,
Expand All @@ -62,7 +63,7 @@ public SearchTask(
TaskId parentTaskId,
Map<String, String> headers
) {
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT);
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, null);
}

public SearchTask(
Expand All @@ -72,10 +73,12 @@ public SearchTask(
Supplier<String> descriptionSupplier,
TaskId parentTaskId,
Map<String, String> headers,
TimeValue cancelAfterTimeInterval
TimeValue cancelAfterTimeInterval,
TimeValue coordinatorTimeout
) {
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
this.descriptionSupplier = descriptionSupplier;
this.coordinatorTimeout = coordinatorTimeout;
}

@Override
Expand Down Expand Up @@ -106,4 +109,8 @@ public final SearchProgressListener getProgressListener() {
public boolean shouldCancelChildrenOnCancellation() {
return true;
}

public TimeValue getCoordinatorTimeout() {
return coordinatorTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -172,7 +173,7 @@ public void createPitContext(
CREATE_READER_CONTEXT_ACTION_NAME,
request,
task,
TransportRequestOptions.EMPTY,
getTransportRequestOptions(task.getCoordinatorTimeout()),
new ActionListenerResponseHandler<>(actionListener, TransportCreatePitAction.CreateReaderContextResponse::new)
);
}
Expand All @@ -188,7 +189,7 @@ public void sendCanMatch(
QUERY_CAN_MATCH_NAME,
request,
task,
TransportRequestOptions.EMPTY,
getTransportRequestOptions(task.getCoordinatorTimeout()),
new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new)
);
}
Expand Down Expand Up @@ -228,6 +229,7 @@ public void sendExecuteDfs(
DFS_ACTION_NAME,
request,
task,
getTransportRequestOptions(task.getCoordinatorTimeout()),
new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -249,6 +251,7 @@ public void sendExecuteQuery(
QUERY_ACTION_NAME,
request,
task,
getTransportRequestOptions(task.getCoordinatorTimeout()),
new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -264,6 +267,7 @@ public void sendExecuteQuery(
QUERY_ID_ACTION_NAME,
request,
task,
getTransportRequestOptions(task.getCoordinatorTimeout()),
new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -279,6 +283,7 @@ public void sendExecuteScrollQuery(
QUERY_SCROLL_ACTION_NAME,
request,
task,
getTransportRequestOptions(task.getCoordinatorTimeout()),
new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand Down Expand Up @@ -328,6 +333,7 @@ private void sendExecuteFetch(
action,
request,
task,
getTransportRequestOptions(task.getCoordinatorTimeout()),
new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId())
);
}
Expand All @@ -342,10 +348,19 @@ void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, f
MultiSearchAction.NAME,
request,
task,
getTransportRequestOptions(task.getCoordinatorTimeout()),
new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId())
);
}

static TransportRequestOptions getTransportRequestOptions(TimeValue coordinatorTimeout) {
if (coordinatorTimeout != null) {
return TransportRequestOptions.builder().withTimeout(coordinatorTimeout).build();
} else {
return TransportRequestOptions.EMPTY;
}
}

public RemoteClusterService getRemoteClusterService() {
return transportService.getRemoteClusterService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public static MultiSearchRequest parseRequest(
multiRequest.add(searchRequest);
});
List<SearchRequest> requests = multiRequest.requests();
final TimeValue coordinatorTimeout = restRequest.paramAsTime("coordinator_timeout", null);
final TimeValue cancelAfterTimeInterval = restRequest.paramAsTime("cancel_after_time_interval", null);
for (SearchRequest request : requests) {
// preserve if it's set on the request
Expand All @@ -171,6 +172,9 @@ public static MultiSearchRequest parseRequest(
if (request.getCancelAfterTimeInterval() == null) {
request.setCancelAfterTimeInterval(cancelAfterTimeInterval);
}
if (request.getCoordinatorTimeout() == null) {
request.setCoordinatorTimeout(coordinatorTimeout);
}
}
return multiRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ public static void parseSearchRequest(
}

searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null));
searchRequest.setCoordinatorTimeout(request.paramAsTime("coordinator_timeout", null));
}

/**
Expand Down
Loading

0 comments on commit 3e56548

Please sign in to comment.