Skip to content

Commit

Permalink
Cut over SearchPhaseResult to Writeable (elastic#41853)
Browse files Browse the repository at this point in the history
Relates to elastic#34389
  • Loading branch information
javanna authored May 8, 2019
1 parent 4e24ef1 commit 42050e9
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.elasticsearch.search;

import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;

/**
* This class is a base class for all search related results. It contains the shard target it
* was executed against, a shard index used to reference the result on the coordinating node
Expand All @@ -32,15 +34,22 @@
* across search phases to ensure the same point in time snapshot is used for querying and
* fetching etc.
*/
public abstract class SearchPhaseResult extends TransportResponse implements Streamable {
public abstract class SearchPhaseResult extends TransportResponse {

private SearchShardTarget searchShardTarget;
private int shardIndex = -1;
protected long requestId;

protected SearchPhaseResult() {

}

protected SearchPhaseResult(StreamInput in) throws IOException {
super(in);
}

/**
* Returns the results request ID that is used to reference the search context on the executing
* node
* Returns the results request ID that is used to reference the search context on the executing node
*/
public long getRequestId() {
return requestId;
Expand Down Expand Up @@ -79,4 +88,9 @@ public QuerySearchResult queryResult() {
* Returns the fetch result iff it's included in this response otherwise <code>null</code>
*/
public FetchSearchResult fetchResult() { return null; }

@Override
public final void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1093,22 +1093,17 @@ public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce
}

public static final class CanMatchResponse extends SearchPhaseResult {
private boolean canMatch;
private final boolean canMatch;

public CanMatchResponse(StreamInput in) throws IOException {
super(in);
this.canMatch = in.readBoolean();
}

public CanMatchResponse(boolean canMatch) {
this.canMatch = canMatch;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
canMatch = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,22 @@ public class DfsSearchResult extends SearchPhaseResult {
private ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap();
private int maxDoc;

public DfsSearchResult() {
}

public DfsSearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
requestId = in.readLong();
int termsSize = in.readVInt();
if (termsSize == 0) {
terms = EMPTY_TERMS;
} else {
terms = new Term[termsSize];
for (int i = 0; i < terms.length; i++) {
terms[i] = new Term(in.readString(), in.readBytesRef());
}
}
this.termStatistics = readTermStats(in, terms);
fieldStatistics = readFieldStats(in);

maxDoc = in.readVInt();
}

public DfsSearchResult(long id, SearchShardTarget shardTarget) {
Expand Down Expand Up @@ -86,26 +97,6 @@ public ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics() {
return fieldStatistics;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestId = in.readLong();
int termsSize = in.readVInt();
if (termsSize == 0) {
terms = EMPTY_TERMS;
} else {
terms = new Term[termsSize];
for (int i = 0; i < terms.length; i++) {
terms[i] = new Term(in.readString(), in.readBytesRef());
}
}
this.termStatistics = readTermStats(in, terms);
readFieldStats(in, fieldStatistics);


maxDoc = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -154,16 +145,9 @@ public static void writeSingleTermStats(StreamOutput out, TermStatistics termSt
}
}

public static ObjectObjectHashMap<String, CollectionStatistics> readFieldStats(StreamInput in) throws IOException {
return readFieldStats(in, null);
}

public static ObjectObjectHashMap<String, CollectionStatistics> readFieldStats(StreamInput in,
ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics) throws IOException {
static ObjectObjectHashMap<String, CollectionStatistics> readFieldStats(StreamInput in) throws IOException {
final int numFieldStatistics = in.readVInt();
if (fieldStatistics == null) {
fieldStatistics = HppcMaps.newNoNullKeysMap(numFieldStatistics);
}
ObjectObjectHashMap<String, CollectionStatistics> fieldStatistics = HppcMaps.newNoNullKeysMap(numFieldStatistics);
for (int i = 0; i < numFieldStatistics; i++) {
final String field = in.readString();
assert field != null;
Expand All @@ -178,7 +162,7 @@ public static ObjectObjectHashMap<String, CollectionStatistics> readFieldStats(S
return fieldStatistics;
}

public static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throws IOException {
static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throws IOException {
int termsStatsSize = in.readVInt();
final TermStatistics[] termStatistics;
if (termsStatsSize == 0) {
Expand All @@ -200,7 +184,6 @@ public static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throw
return termStatistics;
}


/*
* optional statistics are set to -1 in lucene by default.
* Since we are using var longs to encode values we add one to each value
Expand All @@ -211,7 +194,6 @@ public static long addOne(long value) {
return value + 1;
}


/*
* See #addOne this just subtracting one and asserts that the actual value
* is positive.
Expand All @@ -220,5 +202,4 @@ public static long subOne(long value) {
assert value >= 0;
return value - 1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public FetchSearchResult() {
}

public FetchSearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
requestId = in.readLong();
hits = new SearchHits(in);
}

public FetchSearchResult(long id, SearchShardTarget shardTarget) {
Expand Down Expand Up @@ -82,19 +84,6 @@ public int counterGetAndIncrement() {
return counter++;
}

public static FetchSearchResult readFetchSearchResult(StreamInput in) throws IOException {
FetchSearchResult result = new FetchSearchResult();
result.readFrom(in);
return result;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestId = in.readLong();
hits = new SearchHits(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,15 @@

import java.io.IOException;

import static org.elasticsearch.search.fetch.FetchSearchResult.readFetchSearchResult;
import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult;

public final class QueryFetchSearchResult extends SearchPhaseResult {

private QuerySearchResult queryResult;
private FetchSearchResult fetchResult;

public QueryFetchSearchResult() {
}
private final QuerySearchResult queryResult;
private final FetchSearchResult fetchResult;

public QueryFetchSearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
queryResult = new QuerySearchResult(in);
fetchResult = new FetchSearchResult(in);
}

public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) {
Expand Down Expand Up @@ -81,19 +77,6 @@ public FetchSearchResult fetchResult() {
return fetchResult;
}

public static QueryFetchSearchResult readQueryFetchSearchResult(StreamInput in) throws IOException {
QueryFetchSearchResult result = new QueryFetchSearchResult();
result.readFrom(in);
return result;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
queryResult = readQuerySearchResult(in);
fetchResult = readFetchSearchResult(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@

import java.io.IOException;

import static org.elasticsearch.search.fetch.QueryFetchSearchResult.readQueryFetchSearchResult;

public final class ScrollQueryFetchSearchResult extends SearchPhaseResult {

private QueryFetchSearchResult result;

public ScrollQueryFetchSearchResult() {
}
private final QueryFetchSearchResult result;

public ScrollQueryFetchSearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
SearchShardTarget searchShardTarget = new SearchShardTarget(in);
result = new QueryFetchSearchResult(in);
setSearchShardTarget(searchShardTarget);
}

public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) {
Expand Down Expand Up @@ -71,14 +69,6 @@ public FetchSearchResult fetchResult() {
return result.fetchResult();
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
SearchShardTarget searchShardTarget = new SearchShardTarget(in);
result = readQueryFetchSearchResult(in);
setSearchShardTarget(searchShardTarget);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public QuerySearchResult() {
}

public QuerySearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
long id = in.readLong();
readFromWithId(id, in);
}

public QuerySearchResult(long id, SearchShardTarget shardTarget) {
Expand Down Expand Up @@ -256,19 +258,6 @@ public boolean hasSearchContext() {
return hasScoreDocs || hasSuggestHits();
}

public static QuerySearchResult readQuerySearchResult(StreamInput in) throws IOException {
QuerySearchResult result = new QuerySearchResult();
result.readFrom(in);
return result;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
long id = in.readLong();
readFromWithId(id, in);
}

public void readFromWithId(long id, StreamInput in) throws IOException {
this.requestId = id;
from = in.readVInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@

import java.io.IOException;

import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult;

public final class ScrollQuerySearchResult extends SearchPhaseResult {

private QuerySearchResult result;

public ScrollQuerySearchResult() {
}
private final QuerySearchResult result;

public ScrollQuerySearchResult(StreamInput in) throws IOException {
readFrom(in);
super(in);
SearchShardTarget shardTarget = new SearchShardTarget(in);
result = new QuerySearchResult(in);
setSearchShardTarget(shardTarget);
}

public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) {
Expand All @@ -61,14 +59,6 @@ public QuerySearchResult queryResult() {
return result;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
SearchShardTarget shardTarget = new SearchShardTarget(in);
result = readQuerySearchResult(in);
setSearchShardTarget(shardTarget);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -418,11 +417,6 @@ public TestSearchPhaseResult(long id, DiscoveryNode node) {
this.node = node;
}

@Override
public void readFrom(StreamInput in) throws IOException {

}

@Override
public void writeTo(StreamOutput out) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private static QuerySearchResult createTestInstance() throws Exception {
public void testSerialization() throws Exception {
QuerySearchResult querySearchResult = createTestInstance();
Version version = VersionUtils.randomVersion(random());
QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId());
assertNull(deserialized.getSearchShardTarget());
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
Expand Down

0 comments on commit 42050e9

Please sign in to comment.