From 7384c4ea2be22ba82a83edd83fd76fa0df6d2d27 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 11 May 2021 15:04:40 +0100 Subject: [PATCH 1/6] Identify cancelled tasks in list tasks API This commit adds a `cancelled` flag to each cancellable task in the response to the list tasks API, allowing users to see that a task has been properly cancelled and will complete as soon as possible. Closes #72907 --- .../elasticsearch/client/tasks/TaskInfo.java | 25 ++- .../core/tasks/GetTaskResponseTests.java | 14 +- .../client/enrich/StatsResponseTests.java | 14 +- .../tasks/CancelTasksResponseTests.java | 5 +- docs/reference/cluster/tasks.asciidoc | 8 + .../TransportRethrottleActionTests.java | 28 ++- .../admin/cluster/node/tasks/TasksIT.java | 15 +- .../java/org/elasticsearch/tasks/Task.java | 14 +- .../org/elasticsearch/tasks/TaskInfo.java | 68 ++++++- .../admin/cluster/node/tasks/TaskTests.java | 21 ++- .../tasks/ListTasksResponseTests.java | 2 +- .../elasticsearch/tasks/TaskInfoTests.java | 173 +++++++++++++++--- .../action/EnrichStatsResponseTests.java | 15 +- .../ml/MlDailyMaintenanceServiceTests.java | 1 + 14 files changed, 353 insertions(+), 50 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskInfo.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskInfo.java index 7ace6a52df315..a2321064061e1 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskInfo.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskInfo.java @@ -29,6 +29,7 @@ public class TaskInfo { private long startTime; private long runningTimeNanos; private boolean cancellable; + private boolean cancelled; private TaskId parentTaskId; private final Map status = new HashMap<>(); private final Map headers = new HashMap<>(); @@ -93,6 +94,14 @@ void setCancellable(boolean cancellable) { this.cancellable = cancellable; } + public boolean isCancelled() { + return cancelled; + } + + void setCancelled(boolean cancelled) { + this.cancelled = cancelled; + } + public TaskId getParentTaskId() { return parentTaskId; } @@ -134,6 +143,7 @@ private void noOpParse(Object s) {} parser.declareLong(TaskInfo::setStartTime, new ParseField("start_time_in_millis")); parser.declareLong(TaskInfo::setRunningTimeNanos, new ParseField("running_time_in_nanos")); parser.declareBoolean(TaskInfo::setCancellable, new ParseField("cancellable")); + parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled")); parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id")); parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers")); PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null); @@ -147,6 +157,7 @@ public boolean equals(Object o) { return getStartTime() == taskInfo.getStartTime() && getRunningTimeNanos() == taskInfo.getRunningTimeNanos() && isCancellable() == taskInfo.isCancellable() && + isCancelled() == taskInfo.isCancelled() && Objects.equals(getTaskId(), taskInfo.getTaskId()) && Objects.equals(getType(), taskInfo.getType()) && Objects.equals(getAction(), taskInfo.getAction()) && @@ -159,8 +170,17 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash( - getTaskId(), getType(), getAction(), getDescription(), getStartTime(), - getRunningTimeNanos(), isCancellable(), getParentTaskId(), status, getHeaders() + getTaskId(), + getType(), + getAction(), + getDescription(), + getStartTime(), + getRunningTimeNanos(), + isCancellable(), + isCancelled(), + getParentTaskId(), + status, + getHeaders() ); } @@ -175,6 +195,7 @@ public String toString() { ", startTime=" + startTime + ", runningTimeNanos=" + runningTimeNanos + ", cancellable=" + cancellable + + ", cancelled=" + cancelled + ", parentTaskId=" + parentTaskId + ", status=" + status + ", headers=" + headers + diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/core/tasks/GetTaskResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/core/tasks/GetTaskResponseTests.java index 53acbbbdce17f..202ad9e6bbe68 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/core/tasks/GetTaskResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/core/tasks/GetTaskResponseTests.java @@ -69,11 +69,23 @@ static TaskInfo randomTaskInfo() { long startTime = randomLong(); long runningTimeNanos = randomLong(); boolean cancellable = randomBoolean(); + boolean cancelled = cancellable && randomBoolean(); TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(); Map headers = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); - return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers); + return new TaskInfo( + taskId, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers); } private static TaskId randomTaskId() { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/StatsResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/StatsResponseTests.java index a3b3d9e608884..7efbca89294fb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/StatsResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/StatsResponseTests.java @@ -78,10 +78,22 @@ private static TaskInfo randomTaskInfo() { long startTime = randomLong(); long runningTimeNanos = randomNonNegativeLong(); boolean cancellable = randomBoolean(); + boolean cancelled = cancellable && randomBoolean(); TaskId parentTaskId = TaskId.EMPTY_TASK_ID; Map headers = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); - return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers); + return new TaskInfo( + taskId, + type, + action, + description, + null, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/CancelTasksResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/CancelTasksResponseTests.java index 2bf558e8d71c0..5d2855f73509d 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/CancelTasksResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/CancelTasksResponseTests.java @@ -57,6 +57,7 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns } for (int i = 0; i < 4; i++) { + boolean isCancellable = randomBoolean(); tasks.add(new org.elasticsearch.tasks.TaskInfo( new TaskId(NODE_ID, (long) i), randomAlphaOfLength(4), @@ -65,7 +66,8 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns new FakeTaskStatus(randomAlphaOfLength(4), randomInt()), randomLongBetween(1, 3), randomIntBetween(5, 10), - false, + isCancellable, + isCancellable && randomBoolean(), new TaskId("node1", randomLong()), Map.of("x-header-of", "some-value"))); } @@ -99,6 +101,7 @@ protected void assertInstances(ByNodeCancelTasksResponse serverTestInstance, assertEquals(ti.getStartTime(), taskInfo.getStartTime()); assertEquals(ti.getRunningTimeNanos(), taskInfo.getRunningTimeNanos()); assertEquals(ti.isCancellable(), taskInfo.isCancellable()); + assertEquals(ti.isCancelled(), taskInfo.isCancelled()); assertEquals(ti.getParentTaskId().getNodeId(), taskInfo.getParentTaskId().getNodeId()); assertEquals(ti.getParentTaskId().getId(), taskInfo.getParentTaskId().getId()); FakeTaskStatus status = (FakeTaskStatus) ti.getStatus(); diff --git a/docs/reference/cluster/tasks.asciidoc b/docs/reference/cluster/tasks.asciidoc index 94fa143f6258a..bf98266574b34 100644 --- a/docs/reference/cluster/tasks.asciidoc +++ b/docs/reference/cluster/tasks.asciidoc @@ -172,6 +172,7 @@ The API returns the following result: "start_time_in_millis" : 1483478610008, "running_time_in_nanos" : 13991383, "cancellable" : true + "cancelled" : false } } } @@ -243,6 +244,13 @@ nodes `nodeId1` and `nodeId2`. POST _tasks/_cancel?nodes=nodeId1,nodeId2&actions=*reindex -------------------------------------------------- +A task may continue to run for some time after it has been cancelled since it +may not be able to safely stop its current activity straight away. The list +tasks API will continue to list these cancelled tasks until they complete. The +`cancelled` flag in the response to the list tasks API indicates that the +cancellation command has been processed and the task will stop as soon as +possible. + ===== Task Grouping The task lists returned by task API commands can be grouped either by nodes diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java index 828316a10a291..d24b7220d79d7 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java @@ -91,8 +91,18 @@ public void testRethrottleSuccessfulResponse() { List sliceStatuses = new ArrayList<>(slices); for (int i = 0; i < slices; i++) { BulkByScrollTask.Status status = believeableInProgressStatus(i); - tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()), - Collections.emptyMap())); + tasks.add(new TaskInfo( + new TaskId("test", 123), + "test", + "test", + "test", + status, + 0, + 0, + true, + false, + new TaskId("test", task.getId()), + Collections.emptyMap())); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); } rethrottleTestCase(slices, @@ -112,8 +122,18 @@ public void testRethrottleWithSomeSucceeded() { List tasks = new ArrayList<>(); for (int i = succeeded; i < slices; i++) { BulkByScrollTask.Status status = believeableInProgressStatus(i); - tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()), - Collections.emptyMap())); + tasks.add(new TaskInfo( + new TaskId("test", 123), + "test", + "test", + "test", + status, + 0, + 0, + true, + false, + new TaskId("test", task.getId()), + Collections.emptyMap())); sliceStatuses.add(new BulkByScrollTask.StatusOrException(status)); } rethrottleTestCase(slices - succeeded, diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 73e253e63422c..32d3ab62aa7cb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -819,8 +819,19 @@ public void testNodeNotFoundButTaskFound() throws Exception { CyclicBarrier b = new CyclicBarrier(2); TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class); resultsService.storeResult(new TaskResult( - new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()), - new RuntimeException("test")), + new TaskInfo( + new TaskId("fake", 1), + "test", + "test", + "", + null, + 0, + 0, + false, + false, + TaskId.EMPTY_TASK_ID, + Collections.emptyMap()), + new RuntimeException("test")), new ActionListener() { @Override public void onResponse(Void response) { diff --git a/server/src/main/java/org/elasticsearch/tasks/Task.java b/server/src/main/java/org/elasticsearch/tasks/Task.java index bd296a3ed02c7..9b0a77f5be9d3 100644 --- a/server/src/main/java/org/elasticsearch/tasks/Task.java +++ b/server/src/main/java/org/elasticsearch/tasks/Task.java @@ -90,8 +90,18 @@ public final TaskInfo taskInfo(String localNodeId, boolean detailed) { * Build a proper {@link TaskInfo} for this task. */ protected final TaskInfo taskInfo(String localNodeId, String description, Status status) { - return new TaskInfo(new TaskId(localNodeId, getId()), getType(), getAction(), description, status, startTime, - System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask, headers); + return new TaskInfo( + new TaskId(localNodeId, getId()), + getType(), + getAction(), + description, + status, + startTime, + System.nanoTime() - startTimeNanos, + this instanceof CancellableTask, + this instanceof CancellableTask && ((CancellableTask)this).isCancelled(), + parentTask, + headers); } /** diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java b/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java index 8307830bfc830..6aa94425b26b3 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java @@ -8,6 +8,7 @@ package org.elasticsearch.tasks; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -56,12 +57,25 @@ public final class TaskInfo implements Writeable, ToXContentFragment { private final boolean cancellable; + private final boolean cancelled; + private final TaskId parentTaskId; private final Map headers; - public TaskInfo(TaskId taskId, String type, String action, String description, Task.Status status, long startTime, - long runningTimeNanos, boolean cancellable, TaskId parentTaskId, Map headers) { + public TaskInfo( + TaskId taskId, + String type, + String action, + String description, + Task.Status status, + long startTime, + long runningTimeNanos, + boolean cancellable, + boolean cancelled, + TaskId parentTaskId, + Map headers) { + assert cancellable || cancelled == false : "uncancellable task cannot be cancelled"; this.taskId = taskId; this.type = type; this.action = action; @@ -70,6 +84,7 @@ public TaskInfo(TaskId taskId, String type, String action, String description, T this.startTime = startTime; this.runningTimeNanos = runningTimeNanos; this.cancellable = cancellable; + this.cancelled = cancelled; this.parentTaskId = parentTaskId; this.headers = headers; } @@ -86,6 +101,12 @@ public TaskInfo(StreamInput in) throws IOException { startTime = in.readLong(); runningTimeNanos = in.readLong(); cancellable = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + cancelled = in.readBoolean(); + } else { + cancelled = false; // older versions do not report when tasks are cancelled so we just assume they aren't + } + assert cancellable || cancelled == false : "uncancellable task cannot be cancelled"; parentTaskId = TaskId.readFromStream(in); headers = in.readMap(StreamInput::readString, StreamInput::readString); } @@ -100,6 +121,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(startTime); out.writeLong(runningTimeNanos); out.writeBoolean(cancellable); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(cancelled); + } // older versions do not report when tasks are cancelled anyway so it's ok just to drop this flag parentTaskId.writeTo(out); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -153,6 +177,13 @@ public boolean isCancellable() { return cancellable; } + /** + * Returns true if the task supports cancellation and has been cancelled + */ + public boolean isCancelled() { + return cancelled; + } + /** * Returns the parent task id */ @@ -185,6 +216,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field("running_time_in_nanos", runningTimeNanos); builder.field("cancellable", cancellable); + if (cancellable) { + builder.field("cancelled", cancelled); + } if (parentTaskId.isSet()) { builder.field("parent_task_id", parentTaskId.toString()); } @@ -211,6 +245,7 @@ public static TaskInfo fromXContent(XContentParser parser) { long startTime = (Long) a[i++]; long runningTimeNanos = (Long) a[i++]; boolean cancellable = (Boolean) a[i++]; + boolean cancelled = a[i++] == Boolean.TRUE; String parentTaskIdString = (String) a[i++]; @SuppressWarnings("unchecked") Map headers = (Map) a[i++]; if (headers == null) { @@ -219,8 +254,18 @@ public static TaskInfo fromXContent(XContentParser parser) { } RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes); TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString); - return new TaskInfo(id, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, - headers); + return new TaskInfo( + id, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers); }); static { // Note for the future: this has to be backwards and forwards compatible with all changes to the task storage format @@ -234,6 +279,7 @@ public static TaskInfo fromXContent(XContentParser parser) { PARSER.declareLong(constructorArg(), new ParseField("start_time_in_millis")); PARSER.declareLong(constructorArg(), new ParseField("running_time_in_nanos")); PARSER.declareBoolean(constructorArg(), new ParseField("cancellable")); + PARSER.declareBoolean(optionalConstructorArg(), new ParseField("cancelled")); PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id")); PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers")); } @@ -258,12 +304,24 @@ public boolean equals(Object obj) { && Objects.equals(runningTimeNanos, other.runningTimeNanos) && Objects.equals(parentTaskId, other.parentTaskId) && Objects.equals(cancellable, other.cancellable) + && Objects.equals(cancelled, other.cancelled) && Objects.equals(status, other.status) && Objects.equals(headers, other.headers); } @Override public int hashCode() { - return Objects.hash(taskId, type, action, description, startTime, runningTimeNanos, parentTaskId, cancellable, status, headers); + return Objects.hash( + taskId, + type, + action, + description, + startTime, + runningTimeNanos, + parentTaskId, + cancellable, + cancelled, + status, + headers); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java index f746508598104..bb8b16a149430 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskTests.java @@ -25,9 +25,19 @@ public void testTaskInfoToString() { long startTime = randomNonNegativeLong(); long runningTime = randomNonNegativeLong(); boolean cancellable = randomBoolean(); - TaskInfo taskInfo = new TaskInfo(new TaskId(nodeId, taskId), "test_type", - "test_action", "test_description", null, startTime, runningTime, cancellable, TaskId.EMPTY_TASK_ID, - Collections.singletonMap("foo", "bar")); + boolean cancelled = cancellable && randomBoolean(); + TaskInfo taskInfo = new TaskInfo( + new TaskId(nodeId, taskId), + "test_type", + "test_action", + "test_description", + null, + startTime, + runningTime, + cancellable, + cancelled, + TaskId.EMPTY_TASK_ID, + Collections.singletonMap("foo", "bar")); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); assertEquals(((Number)map.get("id")).longValue(), taskId); @@ -37,6 +47,11 @@ public void testTaskInfoToString() { assertEquals(((Number)map.get("start_time_in_millis")).longValue(), startTime); assertEquals(((Number)map.get("running_time_in_nanos")).longValue(), runningTime); assertEquals(map.get("cancellable"), cancellable); + if (cancellable) { + assertEquals(map.get("cancelled"), cancelled); + } else { + assertFalse(map.containsKey("cancelled")); + } assertEquals(map.get("headers"), Collections.singletonMap("foo", "bar")); } diff --git a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java index bdbd2cefd7724..b4952305c9106 100644 --- a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java @@ -40,7 +40,7 @@ public void testEmptyToString() { public void testNonEmptyToString() { TaskInfo info = new TaskInfo( - new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, new TaskId("node1", 0), + new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, false, new TaskId("node1", 0), Collections.singletonMap("foo", "bar")); ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList()); assertEquals("{\n" + diff --git a/server/src/test/java/org/elasticsearch/tasks/TaskInfoTests.java b/server/src/test/java/org/elasticsearch/tasks/TaskInfoTests.java index 4dc20abb3ff95..c5c2017ec9c3b 100644 --- a/server/src/test/java/org/elasticsearch/tasks/TaskInfoTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/TaskInfoTests.java @@ -61,40 +61,137 @@ protected TaskInfo mutateInstance(TaskInfo info) { switch (between(0, 9)) { case 0: TaskId taskId = new TaskId(info.getTaskId().getNodeId() + randomAlphaOfLength(5), info.getTaskId().getId()); - return new TaskInfo(taskId, info.getType(), info.getAction(), info.getDescription(), info.getStatus(), - info.getStartTime(), info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), info.getHeaders()); + return new TaskInfo( + taskId, + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + info.getHeaders()); case 1: - return new TaskInfo(info.getTaskId(), info.getType() + randomAlphaOfLength(5), info.getAction(), info.getDescription(), - info.getStatus(), info.getStartTime(), info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders()); + return new TaskInfo( + info.getTaskId(), + info.getType() + randomAlphaOfLength(5), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + info.getHeaders()); case 2: - return new TaskInfo(info.getTaskId(), info.getType(), info.getAction() + randomAlphaOfLength(5), info.getDescription(), - info.getStatus(), info.getStartTime(), info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders()); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction() + randomAlphaOfLength(5), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + info.getHeaders()); case 3: - return new TaskInfo(info.getTaskId(), info.getType(), info.getAction(), info.getDescription() + randomAlphaOfLength(5), - info.getStatus(), info.getStartTime(), info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders()); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription() + randomAlphaOfLength(5), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + info.getHeaders()); case 4: Task.Status newStatus = randomValueOtherThan(info.getStatus(), TaskInfoTests::randomRawTaskStatus); - return new TaskInfo(info.getTaskId(), info.getType(), info.getAction(), info.getDescription(), newStatus, - info.getStartTime(), info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), info.getHeaders()); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + newStatus, + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + info.getHeaders()); case 5: - return new TaskInfo(info.getTaskId(), info.getType(), info.getAction(), info.getDescription(), info.getStatus(), - info.getStartTime() + between(1, 100), info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), - info.getHeaders()); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime() + between(1, 100), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + info.getHeaders()); case 6: - return new TaskInfo(info.getTaskId(), info.getType(), info.getAction(), info.getDescription(), info.getStatus(), - info.getStartTime(), info.getRunningTimeNanos() + between(1, 100), info.isCancellable(), info.getParentTaskId(), - info.getHeaders()); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos() + between(1, 100), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + info.getHeaders()); case 7: - return new TaskInfo(info.getTaskId(), info.getType(), info.getAction(), info.getDescription(), info.getStatus(), - info.getStartTime(), info.getRunningTimeNanos(), info.isCancellable() == false, info.getParentTaskId(), - info.getHeaders()); + // if not cancellable then mutate cancellable flag but leave cancelled flag unset + // if cancelled then mutate cancelled flag but leave cancellable flag set + // if cancellable but not cancelled then mutate exactly one of the flags + // + // cancellable | cancelled | random | cancellable == cancelled | isNowCancellable | isNowCancelled + // false | false | false | true | true | false + // false | false | true | true | true | false + // true | true | false | true | true | false + // true | true | true | true | true | false + // true | false | false | false | false | false + // true | false | true | false | true | true + boolean isNowCancellable = info.isCancellable() == info.isCancelled() || randomBoolean(); + boolean isNowCancelled = isNowCancellable != (info.isCancellable() == info.isCancelled()); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + isNowCancellable, + isNowCancelled, + info.getParentTaskId(), + info.getHeaders()); case 8: TaskId parentId = new TaskId(info.getParentTaskId().getNodeId() + randomAlphaOfLength(5), info.getParentTaskId().getId()); - return new TaskInfo(info.getTaskId(), info.getType(), info.getAction(), info.getDescription(), info.getStatus(), - info.getStartTime(), info.getRunningTimeNanos(), info.isCancellable(), parentId, info.getHeaders()); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + parentId, + info.getHeaders()); case 9: Map headers = info.getHeaders(); if (headers == null) { @@ -103,8 +200,18 @@ protected TaskInfo mutateInstance(TaskInfo info) { headers = new HashMap<>(info.getHeaders()); } headers.put(randomAlphaOfLength(15), randomAlphaOfLength(15)); - return new TaskInfo(info.getTaskId(), info.getType(), info.getAction(), info.getDescription(), info.getStatus(), - info.getStartTime(), info.getRunningTimeNanos(), info.isCancellable(), info.getParentTaskId(), headers); + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + headers); default: throw new IllegalStateException(); } @@ -119,11 +226,23 @@ static TaskInfo randomTaskInfo() { long startTime = randomLong(); long runningTimeNanos = randomLong(); boolean cancellable = randomBoolean(); + boolean cancelled = cancellable && randomBoolean(); TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(); Map headers = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); - return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers); + return new TaskInfo( + taskId, + type, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers); } private static TaskId randomTaskId() { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java index f2df5648c9d23..07690ceac9b6f 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java @@ -57,10 +57,23 @@ public static TaskInfo randomTaskInfo() { long startTime = randomLong(); long runningTimeNanos = randomNonNegativeLong(); boolean cancellable = randomBoolean(); + boolean cancelled = cancellable && randomBoolean(); TaskId parentTaskId = TaskId.EMPTY_TASK_ID; Map headers = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); - return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers); + return new TaskInfo( + taskId, + type, + action, + description, + null, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers + ); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java index ce69b5508c8f4..57b32d8cb8f5f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java @@ -163,6 +163,7 @@ public void testJobInDeletingStateAlreadyHasDeletionTask() throws InterruptedExc 0, 0, true, + false, new TaskId("test", 456), Collections.emptyMap()); From 0937d19c4a6836cc264a55f62c26b3f61d7ecced Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 11 May 2021 15:18:31 +0100 Subject: [PATCH 2/6] fix invalid JSON --- docs/reference/cluster/tasks.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/reference/cluster/tasks.asciidoc b/docs/reference/cluster/tasks.asciidoc index bf98266574b34..13f966aa58cb0 100644 --- a/docs/reference/cluster/tasks.asciidoc +++ b/docs/reference/cluster/tasks.asciidoc @@ -171,7 +171,7 @@ The API returns the following result: "description" : "indices[test], types[test], search_type[QUERY_THEN_FETCH], source[{\"query\":...}]", "start_time_in_millis" : 1483478610008, "running_time_in_nanos" : 13991383, - "cancellable" : true + "cancellable" : true, "cancelled" : false } } @@ -244,7 +244,7 @@ nodes `nodeId1` and `nodeId2`. POST _tasks/_cancel?nodes=nodeId1,nodeId2&actions=*reindex -------------------------------------------------- -A task may continue to run for some time after it has been cancelled since it +A task may continue to run for some time after it has been cancelled because it may not be able to safely stop its current activity straight away. The list tasks API will continue to list these cancelled tasks until they complete. The `cancelled` flag in the response to the list tasks API indicates that the From a23f80c5b79983df91c2756efb5356dd9020fe29 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 11 May 2021 16:17:42 +0100 Subject: [PATCH 3/6] Drop flag in .tasks index --- .../src/main/java/org/elasticsearch/tasks/TaskInfo.java | 8 +++++++- .../java/org/elasticsearch/tasks/TaskResultsService.java | 4 +++- .../org/elasticsearch/tasks/ListTasksResponseTests.java | 1 + 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java b/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java index 6aa94425b26b3..51ebac9ba4bb5 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskInfo.java @@ -41,6 +41,9 @@ * snapshot information about currently running tasks. */ public final class TaskInfo implements Writeable, ToXContentFragment { + + static final String INCLUDE_CANCELLED_PARAM = "include_cancelled"; + private final TaskId taskId; private final String type; @@ -216,7 +219,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field("running_time_in_nanos", runningTimeNanos); builder.field("cancellable", cancellable); - if (cancellable) { + + if (params.paramAsBoolean(INCLUDE_CANCELLED_PARAM, true) && cancellable) { + // don't record this on entries in the tasks index, since we can't add this field to the mapping dynamically and it's not + // important for completed tasks anyway builder.field("cancelled", cancelled); } if (parentTaskId.isSet()) { diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index 3d2f9dfc182cc..10cbc33524848 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -33,10 +33,12 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Iterator; +import java.util.Map; import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.tasks.TaskInfo.INCLUDE_CANCELLED_PARAM; /** * Service that can store task results. @@ -80,7 +82,7 @@ public TaskResultsService(Client client, ThreadPool threadPool) { public void storeResult(TaskResult taskResult, ActionListener listener) { IndexRequestBuilder index = client.prepareIndex(TASK_INDEX).setId(taskResult.getTask().getTaskId().toString()); try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { - taskResult.toXContent(builder, ToXContent.EMPTY_PARAMS); + taskResult.toXContent(builder, new ToXContent.MapParams(Map.of(INCLUDE_CANCELLED_PARAM, "false"))); index.setSource(builder); } catch (IOException e) { throw new ElasticsearchException("Couldn't convert task result to XContent for [{}]", e, taskResult.getTask()); diff --git a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java index b4952305c9106..623c041498808 100644 --- a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java @@ -56,6 +56,7 @@ public void testNonEmptyToString() { " \"running_time\" : \"1nanos\",\n" + " \"running_time_in_nanos\" : 1,\n" + " \"cancellable\" : true,\n" + + " \"cancelled\" : false,\n" + " \"parent_task_id\" : \"node1:0\",\n" + " \"headers\" : {\n" + " \"foo\" : \"bar\"\n" + From 886887106f88a5cf1abee3e3d2795a8927e55531 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 11 May 2021 16:37:24 +0100 Subject: [PATCH 4/6] More test fix --- .../monitoring/collector/enrich/ExecutingPolicyDocTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java index 68a1c9273c5ee..eacf484f9568d 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java @@ -124,6 +124,9 @@ public void testToXContent() throws IOException { + "," + "\"cancellable\":" + executingPolicy.getTaskInfo().isCancellable() + + (executingPolicy.getTaskInfo().isCancellable() + ? ",\"cancelled\":"+ executingPolicy.getTaskInfo().isCancellable() + : "") + "," + header.map(entry -> String.format(Locale.ROOT, "\"headers\":{\"%s\":\"%s\"}", entry.getKey(), entry.getValue())) .orElse("\"headers\":{}") From 35894da34f328617c32962369b6be2c64598a9d3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 11 May 2021 16:42:12 +0100 Subject: [PATCH 5/6] Spotless --- .../monitoring/collector/enrich/ExecutingPolicyDocTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java index eacf484f9568d..8f0886fd3bad3 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/ExecutingPolicyDocTests.java @@ -125,7 +125,7 @@ public void testToXContent() throws IOException { + "\"cancellable\":" + executingPolicy.getTaskInfo().isCancellable() + (executingPolicy.getTaskInfo().isCancellable() - ? ",\"cancelled\":"+ executingPolicy.getTaskInfo().isCancellable() + ? ",\"cancelled\":" + executingPolicy.getTaskInfo().isCancellable() : "") + "," + header.map(entry -> String.format(Locale.ROOT, "\"headers\":{\"%s\":\"%s\"}", entry.getKey(), entry.getValue())) From f617497e577534b2886aad5becb5c79b669e7b2c Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 17 May 2021 10:00:39 +0100 Subject: [PATCH 6/6] Tidy comment --- .../test/java/org/elasticsearch/tasks/TaskInfoTests.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/tasks/TaskInfoTests.java b/server/src/test/java/org/elasticsearch/tasks/TaskInfoTests.java index c5c2017ec9c3b..42e2a56366a22 100644 --- a/server/src/test/java/org/elasticsearch/tasks/TaskInfoTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/TaskInfoTests.java @@ -158,10 +158,8 @@ protected TaskInfo mutateInstance(TaskInfo info) { // if cancellable but not cancelled then mutate exactly one of the flags // // cancellable | cancelled | random | cancellable == cancelled | isNowCancellable | isNowCancelled - // false | false | false | true | true | false - // false | false | true | true | true | false - // true | true | false | true | true | false - // true | true | true | true | true | false + // false | false | - | true | true | false + // true | true | - | true | true | false // true | false | false | false | false | false // true | false | true | false | true | true boolean isNowCancellable = info.isCancellable() == info.isCancelled() || randomBoolean();