Skip to content

Commit

Permalink
[ML] Ensure data frame analytics task is only marked completed once (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitris-athanasiou authored Sep 26, 2019
1 parent 90ae28d commit 5c23d29
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex
"Finished analysis");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/46907")
public void testStopOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
String sourceIndex = "test-stop-outlier-detection-with-enough-docs-to-scroll";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
private volatile Long reindexingTaskId;
private volatile boolean isReindexingFinished;
private volatile boolean isStopping;
private volatile boolean isMarkAsCompletedCalled;
private final ProgressTracker progressTracker = new ProgressTracker();

public DataFrameAnalyticsTask(long id, String type, String action, TaskId parentTask, Map<String, String> headers,
Expand Down Expand Up @@ -102,10 +103,17 @@ protected void onCancelled() {
public void markAsCompleted() {
// It is possible that the stop API has been called in the meantime and that
// may also cause this method to be called. We check whether we have already
// been marked completed to avoid doing it twice.
if (isCompleted() == false) {
persistProgress(() -> super.markAsCompleted());
// been marked completed to avoid doing it twice. We need to capture that
// locally instead of relying to isCompleted() because of the asynchronous
// persistence of progress.
synchronized (this) {
if (isMarkAsCompletedCalled) {
return;
}
isMarkAsCompletedCalled = true;
}

persistProgress(() -> super.markAsCompleted());
}

@Override
Expand Down Expand Up @@ -224,6 +232,7 @@ private TaskId getReindexTaskId() {
}

private void persistProgress(Runnable runnable) {
LOGGER.debug("[{}] Persisting progress", taskParams.getId());
GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(taskParams.getId());
executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap(
statsResponse -> {
Expand Down

0 comments on commit 5c23d29

Please sign in to comment.