Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix .tasks index strict mapping: parent_id should be parent_task_id #48393

Merged
merged 3 commits into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"id": {
"type": "long"
},
"parent_id": {
"parent_task_id": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to increment the version in TaskResultsService.TASK_RESULT_MAPPING_VERSION in order to get the new mapping applied.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I was not aware of that. Do you know if there are tests around applying the updated mappings?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not, since there would at least be no verification that the parent_task_id field is in the index after/during a rolling upgrade. I also seem to remember that some of the test frameworks delete the .tasks index between each test (but not sure if it applies to all upgrade tests).

"type": "keyword"
},
"node": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ public void testTaskCounts() {
}

public void testMasterNodeOperationTasks() {
registerTaskManageListeners(ClusterHealthAction.NAME);
registerTaskManagerListeners(ClusterHealthAction.NAME);

// First run the health on the master node - should produce only one task on the master node
internalCluster().masterClient().admin().cluster().prepareHealth().get();
assertEquals(1, numberOfEvents(ClusterHealthAction.NAME, Tuple::v1)); // counting only registration events
assertEquals(1, numberOfEvents(ClusterHealthAction.NAME, event -> event.v1() == false)); // counting only unregistration events

resetTaskManageListeners(ClusterHealthAction.NAME);
resetTaskManagerListeners(ClusterHealthAction.NAME);

// Now run the health on a non-master node - should produce one task on master and one task on another node
internalCluster().nonMasterClient().admin().cluster().prepareHealth().get();
Expand All @@ -162,8 +162,8 @@ public void testMasterNodeOperationTasks() {
}

public void testTransportReplicationAllShardsTasks() {
registerTaskManageListeners(ValidateQueryAction.NAME); // main task
registerTaskManageListeners(ValidateQueryAction.NAME + "[s]"); // shard
registerTaskManagerListeners(ValidateQueryAction.NAME); // main task
registerTaskManagerListeners(ValidateQueryAction.NAME + "[s]"); // shard
// level
// tasks
createIndex("test");
Expand All @@ -181,8 +181,8 @@ public void testTransportReplicationAllShardsTasks() {
}

public void testTransportBroadcastByNodeTasks() {
registerTaskManageListeners(UpgradeAction.NAME); // main task
registerTaskManageListeners(UpgradeAction.NAME + "[n]"); // node level tasks
registerTaskManagerListeners(UpgradeAction.NAME); // main task
registerTaskManagerListeners(UpgradeAction.NAME + "[n]"); // node level tasks
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated
client().admin().indices().prepareUpgrade("test").get();
Expand All @@ -197,8 +197,8 @@ public void testTransportBroadcastByNodeTasks() {
}

public void testTransportReplicationSingleShardTasks() {
registerTaskManageListeners(ValidateQueryAction.NAME); // main task
registerTaskManageListeners(ValidateQueryAction.NAME + "[s]"); // shard level tasks
registerTaskManagerListeners(ValidateQueryAction.NAME); // main task
registerTaskManagerListeners(ValidateQueryAction.NAME + "[s]"); // shard level tasks
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated
client().admin().indices().prepareValidateQuery("test").get();
Expand All @@ -213,9 +213,9 @@ public void testTransportReplicationSingleShardTasks() {


public void testTransportBroadcastReplicationTasks() {
registerTaskManageListeners(RefreshAction.NAME); // main task
registerTaskManageListeners(RefreshAction.NAME + "[s]"); // shard level tasks
registerTaskManageListeners(RefreshAction.NAME + "[s][*]"); // primary and replica shard tasks
registerTaskManagerListeners(RefreshAction.NAME); // main task
registerTaskManagerListeners(RefreshAction.NAME + "[s]"); // shard level tasks
registerTaskManagerListeners(RefreshAction.NAME + "[s][*]"); // primary and replica shard tasks
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated
client().admin().indices().prepareRefresh("test").get();
Expand Down Expand Up @@ -287,10 +287,10 @@ public void testTransportBroadcastReplicationTasks() {
}

public void testTransportBulkTasks() {
registerTaskManageListeners(BulkAction.NAME); // main task
registerTaskManageListeners(BulkAction.NAME + "[s]"); // shard task
registerTaskManageListeners(BulkAction.NAME + "[s][p]"); // shard task on primary
registerTaskManageListeners(BulkAction.NAME + "[s][r]"); // shard task on replica
registerTaskManagerListeners(BulkAction.NAME); // main task
registerTaskManagerListeners(BulkAction.NAME + "[s]"); // shard task
registerTaskManagerListeners(BulkAction.NAME + "[s][p]"); // shard task on primary
registerTaskManagerListeners(BulkAction.NAME + "[s][r]"); // shard task on replica
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated to catch replication tasks
// ensures the mapping is available on all nodes so we won't retry the request (in case replicas don't have the right mapping).
Expand Down Expand Up @@ -340,10 +340,9 @@ public void testTransportBulkTasks() {
assertParentTask(findEvents(BulkAction.NAME + "[s][r]", Tuple::v1), shardTask);
}


public void testSearchTaskDescriptions() {
registerTaskManageListeners(SearchAction.NAME); // main task
registerTaskManageListeners(SearchAction.NAME + "[*]"); // shard task
registerTaskManagerListeners(SearchAction.NAME); // main task
registerTaskManagerListeners(SearchAction.NAME + "[*]"); // shard task
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated to catch replication tasks
client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}", XContentType.JSON)
Expand Down Expand Up @@ -489,8 +488,9 @@ public void waitForTaskCompletion(Task task) {
public void testTasksCancellation() throws Exception {
// Start blocking test task
// Get real client (the plugin is not registered on transport nodes)
ActionFuture<TestTaskPlugin.NodesResponse> future = new TestTaskPlugin.NodesRequestBuilder(client(),
TestTaskPlugin.TestTaskAction.INSTANCE).execute();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);

logger.info("--> started test tasks");

// Wait for the task to start on all nodes
Expand All @@ -511,8 +511,8 @@ public void testTasksCancellation() throws Exception {

public void testTasksUnblocking() throws Exception {
// Start blocking test task
ActionFuture<TestTaskPlugin.NodesResponse> future =
new TestTaskPlugin.NodesRequestBuilder(client(), TestTaskPlugin.TestTaskAction.INSTANCE).execute();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().size(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
Expand Down Expand Up @@ -575,8 +575,9 @@ public void testGetTaskWaitForCompletionWithStoringResult() throws Exception {
private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId, ActionFuture<T>> wait, Consumer<T> validator)
throws Exception {
// Start blocking test task
ActionFuture<TestTaskPlugin.NodesResponse> future = new TestTaskPlugin.NodesRequestBuilder(client(),
TestTaskPlugin.TestTaskAction.INSTANCE).setShouldStoreResult(storeResult).execute();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
request.setShouldStoreResult(storeResult);
ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);

ActionFuture<T> waitResponseFuture;
TaskId taskId;
Expand Down Expand Up @@ -649,8 +650,8 @@ public void testGetTaskWaitForTimeout() throws Exception {
*/
private void waitForTimeoutTestCase(Function<TaskId, ? extends Iterable<? extends Throwable>> wait) throws Exception {
// Start blocking test task
ActionFuture<TestTaskPlugin.NodesResponse> future = new TestTaskPlugin.NodesRequestBuilder(client(),
TestTaskPlugin.TestTaskAction.INSTANCE).execute();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
try {
TaskId taskId = waitForTestTaskStartOnAllNodes();

Expand Down Expand Up @@ -717,12 +718,17 @@ public void testTasksWaitForAllTask() throws Exception {
assertThat(response.getTasks().size(), greaterThanOrEqualTo(1));
}

public void testTaskStoringSuccesfulResult() throws Exception {
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
public void testTaskStoringSuccessfulResult() throws Exception {
registerTaskManagerListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process

// Start non-blocking test task
new TestTaskPlugin.NodesRequestBuilder(client(), TestTaskPlugin.TestTaskAction.INSTANCE)
.setShouldStoreResult(true).setShouldBlock(false).get();
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
request.setShouldStoreResult(true);
request.setShouldBlock(false);
TaskId parentTaskId = new TaskId("parent_node", randomLong());
request.setParentTask(parentTaskId);

client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request).get();

List<TaskInfo> events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);

Expand All @@ -736,6 +742,7 @@ public void testTaskStoringSuccesfulResult() throws Exception {
assertNull(taskResult.getError());

assertEquals(taskInfo.getTaskId(), taskResult.getTask().getTaskId());
assertEquals(taskInfo.getParentTaskId(), taskResult.getTask().getParentTaskId());
assertEquals(taskInfo.getType(), taskResult.getTask().getType());
assertEquals(taskInfo.getAction(), taskResult.getTask().getAction());
assertEquals(taskInfo.getDescription(), taskResult.getTask().getDescription());
Expand Down Expand Up @@ -764,14 +771,16 @@ public void testTaskStoringSuccesfulResult() throws Exception {
}

public void testTaskStoringFailureResult() throws Exception {
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
registerTaskManagerListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process

TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
request.setShouldFail(true);
request.setShouldStoreResult(true);
request.setShouldBlock(false);

// Start non-blocking test task that should fail
assertThrows(
new TestTaskPlugin.NodesRequestBuilder(client(), TestTaskPlugin.TestTaskAction.INSTANCE)
.setShouldFail(true)
.setShouldStoreResult(true)
.setShouldBlock(false),
client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request),
IllegalStateException.class
);

Expand Down Expand Up @@ -852,7 +861,7 @@ public void tearDown() throws Exception {
/**
* Registers recording task event listeners with the given action mask on all nodes
*/
private void registerTaskManageListeners(String actionMasks) {
private void registerTaskManagerListeners(String actionMasks) {
for (String nodeName : internalCluster().getNodeNames()) {
DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode();
RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node.getId(), actionMasks.split(","));
Expand All @@ -865,7 +874,7 @@ private void registerTaskManageListeners(String actionMasks) {
/**
* Resets all recording task event listeners with the given action mask on all nodes
*/
private void resetTaskManageListeners(String actionMasks) {
private void resetTaskManagerListeners(String actionMasks) {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
entry.getValue().reset();
Expand Down Expand Up @@ -919,11 +928,12 @@ private void assertParentTask(TaskInfo task, TaskInfo parentTask) {
assertEquals(parentTask.getId(), task.getParentTaskId().getId());
}

private ResourceNotFoundException expectNotFound(ThrowingRunnable r) {
private void expectNotFound(ThrowingRunnable r) {
Exception e = expectThrows(Exception.class, r);
ResourceNotFoundException notFound = (ResourceNotFoundException) ExceptionsHelper.unwrap(e, ResourceNotFoundException.class);
if (notFound == null) throw new RuntimeException("Expected ResourceNotFoundException", e);
return notFound;
if (notFound == null) {
throw new AssertionError("Expected " + ResourceNotFoundException.class.getSimpleName(), e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.NodesOperationRequestBuilder;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
Expand Down Expand Up @@ -138,11 +137,11 @@ public NodeResponse(DiscoveryNode node) {

public static class NodesResponse extends BaseNodesResponse<NodeResponse> implements ToXContentFragment {

public NodesResponse(StreamInput in) throws IOException {
NodesResponse(StreamInput in) throws IOException {
super(in);
}

public NodesResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
NodesResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

Expand All @@ -168,8 +167,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

public static class NodeRequest extends BaseNodeRequest {
protected String requestName;
protected boolean shouldBlock;
protected final String requestName;
protected final boolean shouldBlock;

public NodeRequest(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -214,7 +213,7 @@ public static class NodesRequest extends BaseNodesRequest<NodesRequest> {
shouldFail = in.readBoolean();
}

public NodesRequest(String requestName, String... nodesIds) {
NodesRequest(String requestName, String... nodesIds) {
super(nodesIds);
this.requestName = requestName;
}
Expand Down Expand Up @@ -330,37 +329,13 @@ private TestTaskAction() {
}
}

public static class NodesRequestBuilder extends NodesOperationRequestBuilder<NodesRequest, NodesResponse, NodesRequestBuilder> {

protected NodesRequestBuilder(ElasticsearchClient client, ActionType<NodesResponse> action) {
super(client, action, new NodesRequest("test"));
}


public NodesRequestBuilder setShouldStoreResult(boolean shouldStoreResult) {
request().setShouldStoreResult(shouldStoreResult);
return this;
}

public NodesRequestBuilder setShouldBlock(boolean shouldBlock) {
request().setShouldBlock(shouldBlock);
return this;
}

public NodesRequestBuilder setShouldFail(boolean shouldFail) {
request().setShouldFail(shouldFail);
return this;
}
}


public static class UnblockTestTaskResponse implements Writeable {

public UnblockTestTaskResponse() {
UnblockTestTaskResponse() {

}

public UnblockTestTaskResponse(StreamInput in) {
UnblockTestTaskResponse(StreamInput in) {
}

@Override
Expand All @@ -387,13 +362,13 @@ public static class UnblockTestTasksResponse extends BaseTasksResponse {

private List<UnblockTestTaskResponse> tasks;

public UnblockTestTasksResponse(List<UnblockTestTaskResponse> tasks, List<TaskOperationFailure> taskFailures, List<? extends
UnblockTestTasksResponse(List<UnblockTestTaskResponse> tasks, List<TaskOperationFailure> taskFailures, List<? extends
FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : List.copyOf(tasks);
}

public UnblockTestTasksResponse(StreamInput in) throws IOException {
UnblockTestTasksResponse(StreamInput in) throws IOException {
super(in);
int taskCount = in.readVInt();
List<UnblockTestTaskResponse> builder = new ArrayList<>();
Expand Down