Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Ensure data frame analytics task is only marked completed once #47119

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex
"Finished analysis");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/46907")
public void testStopOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
String sourceIndex = "test-stop-outlier-detection-with-enough-docs-to-scroll";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
private volatile Long reindexingTaskId;
private volatile boolean isReindexingFinished;
private volatile boolean isStopping;
private volatile boolean isMarkAsCompletedCalled;
private final ProgressTracker progressTracker = new ProgressTracker();

public DataFrameAnalyticsTask(long id, String type, String action, TaskId parentTask, Map<String, String> headers,
Expand Down Expand Up @@ -102,10 +103,17 @@ protected void onCancelled() {
public void markAsCompleted() {
// It is possible that the stop API has been called in the meantime and that
// may also cause this method to be called. We check whether we have already
// been marked completed to avoid doing it twice.
if (isCompleted() == false) {
persistProgress(() -> super.markAsCompleted());
// been marked completed to avoid doing it twice. We need to capture that
// locally instead of relying to isCompleted() because of the asynchronous
// persistence of progress.
synchronized (this) {
if (isMarkAsCompletedCalled) {
return;
}
isMarkAsCompletedCalled = true;
}

persistProgress(() -> super.markAsCompleted());
}

@Override
Expand Down Expand Up @@ -224,6 +232,7 @@ private TaskId getReindexTaskId() {
}

private void persistProgress(Runnable runnable) {
LOGGER.debug("[{}] Persisting progress", taskParams.getId());
GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(taskParams.getId());
executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap(
statsResponse -> {
Expand Down