Skip to content

Commit

Permalink
Cleanup duplication and dead code in o.e.action.search (#101789)
Browse files Browse the repository at this point in the history
Removing some obvious code duplication and dead code found during
today's test failure fixing.
  • Loading branch information
original-brownbear authored Nov 6, 2023
1 parent 676e28e commit f3a4813
Show file tree
Hide file tree
Showing 24 changed files with 74 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,27 +228,7 @@ public final void run() {
skipShard(iterator);
}
if (shardsIts.size() > 0) {
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
// Fail-fast verification of all shards being available
for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if (missingShards.length() > 0) {
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
}
if (missingShards.length() > 0) {
// Status red - shard is missing all copies and would produce partial results for an index search
final String msg = "Search rejected due to missing shards ["
+ missingShards
+ "]. Consider using `allow_partial_search_results` setting to bypass this error.";
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
}
}
doCheckNoMissingShards(getName(), request, shardsIts);
Version version = request.minCompatibleShardNode();
if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) {
if (checkMinimumVersion(shardsIts) == false) {
Expand Down Expand Up @@ -434,7 +414,6 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
logger.debug(() -> format("%s shards failed for phase: [%s]", numShardFailures, currentPhase.getName()), cause);
}
onPhaseFailure(currentPhase, "Partial shards failure", null);
return;
} else {
int discrepancy = getNumShards() - successfulOps.get();
assert discrepancy > 0 : "discrepancy: " + discrepancy;
Expand All @@ -449,8 +428,8 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
);
}
onPhaseFailure(currentPhase, "Partial shards failure (" + discrepancy + " shards unavailable)", null);
return;
}
return;
}
if (logger.isTraceEnabled()) {
final String resultsFrom = results.getSuccessfulResults()
Expand Down Expand Up @@ -840,7 +819,7 @@ void executeNext(Runnable runnable, Thread originalThread) {
private static final class PendingExecutions {
private final int permits;
private int permitsTaken = 0;
private ArrayDeque<Runnable> queue = new ArrayDeque<>();
private final ArrayDeque<Runnable> queue = new ArrayDeque<>();

PendingExecutions(int permits) {
assert permits > 0 : "not enough permits: " + permits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -127,7 +126,7 @@ private static boolean assertSearchCoordinationThread() {
}

@Override
public void run() throws IOException {
public void run() {
assert assertSearchCoordinationThread();
checkNoMissingShards();
Version version = request.minCompatibleShardNode();
Expand Down Expand Up @@ -159,9 +158,7 @@ private void runCoordinatorRewritePhase() {
);
final ShardSearchRequest request = canMatchNodeRequest.createShardSearchRequest(buildShardLevelRequest(searchShardIterator));
if (searchShardIterator.prefiltered()) {
CanMatchShardResponse result = new CanMatchShardResponse(searchShardIterator.skip() == false, null);
result.setShardIndex(request.shardRequestIndex());
results.consumeResult(result, () -> {});
consumeResult(searchShardIterator.skip() == false, request);
continue;
}
boolean canMatch = true;
Expand All @@ -178,9 +175,7 @@ private void runCoordinatorRewritePhase() {
if (canMatch) {
matchedShardLevelRequests.add(searchShardIterator);
} else {
CanMatchShardResponse result = new CanMatchShardResponse(canMatch, null);
result.setShardIndex(request.shardRequestIndex());
results.consumeResult(result, () -> {});
consumeResult(false, request);
}
}
if (matchedShardLevelRequests.isEmpty()) {
Expand All @@ -190,29 +185,15 @@ private void runCoordinatorRewritePhase() {
}
}

private void consumeResult(boolean canMatch, ShardSearchRequest request) {
CanMatchShardResponse result = new CanMatchShardResponse(canMatch, null);
result.setShardIndex(request.shardRequestIndex());
results.consumeResult(result, () -> {});
}

private void checkNoMissingShards() {
assert assertSearchCoordinationThread();
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
// Fail-fast verification of all shards being available
for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if (missingShards.length() > 0) {
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
}
if (missingShards.length() > 0) {
// Status red - shard is missing all copies and would produce partial results for an index search
final String msg = "Search rejected due to missing shards ["
+ missingShards
+ "]. Consider using `allow_partial_search_results` setting to bypass this error.";
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
}
}
doCheckNoMissingShards(getName(), request, shardsIts);
}

private Map<SendingTarget, List<SearchShardIterator>> groupByNode(GroupShardsIterator<SearchShardIterator> shards) {
Expand Down Expand Up @@ -425,7 +406,7 @@ public void onFailure(Exception e) {
}

@Override
protected void doRun() throws IOException {
protected void doRun() {
CanMatchPreFilterSearchPhase.this.run();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
/**
* Parse the clear scroll response body into a new {@link ClearScrollResponse} object
*/
public static ClosePointInTimeResponse fromXContent(XContentParser parser) throws IOException {
public static ClosePointInTimeResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.search.vectors.KnnScoreDocQueryBuilder;
import org.elasticsearch.transport.Transport;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -71,7 +70,7 @@ final class DfsQueryPhase extends SearchPhase {
}

@Override
public void run() throws IOException {
public void run() {
// TODO we can potentially also consume the actual per shard results from the initial phase here in the aggregateDfs
// to free up memory early
final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
*/
public class MultiSearchRequest extends ActionRequest implements CompositeIndicesRequest {
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestSearchAction.class);
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal]" + " Specifying types in search requests is deprecated.";
public static final String FIRST_LINE_EMPTY_DEPRECATION_MESSAGE =
"support for empty first line before any action metadata in msearch API is deprecated "
+ "and will be removed in the next major version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,4 @@ public MultiSearchRequestBuilder setIndicesOptions(IndicesOptions indicesOptions
return this;
}

/**
* Sets how many search requests specified in this multi search requests are allowed to be ran concurrently.
*/
public MultiSearchRequestBuilder setMaxConcurrentSearchRequests(int maxConcurrentSearchRequests) {
request().maxConcurrentSearchRequests(maxConcurrentSearchRequests);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,16 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;

public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject {
private static final ParseField ID = new ParseField("id");

private static final ConstructingObjectParser<OpenPointInTimeResponse, Void> PARSER;

static {
PARSER = new ConstructingObjectParser<>("open_point_in_time", true, a -> new OpenPointInTimeResponse((String) a[0]));
PARSER.declareField(constructorArg(), (parser, context) -> parser.text(), ID, ObjectParser.ValueType.STRING);
}
private final String pointInTimeId;

public OpenPointInTimeResponse(String pointInTimeId) {
Expand Down Expand Up @@ -60,7 +49,4 @@ public String getPointInTimeId() {
return pointInTimeId;
}

public static OpenPointInTimeResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,15 @@ public class ParsedScrollId {

public static final String QUERY_AND_FETCH_TYPE = "queryAndFetch";

private final String source;

private final String type;

private final SearchContextIdForNode[] context;

ParsedScrollId(String source, String type, SearchContextIdForNode[] context) {
this.source = source;
ParsedScrollId(String type, SearchContextIdForNode[] context) {
this.type = type;
this.context = context;
}

public String getSource() {
return source;
}

public String getType() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ private record MergeResult(
private static class MergeTask {
private final List<SearchShard> emptyResults;
private QuerySearchResult[] buffer;
private long aggsBufferSize;
private final long aggsBufferSize;
private Runnable next;

private MergeTask(QuerySearchResult[] buffer, long aggsBufferSize, List<SearchShard> emptyResults, Runnable next) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.POST;
Expand All @@ -37,7 +36,7 @@ public List<Route> routes() {
}

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indices);
openRequest.indicesOptions(IndicesOptions.fromRequest(request, OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
public final class SearchContextId {
private final Map<ShardId, SearchContextIdForNode> shards;
private final Map<String, AliasFilter> aliasFilter;
private transient Set<ShardSearchContextId> contextIds;
private final transient Set<ShardSearchContextId> contextIds;

SearchContextId(Map<ShardId, SearchContextIdForNode> shards, Map<String, AliasFilter> aliasFilter) {
this.shards = shards;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.action.search;

import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.core.CheckedRunnable;

import java.io.IOException;
Expand Down Expand Up @@ -37,4 +38,28 @@ public void start() {
throw new UncheckedIOException(e);
}
}

static void doCheckNoMissingShards(String phaseName, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts) {
assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.allowPartialSearchResults() == false) {
final StringBuilder missingShards = new StringBuilder();
// Fail-fast verification of all shards being available
for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
if (shardRoutings.size() == 0) {
if (missingShards.isEmpty() == false) {
missingShards.append(", ");
}
missingShards.append(shardRoutings.shardId());
}
}
if (missingShards.isEmpty() == false) {
// Status red - shard is missing all copies and would produce partial results for an index search
final String msg = "Search rejected due to missing shards ["
+ missingShards
+ "]. Consider using `allow_partial_search_results` setting to bypass this error.";
throw new SearchPhaseExecutionException(phaseName, msg, null, ShardSearchFailure.EMPTY_ARRAY);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ private static void validateMergeSortValueFormats(Collection<? extends SearchPha
firstResult = false;
ulFormats = new boolean[formats.length];
for (int i = 0; i < formats.length; i++) {
ulFormats[i] = formats[i] == DocValueFormat.UNSIGNED_LONG_SHIFTED ? true : false;
ulFormats[i] = formats[i] == DocValueFormat.UNSIGNED_LONG_SHIFTED;
}
} else {
for (int i = 0; i < formats.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,6 @@ public SearchRequestBuilder setSearchType(SearchType searchType) {
return this;
}

/**
* The a string representation search type to execute, defaults to {@link SearchType#DEFAULT}. Can be
* one of "dfs_query_then_fetch" or "query_then_fetch".
*/
public SearchRequestBuilder setSearchType(String searchType) {
request.searchType(searchType);
return this;
}

/**
* If set, will enable scrolling of the search request.
*/
Expand Down Expand Up @@ -251,17 +242,6 @@ public SearchRequestBuilder seqNoAndPrimaryTerm(boolean seqNoAndPrimaryTerm) {
return this;
}

/**
* Sets the boost a specific index will receive when the query is executed against it.
*
* @param index The index to apply the boost against
* @param indexBoost The boost to apply to the index
*/
public SearchRequestBuilder addIndexBoost(String index, float indexBoost) {
sourceBuilder().indexBoost(index, indexBoost);
return this;
}

/**
* The stats groups this request will be aggregated under.
*/
Expand All @@ -270,14 +250,6 @@ public SearchRequestBuilder setStats(String... statsGroups) {
return this;
}

/**
* The stats groups this request will be aggregated under.
*/
public SearchRequestBuilder setStats(List<String> statsGroups) {
sourceBuilder().stats(statsGroups);
return this;
}

/**
* Indicates whether the response should contain the stored _source for every hit
*/
Expand Down
Loading

0 comments on commit f3a4813

Please sign in to comment.