diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index c1958edbee44b..83a1839654b78 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -257,7 +257,6 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex "Finished analysis"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/46907") public void testStopOutlierDetectionWithEnoughDocumentsToScroll() throws Exception { String sourceIndex = "test-stop-outlier-detection-with-enough-docs-to-scroll"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java index 2a172fd6d9c7e..67b5e988826cc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java @@ -60,6 +60,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S private volatile Long reindexingTaskId; private volatile boolean isReindexingFinished; private volatile boolean isStopping; + private volatile boolean isMarkAsCompletedCalled; private final ProgressTracker progressTracker = new ProgressTracker(); public DataFrameAnalyticsTask(long id, String type, String action, TaskId parentTask, Map headers, @@ -102,10 +103,17 @@ protected void onCancelled() { public void markAsCompleted() { // It is possible that the stop API has been called in the meantime and that // may also cause this method to be called. We check whether we have already - // been marked completed to avoid doing it twice. - if (isCompleted() == false) { - persistProgress(() -> super.markAsCompleted()); + // been marked completed to avoid doing it twice. We need to capture that + // locally instead of relying to isCompleted() because of the asynchronous + // persistence of progress. + synchronized (this) { + if (isMarkAsCompletedCalled) { + return; + } + isMarkAsCompletedCalled = true; } + + persistProgress(() -> super.markAsCompleted()); } @Override @@ -224,6 +232,7 @@ private TaskId getReindexTaskId() { } private void persistProgress(Runnable runnable) { + LOGGER.debug("[{}] Persisting progress", taskParams.getId()); GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(taskParams.getId()); executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap( statsResponse -> {