Skip to content

Commit

Permalink
Responses can use Writeable.Reader interface (#34655)
Browse files Browse the repository at this point in the history
In order to remove Streamable from the codebase, Response objects need
to be read using the Writeable.Reader interface which this change
enables. This change enables the use of Writeable.Reader by adding the
`Action#getResponseReader` method. The default implementation simply
uses the existing `newResponse` method and the readFrom method. As
responses are migrated to the Writeable.Reader interface, Action
classes can be updated to throw an UnsupportedOperationException when
`newResponse` is called and override the `getResponseReader` method.

Relates #34389
  • Loading branch information
jaymode committed Oct 26, 2018
1 parent f2280c4 commit bf5d9a5
Show file tree
Hide file tree
Showing 55 changed files with 507 additions and 347 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -103,7 +104,7 @@ public void testScheduledPing() throws Exception {
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(),
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty newInstance() {
public TransportResponse.Empty read(StreamInput in) {
return TransportResponse.Empty.INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.elasticsearch.action;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;

/**
* A simple base class for action response listeners, defaulting to using the SAME executor (as its
Expand All @@ -34,11 +36,11 @@
public class ActionListenerResponseHandler<Response extends TransportResponse> implements TransportResponseHandler<Response> {

private final ActionListener<? super Response> listener;
private final Supplier<Response> responseSupplier;
private final Writeable.Reader<Response> reader;

public ActionListenerResponseHandler(ActionListener<? super Response> listener, Supplier<Response> responseSupplier) {
public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) {
this.listener = Objects.requireNonNull(listener);
this.responseSupplier = Objects.requireNonNull(responseSupplier);
this.reader = Objects.requireNonNull(reader);
}

@Override
Expand All @@ -52,12 +54,12 @@ public void handleException(TransportException e) {
}

@Override
public Response newInstance() {
return responseSupplier.get();
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
public Response read(StreamInput in) throws IOException {
return reader.read(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
*/
public abstract class ActionResponse extends TransportResponse {

public ActionResponse() {
}

public ActionResponse(StreamInput in) throws IOException {
super(in);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down
15 changes: 15 additions & 0 deletions server/src/main/java/org/elasticsearch/action/GenericAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportRequestOptions;

Expand All @@ -45,9 +46,23 @@ public String name() {

/**
* Creates a new response instance.
* @deprecated Implement {@link #getResponseReader()} instead and make this method throw an
* {@link UnsupportedOperationException}
*/
@Deprecated
public abstract Response newResponse();

/**
* Get a reader that can create a new instance of the class from a {@link org.elasticsearch.common.io.stream.StreamInput}
*/
public Writeable.Reader<Response> getResponseReader() {
return in -> {
Response response = newResponse();
response.readFrom(in);
return response;
};
}

/**
* Optional request options for the action.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ public void execute(final DiscoveryNode node, final Request request, final Actio
return;
}
transportService.sendRequest(node, action.name(), request, transportOptions,
new ActionListenerResponseHandler<>(listener, action::newResponse));
new ActionListenerResponseHandler<>(listener, action.getResponseReader()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -124,8 +125,10 @@ private void runOnNodeWithTaskIfPossible(Task thisTask, GetTaskRequest request,
transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(),
new TransportResponseHandler<GetTaskResponse>() {
@Override
public GetTaskResponse newInstance() {
return new GetTaskResponse();
public GetTaskResponse read(StreamInput in) throws IOException {
GetTaskResponse response = new GetTaskResponse();
response.readFrom(in);
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.Writeable;

public class ClusterSearchShardsAction extends Action<ClusterSearchShardsRequest, ClusterSearchShardsResponse, ClusterSearchShardsRequestBuilder> {

Expand All @@ -33,7 +34,12 @@ private ClusterSearchShardsAction() {

@Override
public ClusterSearchShardsResponse newResponse() {
return new ClusterSearchShardsResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
public Writeable.Reader<ClusterSearchShardsResponse> getResponseReader() {
return ClusterSearchShardsResponse::new;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,36 +39,12 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo
public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
new DiscoveryNode[0], Collections.emptyMap());

private ClusterSearchShardsGroup[] groups;
private DiscoveryNode[] nodes;
private Map<String, AliasFilter> indicesAndFilters;
private final ClusterSearchShardsGroup[] groups;
private final DiscoveryNode[] nodes;
private final Map<String, AliasFilter> indicesAndFilters;

public ClusterSearchShardsResponse() {

}

public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes,
Map<String, AliasFilter> indicesAndFilters) {
this.groups = groups;
this.nodes = nodes;
this.indicesAndFilters = indicesAndFilters;
}

public ClusterSearchShardsGroup[] getGroups() {
return groups;
}

public DiscoveryNode[] getNodes() {
return nodes;
}

public Map<String, AliasFilter> getIndicesAndFilters() {
return indicesAndFilters;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public ClusterSearchShardsResponse(StreamInput in) throws IOException {
super(in);
groups = new ClusterSearchShardsGroup[in.readVInt()];
for (int i = 0; i < groups.length; i++) {
groups[i] = ClusterSearchShardsGroup.readSearchShardsGroupResponse(in);
Expand All @@ -85,9 +61,16 @@ public void readFrom(StreamInput in) throws IOException {
AliasFilter aliasFilter = new AliasFilter(in);
indicesAndFilters.put(index, aliasFilter);
}
} else {
indicesAndFilters = null;
}
}

@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -108,6 +91,25 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public ClusterSearchShardsResponse(ClusterSearchShardsGroup[] groups, DiscoveryNode[] nodes,
Map<String, AliasFilter> indicesAndFilters) {
this.groups = groups;
this.nodes = nodes;
this.indicesAndFilters = indicesAndFilters;
}

public ClusterSearchShardsGroup[] getGroups() {
return groups;
}

public DiscoveryNode[] getNodes() {
return nodes;
}

public Map<String, AliasFilter> getIndicesAndFilters() {
return indicesAndFilters;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -72,7 +74,12 @@ protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, C

@Override
protected ClusterSearchShardsResponse newResponse() {
return new ClusterSearchShardsResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
protected ClusterSearchShardsResponse read(StreamInput in) throws IOException {
return new ClusterSearchShardsResponse(in);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public IngestActionForwarder(TransportService transportService) {

public void forwardIngestRequest(Action<?, ?, ?> action, ActionRequest request, ActionListener<?> listener) {
transportService.sendRequest(randomIngestNode(), action.name(), request,
new ActionListenerResponseHandler(listener, action::newResponse));
new ActionListenerResponseHandler(listener, action.getResponseReader()));
}

private DiscoveryNode randomIngestNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -45,6 +46,7 @@
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -151,8 +153,10 @@ public void sync(ResyncReplicationRequest request, Task parentTask, String prima
transportOptions,
new TransportResponseHandler<ResyncReplicationResponse>() {
@Override
public ResyncReplicationResponse newInstance() {
return newResponseInstance();
public ResyncReplicationResponse read(StreamInput in) throws IOException {
ResyncReplicationResponse response = newResponseInstance();
response.readFrom(in);
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public MultiSearchResponse(Item[] items) {
this.items = items;
}

MultiSearchResponse(StreamInput in) throws IOException {
readFrom(in);
}

@Override
public Iterator<Item> iterator() {
return Arrays.stream(items).iterator();
Expand Down
Loading

0 comments on commit bf5d9a5

Please sign in to comment.