Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PersistentTasksClusterService::unassignPersistentTask method #37576

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,45 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

/**
* This unassigns a task from any node, i.e. it is assigned to a {@code null} node with the provided reason.
*
* Since the assignment executor node is null, the {@link PersistentTasksClusterService} will attempt to reassign it to a valid
* node quickly.
*
* @param taskId the id of a persistent task
* @param taskAllocationId the expected allocation id of the persistent task
* @param reason the reason for unassigning the task from any node
* @param listener the listener that will be called when task is unassigned
*/
public void unassignPersistentTask(final String taskId,
final long taskAllocationId,
final String reason,
final ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("unassign persistent task from any node", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(taskId, taskAllocationId)) {
logger.trace("Unassigning task {} with allocation id {}", taskId, taskAllocationId);
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason)));
} else {
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId);
}
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, taskId));
}
});
}

/**
* Creates a new {@link Assignment} for the given persistent task.
*
Expand All @@ -263,7 +302,7 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(final

AssignmentDecision decision = decider.canAssign();
if (decision.getType() == AssignmentDecision.Type.NO) {
return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
}

return persistentTasksExecutor.getAssignment(taskParams, currentState);
Expand Down Expand Up @@ -404,6 +443,10 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus
}
}

private static Assignment unassignedAssignment(String reason) {
return new Assignment(null, reason);
}

/**
* Class to periodically try to reassign unassigned persistent tasks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.elasticsearch.persistent;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -63,10 +65,13 @@
import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged;
import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -464,6 +469,56 @@ public void testPeriodicRecheck() throws Exception {
});
}

public void testUnassignTask() {
ClusterState clusterState = initialState();
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(
clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
.localNodeId("_node_1")
.masterNodeId("_node_1")
.add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT));

String unassignedId = addTask(tasks, "unassign", "_node_2");

MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
clusterState = builder.metaData(metaData).nodes(nodes).build();
setState(clusterService, clusterState);
PersistentTasksClusterService service = createService((params, currentState) ->
new Assignment("_node_2", "test"));
service.unassignPersistentTask(unassignedId, tasks.getLastAllocationId(), "unassignment test", ActionListener.wrap(
task -> {
assertThat(task.getAssignment().getExecutorNode(), is(nullValue()));
assertThat(task.getId(), equalTo(unassignedId));
assertThat(task.getAssignment().getExplanation(), equalTo("unassignment test"));
},
e -> fail()
));
}

public void testUnassignNonExistentTask() {
ClusterState clusterState = initialState();
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(
clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
.localNodeId("_node_1")
.masterNodeId("_node_1")
.add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT));

MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
clusterState = builder.metaData(metaData).nodes(nodes).build();
setState(clusterService, clusterState);
PersistentTasksClusterService service = createService((params, currentState) ->
new Assignment("_node_2", "test"));
service.unassignPersistentTask("missing-task", tasks.getLastAllocationId(), "unassignment test", ActionListener.wrap(
task -> fail(),
e -> assertThat(e, instanceOf(ResourceNotFoundException.class))
));
}

private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) {
AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure);
AtomicReference<ClusterState> state = new AtomicReference<>(initialState);
Expand Down Expand Up @@ -728,9 +783,11 @@ private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuil
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param), assignment).build()));
}

private void addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) {
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param),
private String addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) {
String id = UUIDs.base64UUID();
tasks.addTask(id, TestPersistentTasksExecutor.NAME, new TestParams(param),
new Assignment(node, "explanation: " + param));
return id;
}

private DiscoveryNode newNode(String nodeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class PersistentTasksExecutorIT extends ESIntegTestCase {
Expand Down Expand Up @@ -155,11 +157,8 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
String newNode = internalCluster().startNode(nodeSettings);
String newNodeId = internalCluster().clusterService(newNode).localNode().getId();
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
.size(), equalTo(1));
});
waitForTaskToStart();

TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);

Expand Down Expand Up @@ -199,11 +198,7 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception

TestPersistentTasksExecutor.setNonClusterStateCondition(true);

assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
.size(), equalTo(1));
});
waitForTaskToStart();
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);

Expand All @@ -221,12 +216,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
String taskId = future.get().getId();

assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
.size(), equalTo(1));
});
waitForTaskToStart();
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);

Expand Down Expand Up @@ -307,6 +297,62 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception {
});
}

public void testUnassignRunningPersistentTask() throws Exception {
PersistentTasksClusterService persistentTasksClusterService =
internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName());
// Speed up rechecks to a rate that is quicker than what settings would allow
persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a risk to set it to a such low value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already another test that sets 1ms for the recheck interval. I agree 1ms would be a completely inappropriate setting for production, but that's why it's being set via a method that end users cannot call. I think if a test fails because the interval is low then it will be exposing a bug that could happen with a higher setting, just much less frequently. During this test it's true that the master node will be doing a lot of work iterating through the persistent tasks list, but it won't be doing the other work that a production master node would be doing, so a modern CPU should be able to cope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the precision. I just wanted to be sure that this value couldn't overhelm any thread pool and cause other issues.

PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
TestParams testParams = new TestParams("Blah");
testParams.setExecutorNodeAttr("test");
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future);
PersistentTask<TestParams> task = future.get();
String taskId = task.getId();

Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
internalCluster().startNode(nodeSettings);

waitForTaskToStart();

PlainActionFuture<PersistentTask<?>> unassignmentFuture = new PlainActionFuture<>();

// Disallow re-assignment after it is unallocated to verify master and node state
TestPersistentTasksExecutor.setNonClusterStateCondition(false);

persistentTasksClusterService.unassignPersistentTask(taskId,
task.getAllocationId() + 1,
"unassignment test",
unassignmentFuture);
PersistentTask<?> unassignedTask = unassignmentFuture.get();
assertThat(unassignedTask.getId(), equalTo(taskId));
assertThat(unassignedTask.getAssignment().getExplanation(), equalTo("unassignment test"));
assertThat(unassignedTask.getAssignment().getExecutorNode(), is(nullValue()));

assertBusy(() -> {
// Verify that the task is NOT running on the node
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
.getTasks();
assertThat(tasks.size(), equalTo(0));

// Verify that the task is STILL in internal cluster state
assertClusterStateHasTask(taskId);
});

// Allow it to be reassigned again to the same node
TestPersistentTasksExecutor.setNonClusterStateCondition(true);

// Verify it starts again
waitForTaskToStart();

assertClusterStateHasTask(taskId);

// Complete or cancel the running task
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
.get().getTasks().get(0);
stopOrCancelTask(taskInfo.getTaskId());
}

private void stopOrCancelTask(TaskId taskId) {
if (randomBoolean()) {
logger.info("Completing the running task");
Expand All @@ -322,6 +368,25 @@ private void stopOrCancelTask(TaskId taskId) {
}
}

private static void waitForTaskToStart() throws Exception {
assertBusy(() -> {
// Wait for the task to start
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
.size(), equalTo(1));
});
}

private static void assertClusterStateHasTask(String taskId) {
Collection<PersistentTask<?>> clusterTasks = ((PersistentTasksCustomMetaData) internalCluster()
.clusterService()
.state()
.getMetaData()
.custom(PersistentTasksCustomMetaData.TYPE))
.tasks();
assertThat(clusterTasks, hasSize(1));
assertThat(clusterTasks.iterator().next().getId(), equalTo(taskId));
}

private void assertNoRunningTasks() throws Exception {
assertBusy(() -> {
// Wait for the task to finish
Expand Down