From 611701d25943ccf18a47996151d6a46237fef522 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 29 Sep 2023 09:49:21 -0400 Subject: [PATCH] Reenable EsqlActionTaskIT (#99686) This test was failing every few runs. After these changes I ran it for hours in a `while` loop and it hasn't failed. Yay. This makes a small change to how we report the status of our `LuceneSourceOperator`s to line them up better with how they currently work. Closes #99589 Closes #99582 --- .../compute/lucene/LuceneOperator.java | 91 ++++++++----- .../compute/lucene/LuceneSliceQueue.java | 2 +- .../lucene/PartialLeafReaderContext.java | 19 +-- .../LuceneSourceOperatorStatusTests.java | 61 ++++----- .../xpack/esql/action/EsqlActionTaskIT.java | 121 ++++++++++++------ 5 files changed, 174 insertions(+), 120 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index ec1e13d033a8b..e7ba2f0d55891 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -22,6 +22,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.xcontent.XContentBuilder; @@ -31,6 +33,7 @@ import java.util.function.Function; public abstract class LuceneOperator extends SourceOperator { + private static final Logger logger = LogManager.getLogger(LuceneOperator.class); public static final int NO_LIMIT = Integer.MAX_VALUE; @@ -76,14 +79,15 @@ LuceneScorer getCurrentOrLoadNextScorer() { } } final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++); - final LeafReaderContext leaf = partialLeaf.leafReaderContext; + logger.trace("Starting {}", partialLeaf); + final LeafReaderContext leaf = partialLeaf.leafReaderContext(); if (currentScorer == null || currentScorer.leafReaderContext() != leaf) { final Weight weight = currentSlice.weight().get(); currentScorer = new LuceneScorer(currentSlice.shardIndex(), currentSlice.searchContext(), weight, leaf); } - assert currentScorer.maxPosition <= partialLeaf.maxDoc : currentScorer.maxPosition + ">" + partialLeaf.maxDoc; - currentScorer.maxPosition = partialLeaf.maxDoc; - currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc); + assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc(); + currentScorer.maxPosition = partialLeaf.maxDoc(); + currentScorer.position = Math.max(currentScorer.position, partialLeaf.minDoc()); } if (Thread.currentThread() != currentScorer.executingThread) { currentScorer.reinitialize(); @@ -175,49 +179,62 @@ public static class Status implements Operator.Status { private final int processedSlices; private final int totalSlices; private final int pagesEmitted; - private final int slicePosition; - private final int sliceSize; + private final int sliceIndex; + private final int sliceMin; + private final int sliceMax; + private final int current; private Status(LuceneOperator operator) { processedSlices = operator.processSlices; + sliceIndex = operator.sliceIndex; totalSlices = operator.sliceQueue.totalSlices(); LuceneSlice slice = operator.currentSlice; - final PartialLeafReaderContext leaf; - int sliceIndex = operator.sliceIndex; if (slice != null && sliceIndex < slice.numLeaves()) { - leaf = slice.getLeaf(sliceIndex); + PartialLeafReaderContext leaf = slice.getLeaf(sliceIndex); + sliceMin = leaf.minDoc(); + sliceMax = leaf.maxDoc(); } else { - leaf = null; + sliceMin = 0; + sliceMax = 0; } LuceneScorer scorer = operator.currentScorer; - slicePosition = scorer != null ? scorer.position : 0; - sliceSize = leaf != null ? leaf.maxDoc - leaf.minDoc : 0; + if (scorer == null) { + current = 0; + } else { + current = scorer.position; + } pagesEmitted = operator.pagesEmitted; } - Status(int processedSlices, int totalSlices, int pagesEmitted, int slicePosition, int sliceSize) { + Status(int processedSlices, int sliceIndex, int totalSlices, int pagesEmitted, int sliceMin, int sliceMax, int current) { this.processedSlices = processedSlices; + this.sliceIndex = sliceIndex; this.totalSlices = totalSlices; - this.slicePosition = slicePosition; - this.sliceSize = sliceSize; this.pagesEmitted = pagesEmitted; + this.sliceMin = sliceMin; + this.sliceMax = sliceMax; + this.current = current; } Status(StreamInput in) throws IOException { processedSlices = in.readVInt(); + sliceIndex = in.readVInt(); totalSlices = in.readVInt(); - slicePosition = in.readVInt(); - sliceSize = in.readVInt(); pagesEmitted = in.readVInt(); + sliceMin = in.readVInt(); + sliceMax = in.readVInt(); + current = in.readVInt(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(processedSlices); + out.writeVInt(sliceIndex); out.writeVInt(totalSlices); - out.writeVInt(slicePosition); - out.writeVInt(sliceSize); out.writeVInt(pagesEmitted); + out.writeVInt(sliceMin); + out.writeVInt(sliceMax); + out.writeVInt(current); } @Override @@ -225,11 +242,15 @@ public String getWriteableName() { return ENTRY.name; } - public int currentLeaf() { + public int processedSlices() { return processedSlices; } - public int totalLeaves() { + public int sliceIndex() { + return sliceIndex; + } + + public int totalSlices() { return totalSlices; } @@ -237,22 +258,28 @@ public int pagesEmitted() { return pagesEmitted; } - public int slicePosition() { - return slicePosition; + public int sliceMin() { + return sliceMin; + } + + public int sliceMax() { + return sliceMax; } - public int sliceSize() { - return sliceSize; + public int current() { + return current; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("processed_sliced", processedSlices); + builder.field("processed_slices", processedSlices); + builder.field("slice_index", sliceIndex); builder.field("total_slices", totalSlices); - builder.field("slice_position", slicePosition); - builder.field("slice_size", sliceSize); builder.field("pages_emitted", pagesEmitted); + builder.field("slice_min", sliceMin); + builder.field("slice_max", sliceMax); + builder.field("current", current); return builder.endObject(); } @@ -262,15 +289,17 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; return processedSlices == status.processedSlices + && sliceIndex == status.sliceIndex && totalSlices == status.totalSlices && pagesEmitted == status.pagesEmitted - && slicePosition == status.slicePosition - && sliceSize == status.sliceSize; + && sliceMin == status.sliceMin + && sliceMax == status.sliceMax + && current == status.current; } @Override public int hashCode() { - return Objects.hash(processedSlices, totalSlices, pagesEmitted, slicePosition, sliceSize); + return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java index 7d96416649636..faf3d6437282a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSliceQueue.java @@ -117,7 +117,7 @@ static List> docSlices(IndexReader indexReader, i } if (slices.stream() .flatMapToInt( - l -> l.stream().mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc - partialLeafReaderContext.minDoc) + l -> l.stream().mapToInt(partialLeafReaderContext -> partialLeafReaderContext.maxDoc() - partialLeafReaderContext.minDoc()) ) .sum() != totalDocCount) { throw new IllegalStateException("wrong doc count"); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/PartialLeafReaderContext.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/PartialLeafReaderContext.java index 964827a41516e..e9063c9597c5f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/PartialLeafReaderContext.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/PartialLeafReaderContext.java @@ -9,18 +9,13 @@ import org.apache.lucene.index.LeafReaderContext; -public final class PartialLeafReaderContext { - - final LeafReaderContext leafReaderContext; - final int minDoc; // incl - final int maxDoc; // excl - - public PartialLeafReaderContext(LeafReaderContext leafReaderContext, int minDoc, int maxDoc) { - this.leafReaderContext = leafReaderContext; - this.minDoc = minDoc; - this.maxDoc = maxDoc; - } - +/** + * A subset of a {@link LeafReaderContext}. + * @param leafReaderContext the context to subset + * @param minDoc the first document + * @param maxDoc one more than the last document + */ +public record PartialLeafReaderContext(LeafReaderContext leafReaderContext, int minDoc, int maxDoc) { public PartialLeafReaderContext(LeafReaderContext leafReaderContext) { this(leafReaderContext, 0, leafReaderContext.reader().maxDoc()); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java index 60d5dd394afb7..fad1f793122d8 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java @@ -16,12 +16,12 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTestCase { public static LuceneSourceOperator.Status simple() { - return new LuceneSourceOperator.Status(0, 1, 5, 123, 99990); + return new LuceneSourceOperator.Status(0, 0, 1, 5, 123, 99990, 8000); } public static String simpleToJson() { return """ - {"processed_sliced":0,"total_slices":1,"slice_position":123,"slice_size":99990,"pages_emitted":5}"""; + {"processed_slices":0,"slice_index":0,"total_slices":1,"pages_emitted":5,"slice_min":123,"slice_max":99990,"current":8000}"""; } public void testToXContent() { @@ -40,49 +40,32 @@ public LuceneSourceOperator.Status createTestInstance() { randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt(), + randomNonNegativeInt(), + randomNonNegativeInt(), randomNonNegativeInt() ); } @Override protected LuceneSourceOperator.Status mutateInstance(LuceneSourceOperator.Status instance) { - return switch (between(0, 4)) { - case 0 -> new LuceneSourceOperator.Status( - randomValueOtherThan(instance.currentLeaf(), ESTestCase::randomNonNegativeInt), - instance.totalLeaves(), - instance.pagesEmitted(), - instance.slicePosition(), - instance.sliceSize() - ); - case 1 -> new LuceneSourceOperator.Status( - instance.currentLeaf(), - randomValueOtherThan(instance.totalLeaves(), ESTestCase::randomNonNegativeInt), - instance.pagesEmitted(), - instance.slicePosition(), - instance.sliceSize() - ); - case 2 -> new LuceneSourceOperator.Status( - instance.currentLeaf(), - instance.totalLeaves(), - randomValueOtherThan(instance.pagesEmitted(), ESTestCase::randomNonNegativeInt), - instance.slicePosition(), - instance.sliceSize() - ); - case 3 -> new LuceneSourceOperator.Status( - instance.currentLeaf(), - instance.totalLeaves(), - instance.pagesEmitted(), - randomValueOtherThan(instance.slicePosition(), ESTestCase::randomNonNegativeInt), - instance.sliceSize() - ); - case 4 -> new LuceneSourceOperator.Status( - instance.currentLeaf(), - instance.totalLeaves(), - instance.pagesEmitted(), - instance.slicePosition(), - randomValueOtherThan(instance.sliceSize(), ESTestCase::randomNonNegativeInt) - ); + int processedSlices = instance.processedSlices(); + int sliceIndex = instance.sliceIndex(); + int totalSlices = instance.totalSlices(); + int pagesEmitted = instance.pagesEmitted(); + int sliceMin = instance.sliceMin(); + int sliceMax = instance.sliceMax(); + int current = instance.current(); + switch (between(0, 6)) { + case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt); + case 1 -> sliceIndex = randomValueOtherThan(sliceIndex, ESTestCase::randomNonNegativeInt); + case 2 -> totalSlices = randomValueOtherThan(totalSlices, ESTestCase::randomNonNegativeInt); + case 3 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt); + case 4 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt); + case 5 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt); + case 6 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt); default -> throw new UnsupportedOperationException(); - }; + } + ; + return new LuceneSourceOperator.Status(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current); } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 16f704aa8f7c3..6c4ee71f6983c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -7,13 +7,14 @@ package org.elasticsearch.xpack.esql.action; -import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.search.DocIdSetIterator; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; @@ -23,6 +24,7 @@ import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator; +import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.mapper.OnScriptError; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -50,6 +52,9 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.emptyOrNullString; @@ -63,10 +68,9 @@ * Tests that we expose a reasonable task status. */ @TestLogging( - value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.tasks.TaskCancellationService:TRACE", - reason = "These tests are failing frequently; we need logs before muting them" + value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", + reason = "These tests were failing frequently, let's learn as much as we can" ) -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99589") public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase { private static int PAGE_SIZE; private static int NUM_DOCS; @@ -93,7 +97,8 @@ public void setupIndex() throws IOException { MERGE_DESCRIPTION = """ \\_ExchangeSourceOperator[] \\_AggregationOperator[mode = FINAL, aggs = sum of longs] - \\_LimitOperator[limit = 10000] + \\_ProjectOperator[projection = [0]] + \\_LimitOperator[limit = 500] \\_OutputOperator[columns = sum(pause_me)]"""; XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); @@ -107,13 +112,35 @@ public void setupIndex() throws IOException { mapping.endObject(); } mapping.endObject(); - client().admin().indices().prepareCreate("test").setSettings(Map.of("number_of_shards", 1)).setMapping(mapping.endObject()).get(); + client().admin() + .indices() + .prepareCreate("test") + .setSettings(Map.of("number_of_shards", 1, "number_of_replicas", 0)) + .setMapping(mapping.endObject()) + .get(); BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (int i = 0; i < NUM_DOCS; i++) { bulk.add(client().prepareIndex("test").setId(Integer.toString(i)).setSource("foo", i)); } bulk.get(); + /* + * forceMerge so we can be sure that we don't bump into tiny + * segments that finish super quickly and cause us to report strange + * statuses when we expect "starting". + */ + client().admin().indices().prepareForceMerge("test").setMaxNumSegments(1).get(); + /* + * Double super extra paranoid check that force merge worked. It's + * failed to reduce the index to a single segment and caused this test + * to fail in very difficult to debug ways. If it fails again, it'll + * trip here. Or maybe it won't! And we'll learn something. Maybe + * it's ghosts. + */ + SegmentsStats stats = client().admin().indices().prepareStats("test").get().getPrimaries().getSegments(); + if (stats.getCount() != 1L) { + fail(Strings.toString(stats)); + } } public void testTaskContents() throws Exception { @@ -130,19 +157,27 @@ public void testTaskContents() throws Exception { DriverStatus status = (DriverStatus) task.status(); assertThat(status.sessionId(), not(emptyOrNullString())); for (DriverStatus.OperatorStatus o : status.activeOperators()) { + logger.info("status {}", o); if (o.operator().startsWith("LuceneSourceOperator[maxPageSize=" + PAGE_SIZE)) { LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status(); - assertThat(oStatus.currentLeaf(), lessThanOrEqualTo(oStatus.totalLeaves())); - assertThat(oStatus.slicePosition(), greaterThanOrEqualTo(0)); - if (oStatus.sliceSize() != 0) { - assertThat(oStatus.slicePosition(), lessThanOrEqualTo(oStatus.sliceSize())); + assertThat(oStatus.processedSlices(), lessThanOrEqualTo(oStatus.totalSlices())); + assertThat(oStatus.sliceIndex(), lessThanOrEqualTo(oStatus.totalSlices())); + assertThat(oStatus.sliceMin(), greaterThanOrEqualTo(0)); + assertThat(oStatus.sliceMax(), greaterThanOrEqualTo(oStatus.sliceMin())); + if (oStatus.sliceMin() != 0 && oStatus.sliceMax() != 0) { + assertThat( + oStatus.current(), + either(both(greaterThanOrEqualTo(oStatus.sliceMin())).and(lessThanOrEqualTo(oStatus.sliceMax()))).or( + equalTo(DocIdSetIterator.NO_MORE_DOCS) + ) + ); } luceneSources++; continue; } if (o.operator().equals("ValuesSourceReaderOperator[field = pause_me]")) { ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status(); - assertThat(oStatus.readersBuilt(), equalTo(Map.of("LongValuesReader", 1))); + assertMap(oStatus.readersBuilt(), matchesMap().entry("LongValuesReader", greaterThanOrEqualTo(1))); assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1)); valuesSourceReaders++; continue; @@ -166,40 +201,51 @@ public void testTaskContents() throws Exception { assertThat(exchangeSinks, greaterThanOrEqualTo(1)); assertThat(exchangeSources, equalTo(1)); } finally { - scriptPermits.release(Integer.MAX_VALUE); + scriptPermits.release(NUM_DOCS); assertThat(Iterators.flatMap(response.get().values(), i -> i).next(), equalTo((long) NUM_DOCS)); } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99582") public void testCancelRead() throws Exception { ActionFuture response = startEsql(); - List infos = getTasksStarting(); - TaskInfo running = infos.stream().filter(t -> t.description().equals(READ_DESCRIPTION)).findFirst().get(); - cancelTask(running.taskId()); - assertCancelled(response); + try { + List infos = getTasksStarting(); + TaskInfo running = infos.stream().filter(t -> t.description().equals(READ_DESCRIPTION)).findFirst().get(); + cancelTask(running.taskId()); + assertCancelled(response); + } finally { + scriptPermits.release(NUM_DOCS); + } } public void testCancelMerge() throws Exception { ActionFuture response = startEsql(); - List infos = getTasksStarting(); - TaskInfo running = infos.stream().filter(t -> t.description().equals(MERGE_DESCRIPTION)).findFirst().get(); - cancelTask(running.taskId()); - assertCancelled(response); + try { + List infos = getTasksStarting(); + TaskInfo running = infos.stream().filter(t -> t.description().equals(MERGE_DESCRIPTION)).findFirst().get(); + cancelTask(running.taskId()); + assertCancelled(response); + } finally { + scriptPermits.release(NUM_DOCS); + } } public void testCancelEsqlTask() throws Exception { ActionFuture response = startEsql(); - getTasksStarting(); - List tasks = client().admin() - .cluster() - .prepareListTasks() - .setActions(EsqlQueryAction.NAME) - .setDetailed(true) - .get() - .getTasks(); - cancelTask(tasks.get(0).taskId()); - assertCancelled(response); + try { + getTasksStarting(); + List tasks = client().admin() + .cluster() + .prepareListTasks() + .setActions(EsqlQueryAction.NAME) + .setDetailed(true) + .get() + .getTasks(); + cancelTask(tasks.get(0).taskId()); + assertCancelled(response); + } finally { + scriptPermits.release(NUM_DOCS); + } } private ActionFuture startEsql() { @@ -225,7 +271,7 @@ private void cancelTask(TaskId taskId) { request.setWaitForCompletion(false); LOGGER.debug("--> cancelling task [{}] without waiting for completion", taskId); client().admin().cluster().execute(CancelTasksAction.INSTANCE, request).actionGet(); - scriptPermits.release(Integer.MAX_VALUE / 2); + scriptPermits.release(NUM_DOCS); request = new CancelTasksRequest().setTargetTaskId(taskId).setReason("test cancel"); request.setWaitForCompletion(true); LOGGER.debug("--> cancelling task [{}] with waiting for completion", taskId); @@ -233,8 +279,10 @@ private void cancelTask(TaskId taskId) { } /** - * Fetches tasks until it finds all of them are "starting". - */ + * Fetches tasks until it finds all of them are "starting" or "async". + * The "async" part is because the coordinating task almost immediately goes async + * because there isn't any data for it to process. + */ private List getTasksStarting() throws Exception { List foundTasks = new ArrayList<>(); assertBusy(() -> { @@ -248,10 +296,9 @@ private List getTasksStarting() throws Exception { assertThat(tasks, hasSize(equalTo(2))); for (TaskInfo task : tasks) { assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME)); - logger.info("{}", task.description()); - assertThat(task.description(), either(equalTo(READ_DESCRIPTION)).or(equalTo(MERGE_DESCRIPTION))); DriverStatus status = (DriverStatus) task.status(); - logger.info("{}", status.status()); + logger.info("task {} {}", task.description(), status); + assertThat(task.description(), either(equalTo(READ_DESCRIPTION)).or(equalTo(MERGE_DESCRIPTION))); /* * Accept tasks that are either starting or have gone * immediately async. The coordinating task is likely