Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Search requests to use Writeable reading strategies #26428

Merged
merged 8 commits into from
Aug 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class TransportNoopSearchAction extends HandledTransportAction<SearchRequ
@Inject
public TransportNoopSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters
actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NoopSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
SearchRequest::new);
super(settings, NoopSearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest::new,
indexNameExpressionResolver);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ public void readFrom(StreamInput in) throws IOException {
maxConcurrentSearchRequests = in.readVInt();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
SearchRequest request = new SearchRequest();
request.readFrom(in);
SearchRequest request = new SearchRequest(in);
requests.add(request);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,55 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
this.source = source;
}

/**
* Constructs a new search request from reading the specified stream.
*
* @param in The stream the request is read from
* @throws IOException if there is an issue reading the stream
*/
public SearchRequest(StreamInput in) throws IOException {
super(in);
searchType = SearchType.fromId(in.readByte());
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readString();
}
routing = in.readOptionalString();
preference = in.readOptionalString();
scroll = in.readOptionalWriteable(Scroll::new);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
types = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
requestCache = in.readOptionalBoolean();
batchedReduceSize = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
maxConcurrentShardRequests = in.readVInt();
preFilterShardSize = in.readVInt();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(searchType.id());
out.writeVInt(indices.length);
for (String index : indices) {
out.writeString(index);
}
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeOptionalWriteable(scroll);
out.writeOptionalWriteable(source);
out.writeStringArray(types);
indicesOptions.writeIndicesOptions(out);
out.writeOptionalBoolean(requestCache);
out.writeVInt(batchedReduceSize);
if (out.getVersion().onOrAfter(Version.V_5_6_0)) {
out.writeVInt(maxConcurrentShardRequests);
out.writeVInt(preFilterShardSize);
}
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand Down Expand Up @@ -385,7 +434,7 @@ public String getDescription() {
sb.append("], ");
sb.append("search_type[").append(searchType).append("], ");
if (source != null) {

sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]");
} else {
sb.append("source[]");
Expand All @@ -397,46 +446,7 @@ public String getDescription() {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
searchType = SearchType.fromId(in.readByte());
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readString();
}
routing = in.readOptionalString();
preference = in.readOptionalString();
scroll = in.readOptionalWriteable(Scroll::new);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
types = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
requestCache = in.readOptionalBoolean();
batchedReduceSize = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
maxConcurrentShardRequests = in.readVInt();
preFilterShardSize = in.readVInt();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(searchType.id());
out.writeVInt(indices.length);
for (String index : indices) {
out.writeString(index);
}
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeOptionalWriteable(scroll);
out.writeOptionalWriteable(source);
out.writeStringArray(types);
indicesOptions.writeIndicesOptions(out);
out.writeOptionalBoolean(requestCache);
out.writeVInt(batchedReduceSize);
if (out.getVersion().onOrAfter(Version.V_5_6_0)) {
out.writeVInt(maxConcurrentShardRequests);
out.writeVInt(preFilterShardSize);
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ public SearchScrollRequest(String scrollId) {
this.scrollId = scrollId;
}

public SearchScrollRequest(StreamInput in) throws IOException {
super(in);
scrollId = in.readString();
scroll = in.readOptionalWriteable(Scroll::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(scrollId);
out.writeOptionalWriteable(scroll);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand Down Expand Up @@ -100,16 +113,7 @@ public SearchScrollRequest scroll(String keepAlive) {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
scrollId = in.readString();
scroll = in.readOptionalWriteable(Scroll::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(scrollId);
out.writeOptionalWriteable(scroll);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,8 @@ static class ScrollFreeContextRequest extends TransportRequest {
this.id = id;
}

public long id() {
return this.id;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
ScrollFreeContextRequest(StreamInput in) throws IOException {
super(in);
id = in.readLong();
}

Expand All @@ -218,6 +213,15 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
}

public long id() {
return this.id;
}

@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}

static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
Expand All @@ -231,6 +235,17 @@ static class SearchFreeContextRequest extends ScrollFreeContextRequest implement
this.originalIndices = originalIndices;
}

SearchFreeContextRequest(StreamInput in) throws IOException {
super(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}

@Override
public String[] indices() {
if (originalIndices == null) {
Expand All @@ -249,14 +264,7 @@ public IndicesOptions indicesOptions() {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}

Expand Down Expand Up @@ -289,7 +297,7 @@ public void writeTo(StreamOutput out) throws IOException {
}

public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
new TaskAwareTransportRequestHandler<ScrollFreeContextRequest>() {
@Override
public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
Expand All @@ -298,7 +306,7 @@ public void messageReceived(ScrollFreeContextRequest request, TransportChannel c
}
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
@Override
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
Expand All @@ -318,7 +326,7 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
() -> TransportResponse.Empty.INSTANCE);

transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
Expand Down Expand Up @@ -346,7 +354,7 @@ public void onFailure(Exception e) {
});
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);

transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
Expand All @@ -373,7 +381,7 @@ public void onFailure(Exception e) {
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
@Override
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
Expand All @@ -383,7 +391,7 @@ public void messageReceived(QuerySearchRequest request, TransportChannel channel
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
Expand All @@ -393,7 +401,7 @@ public void messageReceived(InternalScrollSearchRequest request, TransportChanne
});
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);

transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
Expand All @@ -403,7 +411,7 @@ public void messageReceived(InternalScrollSearchRequest request, TransportChanne
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
@Override
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
Expand All @@ -413,7 +421,7 @@ public void messageReceived(ShardFetchRequest request, TransportChannel channel,
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchSearchRequest::new,
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
@Override
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
Expand All @@ -424,7 +432,7 @@ public void messageReceived(ShardFetchSearchRequest request, TransportChannel ch
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);

// this is super cheap and should not hit thread-pool rejections
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public TransportSearchAction(Settings settings, ThreadPool threadPool, Transport
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest::new, indexNameExpressionResolver);
this.searchPhaseController = searchPhaseController;
this.searchTransportService = searchTransportService;
this.remoteClusterService = searchTransportService.getRemoteClusterService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, Tra
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController) {
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
SearchScrollRequest::new);
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, SearchScrollRequest::new,
indexNameExpressionResolver);
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;
this.searchPhaseController = searchPhaseController;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId)
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
searchRequest = new SearchRequest();
searchRequest.readFrom(in);
searchRequest = new SearchRequest(in);
abortOnVersionConflict = in.readBoolean();
size = in.readVInt();
refresh = in.readBoolean();
Expand Down
Loading