Skip to content

Commit

Permalink
[ML] Audit job open failures and stop any corresponding datafeed
Browse files Browse the repository at this point in the history
The anomaly detection code contained an assumption dating back
to 2016 that if a job failed then its datafeed would notice and
stop itself. That works if the job fails on a node after it has
successfully started up. But it doesn't work if the job fails
during the startup sequence. If the job is being started for the
first time then the datafeed won't be running, so there's no
problem, but if the job fails when it's being reassigned to a
new node then it breaks down, because the datafeed is started
by not assigned to any node at that instant.

This PR addresses this by making the job force-stop its own
datafeed if it fails during its startup sequence and the datafeed
is started.

Fixes elastic#48934

Additionally, auditing of job failures during the startup
sequence is moved so that it happens for all failure scenarios
instead of just one.

Fixes elastic#80621
  • Loading branch information
droberts195 committed Nov 11, 2021
1 parent 1b300ac commit 49290ba
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private AssignmentFailure checkAssignment() {
}

if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
// lets try again later when the job has been opened:
// let's try again later when the job has been opened:
String reason = "cannot start datafeed ["
+ datafeedId
+ "], because the job's ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,7 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams
// Step 3. Set scheduled events on message and write update process message
ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(events -> {
updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> handler.accept(e));
}, handler);

// Step 2. Set the filters on the message and get scheduled events
Expand Down Expand Up @@ -545,20 +539,18 @@ public void openJob(

// Start the process
ActionListener<Boolean> stateAliasHandler = ActionListener.wrap(
r -> {
jobManager.getJob(
jobId,
ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true))
);
},
r -> jobManager.getJob(
jobId,
ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true))
),
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof InvalidAliasNameException) {
String msg = "Detected a problem with your setup of machine learning, the state index alias ["
+ AnomalyDetectorsIndex.jobStateIndexWriteAlias()
+ "] exists as index but must be an alias.";
logger.error(new ParameterizedMessage("[{}] {}", jobId, msg), e);
auditor.error(jobId, msg);
setJobState(jobTask, JobState.FAILED, msg, e2 -> closeHandler.accept(e, true));
// The close handler is responsible for auditing this and setting the job state to failed
closeHandler.accept(new IllegalStateException(msg, e), true);
} else {
closeHandler.accept(e, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.job.config.Blocked;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
Expand Down Expand Up @@ -298,8 +299,8 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams
return;
}

ActionListener<Boolean> hasRunningDatafeedTaskListener = ActionListener.wrap(hasRunningDatafeed -> {
if (hasRunningDatafeed && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {
ActionListener<String> getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> {
if (runningDatafeedId != null && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {

// This job has a running datafeed attached to it.
// In order to prevent gaps in the model we revert to the current snapshot deleting intervening results.
Expand All @@ -319,45 +320,78 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams
}
});

hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener);
getRunningDatafeed(jobTask.getJobId(), getRunningDatafeedListener);
}

private void failTask(JobTask jobTask, String reason) {
auditor.error(jobTask.getJobId(), reason);
JobTaskState failedState = new JobTaskState(JobState.FAILED, jobTask.getAllocationId(), reason);
jobTask.updatePersistentTaskState(
failedState,
ActionListener.wrap(
r -> logger.debug(() -> new ParameterizedMessage("[{}] updated task state to failed", jobTask.getJobId())),
e -> {
logger.error(
new ParameterizedMessage(
"[{}] error while setting task state to failed; marking task as failed",
jobTask.getJobId()
),
e
);
jobTask.markAsFailed(e);
}
)
);
jobTask.updatePersistentTaskState(failedState, ActionListener.wrap(r -> {
logger.debug(() -> new ParameterizedMessage("[{}] updated task state to failed", jobTask.getJobId()));
stopAssociatedDatafeedForFailedJob(jobTask.getJobId());
}, e -> {
logger.error(
new ParameterizedMessage("[{}] error while setting task state to failed; marking task as failed", jobTask.getJobId()),
e
);
jobTask.markAsFailed(e);
}));
}

private void stopAssociatedDatafeedForFailedJob(String jobId) {

ActionListener<String> getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> {
if (runningDatafeedId == null) {
return;
}
StopDatafeedAction.Request request = new StopDatafeedAction.Request(runningDatafeedId);
request.setForce(true);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
StopDatafeedAction.INSTANCE,
request,
ActionListener.wrap(
r -> logger.info("[{}] stopped associated datafeed [{}] after job failure", jobId, runningDatafeedId),
e -> {
if (autodetectProcessManager.isNodeDying() == false) {
logger.error(
new ParameterizedMessage(
"[{}] failed to stop associated datafeed [{}] after job failure",
jobId,
runningDatafeedId
),
e
);
}
}
)
);
}, e -> {
if (autodetectProcessManager.isNodeDying() == false) {
logger.error(new ParameterizedMessage("[{}] failed to search for associated datafeed", jobId), e);
}
});

getRunningDatafeed(jobId, getRunningDatafeedListener);
}

private boolean isMasterNodeVersionOnOrAfter(Version version) {
return clusterState.nodes().getMasterNode().getVersion().onOrAfter(version);
}

private void hasRunningDatafeedTask(String jobId, ActionListener<Boolean> listener) {
private void getRunningDatafeed(String jobId, ActionListener<String> listener) {
ActionListener<Set<String>> datafeedListener = ActionListener.wrap(datafeeds -> {
assert datafeeds.size() <= 1;
if (datafeeds.isEmpty()) {
listener.onResponse(false);
listener.onResponse(null);
return;
}

String datafeedId = datafeeds.iterator().next();
PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
listener.onResponse(datafeedTask != null);
listener.onResponse(datafeedTask != null ? datafeedId : null);
}, listener::onFailure);

datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singleton(jobId), datafeedListener);
Expand Down Expand Up @@ -504,7 +538,7 @@ private void openJob(JobTask jobTask) {
}
} else if (autodetectProcessManager.isNodeDying() == false) {
logger.error(new ParameterizedMessage("[{}] failed to open job", jobTask.getJobId()), e2);
failTask(jobTask, "failed to open job");
failTask(jobTask, "failed to open job: " + e2.getMessage());
}
});
}
Expand Down

0 comments on commit 49290ba

Please sign in to comment.