From f810af25f23c37b999477e0c2be8ce581ab3448e Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 29 Aug 2017 13:01:24 -0700 Subject: [PATCH 1/8] convert searchrequest usage to Writeable --- .../action/search/SearchRequest.java | 48 +++++++++++-------- .../search/SearchRequestTests.java | 3 +- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index f0b315a461ef3..8130dcfb42572 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -109,6 +109,33 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) { this.source = source; } + /** + * Constructs a new search request from reading the specified stream. + * + * @param in The stream the request is read from + * @throws IOException if there is an issue reading the stream + */ + public SearchRequest(StreamInput in) throws IOException { + super(in); + searchType = SearchType.fromId(in.readByte()); + indices = new String[in.readVInt()]; + for (int i = 0; i < indices.length; i++) { + indices[i] = in.readString(); + } + routing = in.readOptionalString(); + preference = in.readOptionalString(); + scroll = in.readOptionalWriteable(Scroll::new); + source = in.readOptionalWriteable(SearchSourceBuilder::new); + types = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + requestCache = in.readOptionalBoolean(); + batchedReduceSize = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_5_6_0)) { + maxConcurrentShardRequests = in.readVInt(); + preFilterShardSize = in.readVInt(); + } + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -385,7 +412,7 @@ public String getDescription() { sb.append("], "); sb.append("search_type[").append(searchType).append("], "); if (source != null) { - + sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]"); } else { sb.append("source[]"); @@ -397,24 +424,7 @@ public String getDescription() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - searchType = SearchType.fromId(in.readByte()); - indices = new String[in.readVInt()]; - for (int i = 0; i < indices.length; i++) { - indices[i] = in.readString(); - } - routing = in.readOptionalString(); - preference = in.readOptionalString(); - scroll = in.readOptionalWriteable(Scroll::new); - source = in.readOptionalWriteable(SearchSourceBuilder::new); - types = in.readStringArray(); - indicesOptions = IndicesOptions.readIndicesOptions(in); - requestCache = in.readOptionalBoolean(); - batchedReduceSize = in.readVInt(); - if (in.getVersion().onOrAfter(Version.V_5_6_0)) { - maxConcurrentShardRequests = in.readVInt(); - preFilterShardSize = in.readVInt(); - } + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/core/src/test/java/org/elasticsearch/search/SearchRequestTests.java b/core/src/test/java/org/elasticsearch/search/SearchRequestTests.java index b83429e95e48a..d37b8b4b13392 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchRequestTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchRequestTests.java @@ -43,8 +43,7 @@ public void testSerialization() throws Exception { try (BytesStreamOutput output = new BytesStreamOutput()) { searchRequest.writeTo(output); try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) { - SearchRequest deserializedRequest = new SearchRequest(); - deserializedRequest.readFrom(in); + SearchRequest deserializedRequest = new SearchRequest(in); assertEquals(deserializedRequest, searchRequest); assertEquals(deserializedRequest.hashCode(), searchRequest.hashCode()); assertNotSame(deserializedRequest, searchRequest); From 272e5dca3e7074c3aab089be6b8654460bdddcba Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 29 Aug 2017 13:33:20 -0700 Subject: [PATCH 2/8] moar refactors --- .../search/TransportNoopSearchAction.java | 4 +-- .../action/search/MultiSearchRequest.java | 3 +- .../action/search/SearchTransportService.java | 36 +++++++++++-------- .../action/search/TransportSearchAction.java | 2 +- .../search/fetch/ShardFetchRequest.java | 34 ++++++++++-------- .../search/fetch/ShardFetchSearchRequest.java | 8 +++-- .../internal/InternalScrollSearchRequest.java | 10 ++++-- .../internal/ShardSearchTransportRequest.java | 14 +++++--- .../search/query/QuerySearchRequest.java | 12 ++++--- .../mustache/SearchTemplateRequest.java | 2 +- 10 files changed, 76 insertions(+), 49 deletions(-) diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java index 280e0b08f2c72..a6796c76f9279 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java @@ -42,8 +42,8 @@ public class TransportNoopSearchAction extends HandledTransportAction() { @Override public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception { @@ -298,7 +306,7 @@ public void messageReceived(ScrollFreeContextRequest request, TransportChannel c } }); TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); - transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, + transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception { @@ -318,7 +326,7 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportResponse.Empty.INSTANCE); - transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, + transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { @@ -346,7 +354,7 @@ public void onFailure(Exception e) { }); TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new); - transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, + transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { @@ -373,7 +381,7 @@ public void onFailure(Exception e) { }); TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception { @@ -383,7 +391,7 @@ public void messageReceived(QuerySearchRequest request, TransportChannel channel }); TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception { @@ -393,7 +401,7 @@ public void messageReceived(InternalScrollSearchRequest request, TransportChanne }); TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception { @@ -403,7 +411,7 @@ public void messageReceived(InternalScrollSearchRequest request, TransportChanne }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception { @@ -413,7 +421,7 @@ public void messageReceived(ShardFetchRequest request, TransportChannel channel, }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchSearchRequest::new, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception { @@ -424,7 +432,7 @@ public void messageReceived(ShardFetchSearchRequest request, TransportChannel ch TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); // this is super cheap and should not hit thread-pool rejections - transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, + transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index e1ae2a5a66866..795ccdd3f203a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -82,7 +82,7 @@ public TransportSearchAction(Settings settings, ThreadPool threadPool, Transport SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new); + super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest::new, indexNameExpressionResolver); this.searchPhaseController = searchPhaseController; this.searchTransportService = searchTransportService; this.remoteClusterService = searchTransportService.getRemoteClusterService(); diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java index dcea42e5ecb7f..df80d10c3ed07 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java @@ -56,6 +56,24 @@ public ShardFetchRequest(long id, IntArrayList list, ScoreDoc lastEmittedDoc) { this.lastEmittedDoc = lastEmittedDoc; } + public ShardFetchRequest(StreamInput in) throws IOException { + super(in); + id = in.readLong(); + size = in.readVInt(); + docIds = new int[size]; + for (int i = 0; i < size; i++) { + docIds[i] = in.readVInt(); + } + byte flag = in.readByte(); + if (flag == 1) { + lastEmittedDoc = Lucene.readFieldDoc(in); + } else if (flag == 2) { + lastEmittedDoc = Lucene.readScoreDoc(in); + } else if (flag != 0) { + throw new IOException("Unknown flag: " + flag); + } + } + public long id() { return id; } @@ -74,21 +92,7 @@ public ScoreDoc lastEmittedDoc() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readLong(); - size = in.readVInt(); - docIds = new int[size]; - for (int i = 0; i < size; i++) { - docIds[i] = in.readVInt(); - } - byte flag = in.readByte(); - if (flag == 1) { - lastEmittedDoc = Lucene.readFieldDoc(in); - } else if (flag == 2) { - lastEmittedDoc = Lucene.readScoreDoc(in); - } else if (flag != 0) { - throw new IOException("Unknown flag: " + flag); - } + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java index fdfc582c95295..1b953bf46c5f2 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java @@ -46,6 +46,11 @@ public ShardFetchSearchRequest(OriginalIndices originalIndices, long id, IntArra this.originalIndices = originalIndices; } + public ShardFetchSearchRequest(StreamInput in) throws IOException { + super(in); + originalIndices = OriginalIndices.readOriginalIndices(in); + } + @Override public String[] indices() { if (originalIndices == null) { @@ -64,8 +69,7 @@ public IndicesOptions indicesOptions() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - originalIndices = OriginalIndices.readOriginalIndices(in); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java b/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java index f112c97dd0f63..8cd25e1d5695c 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java @@ -44,6 +44,12 @@ public InternalScrollSearchRequest(SearchScrollRequest request, long id) { this.scroll = request.scroll(); } + public InternalScrollSearchRequest(StreamInput in) throws IOException { + super(in); + id = in.readLong(); + scroll = in.readOptionalWriteable(Scroll::new); + } + public long id() { return id; } @@ -59,9 +65,7 @@ public InternalScrollSearchRequest scroll(Scroll scroll) { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readLong(); - scroll = in.readOptionalWriteable(Scroll::new); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index 1c2ac0e4d179c..d85fb30439dc6 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -62,6 +62,13 @@ public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchReques this.originalIndices = originalIndices; } + public ShardSearchTransportRequest(StreamInput in) throws IOException { + super(in); + shardSearchLocalRequest = new ShardSearchLocalRequest(); + shardSearchLocalRequest.innerReadFrom(in); + originalIndices = OriginalIndices.readOriginalIndices(in); + } + public void searchType(SearchType searchType) { shardSearchLocalRequest.setSearchType(searchType); } @@ -144,13 +151,10 @@ public Scroll scroll() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - shardSearchLocalRequest = new ShardSearchLocalRequest(); - shardSearchLocalRequest.innerReadFrom(in); - originalIndices = OriginalIndices.readOriginalIndices(in); - + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java index 86a9c70dc0be1..1b44aecc6982e 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java @@ -52,6 +52,13 @@ public QuerySearchRequest(OriginalIndices originalIndices, long id, AggregatedDf this.originalIndices = originalIndices; } + public QuerySearchRequest(StreamInput in) throws IOException { + super(in); + id = in.readLong(); + dfs = readAggregatedDfs(in); + originalIndices = OriginalIndices.readOriginalIndices(in); + } + public long id() { return id; } @@ -72,10 +79,7 @@ public IndicesOptions indicesOptions() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readLong(); - dfs = readAggregatedDfs(in); - originalIndices = OriginalIndices.readOriginalIndices(in); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateRequest.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateRequest.java index 8ff30fb0e5b4e..b0186b7b0e3cf 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateRequest.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateRequest.java @@ -137,7 +137,7 @@ public ActionRequestValidationException validate() { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - request = in.readOptionalStreamable(SearchRequest::new); + request = in.readOptionalWriteable(SearchRequest::new); simulate = in.readBoolean(); explain = in.readBoolean(); profile = in.readBoolean(); From 12499c9073a2d1e0ced55999a7b962a234607346 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 29 Aug 2017 13:38:21 -0700 Subject: [PATCH 3/8] fix core tests --- .../action/search/SearchScrollRequest.java | 10 ++++++--- .../search/TransportSearchScrollAction.java | 4 ++-- .../search/SearchScrollRequestTests.java | 6 ++---- .../ShardSearchTransportRequestTests.java | 3 +-- .../hamcrest/ElasticsearchAssertions.java | 21 ++++++++++++++++++- 5 files changed, 32 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java index fbe648cceaa80..8cfa71d545148 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java @@ -48,6 +48,12 @@ public SearchScrollRequest(String scrollId) { this.scrollId = scrollId; } + public SearchScrollRequest(StreamInput in) throws IOException { + super(in); + scrollId = in.readString(); + scroll = in.readOptionalWriteable(Scroll::new); + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -100,9 +106,7 @@ public SearchScrollRequest scroll(String keepAlive) { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - scrollId = in.readString(); - scroll = in.readOptionalWriteable(Scroll::new); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java index e334b95180122..6f230c9bd8b89 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java @@ -45,8 +45,8 @@ public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, Tra ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, SearchTransportService searchTransportService, SearchPhaseController searchPhaseController) { - super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, - SearchScrollRequest::new); + super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, SearchScrollRequest::new, + indexNameExpressionResolver); this.clusterService = clusterService; this.searchTransportService = searchTransportService; this.searchPhaseController = searchPhaseController; diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchScrollRequestTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchScrollRequestTests.java index 6ec9f95f489de..f40819ec08958 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchScrollRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchScrollRequestTests.java @@ -46,8 +46,7 @@ public void testSerialization() throws Exception { try (BytesStreamOutput output = new BytesStreamOutput()) { searchScrollRequest.writeTo(output); try (StreamInput in = output.bytes().streamInput()) { - SearchScrollRequest deserializedRequest = new SearchScrollRequest(); - deserializedRequest.readFrom(in); + SearchScrollRequest deserializedRequest = new SearchScrollRequest(in); assertEquals(deserializedRequest, searchScrollRequest); assertEquals(deserializedRequest.hashCode(), searchScrollRequest.hashCode()); assertNotSame(deserializedRequest, searchScrollRequest); @@ -61,8 +60,7 @@ public void testInternalScrollSearchRequestSerialization() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { internalScrollSearchRequest.writeTo(output); try (StreamInput in = output.bytes().streamInput()) { - InternalScrollSearchRequest deserializedRequest = new InternalScrollSearchRequest(); - deserializedRequest.readFrom(in); + InternalScrollSearchRequest deserializedRequest = new InternalScrollSearchRequest(in); assertEquals(deserializedRequest.id(), internalScrollSearchRequest.id()); assertEquals(deserializedRequest.scroll(), internalScrollSearchRequest.scroll()); assertNotSame(deserializedRequest, internalScrollSearchRequest); diff --git a/core/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java b/core/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java index a16b91872d086..f68d3a3583503 100644 --- a/core/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java +++ b/core/src/test/java/org/elasticsearch/search/internal/ShardSearchTransportRequestTests.java @@ -61,8 +61,7 @@ public void testSerialization() throws Exception { try (BytesStreamOutput output = new BytesStreamOutput()) { shardSearchTransportRequest.writeTo(output); try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) { - ShardSearchTransportRequest deserializedRequest = new ShardSearchTransportRequest(); - deserializedRequest.readFrom(in); + ShardSearchTransportRequest deserializedRequest = new ShardSearchTransportRequest(in); assertEquals(deserializedRequest.scroll(), shardSearchTransportRequest.scroll()); assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter()); assertArrayEquals(deserializedRequest.indices(), shardSearchTransportRequest.indices()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index c1facb772c79d..8ea8a77746188 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -688,7 +689,12 @@ public static void assertVersionSerializable(Version version, Streamable streama input = new NamedWriteableAwareStreamInput(input, namedWriteableRegistry); } input.setVersion(version); - newInstance.readFrom(input); + // This is here since some Streamables are being converted into Writeables + // and the readFrom method throws an exception if called + Streamable newInstanceFromStream = tryCreateFromStream(streamable, input); + if (newInstanceFromStream == null) { + newInstance.readFrom(input); + } assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(), equalTo(0)); BytesReference newBytes = serialize(version, streamable); @@ -749,6 +755,19 @@ private static Streamable tryCreateNewInstance(Streamable streamable) throws NoS } } + private static Streamable tryCreateFromStream(Streamable streamable, StreamInput in) throws NoSuchMethodException, + InstantiationException, IllegalAccessException, InvocationTargetException { + try { + Class clazz = streamable.getClass(); + Constructor constructor = clazz.getConstructor(StreamInput.class); + assertThat(constructor, Matchers.notNullValue()); + Streamable newInstance = constructor.newInstance(in); + return newInstance; + } catch (Exception e) { + return null; + } + } + /** * Applies basic assertions on the SearchResponse. This method checks if all shards were successful, if * any of the shards threw an exception and if the response is serializable. From 0d38663da74af440f85db284d1a0dd630e0ea195 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 29 Aug 2017 15:15:34 -0700 Subject: [PATCH 4/8] respond to reviews --- .../action/search/SearchRequest.java | 44 +++++++++---------- .../action/search/SearchScrollRequest.java | 14 +++--- .../search/fetch/ShardFetchRequest.java | 38 ++++++++-------- .../search/fetch/ShardFetchSearchRequest.java | 12 ++--- .../internal/InternalScrollSearchRequest.java | 14 +++--- .../search/query/QuerySearchRequest.java | 16 +++---- .../hamcrest/ElasticsearchAssertions.java | 20 +++++++-- 7 files changed, 85 insertions(+), 73 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 8130dcfb42572..030d19d8b6879 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -136,6 +136,28 @@ public SearchRequest(StreamInput in) throws IOException { } } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeByte(searchType.id()); + out.writeVInt(indices.length); + for (String index : indices) { + out.writeString(index); + } + out.writeOptionalString(routing); + out.writeOptionalString(preference); + out.writeOptionalWriteable(scroll); + out.writeOptionalWriteable(source); + out.writeStringArray(types); + indicesOptions.writeIndicesOptions(out); + out.writeOptionalBoolean(requestCache); + out.writeVInt(batchedReduceSize); + if (out.getVersion().onOrAfter(Version.V_5_6_0)) { + out.writeVInt(maxConcurrentShardRequests); + out.writeVInt(preFilterShardSize); + } + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -427,28 +449,6 @@ public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeByte(searchType.id()); - out.writeVInt(indices.length); - for (String index : indices) { - out.writeString(index); - } - out.writeOptionalString(routing); - out.writeOptionalString(preference); - out.writeOptionalWriteable(scroll); - out.writeOptionalWriteable(source); - out.writeStringArray(types); - indicesOptions.writeIndicesOptions(out); - out.writeOptionalBoolean(requestCache); - out.writeVInt(batchedReduceSize); - if (out.getVersion().onOrAfter(Version.V_5_6_0)) { - out.writeVInt(maxConcurrentShardRequests); - out.writeVInt(preFilterShardSize); - } - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java index 8cfa71d545148..be83ef6d5839e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java @@ -54,6 +54,13 @@ public SearchScrollRequest(StreamInput in) throws IOException { scroll = in.readOptionalWriteable(Scroll::new); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(scrollId); + out.writeOptionalWriteable(scroll); + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -109,13 +116,6 @@ public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(scrollId); - out.writeOptionalWriteable(scroll); - } - @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { return new SearchTask(id, type, action, getDescription(), parentTaskId); diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java index df80d10c3ed07..ac71b84d54f34 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java @@ -74,6 +74,25 @@ public ShardFetchRequest(StreamInput in) throws IOException { } } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(id); + out.writeVInt(size); + for (int i = 0; i < size; i++) { + out.writeVInt(docIds[i]); + } + if (lastEmittedDoc == null) { + out.writeByte((byte) 0); + } else if (lastEmittedDoc instanceof FieldDoc) { + out.writeByte((byte) 1); + Lucene.writeFieldDoc(out, (FieldDoc) lastEmittedDoc); + } else { + out.writeByte((byte) 2); + Lucene.writeScoreDoc(out, lastEmittedDoc); + } + } + public long id() { return id; } @@ -95,25 +114,6 @@ public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeLong(id); - out.writeVInt(size); - for (int i = 0; i < size; i++) { - out.writeVInt(docIds[i]); - } - if (lastEmittedDoc == null) { - out.writeByte((byte) 0); - } else if (lastEmittedDoc instanceof FieldDoc) { - out.writeByte((byte) 1); - Lucene.writeFieldDoc(out, (FieldDoc) lastEmittedDoc); - } else { - out.writeByte((byte) 2); - Lucene.writeScoreDoc(out, lastEmittedDoc); - } - } - @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { return new SearchTask(id, type, action, getDescription(), parentTaskId); diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java index 1b953bf46c5f2..b81d9eded9cd6 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java @@ -51,6 +51,12 @@ public ShardFetchSearchRequest(StreamInput in) throws IOException { originalIndices = OriginalIndices.readOriginalIndices(in); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + OriginalIndices.writeOriginalIndices(originalIndices, out); + } + @Override public String[] indices() { if (originalIndices == null) { @@ -71,10 +77,4 @@ public IndicesOptions indicesOptions() { public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - OriginalIndices.writeOriginalIndices(originalIndices, out); - } } diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java b/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java index 8cd25e1d5695c..d1fba0f761526 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java @@ -50,6 +50,13 @@ public InternalScrollSearchRequest(StreamInput in) throws IOException { scroll = in.readOptionalWriteable(Scroll::new); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(id); + out.writeOptionalWriteable(scroll); + } + public long id() { return id; } @@ -68,13 +75,6 @@ public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeLong(id); - out.writeOptionalWriteable(scroll); - } - @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { return new SearchTask(id, type, action, getDescription(), parentTaskId); diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java index 1b44aecc6982e..c893ed93046f0 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java @@ -59,6 +59,14 @@ public QuerySearchRequest(StreamInput in) throws IOException { originalIndices = OriginalIndices.readOriginalIndices(in); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(id); + dfs.writeTo(out); + OriginalIndices.writeOriginalIndices(originalIndices, out); + } + public long id() { return id; } @@ -82,14 +90,6 @@ public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeLong(id); - dfs.writeTo(out); - OriginalIndices.writeOriginalIndices(originalIndices, out); - } - @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { return new SearchTask(id, type, action, getDescription(), parentTaskId); diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 8ea8a77746188..944a40cd2b2c8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -755,15 +755,27 @@ private static Streamable tryCreateNewInstance(Streamable streamable) throws NoS } } + /** + * This attemps to construct a new {@link Streamable} object that is in the process of + * being converted from {@link Streamable} to {@link Writeable}. Assuming this constructs + * the object successfully, #readFrom should not be called on the constructed object. + * + * @param streamable the object to retrieve the type of class to construct the new instance from + * @param in the stream to read the object from + * @return the newly constructed object from reading the stream + * @throws NoSuchMethodException + * @throws InstantiationException + * @throws IllegalAccessException + * @throws InvocationTargetException + */ private static Streamable tryCreateFromStream(Streamable streamable, StreamInput in) throws NoSuchMethodException, - InstantiationException, IllegalAccessException, InvocationTargetException { + InstantiationException, IllegalAccessException, InvocationTargetException { try { Class clazz = streamable.getClass(); Constructor constructor = clazz.getConstructor(StreamInput.class); assertThat(constructor, Matchers.notNullValue()); - Streamable newInstance = constructor.newInstance(in); - return newInstance; - } catch (Exception e) { + return constructor.newInstance(in); + } catch (NoSuchMethodException e) { return null; } } From 1e355d0ea64425114ffd30f544ea1fa536beaf14 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 29 Aug 2017 15:29:25 -0700 Subject: [PATCH 5/8] fix javadoc descriptions --- .../test/hamcrest/ElasticsearchAssertions.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 944a40cd2b2c8..0f7a6945eb0a7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -763,10 +763,13 @@ private static Streamable tryCreateNewInstance(Streamable streamable) throws NoS * @param streamable the object to retrieve the type of class to construct the new instance from * @param in the stream to read the object from * @return the newly constructed object from reading the stream - * @throws NoSuchMethodException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws InvocationTargetException + * @throws NoSuchMethodException if constuctor cannot be found + * @throws InstantiationException if the class represents an abstract class + * @throws IllegalAccessException if this {@code Constructor} object + * is enforcing Java language access control and the underlying + * constructor is inaccessible. + * @throws InvocationTargetException if the underlying constructor + * throws an exception. */ private static Streamable tryCreateFromStream(Streamable streamable, StreamInput in) throws NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { From e0b8e21fcb3e6364eddba6b59ccc457b34ce9256 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 29 Aug 2017 15:55:13 -0700 Subject: [PATCH 6/8] last migration of writeTo to be near the constructors --- .../action/search/SearchTransportService.java | 24 +++++++++---------- .../internal/ShardSearchTransportRequest.java | 15 ++++++------ 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 10fdea7883e2f..755bfe48410a0 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -208,6 +208,12 @@ static class ScrollFreeContextRequest extends TransportRequest { id = in.readLong(); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(id); + } + public long id() { return this.id; } @@ -216,12 +222,6 @@ public long id() { public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeLong(id); - } } static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest { @@ -240,6 +240,12 @@ static class SearchFreeContextRequest extends ScrollFreeContextRequest implement originalIndices = OriginalIndices.readOriginalIndices(in); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + OriginalIndices.writeOriginalIndices(originalIndices, out); + } + @Override public String[] indices() { if (originalIndices == null) { @@ -260,12 +266,6 @@ public IndicesOptions indicesOptions() { public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - OriginalIndices.writeOriginalIndices(originalIndices, out); - } } public static class SearchFreeContextResponse extends TransportResponse { diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index d85fb30439dc6..76a13b7b02f24 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -69,6 +69,13 @@ public ShardSearchTransportRequest(StreamInput in) throws IOException { originalIndices = OriginalIndices.readOriginalIndices(in); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardSearchLocalRequest.innerWriteTo(out, false); + OriginalIndices.writeOriginalIndices(originalIndices, out); + } + public void searchType(SearchType searchType) { shardSearchLocalRequest.setSearchType(searchType); } @@ -154,14 +161,6 @@ public void readFrom(StreamInput in) throws IOException { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - shardSearchLocalRequest.innerWriteTo(out, false); - OriginalIndices.writeOriginalIndices(originalIndices, out); - } - @Override public BytesReference cacheKey() throws IOException { return shardSearchLocalRequest.cacheKey(); From 1abf97ea08844584eb13a8b0a27ac441c1611bc2 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 29 Aug 2017 16:16:51 -0700 Subject: [PATCH 7/8] remove null assertion on constructor existance --- .../org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 0f7a6945eb0a7..bf2ffc5236e3f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -776,7 +776,6 @@ private static Streamable tryCreateFromStream(Streamable streamable, StreamInput try { Class clazz = streamable.getClass(); Constructor constructor = clazz.getConstructor(StreamInput.class); - assertThat(constructor, Matchers.notNullValue()); return constructor.newInstance(in); } catch (NoSuchMethodException e) { return null; From cb19601aee169fc3e9bae5c37a1c1df067e18a3a Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 30 Aug 2017 07:43:31 -0700 Subject: [PATCH 8/8] fix abstractrbulkscrollrequest serialization --- .../index/reindex/AbstractBulkByScrollRequest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 0355eeaee4788..ac700f9de7419 100644 --- a/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/core/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -399,8 +399,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId) @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - searchRequest = new SearchRequest(); - searchRequest.readFrom(in); + searchRequest = new SearchRequest(in); abortOnVersionConflict = in.readBoolean(); size = in.readVInt(); refresh = in.readBoolean();