Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to associate an ID with tasks #27764

Merged
merged 7 commits into from
Jan 12, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.usage.UsageService;

Expand All @@ -324,6 +325,7 @@
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;

Expand Down Expand Up @@ -362,7 +364,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<String> headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet());
Set<String> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of("X-Opaque-Id")
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* Transport action that can be used to cancel currently running cancellable tasks.
* <p>
* For a task to be cancellable it has to return an instance of
* {@link CancellableTask} from {@link TransportRequest#createTask(long, String, String, TaskId)}
* {@link CancellableTask} from {@link TransportRequest#createTask}
*/
public class TransportCancelTasksAction extends TransportTasksAction<CancellableTask, CancelTasksRequest, CancelTasksResponse, TaskInfo> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Param
return builder;
}

/**
* Presents a flat list of tasks
*/
public XContentBuilder toXContentGroupedByNone(XContentBuilder builder, Params params) throws IOException {
toXContentCommon(builder, params);
builder.startArray("tasks");
for (TaskInfo taskInfo : getTasks()) {
builder.startObject();
taskInfo.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
/**
* Max length of the source document to include into toString()
*
* @see ReplicationRequest#createTask(long, java.lang.String, java.lang.String, org.elasticsearch.tasks.TaskId)
* @see ReplicationRequest#createTask
*/
static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -428,9 +429,9 @@ public boolean isSuggestOnly() {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
return new SearchTask(id, type, action, null, parentTaskId) {
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
public String getDescription() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -117,8 +118,8 @@ public void readFrom(StreamInput in) throws IOException {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;

import java.util.Map;

/**
* Task storing information about a currently running search request.
*/
public class SearchTask extends CancellableTask {

public SearchTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId);
public SearchTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, description, parentTaskId, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -207,8 +208,8 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new ReplicationTask(id, type, action, getDescription(), parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ReplicationTask(id, type, action, getDescription(), parentTaskId, headers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;

import static java.util.Objects.requireNonNull;

Expand All @@ -36,8 +37,8 @@
public class ReplicationTask extends Task {
private volatile String phase = "starting";

public ReplicationTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId);
public ReplicationTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, description, parentTaskId, headers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1228,8 +1229,8 @@ public TaskId getParentTask() {
return request.getParentTask();
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return request.createTask(id, type, action, parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return request.createTask(id, type, action, parentTaskId, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
final TransportService transportService = new TransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(),
boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0),
UUIDs.randomBase64UUID()), null);
UUIDs.randomBase64UUID()), null, Collections.emptySet());
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
Expand Down Expand Up @@ -392,8 +393,8 @@ protected Self doForSlice(Self request, TaskId slicingTask, int totalSlices) {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new BulkByScrollTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.CancellableTask;
Expand All @@ -38,6 +37,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static java.lang.Math.min;
Expand All @@ -62,8 +62,8 @@ public class BulkByScrollTask extends CancellableTask {
private volatile LeaderBulkByScrollTaskState leaderState;
private volatile WorkerBulkByScrollTaskState workerState;

public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId);
public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, description, parentTaskId, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -271,8 +272,8 @@ public ResyncRequest(ShardId shardId, String allocationId) {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new ResyncTask(id, type, action, getDescription(), parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ResyncTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
Expand All @@ -297,8 +298,8 @@ public static class ResyncTask extends Task {
private volatile int resyncedOperations;
private volatile int skippedOperations;

public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId);
public ResyncTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, description, parentTaskId, headers);
}

/**
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -424,8 +425,12 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
metaDataIndexUpgradeService, metaDataUpgrader);
new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of("X-Opaque-Id")
).collect(Collectors.toSet());
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
Expand Down Expand Up @@ -543,8 +548,8 @@ static void warnIfPreRelease(final Version version, final boolean isSnapshot, fi
protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings);
ClusterSettings clusterSettings, Set<String> taskHeaders) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
}

protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ default Collection<String> getRestHeaders() {
return Collections.emptyList();
}

/**
* Returns headers which should be copied from internal requests into tasks.
*/
default Collection<String> getTaskHeaders() {
return Collections.emptyList();
}

/**
* Returns a function used to wrap each rest request before handling the request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,21 @@ public RestResponse buildResponse(T response, XContentBuilder builder) throws Ex
return new BytesRestResponse(RestStatus.OK, builder);
}
};
} else if ("none".equals(groupBy)) {
return new RestBuilderListener<T>(channel) {
@Override
public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContentGroupedByNone(builder, channel.request());
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
};

} else if ("parents".equals(groupBy)) {
return new RestToXContentListener<>(channel);
} else {
throw new IllegalArgumentException("[group_by] must be one of [nodes] or [parents] but was [" + groupBy + "]");
throw new IllegalArgumentException("[group_by] must be one of [nodes], [parents] or [none] but was [" + groupBy + "]");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Map;

/**
* Shard level fetch base request. Holds all the info needed to execute a fetch.
Expand Down Expand Up @@ -115,8 +116,8 @@ public void readFrom(StreamInput in) throws IOException {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Map;

public class InternalScrollSearchRequest extends TransportRequest {

Expand Down Expand Up @@ -76,8 +77,8 @@ public void readFrom(StreamInput in) throws IOException {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Map;

/**
* Shard level search request that represents an actual search sent from the coordinating node to the nodes holding
Expand Down Expand Up @@ -177,8 +178,8 @@ public boolean isProfile() {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
Expand Down
Loading