Skip to content

Commit

Permalink
Identify cancelled tasks in list tasks API (#72931)
Browse files Browse the repository at this point in the history
This commit adds a `cancelled` flag to each cancellable task in the
response to the list tasks API, allowing users to see that a task has
been properly cancelled and will complete as soon as possible.

Closes #72907
  • Loading branch information
DaveCTurner committed May 17, 2021
1 parent 15d5f2a commit 6d8b841
Show file tree
Hide file tree
Showing 16 changed files with 364 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class TaskInfo {
private long startTime;
private long runningTimeNanos;
private boolean cancellable;
private boolean cancelled;
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
Expand Down Expand Up @@ -93,6 +94,14 @@ void setCancellable(boolean cancellable) {
this.cancellable = cancellable;
}

public boolean isCancelled() {
return cancelled;
}

void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}

public TaskId getParentTaskId() {
return parentTaskId;
}
Expand Down Expand Up @@ -134,6 +143,7 @@ private void noOpParse(Object s) {}
parser.declareLong(TaskInfo::setStartTime, new ParseField("start_time_in_millis"));
parser.declareLong(TaskInfo::setRunningTimeNanos, new ParseField("running_time_in_nanos"));
parser.declareBoolean(TaskInfo::setCancellable, new ParseField("cancellable"));
parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
Expand All @@ -147,6 +157,7 @@ public boolean equals(Object o) {
return getStartTime() == taskInfo.getStartTime() &&
getRunningTimeNanos() == taskInfo.getRunningTimeNanos() &&
isCancellable() == taskInfo.isCancellable() &&
isCancelled() == taskInfo.isCancelled() &&
Objects.equals(getTaskId(), taskInfo.getTaskId()) &&
Objects.equals(getType(), taskInfo.getType()) &&
Objects.equals(getAction(), taskInfo.getAction()) &&
Expand All @@ -159,8 +170,17 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
getTaskId(), getType(), getAction(), getDescription(), getStartTime(),
getRunningTimeNanos(), isCancellable(), getParentTaskId(), status, getHeaders()
getTaskId(),
getType(),
getAction(),
getDescription(),
getStartTime(),
getRunningTimeNanos(),
isCancellable(),
isCancelled(),
getParentTaskId(),
status,
getHeaders()
);
}

Expand All @@ -175,6 +195,7 @@ public String toString() {
", startTime=" + startTime +
", runningTimeNanos=" + runningTimeNanos +
", cancellable=" + cancellable +
", cancelled=" + cancelled +
", parentTaskId=" + parentTaskId +
", status=" + status +
", headers=" + headers +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,23 @@ static TaskInfo randomTaskInfo() {
long startTime = randomLong();
long runningTimeNanos = randomLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable && randomBoolean();
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
Map<String, String> headers = randomBoolean() ?
Collections.emptyMap() :
Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers);
}

private static TaskId randomTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,22 @@ private static TaskInfo randomTaskInfo() {
long startTime = randomLong();
long runningTimeNanos = randomNonNegativeLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable && randomBoolean();
TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
Map<String, String> headers = randomBoolean() ?
Collections.emptyMap() :
Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
taskId,
type,
action,
description,
null,
startTime,
runningTimeNanos,
cancellable,
cancelled,
parentTaskId,
headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
}

for (int i = 0; i < 4; i++) {
boolean isCancellable = randomBoolean();
tasks.add(new org.elasticsearch.tasks.TaskInfo(
new TaskId(NODE_ID, (long) i),
randomAlphaOfLength(4),
Expand All @@ -66,7 +67,8 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
new FakeTaskStatus(randomAlphaOfLength(4), randomInt()),
randomLongBetween(1, 3),
randomIntBetween(5, 10),
false,
isCancellable,
isCancellable && randomBoolean(),
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value")));
}
Expand Down Expand Up @@ -100,6 +102,7 @@ protected void assertInstances(ByNodeCancelTasksResponse serverTestInstance,
assertEquals(ti.getStartTime(), taskInfo.getStartTime());
assertEquals(ti.getRunningTimeNanos(), taskInfo.getRunningTimeNanos());
assertEquals(ti.isCancellable(), taskInfo.isCancellable());
assertEquals(ti.isCancelled(), taskInfo.isCancelled());
assertEquals(ti.getParentTaskId().getNodeId(), taskInfo.getParentTaskId().getNodeId());
assertEquals(ti.getParentTaskId().getId(), taskInfo.getParentTaskId().getId());
FakeTaskStatus status = (FakeTaskStatus) ti.getStatus();
Expand Down
10 changes: 9 additions & 1 deletion docs/reference/cluster/tasks.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ The API returns the following result:
"description" : "indices[test], types[test], search_type[QUERY_THEN_FETCH], source[{\"query\":...}]",
"start_time_in_millis" : 1483478610008,
"running_time_in_nanos" : 13991383,
"cancellable" : true
"cancellable" : true,
"cancelled" : false
}
}
}
Expand Down Expand Up @@ -243,6 +244,13 @@ nodes `nodeId1` and `nodeId2`.
POST _tasks/_cancel?nodes=nodeId1,nodeId2&actions=*reindex
--------------------------------------------------

A task may continue to run for some time after it has been cancelled because it
may not be able to safely stop its current activity straight away. The list
tasks API will continue to list these cancelled tasks until they complete. The
`cancelled` flag in the response to the list tasks API indicates that the
cancellation command has been processed and the task will stop as soon as
possible.

===== Task Grouping

The task lists returned by task API commands can be grouped either by nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,18 @@ public void testRethrottleSuccessfulResponse() {
List<BulkByScrollTask.StatusOrException> sliceStatuses = new ArrayList<>(slices);
for (int i = 0; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
tasks.add(new TaskInfo(
new TaskId("test", 123),
"test",
"test",
"test",
status,
0,
0,
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices,
Expand All @@ -112,8 +122,18 @@ public void testRethrottleWithSomeSucceeded() {
List<TaskInfo> tasks = new ArrayList<>();
for (int i = succeeded; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
tasks.add(new TaskInfo(
new TaskId("test", 123),
"test",
"test",
"test",
status,
0,
0,
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices - succeeded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,19 @@ public void testNodeNotFoundButTaskFound() throws Exception {
CyclicBarrier b = new CyclicBarrier(2);
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(new TaskResult(
new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()),
new RuntimeException("test")),
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap()),
new RuntimeException("test")),
new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,18 @@ public final TaskInfo taskInfo(String localNodeId, boolean detailed) {
* Build a proper {@link TaskInfo} for this task.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status) {
return new TaskInfo(new TaskId(localNodeId, getId()), getType(), getAction(), description, status, startTime,
System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask, headers);
return new TaskInfo(
new TaskId(localNodeId, getId()),
getType(),
getAction(),
description,
status,
startTime,
System.nanoTime() - startTimeNanos,
this instanceof CancellableTask,
this instanceof CancellableTask && ((CancellableTask)this).isCancelled(),
parentTask,
headers);
}

/**
Expand Down
Loading

0 comments on commit 6d8b841

Please sign in to comment.