diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index c3d333b7d142b..e0785d3bda106 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -145,7 +145,7 @@ private AssignmentFailure checkAssignment() { } if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { - // lets try again later when the job has been opened: + // let's try again later when the job has been opened: String reason = "cannot start datafeed [" + datafeedId + "], because the job's [" diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 4dcd3954488f7..127c529d2fd71 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -404,13 +404,7 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams // Step 3. Set scheduled events on message and write update process message ActionListener> eventsListener = ActionListener.wrap(events -> { updateProcessMessage.setScheduledEvents(events == null ? null : events.results()); - communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> { - if (e == null) { - handler.accept(null); - } else { - handler.accept(e); - } - }); + communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> handler.accept(e)); }, handler); // Step 2. Set the filters on the message and get scheduled events @@ -545,20 +539,18 @@ public void openJob( // Start the process ActionListener stateAliasHandler = ActionListener.wrap( - r -> { - jobManager.getJob( - jobId, - ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true)) - ); - }, + r -> jobManager.getJob( + jobId, + ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true)) + ), e -> { if (ExceptionsHelper.unwrapCause(e) instanceof InvalidAliasNameException) { String msg = "Detected a problem with your setup of machine learning, the state index alias [" + AnomalyDetectorsIndex.jobStateIndexWriteAlias() + "] exists as index but must be an alias."; logger.error(new ParameterizedMessage("[{}] {}", jobId, msg), e); - auditor.error(jobId, msg); - setJobState(jobTask, JobState.FAILED, msg, e2 -> closeHandler.accept(e, true)); + // The close handler is responsible for auditing this and setting the job state to failed + closeHandler.accept(new IllegalStateException(msg, e), true); } else { closeHandler.accept(e, true); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java index f46220b82944b..753c8aa9d0ed1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.ResetJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.job.config.Blocked; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -298,8 +299,8 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams return; } - ActionListener hasRunningDatafeedTaskListener = ActionListener.wrap(hasRunningDatafeed -> { - if (hasRunningDatafeed && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) { + ActionListener getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> { + if (runningDatafeedId != null && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) { // This job has a running datafeed attached to it. // In order to prevent gaps in the model we revert to the current snapshot deleting intervening results. @@ -319,45 +320,78 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams } }); - hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener); + getRunningDatafeed(jobTask.getJobId(), getRunningDatafeedListener); } private void failTask(JobTask jobTask, String reason) { + auditor.error(jobTask.getJobId(), reason); JobTaskState failedState = new JobTaskState(JobState.FAILED, jobTask.getAllocationId(), reason); - jobTask.updatePersistentTaskState( - failedState, - ActionListener.wrap( - r -> logger.debug(() -> new ParameterizedMessage("[{}] updated task state to failed", jobTask.getJobId())), - e -> { - logger.error( - new ParameterizedMessage( - "[{}] error while setting task state to failed; marking task as failed", - jobTask.getJobId() - ), - e - ); - jobTask.markAsFailed(e); - } - ) - ); + jobTask.updatePersistentTaskState(failedState, ActionListener.wrap(r -> { + logger.debug(() -> new ParameterizedMessage("[{}] updated task state to failed", jobTask.getJobId())); + stopAssociatedDatafeedForFailedJob(jobTask.getJobId()); + }, e -> { + logger.error( + new ParameterizedMessage("[{}] error while setting task state to failed; marking task as failed", jobTask.getJobId()), + e + ); + jobTask.markAsFailed(e); + })); + } + + private void stopAssociatedDatafeedForFailedJob(String jobId) { + + ActionListener getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> { + if (runningDatafeedId == null) { + return; + } + StopDatafeedAction.Request request = new StopDatafeedAction.Request(runningDatafeedId); + request.setForce(true); + executeAsyncWithOrigin( + client, + ML_ORIGIN, + StopDatafeedAction.INSTANCE, + request, + ActionListener.wrap( + r -> logger.info("[{}] stopped associated datafeed [{}] after job failure", jobId, runningDatafeedId), + e -> { + if (autodetectProcessManager.isNodeDying() == false) { + logger.error( + new ParameterizedMessage( + "[{}] failed to stop associated datafeed [{}] after job failure", + jobId, + runningDatafeedId + ), + e + ); + } + } + ) + ); + }, e -> { + if (autodetectProcessManager.isNodeDying() == false) { + logger.error(new ParameterizedMessage("[{}] failed to search for associated datafeed", jobId), e); + } + }); + + getRunningDatafeed(jobId, getRunningDatafeedListener); } private boolean isMasterNodeVersionOnOrAfter(Version version) { return clusterState.nodes().getMasterNode().getVersion().onOrAfter(version); } - private void hasRunningDatafeedTask(String jobId, ActionListener listener) { + private void getRunningDatafeed(String jobId, ActionListener listener) { ActionListener> datafeedListener = ActionListener.wrap(datafeeds -> { assert datafeeds.size() <= 1; if (datafeeds.isEmpty()) { - listener.onResponse(false); + listener.onResponse(null); return; } String datafeedId = datafeeds.iterator().next(); PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); PersistentTasksCustomMetadata.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); - listener.onResponse(datafeedTask != null); + listener.onResponse(datafeedTask != null ? datafeedId : null); }, listener::onFailure); datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singleton(jobId), datafeedListener); @@ -504,7 +538,7 @@ private void openJob(JobTask jobTask) { } } else if (autodetectProcessManager.isNodeDying() == false) { logger.error(new ParameterizedMessage("[{}] failed to open job", jobTask.getJobId()), e2); - failTask(jobTask, "failed to open job"); + failTask(jobTask, "failed to open job: " + e2.getMessage()); } }); }