From bf5d9a51c9360ce3c71998a6335c8e1be518a20a Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Fri, 26 Oct 2018 09:21:54 -0600 Subject: [PATCH] Responses can use Writeable.Reader interface (#34655) In order to remove Streamable from the codebase, Response objects need to be read using the Writeable.Reader interface which this change enables. This change enables the use of Writeable.Reader by adding the `Action#getResponseReader` method. The default implementation simply uses the existing `newResponse` method and the readFrom method. As responses are migrated to the Writeable.Reader interface, Action classes can be updated to throw an UnsupportedOperationException when `newResponse` is called and override the `getResponseReader` method. Relates #34389 --- .../netty4/Netty4ScheduledPingTests.java | 3 +- .../action/ActionListenerResponseHandler.java | 18 +- .../elasticsearch/action/ActionResponse.java | 7 + .../elasticsearch/action/GenericAction.java | 15 ++ .../action/TransportActionNodeProxy.java | 2 +- .../tasks/get/TransportGetTaskAction.java | 7 +- .../shards/ClusterSearchShardsAction.java | 8 +- .../shards/ClusterSearchShardsResponse.java | 60 +++---- .../TransportClusterSearchShardsAction.java | 9 +- .../action/ingest/IngestActionForwarder.java | 2 +- .../TransportResyncReplicationAction.java | 8 +- .../action/search/MultiSearchResponse.java | 4 + .../action/search/SearchTransportService.java | 37 ++-- .../broadcast/TransportBroadcastAction.java | 7 +- .../node/TransportBroadcastByNodeAction.java | 6 +- .../master/TransportMasterNodeAction.java | 45 +++-- .../support/nodes/TransportNodesAction.java | 8 +- .../TransportReplicationAction.java | 22 ++- ...ransportInstanceSingleOperationAction.java | 8 +- .../shard/TransportSingleShardAction.java | 13 +- .../support/tasks/TransportTasksAction.java | 6 +- .../TransportClientNodesService.java | 6 +- .../discovery/zen/MasterFaultDetection.java | 14 +- .../discovery/zen/NodesFaultDetection.java | 14 +- .../gateway/LocalAllocateDangledIndices.java | 6 +- .../indices/flush/SyncedFlushService.java | 18 +- .../recovery/PeerRecoveryTargetService.java | 7 +- .../RecoveryTranslogOperationsResponse.java | 6 +- .../elasticsearch/search/SearchService.java | 4 + .../search/dfs/DfsSearchResult.java | 4 + .../search/fetch/FetchSearchResult.java | 4 + .../search/fetch/QueryFetchSearchResult.java | 4 + .../fetch/ScrollQueryFetchSearchResult.java | 4 + .../search/query/QuerySearchResult.java | 4 + .../search/query/ScrollQuerySearchResult.java | 4 + .../EmptyTransportResponseHandler.java | 3 +- .../transport/RemoteClusterAwareClient.java | 2 +- .../transport/RemoteClusterConnection.java | 16 +- .../elasticsearch/transport/TcpTransport.java | 24 ++- .../elasticsearch/transport/Transport.java | 19 ++- .../transport/TransportActionProxy.java | 56 +++--- .../TransportChannelResponseHandler.java | 13 +- .../transport/TransportMessage.java | 13 ++ .../transport/TransportResponse.java | 19 +++ .../transport/TransportResponseHandler.java | 25 --- .../transport/TransportService.java | 24 ++- .../ClusterSearchShardsResponseTests.java | 3 +- .../search/TransportSearchActionTests.java | 2 +- .../TransportClientNodesServiceTests.java | 6 +- .../RemoteClusterConnectionTests.java | 12 +- .../transport/TransportActionProxyTests.java | 42 ++--- .../test/transport/CapturingTransport.java | 7 +- .../AbstractSimpleTransportTestCase.java | 160 ++++++++++-------- ...curityServerTransportInterceptorTests.java | 5 +- ...ServerTransportFilterIntegrationTests.java | 9 +- 55 files changed, 507 insertions(+), 347 deletions(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java index 3b34e323e47b5..9d2aa0d9e2add 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -103,7 +104,7 @@ public void testScheduledPing() throws Exception { TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new TransportResponseHandler() { @Override - public TransportResponse.Empty newInstance() { + public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } diff --git a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java index f258be3a16137..432cef6ad3029 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java @@ -19,13 +19,15 @@ package org.elasticsearch.action; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; +import java.io.IOException; import java.util.Objects; -import java.util.function.Supplier; /** * A simple base class for action response listeners, defaulting to using the SAME executor (as its @@ -34,11 +36,11 @@ public class ActionListenerResponseHandler implements TransportResponseHandler { private final ActionListener listener; - private final Supplier responseSupplier; + private final Writeable.Reader reader; - public ActionListenerResponseHandler(ActionListener listener, Supplier responseSupplier) { + public ActionListenerResponseHandler(ActionListener listener, Writeable.Reader reader) { this.listener = Objects.requireNonNull(listener); - this.responseSupplier = Objects.requireNonNull(responseSupplier); + this.reader = Objects.requireNonNull(reader); } @Override @@ -52,12 +54,12 @@ public void handleException(TransportException e) { } @Override - public Response newInstance() { - return responseSupplier.get(); + public String executor() { + return ThreadPool.Names.SAME; } @Override - public String executor() { - return ThreadPool.Names.SAME; + public Response read(StreamInput in) throws IOException { + return reader.read(in); } } diff --git a/server/src/main/java/org/elasticsearch/action/ActionResponse.java b/server/src/main/java/org/elasticsearch/action/ActionResponse.java index a1cd3068a269f..dd019ba3f5591 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ActionResponse.java @@ -30,6 +30,13 @@ */ public abstract class ActionResponse extends TransportResponse { + public ActionResponse() { + } + + public ActionResponse(StreamInput in) throws IOException { + super(in); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); diff --git a/server/src/main/java/org/elasticsearch/action/GenericAction.java b/server/src/main/java/org/elasticsearch/action/GenericAction.java index 6220a1b2062bf..4e03db95a4c31 100644 --- a/server/src/main/java/org/elasticsearch/action/GenericAction.java +++ b/server/src/main/java/org/elasticsearch/action/GenericAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.transport.TransportRequestOptions; @@ -45,9 +46,23 @@ public String name() { /** * Creates a new response instance. + * @deprecated Implement {@link #getResponseReader()} instead and make this method throw an + * {@link UnsupportedOperationException} */ + @Deprecated public abstract Response newResponse(); + /** + * Get a reader that can create a new instance of the class from a {@link org.elasticsearch.common.io.stream.StreamInput} + */ + public Writeable.Reader getResponseReader() { + return in -> { + Response response = newResponse(); + response.readFrom(in); + return response; + }; + } + /** * Optional request options for the action. */ diff --git a/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java b/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java index 2e7cbec93d9ae..dc42da765c6ee 100644 --- a/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java +++ b/server/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java @@ -48,6 +48,6 @@ public void execute(final DiscoveryNode node, final Request request, final Actio return; } transportService.sendRequest(node, action.name(), request, transportOptions, - new ActionListenerResponseHandler<>(listener, action::newResponse)); + new ActionListenerResponseHandler<>(listener, action.getResponseReader())); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 927ac2a9148ed..39648abdc01bd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -124,8 +125,10 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request, transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(), new TransportResponseHandler() { @Override - public GetTaskResponse newInstance() { - return new GetTaskResponse(); + public GetTaskResponse read(StreamInput in) throws IOException { + GetTaskResponse response = new GetTaskResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java index cb3240a7929b2..9aa5586d02bc8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.Action; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.io.stream.Writeable; public class ClusterSearchShardsAction extends Action { @@ -33,7 +34,12 @@ private ClusterSearchShardsAction() { @Override public ClusterSearchShardsResponse newResponse() { - return new ClusterSearchShardsResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return ClusterSearchShardsResponse::new; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java index 28c7903efde81..551f39156c868 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java @@ -39,36 +39,12 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], new DiscoveryNode[0], Collections.emptyMap()); - private ClusterSearchShardsGroup[] groups; - private DiscoveryNode[] nodes; - private Map indicesAndFilters; + private final ClusterSearchShardsGroup[] groups; + private final DiscoveryNode[] nodes; + private final Map indicesAndFilters; - public ClusterSearchShardsResponse() { - - } - - public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, - Map indicesAndFilters) { - this.groups = groups; - this.nodes = nodes; - this.indicesAndFilters = indicesAndFilters; - } - - public ClusterSearchShardsGroup[] getGroups() { - return groups; - } - - public DiscoveryNode[] getNodes() { - return nodes; - } - - public Map getIndicesAndFilters() { - return indicesAndFilters; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public ClusterSearchShardsResponse(StreamInput in) throws IOException { + super(in); groups = new ClusterSearchShardsGroup[in.readVInt()]; for (int i = 0; i < groups.length; i++) { groups[i] = ClusterSearchShardsGroup.readSearchShardsGroupResponse(in); @@ -85,9 +61,16 @@ public void readFrom(StreamInput in) throws IOException { AliasFilter aliasFilter = new AliasFilter(in); indicesAndFilters.put(index, aliasFilter); } + } else { + indicesAndFilters = null; } } + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -108,6 +91,25 @@ public void writeTo(StreamOutput out) throws IOException { } } + public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes, + Map indicesAndFilters) { + this.groups = groups; + this.nodes = nodes; + this.indicesAndFilters = indicesAndFilters; + } + + public ClusterSearchShardsGroup[] getGroups() { + return groups; + } + + public DiscoveryNode[] getNodes() { + return nodes; + } + + public Map getIndicesAndFilters() { + return indicesAndFilters; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java index 20ed69ae5a92f..82d27034ca7bb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -39,6 +40,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -72,7 +74,12 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C @Override protected ClusterSearchShardsResponse newResponse() { - return new ClusterSearchShardsResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected ClusterSearchShardsResponse read(StreamInput in) throws IOException { + return new ClusterSearchShardsResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java b/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java index 8b163eb1eedf8..7219efae5c1ea 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/IngestActionForwarder.java @@ -49,7 +49,7 @@ public IngestActionForwarder(TransportService transportService) { public void forwardIngestRequest(Action action, ActionRequest request, ActionListener listener) { transportService.sendRequest(randomIngestNode(), action.name(), request, - new ActionListenerResponseHandler(listener, action::newResponse)); + new ActionListenerResponseHandler(listener, action.getResponseReader())); } private DiscoveryNode randomIngestNode() { diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 6d0c35345b1fa..50d75b20dc82b 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.function.Consumer; import java.util.function.Supplier; @@ -151,8 +153,10 @@ public void sync(ResyncReplicationRequest request, Task parentTask, String prima transportOptions, new TransportResponseHandler() { @Override - public ResyncReplicationResponse newInstance() { - return newResponseInstance(); + public ResyncReplicationResponse read(StreamInput in) throws IOException { + ResyncReplicationResponse response = newResponseInstance(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index b5765a2380f7b..fffcdf5adbbee 100644 --- a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -133,6 +133,10 @@ public MultiSearchResponse(Item[] items) { this.items = items; } + MultiSearchResponse(StreamInput in) throws IOException { + readFrom(in); + } + @Override public Iterator iterator() { return Arrays.stream(items).iterator(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index e1f757aff3fbd..199f1104954c0 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.search.SearchPhaseResult; @@ -64,7 +65,6 @@ import java.util.HashMap; import java.util.Map; import java.util.function.BiFunction; -import java.util.function.Supplier; /** * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through @@ -132,7 +132,7 @@ public void sendCanMatch(Transport.Connection connection, final ShardSearchTrans public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener listener) { transportService.sendRequest(connection, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE, - TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, (in) -> TransportResponse.Empty.INSTANCE)); } public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task, @@ -146,11 +146,11 @@ public void sendExecuteQuery(Transport.Connection connection, final ShardSearchT // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request // this used to be the QUERY_AND_FETCH which doesn't exist anymore. final boolean fetchDocuments = request.numberOfShards() == 1; - Supplier supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; + Writeable.Reader reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; final ActionListener handler = responseWrapper.apply(connection, listener); transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task, - new ConnectionCountingHandler<>(handler, supplier, clientConnections, connection.getNode().getId())); + new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())); } public void sendExecuteQuery(Transport.Connection connection, final QuerySearchRequest request, SearchTask task, @@ -168,8 +168,8 @@ public void sendExecuteScrollQuery(Transport.Connection connection, final Intern public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task, - new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, - clientConnections, connection.getNode().getId())); + new ConnectionCountingHandler<>(listener, ScrollQueryFetchSearchResult::new, clientConnections, + connection.getNode().getId())); } public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSearchRequest request, SearchTask task, @@ -284,6 +284,10 @@ public static class SearchFreeContextResponse extends TransportResponse { SearchFreeContextResponse() { } + SearchFreeContextResponse(StreamInput in) throws IOException { + freed = in.readBoolean(); + } + SearchFreeContextResponse(boolean freed) { this.freed = freed; } @@ -314,9 +318,8 @@ public void messageReceived(ScrollFreeContextRequest request, TransportChannel c channel.sendResponse(new SearchFreeContextResponse(freed)); } }); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, - (Supplier) SearchFreeContextResponse::new); - transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, + TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); + transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception { @@ -324,8 +327,7 @@ public void messageReceived(SearchFreeContextRequest request, TransportChannel c channel.sendResponse(new SearchFreeContextResponse(freed)); } }); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, - (Supplier) SearchFreeContextResponse::new); + TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override @@ -335,7 +337,7 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha } }); TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, - () -> TransportResponse.Empty.INSTANCE); + (in) -> TransportResponse.Empty.INSTANCE); transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @@ -356,8 +358,8 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request)); } }); - TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, - (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); + TransportActionProxy.registerProxyActionWithDynamicResponseType(transportService, QUERY_ACTION_NAME, + (request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new); transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new TaskAwareTransportRequestHandler() { @@ -418,8 +420,7 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne channel.sendResponse(new CanMatchResponse(canMatch)); } }); - TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, - (Supplier) CanMatchResponse::new); + TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new); } /** @@ -441,9 +442,9 @@ final class ConnectionCountingHandler extend private final Map clientConnections; private final String nodeId; - ConnectionCountingHandler(final ActionListener listener, final Supplier responseSupplier, + ConnectionCountingHandler(final ActionListener listener, final Writeable.Reader responseReader, final Map clientConnections, final String nodeId) { - super(listener, responseSupplier); + super(listener, responseReader); this.clientConnections = clientConnections; this.nodeId = nodeId; // Increment the number of connections for this node by one diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 01c760a6fdb47..becc4029cc52b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.tasks.Task; @@ -176,8 +177,10 @@ protected void performOperation(final ShardIterator shardIt, final ShardRouting } else { transportService.sendRequest(node, transportShardAction, shardRequest, new TransportResponseHandler() { @Override - public ShardResponse newInstance() { - return newShardResponse(); + public ShardResponse read(StreamInput in) throws IOException { + ShardResponse response = newShardResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index ff4e73acc1877..7867b15218ef1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -320,8 +320,10 @@ private void sendNodeRequest(final DiscoveryNode node, List shards } transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new TransportResponseHandler() { @Override - public NodeResponse newInstance() { - return new NodeResponse(); + public NodeResponse read(StreamInput in) throws IOException { + NodeResponse nodeResponse = new NodeResponse(); + nodeResponse.readFrom(in); + return nodeResponse; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index fb74a7d172460..bde70f977591f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -35,6 +35,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException; @@ -46,6 +49,7 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.function.Predicate; import java.util.function.Supplier; @@ -79,8 +83,21 @@ protected TransportMasterNodeAction(Settings settings, String actionName, boolea protected abstract String executor(); + /** + * @deprecated new implementors should override {@link #read(StreamInput)} and use the + * {@link Writeable.Reader} interface. + * @return a new response instance. Typically this is used for serialization using the + * {@link Streamable#readFrom(StreamInput)} method. + */ + @Deprecated protected abstract Response newResponse(); + protected Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; + } + protected abstract void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception; /** @@ -185,21 +202,21 @@ protected void doRun() throws Exception { } else { DiscoveryNode masterNode = nodes.getMasterNode(); final String actionName = getMasterActionName(masterNode); - transportService.sendRequest(masterNode, actionName, request, new ActionListenerResponseHandler(listener, - TransportMasterNodeAction.this::newResponse) { - @Override - public void handleException(final TransportException exp) { - Throwable cause = exp.unwrapCause(); - if (cause instanceof ConnectTransportException) { - // we want to retry here a bit to see if a new master is elected - logger.debug("connection exception while trying to forward request with action name [{}] to " + - "master node [{}], scheduling a retry. Error: [{}]", - actionName, nodes.getMasterNode(), exp.getDetailedMessage()); - retry(cause, masterChangePredicate); - } else { - listener.onFailure(exp); + transportService.sendRequest(masterNode, actionName, request, + new ActionListenerResponseHandler(listener, TransportMasterNodeAction.this::read) { + @Override + public void handleException(final TransportException exp) { + Throwable cause = exp.unwrapCause(); + if (cause instanceof ConnectTransportException) { + // we want to retry here a bit to see if a new master is elected + logger.debug("connection exception while trying to forward request with action name [{}] to " + + "master node [{}], scheduling a retry. Error: [{}]", + actionName, nodes.getMasterNode(), exp.getDetailedMessage()); + retry(cause, masterChangePredicate); + } else { + listener.onFailure(exp); + } } - } }); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 0b61c7ed71247..f7cc5b3587a13 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -41,6 +42,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -197,8 +199,10 @@ void start() { transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new TransportResponseHandler() { @Override - public NodeResponse newInstance() { - return newNodeResponse(); + public NodeResponse read(StreamInput in) throws IOException { + NodeResponse nodeResponse = newNodeResponse(); + nodeResponse.readFrom(in); + return nodeResponse; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2bebdbbfef27a..aeb357e2482fd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; @@ -328,12 +329,17 @@ public void onResponse(PrimaryShardReference primaryShardReference) { // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase. final ShardRouting primary = primaryShardReference.routingEntry(); assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary; + final Writeable.Reader reader = in -> { + Response response = TransportReplicationAction.this.newResponseInstance(); + response.readFrom(in); + return response; + }; DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), transportOptions, new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary, - TransportReplicationAction.this::newResponseInstance) { + reader) { @Override public void handleResponse(Response response) { @@ -594,7 +600,7 @@ public void onNewClusterState(ClusterState state) { String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; TransportChannelResponseHandler handler = new TransportChannelResponseHandler<>(logger, channel, extraMessage, - () -> TransportResponse.Empty.INSTANCE); + (in) -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes), @@ -830,8 +836,10 @@ private void performAction(final DiscoveryNode node, final String action, final transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponseInstance(); + public Response read(StreamInput in) throws IOException { + Response response = newResponseInstance(); + response.readFrom(in); + return response; } @Override @@ -1203,7 +1211,11 @@ protected void sendReplicaRequest( final ConcreteReplicaRequest replicaRequest, final DiscoveryNode node, final ActionListener listener) { - final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, ReplicaResponse::new); + final ActionListenerResponseHandler handler = new ActionListenerResponseHandler<>(listener, in -> { + ReplicaResponse replicaResponse = new ReplicaResponse(); + replicaResponse.readFrom(in); + return replicaResponse; + }); transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, handler); } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index b75828327035b..8fa49f851a360 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.NodeClosedException; @@ -46,6 +47,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.function.Supplier; public abstract class TransportInstanceSingleOperationAction, Response extends ActionResponse> @@ -172,8 +174,10 @@ protected void doStart(ClusterState clusterState) { transportService.sendRequest(node, shardActionName, request, transportOptions(), new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponse(); + public Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 899f9591bda6a..5deffe72bce55 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -180,8 +181,10 @@ public void start() { // just execute it on the local node transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponse(); + public Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; } @Override @@ -244,8 +247,10 @@ private void perform(@Nullable final Exception currentFailure) { transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler() { @Override - public Response newInstance() { - return newResponse(); + public Response read(StreamInput in) throws IOException { + Response response = newResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index aad7d20073c3b..dde4d8f4c9f68 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -279,8 +279,10 @@ private void start() { transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new TransportResponseHandler() { @Override - public NodeTasksResponse newInstance() { - return new NodeTasksResponse(); + public NodeTasksResponse read(StreamInput in) throws IOException { + NodeTasksResponse response = new NodeTasksResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index aa0672d80ba1d..0cfc1f5004ce8 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -511,8 +511,10 @@ protected void doRun() throws Exception { new TransportResponseHandler() { @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); + public ClusterStateResponse read(StreamInput in) throws IOException { + final ClusterStateResponse clusterStateResponse = new ClusterStateResponse(); + clusterStateResponse.readFrom(in); + return clusterStateResponse; } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java index c38cfe88619ee..df5910878de0d 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java @@ -224,8 +224,8 @@ public void run() { transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new TransportResponseHandler() { @Override - public MasterPingResponseResponse newInstance() { - return new MasterPingResponseResponse(); + public MasterPingResponseResponse read(StreamInput in) throws IOException { + return new MasterPingResponseResponse(in); } @Override @@ -432,14 +432,8 @@ private static class MasterPingResponseResponse extends TransportResponse { private MasterPingResponseResponse() { } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + private MasterPingResponseResponse(StreamInput in) throws IOException { + super(in); } } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java index d19cc98441b79..33f30c1103e47 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java @@ -225,8 +225,8 @@ public void run() { .withTimeout(pingRetryTimeout).build(); transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler() { @Override - public PingResponse newInstance() { - return new PingResponse(); + public PingResponse read(StreamInput in) throws IOException { + return new PingResponse(in); } @Override @@ -358,14 +358,8 @@ private static class PingResponse extends TransportResponse { private PingResponse() { } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + private PingResponse(StreamInput in) throws IOException { + super(in); } } } diff --git a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java index c8986b0493459..1eb4389965d1b 100644 --- a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java +++ b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java @@ -83,8 +83,10 @@ public void allocateDangled(Collection indices, final Listener li AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), indices.toArray(new IndexMetaData[indices.size()])); transportService.sendRequest(masterNode, ACTION_NAME, request, new TransportResponseHandler() { @Override - public AllocateDangledResponse newInstance() { - return new AllocateDangledResponse(); + public AllocateDangledResponse read(StreamInput in) throws IOException { + final AllocateDangledResponse response = new AllocateDangledResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 6ef6c1546d152..f40a4ef69f22c 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -312,8 +312,10 @@ protected void getInflightOpsCount(final ShardId shardId, ClusterState state, In transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId), new TransportResponseHandler() { @Override - public InFlightOpsResponse newInstance() { - return new InFlightOpsResponse(); + public InFlightOpsResponse read(StreamInput in) throws IOException { + InFlightOpsResponse response = new InFlightOpsResponse(); + response.readFrom(in); + return response; } @Override @@ -382,8 +384,10 @@ void sendSyncRequests(final String syncId, final List shards, Clus transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId), new TransportResponseHandler() { @Override - public ShardSyncedFlushResponse newInstance() { - return new ShardSyncedFlushResponse(); + public ShardSyncedFlushResponse read(StreamInput in) throws IOException { + ShardSyncedFlushResponse response = new ShardSyncedFlushResponse(); + response.readFrom(in); + return response; } @Override @@ -436,8 +440,10 @@ void sendPreSyncRequests(final List shards, final ClusterState sta } transportService.sendRequest(node, PRE_SYNCED_FLUSH_ACTION_NAME, new PreShardSyncedFlushRequest(shard.shardId()), new TransportResponseHandler() { @Override - public PreSyncedFlushResponse newInstance() { - return new PreSyncedFlushResponse(); + public PreSyncedFlushResponse read(StreamInput in) throws IOException { + PreSyncedFlushResponse response = new PreSyncedFlushResponse(); + response.readFrom(in); + return response; } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 5eaad06e1f443..cefc196b73a73 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -194,8 +195,10 @@ private void doRecovery(final long recoveryId) { transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request, new FutureTransportResponseHandler() { @Override - public RecoveryResponse newInstance() { - return new RecoveryResponse(); + public RecoveryResponse read(StreamInput in) throws IOException { + RecoveryResponse recoveryResponse = new RecoveryResponse(); + recoveryResponse.readFrom(in); + return recoveryResponse; } }).txGet())); final RecoveryResponse recoveryResponse = responseHolder.get(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java index 530b8b67415d3..8633380f3947a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsResponse.java @@ -63,8 +63,10 @@ public void readFrom(final StreamInput in) throws IOException { static TransportResponseHandler HANDLER = new FutureTransportResponseHandler() { @Override - public RecoveryTranslogOperationsResponse newInstance() { - return new RecoveryTranslogOperationsResponse(); + public RecoveryTranslogOperationsResponse read(StreamInput in) throws IOException { + RecoveryTranslogOperationsResponse response = new RecoveryTranslogOperationsResponse(); + response.readFrom(in); + return response; } }; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 59bbb60b25530..978dee84f6f72 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1101,6 +1101,10 @@ public static final class CanMatchResponse extends SearchPhaseResult { public CanMatchResponse() { } + public CanMatchResponse(StreamInput in) throws IOException { + this.canMatch = in.readBoolean(); + } + public CanMatchResponse(boolean canMatch) { this.canMatch = canMatch; } diff --git a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java index 0cd624b00a36b..522187dcf5d74 100644 --- a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java @@ -45,6 +45,10 @@ public class DfsSearchResult extends SearchPhaseResult { public DfsSearchResult() { } + public DfsSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public DfsSearchResult(long id, SearchShardTarget shardTarget) { this.setSearchShardTarget(shardTarget); this.requestId = id; diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index a5f27733ad28a..12391151861d0 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -38,6 +38,10 @@ public final class FetchSearchResult extends SearchPhaseResult { public FetchSearchResult() { } + public FetchSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public FetchSearchResult(long id, SearchShardTarget shardTarget) { this.requestId = id; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java index 8d1e6276e65d9..0a5a7cec375db 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java @@ -38,6 +38,10 @@ public final class QueryFetchSearchResult extends SearchPhaseResult { public QueryFetchSearchResult() { } + public QueryFetchSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) { this.queryResult = queryResult; this.fetchResult = fetchResult; diff --git a/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java index 55aa4a96d018c..6b0a8b619bff3 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java @@ -36,6 +36,10 @@ public final class ScrollQueryFetchSearchResult extends SearchPhaseResult { public ScrollQueryFetchSearchResult() { } + public ScrollQueryFetchSearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) { this.result = result; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index feb3e7876a3d7..5801f20780b7f 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -65,6 +65,10 @@ public final class QuerySearchResult extends SearchPhaseResult { public QuerySearchResult() { } + public QuerySearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public QuerySearchResult(long id, SearchShardTarget shardTarget) { this.requestId = id; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java index 6401459489955..632d148ea901b 100644 --- a/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java @@ -35,6 +35,10 @@ public final class ScrollQuerySearchResult extends SearchPhaseResult { public ScrollQuerySearchResult() { } + public ScrollQuerySearchResult(StreamInput in) throws IOException { + readFrom(in); + } + public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) { this.result = result; setSearchShardTarget(shardTarget); diff --git a/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java index c5814cf0fefcc..7ff1ef8391fd6 100644 --- a/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/EmptyTransportResponseHandler.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.threadpool.ThreadPool; public class EmptyTransportResponseHandler implements TransportResponseHandler { @@ -32,7 +33,7 @@ public EmptyTransportResponseHandler(String executor) { } @Override - public TransportResponse.Empty newInstance() { + public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index aa476bf4dd267..413036d70233e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -49,7 +49,7 @@ void doExecute(Action action, Request request remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, action::newResponse)); + new ActionListenerResponseHandler<>(listener, action.getResponseReader())); }, listener::onFailure)); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 8e4b7ac8aa293..d279d12ea0fdb 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -223,8 +223,8 @@ private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, new TransportResponseHandler() { @Override - public ClusterSearchShardsResponse newInstance() { - return new ClusterSearchShardsResponse(); + public ClusterSearchShardsResponse read(StreamInput in) throws IOException { + return new ClusterSearchShardsResponse(in); } @Override @@ -596,8 +596,10 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl } @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); + public ClusterStateResponse read(StreamInput in) throws IOException { + ClusterStateResponse response = new ClusterStateResponse(); + response.readFrom(in); + return response; } @Override @@ -695,8 +697,10 @@ public void getConnectionInfo(ActionListener listener) { transportService.sendRequest(connection, NodesInfoAction.NAME, request, TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public NodesInfoResponse newInstance() { - return new NodesInfoResponse(); + public NodesInfoResponse read(StreamInput in) throws IOException { + NodesInfoResponse ir = new NodesInfoResponse(); + ir.readFrom(in); + return ir; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 93db5ca22dcb3..71264b3cb44e2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -203,7 +203,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final MeanMetric readBytesMetric = new MeanMetric(); private final MeanMetric transmittedBytesMetric = new MeanMetric(); - private volatile Map requestHandlers = Collections.emptyMap(); + private volatile Map> requestHandlers = Collections.emptyMap(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); private final BytesReference pingMessage; @@ -279,8 +279,8 @@ private static class HandshakeResponseHandler implements TransportResponseHandle } @Override - public VersionHandshakeResponse newInstance() { - return new VersionHandshakeResponse(); + public VersionHandshakeResponse read(StreamInput in) throws IOException { + return new VersionHandshakeResponse(in); } @Override @@ -1175,7 +1175,8 @@ public final void messageReceived(BytesReference reference, TcpChannel channel, if (isHandshake) { handler = pendingHandshakes.remove(requestId); } else { - TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener); + TransportResponseHandler theHandler = + responseHandlers.onResponseReceived(requestId, messageListener); if (theHandler == null && TransportStatus.isError(status)) { handler = pendingHandshakes.remove(requestId); } else { @@ -1221,8 +1222,9 @@ static void ensureVersionCompatibility(Version version, Version currentVersion, } } - private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) { - final TransportResponse response; + private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, + final TransportResponseHandler handler) { + final T response; try { response = handler.read(stream); response.remoteAddress(new TransportAddress(remoteAddress)); @@ -1371,17 +1373,13 @@ public void onFailure(Exception e) { } private static final class VersionHandshakeResponse extends TransportResponse { - private Version version; + private final Version version; private VersionHandshakeResponse(Version version) { this.version = version; } - private VersionHandshakeResponse() { - } - - @Override - public void readFrom(StreamInput in) throws IOException { + private VersionHandshakeResponse(StreamInput in) throws IOException { super.readFrom(in); version = Version.readVersion(in); } @@ -1638,7 +1636,7 @@ public final ResponseHandlers getResponseHandlers() { } @Override - public final RequestHandlerRegistry getRequestHandler(String action) { + public final RequestHandlerRegistry getRequestHandler(String action) { return requestHandlers.get(action); } } diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index fc1f0c9e5ec0f..e13213dca066a 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -54,7 +54,7 @@ public interface Transport extends LifecycleComponent { * Returns the registered request handler registry for the given action or null if it's not registered * @param action the action to look up */ - RequestHandlerRegistry getRequestHandler(String action); + RequestHandlerRegistry getRequestHandler(String action); void addMessageListener(TransportMessageListener listener); @@ -184,7 +184,7 @@ public String action() { * This class is a registry that allows */ final class ResponseHandlers { - private final ConcurrentMapLong handlers = ConcurrentCollections + private final ConcurrentMapLong> handlers = ConcurrentCollections .newConcurrentMapLongWithAggressiveConcurrency(); private final AtomicLong requestIdGenerator = new AtomicLong(); @@ -208,7 +208,7 @@ public ResponseContext remove(long requestId) { * @return the new request ID * @see Connection#sendRequest(long, String, TransportRequest, TransportRequestOptions) */ - public long add(ResponseContext holder) { + public long add(ResponseContext holder) { long requestId = newRequestId(); ResponseContext existing = handlers.put(requestId, holder); assert existing == null : "request ID already in use: " + requestId; @@ -226,10 +226,10 @@ long newRequestId() { /** * Removes and returns all {@link ResponseContext} instances that match the predicate */ - public List prune(Predicate predicate) { - final List holders = new ArrayList<>(); - for (Map.Entry entry : handlers.entrySet()) { - ResponseContext holder = entry.getValue(); + public List> prune(Predicate predicate) { + final List> holders = new ArrayList<>(); + for (Map.Entry> entry : handlers.entrySet()) { + ResponseContext holder = entry.getValue(); if (predicate.test(holder)) { ResponseContext remove = handlers.remove(entry.getKey()); if (remove != null) { @@ -245,8 +245,9 @@ public List prune(Predicate predicate) { * sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not * found. */ - public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) { - ResponseContext context = handlers.remove(requestId); + public TransportResponseHandler onResponseReceived(final long requestId, + final TransportMessageListener listener) { + ResponseContext context = handlers.remove(requestId); listener.onResponseReceived(requestId, context); if (context == null) { return null; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 8c48f08874350..242a1af016c32 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.function.Function; -import java.util.function.Supplier; /** * TransportActionProxy allows an arbitrary action to be executed on a defined target node while the initial request is sent to a second @@ -42,10 +41,10 @@ private static class ProxyRequestHandler implements Tran private final TransportService service; private final String action; - private final Function> responseFunction; + private final Function> responseFunction; ProxyRequestHandler(TransportService service, String action, Function> responseFunction) { + Writeable.Reader> responseFunction) { this.service = service; this.action = action; this.responseFunction = responseFunction; @@ -62,17 +61,17 @@ public void messageReceived(T request, TransportChannel channel) throws Exceptio private static class ProxyResponseHandler implements TransportResponseHandler { - private final Supplier responseFactory; + private final Writeable.Reader reader; private final TransportChannel channel; - ProxyResponseHandler(TransportChannel channel, Supplier responseFactory) { - this.responseFactory = responseFactory; + ProxyResponseHandler(TransportChannel channel, Writeable.Reader reader) { + this.reader = reader; this.channel = channel; - } + @Override - public T newInstance() { - return responseFactory.get(); + public T read(StreamInput in) throws IOException { + return reader.read(in); } @Override @@ -100,26 +99,25 @@ public String executor() { } static class ProxyRequest extends TransportRequest { - T wrapped; - Writeable.Reader reader; - DiscoveryNode targetNode; - - ProxyRequest(Writeable.Reader reader) { - this.reader = reader; - } + final T wrapped; + final DiscoveryNode targetNode; ProxyRequest(T wrapped, DiscoveryNode targetNode) { this.wrapped = wrapped; this.targetNode = targetNode; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + ProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { + super(in); targetNode = new DiscoveryNode(in); wrapped = reader.read(in); } + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -132,21 +130,23 @@ public void writeTo(StreamOutput out) throws IOException { * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the * response type changes based on the upcoming request (quite rare) */ - public static void registerProxyAction(TransportService service, String action, - Function> responseFunction) { - RequestHandlerRegistry requestHandler = service.getRequestHandler(action); - service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME, - true, false, new ProxyRequestHandler<>(service, action, responseFunction)); + public static void registerProxyActionWithDynamicResponseType(TransportService service, String action, + Function> responseFunction) { + RequestHandlerRegistry requestHandler = service.getRequestHandler(action); + service.registerRequestHandler(getProxyAction(action), ThreadPool.Names.SAME, true, false, + in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, responseFunction)); } /** * Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the * response type is always the same (most of the cases). */ - public static void registerProxyAction(TransportService service, String action, Supplier responseSupplier) { - RequestHandlerRegistry requestHandler = service.getRequestHandler(action); - service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME, - true, false, new ProxyRequestHandler<>(service, action, request -> responseSupplier)); + public static void registerProxyAction(TransportService service, String action, + Writeable.Reader reader) { + RequestHandlerRegistry requestHandler = service.getRequestHandler(action); + service.registerRequestHandler(getProxyAction(action), ThreadPool.Names.SAME, true, false, + in -> new ProxyRequest<>(in, requestHandler::newRequest), new ProxyRequestHandler<>(service, action, request -> reader)); } private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/"; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java index 4ba2769edb4a2..6b45feec94859 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java @@ -21,10 +21,11 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.function.Supplier; /** * Base class for delegating transport response to a transport channel @@ -34,19 +35,19 @@ public class TransportChannelResponseHandler implem private final Logger logger; private final TransportChannel channel; private final String extraInfoOnError; - private final Supplier responseSupplier; + private final Writeable.Reader reader; public TransportChannelResponseHandler(Logger logger, TransportChannel channel, String extraInfoOnError, - Supplier responseSupplier) { + Writeable.Reader reader) { this.logger = logger; this.channel = channel; this.extraInfoOnError = extraInfoOnError; - this.responseSupplier = responseSupplier; + this.reader = reader; } @Override - public T newInstance() { - return responseSupplier.get(); + public T read(StreamInput in) throws IOException { + return reader.read(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java index ecaca73b2db57..05deab8eafbf0 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java @@ -39,6 +39,19 @@ public TransportAddress remoteAddress() { return remoteAddress; } + /** + * Constructs a new empty transport message + */ + public TransportMessage() { + } + + /** + * Constructs a new transport message with the data from the {@link StreamInput}. This is + * currently a no-op + */ + public TransportMessage(StreamInput in) throws IOException { + } + @Override public void readFrom(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java index 25ae72a479f7d..5ad9c9fee544e 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponse.java @@ -19,8 +19,27 @@ package org.elasticsearch.transport; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + public abstract class TransportResponse extends TransportMessage { + /** + * Constructs a new empty transport response + */ + public TransportResponse() { + } + + /** + * Constructs a new transport response with the data from the {@link StreamInput}. This is + * currently a no-op. However, this exists to allow extenders to call super(in) + * so that reading can mirror writing where we often call super.writeTo(out). + */ + public TransportResponse(StreamInput in) throws IOException { + super(in); + } + public static class Empty extends TransportResponse { public static final Empty INSTANCE = new Empty(); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java index 447bbd92dd2b0..29720216cf400 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponseHandler.java @@ -19,35 +19,10 @@ package org.elasticsearch.transport; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; -import java.io.IOException; - public interface TransportResponseHandler extends Writeable.Reader { - /** - * @deprecated Implement {@link #read(StreamInput)} instead. - */ - @Deprecated - default T newInstance() { - throw new UnsupportedOperationException(); - } - - /** - * deserializes a new instance of the return type from the stream. - * called by the infra when de-serializing the response. - * - * @return the deserialized response. - */ - @SuppressWarnings("deprecation") - @Override - default T read(StreamInput in) throws IOException { - T instance = newInstance(); - instance.readFrom(in); - return instance; - } - void handleResponse(T response); void handleException(TransportException exp); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 8559963bf7836..1fa48588543fd 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -433,8 +433,8 @@ public HandshakeResponse handshake( PlainTransportFuture futureHandler = new PlainTransportFuture<>( new FutureTransportResponseHandler() { @Override - public HandshakeResponse newInstance() { - return new HandshakeResponse(); + public HandshakeResponse read(StreamInput in) throws IOException { + return new HandshakeResponse(in); } }); sendRequest(connection, HANDSHAKE_ACTION_NAME, HandshakeRequest.INSTANCE, @@ -467,12 +467,9 @@ private HandshakeRequest() { } public static class HandshakeResponse extends TransportResponse { - private DiscoveryNode discoveryNode; - private ClusterName clusterName; - private Version version; - - HandshakeResponse() { - } + private final DiscoveryNode discoveryNode; + private final ClusterName clusterName; + private final Version version; public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) { this.discoveryNode = discoveryNode; @@ -480,9 +477,8 @@ public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, V this.clusterName = clusterName; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public HandshakeResponse(StreamInput in) throws IOException { + super(in); discoveryNode = in.readOptionalWriteable(DiscoveryNode::new); clusterName = new ClusterName(in); version = Version.readVersion(in); @@ -929,7 +925,7 @@ public void onRequestReceived(long requestId, String action) { } } - public RequestHandlerRegistry getRequestHandler(String action) { + public RequestHandlerRegistry getRequestHandler(String action) { return transport.getRequestHandler(action); } @@ -976,8 +972,8 @@ private void checkForTimeout(long requestId) { @Override public void onConnectionClosed(Transport.Connection connection) { try { - List pruned = responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection - .getCacheKey())); + List> pruned = + responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection.getCacheKey())); // callback that an exception happened, but on a different thread since we don't // want handlers to worry about stack overflows getExecutorService().execute(() -> { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java index 90eb7cdcfd46a..9444b5e47ba8b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponseTests.java @@ -83,8 +83,7 @@ public void testSerialization() throws Exception { clusterSearchShardsResponse.writeTo(out); try(StreamInput in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry)) { in.setVersion(version); - ClusterSearchShardsResponse deserialized = new ClusterSearchShardsResponse(); - deserialized.readFrom(in); + ClusterSearchShardsResponse deserialized = new ClusterSearchShardsResponse(in); assertArrayEquals(clusterSearchShardsResponse.getNodes(), deserialized.getNodes()); assertEquals(clusterSearchShardsResponse.getGroups().length, deserialized.getGroups().length); for (int i = 0; i < clusterSearchShardsResponse.getGroups().length; i++) { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index c763709a04e40..e529af97c800d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -254,7 +254,7 @@ public void testBuildClusters() { remoteIndices.put(cluster, randomOriginalIndices()); if (onlySuccessful || randomBoolean()) { //whatever response counts as successful as long as it's not the empty placeholder - searchShardsResponses.put(cluster, new ClusterSearchShardsResponse()); + searchShardsResponses.put(cluster, new ClusterSearchShardsResponse(null, null, null)); successful++; } else { searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY); diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index b9f645a97af2a..5d76018503b06 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -256,8 +256,8 @@ public void onFailure(Exception e) { iteration.transportService.sendRequest(node, "action", new TestRequest(), TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) { + return new TestResponse(in); } @Override @@ -438,5 +438,7 @@ public static class TestRequest extends TransportRequest { private static class TestResponse extends TransportResponse { + private TestResponse() {} + private TestResponse(StreamInput in) {} } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 8c9222a375a0a..77f7aed844a66 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -179,9 +179,7 @@ public void testRemoteProfileIsUsedForLocalCluster() throws Exception { new FutureTransportResponseHandler() { @Override public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); - inst.readFrom(in); - return inst; + return new ClusterSearchShardsResponse(in); } }); TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) @@ -222,9 +220,7 @@ public void testRemoteProfileIsUsedForRemoteCluster() throws Exception { new FutureTransportResponseHandler() { @Override public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); - inst.readFrom(in); - return inst; + return new ClusterSearchShardsResponse(in); } }); TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK) @@ -240,9 +236,7 @@ public ClusterSearchShardsResponse read(StreamInput in) throws IOException { new FutureTransportResponseHandler() { @Override public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - ClusterSearchShardsResponse inst = new ClusterSearchShardsResponse(); - inst.readFrom(in); - return inst; + return new ClusterSearchShardsResponse(in); } }); TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index f684fb7dae7b1..096a410ee1484 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -90,8 +90,7 @@ public void testSendMessage() throws InterruptedException { serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_A"; + SimpleTestResponse response = new SimpleTestResponse("TS_A"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new); @@ -100,8 +99,7 @@ public void testSendMessage() throws InterruptedException { serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_B"; + SimpleTestResponse response = new SimpleTestResponse("TS_B"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new); @@ -109,8 +107,7 @@ public void testSendMessage() throws InterruptedException { serviceC.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_C"; + SimpleTestResponse response = new SimpleTestResponse("TS_C"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceC, "internal:test", SimpleTestResponse::new); @@ -119,8 +116,8 @@ public void testSendMessage() throws InterruptedException { serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.wrapRequest(nodeC, new SimpleTestRequest("TS_A")), new TransportResponseHandler() { @Override - public SimpleTestResponse newInstance() { - return new SimpleTestResponse(); + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); } @Override @@ -135,7 +132,7 @@ public void handleResponse(SimpleTestResponse response) { @Override public void handleException(TransportException exp) { try { - throw new AssertionError(exp); + throw new AssertionError(exp); } finally { latch.countDown(); } @@ -153,8 +150,7 @@ public void testException() throws InterruptedException { serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_A"; + SimpleTestResponse response = new SimpleTestResponse("TS_A"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new); @@ -163,8 +159,7 @@ public void testException() throws InterruptedException { serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, (request, channel) -> { assertEquals(request.sourceNode, "TS_A"); - SimpleTestResponse response = new SimpleTestResponse(); - response.targetNode = "TS_B"; + SimpleTestResponse response = new SimpleTestResponse("TS_B"); channel.sendResponse(response); }); TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new); @@ -179,8 +174,8 @@ public void testException() throws InterruptedException { serviceA.sendRequest(nodeB, TransportActionProxy.getProxyAction("internal:test"), TransportActionProxy.wrapRequest(nodeC, new SimpleTestRequest("TS_A")), new TransportResponseHandler() { @Override - public SimpleTestResponse newInstance() { - return new SimpleTestResponse(); + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); } @Override @@ -232,11 +227,20 @@ public void writeTo(StreamOutput out) throws IOException { } public static class SimpleTestResponse extends TransportResponse { - String targetNode; + final String targetNode; + + SimpleTestResponse(String targetNode) { + this.targetNode = targetNode; + } + + SimpleTestResponse(StreamInput in) throws IOException { + super(in); + this.targetNode = in.readString(); + } + @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - targetNode = in.readString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -267,7 +271,7 @@ public void testIsProxyAction() { } public void testIsProxyRequest() { - assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null))); + assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(TransportRequest.Empty.INSTANCE, null))); assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 132a07d5b7f48..1b8405a2d591a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; @@ -163,8 +164,10 @@ public void clear() { /** * simulate a response for the given requestId */ - public void handleResponse(final long requestId, final TransportResponse response) { - responseHandlers.onResponseReceived(requestId, listener).handleResponse(response); + public void handleResponse(final long requestId, final Response response) { + TransportResponseHandler handler = + (TransportResponseHandler) responseHandlers.onResponseReceived(requestId, listener); + handler.handleResponse(response); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index ac19e62ec4751..230b055908243 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -234,8 +234,8 @@ public void testHelloWorld() { TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -265,8 +265,8 @@ public void handleException(TransportException exp) { res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -312,8 +312,8 @@ public void testThreadContext() throws ExecutionException, InterruptedException final String executor = randomFrom(ThreadPool.THREAD_POOL_TYPES.keySet().toArray(new String[0])); TransportResponseHandler responseHandler = new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -367,8 +367,8 @@ public void testLocalNodeConnection() throws InterruptedException { serviceA.sendRequest(nodeA, "internal:localNode", new StringMessageRequest("test"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -516,7 +516,7 @@ public void testVoidMessageCompressed() { TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override - public TransportResponse.Empty newInstance() { + public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -564,8 +564,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -606,8 +606,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHelloException", new StringMessageRequest("moshe"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -658,7 +658,7 @@ public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierExcepti serviceA.registerRequestHandler("internal:test", TestRequest::new, randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC, (request, channel) -> { try { - channel.sendResponse(new TestResponse()); + channel.sendResponse(new TestResponse((String) null)); } catch (Exception e) { logger.info("caught exception while responding", e); responseErrors.add(e); @@ -666,7 +666,7 @@ public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierExcepti }); final TransportRequestHandler ignoringRequestHandler = (request, channel) -> { try { - channel.sendResponse(new TestResponse()); + channel.sendResponse(new TestResponse((String) null)); } catch (Exception e) { // we don't really care what's going on B, we're testing through A logger.trace("caught exception while responding from node B", e); @@ -822,8 +822,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -886,8 +886,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann new StringMessageRequest("forever"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -924,8 +924,8 @@ public void handleException(TransportException exp) { new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -975,8 +975,8 @@ public void messageReceived(StringMessageRequest request, TransportChannel chann TransportResponseHandler noopResponseHandler = new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -1174,19 +1174,19 @@ public void writeTo(StreamOutput out) throws IOException { static class StringMessageResponse extends TransportResponse { - private String message; + private final String message; StringMessageResponse(String message) { this.message = message; } - StringMessageResponse() { + StringMessageResponse(StreamInput in) throws IOException { + this.message = in.readString(); } @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - message = in.readString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1238,12 +1238,19 @@ public void writeTo(StreamOutput out) throws IOException { static class Version0Response extends TransportResponse { - int value1; + final int value1; + + Version0Response(int value1) { + this.value1 = value1; + } + + Version0Response(StreamInput in) throws IOException { + this.value1 = in.readInt(); + } @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - value1 = in.readInt(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1255,16 +1262,27 @@ public void writeTo(StreamOutput out) throws IOException { static class Version1Response extends Version0Response { - int value2; + final int value2; - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + Version1Response(int value1, int value2) { + super(value1); + this.value2 = value2; + } + + Version1Response(StreamInput in) throws IOException { + super(in); if (in.getVersion().onOrAfter(version1)) { value2 = in.readInt(); + } else { + value2 = 0; } } + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -1281,9 +1299,7 @@ public void testVersionFrom0to1() throws Exception { public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { assertThat(request.value1, equalTo(1)); assertThat(request.value2, equalTo(0)); // not set, coming from service A - Version1Response response = new Version1Response(); - response.value1 = 1; - response.value2 = 2; + Version1Response response = new Version1Response(1, 2); channel.sendResponse(response); assertEquals(version0, channel.getVersion()); } @@ -1294,8 +1310,8 @@ public void messageReceived(Version1Request request, TransportChannel channel) t Version0Response version0Response = serviceA.submitRequest(nodeB, "internal:version", version0Request, new TransportResponseHandler() { @Override - public Version0Response newInstance() { - return new Version0Response(); + public Version0Response read(StreamInput in) throws IOException { + return new Version0Response(in); } @Override @@ -1324,8 +1340,7 @@ public void testVersionFrom1to0() throws Exception { @Override public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { assertThat(request.value1, equalTo(1)); - Version0Response response = new Version0Response(); - response.value1 = 1; + Version0Response response = new Version0Response(1); channel.sendResponse(response); assertEquals(version0, channel.getVersion()); } @@ -1337,8 +1352,8 @@ public void messageReceived(Version0Request request, TransportChannel channel) t Version1Response version1Response = serviceB.submitRequest(nodeA, "internal:version", version1Request, new TransportResponseHandler() { @Override - public Version1Response newInstance() { - return new Version1Response(); + public Version1Response read(StreamInput in) throws IOException { + return new Version1Response(in); } @Override @@ -1368,9 +1383,7 @@ public void testVersionFrom1to1() throws Exception { (request, channel) -> { assertThat(request.value1, equalTo(1)); assertThat(request.value2, equalTo(2)); - Version1Response response = new Version1Response(); - response.value1 = 1; - response.value2 = 2; + Version1Response response = new Version1Response(1, 2); channel.sendResponse(response); assertEquals(version1, channel.getVersion()); }); @@ -1381,8 +1394,8 @@ public void testVersionFrom1to1() throws Exception { Version1Response version1Response = serviceB.submitRequest(nodeB, "internal:/version", version1Request, new TransportResponseHandler() { @Override - public Version1Response newInstance() { - return new Version1Response(); + public Version1Response read(StreamInput in) throws IOException { + return new Version1Response(in); } @Override @@ -1411,8 +1424,7 @@ public void testVersionFrom0to0() throws Exception { serviceA.registerRequestHandler("internal:/version", Version0Request::new, ThreadPool.Names.SAME, (request, channel) -> { assertThat(request.value1, equalTo(1)); - Version0Response response = new Version0Response(); - response.value1 = 1; + Version0Response response = new Version0Response(1); channel.sendResponse(response); assertEquals(version0, channel.getVersion()); }); @@ -1422,8 +1434,8 @@ public void testVersionFrom0to0() throws Exception { Version0Response version0Response = serviceA.submitRequest(nodeA, "internal:/version", version0Request, new TransportResponseHandler() { @Override - public Version0Response newInstance() { - return new Version0Response(); + public Version0Response read(StreamInput in) throws IOException { + return new Version0Response(in); } @Override @@ -1458,8 +1470,8 @@ public void testMockFailToSendNoConnectRule() throws Exception { TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -1516,8 +1528,8 @@ public void testMockUnresponsiveRule() throws IOException { new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); } @Override @@ -1564,14 +1576,14 @@ public void testHostOnMessages() throws InterruptedException { @Override public void messageReceived(TestRequest request, TransportChannel channel) throws Exception { addressA.set(request.remoteAddress()); - channel.sendResponse(new TestResponse()); + channel.sendResponse(new TestResponse((String) null)); latch.countDown(); } }); serviceA.sendRequest(nodeB, "internal:action1", new TestRequest(), new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -1618,8 +1630,8 @@ public void testBlockingIncomingRequests() throws Exception { serviceA.sendRequest(connection, "internal:action", new TestRequest(), TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -1684,9 +1696,10 @@ public String toString() { private static class TestResponse extends TransportResponse { - String info; + final String info; - TestResponse() { + TestResponse(StreamInput in) throws IOException { + this.info = in.readOptionalString(); } TestResponse(String info) { @@ -1695,8 +1708,7 @@ private static class TestResponse extends TransportResponse { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - info = in.readOptionalString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1781,8 +1793,8 @@ public void messageReceived(TestRequest request, TransportChannel channel) throw TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new TransportResponseHandler() { @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -1838,8 +1850,8 @@ class TestResponseHandler implements TransportResponseHandler { } @Override - public TestResponse newInstance() { - return new TestResponse(); + public TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); } @Override @@ -2105,7 +2117,7 @@ public void testResponseHeadersArePreserved() throws InterruptedException { TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2159,7 +2171,7 @@ public void testHandlerIsInvokedOnConnectionClose() throws IOException, Interrup CountDownLatch latch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2236,7 +2248,7 @@ protected void doRun() throws Exception { CountDownLatch responseLatch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2304,7 +2316,7 @@ protected void doRun() throws Exception { CountDownLatch responseLatch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } @@ -2417,7 +2429,7 @@ protected void doRun() throws Exception { CountDownLatch responseLatch = new CountDownLatch(1); TransportResponseHandler transportResponseHandler = new TransportResponseHandler() { @Override - public TransportResponse newInstance() { + public TransportResponse read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java index a7351ccfe14d1..1b85049da235b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptorTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -335,7 +336,7 @@ public void testContextRestoreResponseHandler() throws Exception { threadContext.wrapRestorable(storedContext), new TransportResponseHandler() { @Override - public Empty newInstance() { + public Empty read(StreamInput in) { return Empty.INSTANCE; } @@ -374,7 +375,7 @@ public void testContextRestoreResponseHandlerRestoreOriginalContext() throws Exc new TransportResponseHandler() { @Override - public Empty newInstance() { + public Empty read(StreamInput in) { return Empty.INSTANCE; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java index 2e5c1000cc858..f598ce901bc58 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -172,8 +173,12 @@ public void testThatConnectionToClientTypeConnectionIsRejected() throws IOExcept TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override - public TransportResponse newInstance() { - fail("never get that far"); + public TransportResponse read(StreamInput in) { + try { + fail("never get that far"); + } finally { + latch.countDown(); + } return null; }