diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 0f234cc7a3599..ded261b8512ec 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -63,6 +63,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.After; @@ -542,7 +543,108 @@ public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() t assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs)); assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); }); + } + + public void testClusterWithTwoMlNodes_StopsDatafeed_GivenJobFailsOnReassign() throws Exception { + internalCluster().ensureAtMostNumDataNodes(0); + logger.info("Starting dedicated master node..."); + internalCluster().startMasterOnlyNode(); + logger.info("Starting ml and data node..."); + internalCluster().startNode(onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.ML_ROLE))); + logger.info("Starting another ml and data node..."); + internalCluster().startNode(onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.ML_ROLE))); + ensureStableCluster(); + + // index some datafeed data + client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get(); + long numDocs = 80000; + long now = System.currentTimeMillis(); + long weekAgo = now - 604800000; + long twoWeeksAgo = weekAgo - 604800000; + indexDocs(logger, "data", numDocs, twoWeeksAgo, weekAgo); + + String jobId = "test-node-goes-down-while-running-job"; + String datafeedId = jobId + "-datafeed"; + + Job.Builder job = createScheduledJob(jobId); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + + DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"), TimeValue.timeValueHours(1)); + PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config); + client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet(); + + client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())); + + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = client().execute( + GetJobsStatsAction.INSTANCE, + new GetJobsStatsAction.Request(job.getId()) + ).actionGet(); + assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState()); + }, 30, TimeUnit.SECONDS); + + DiscoveryNode nodeRunningJob = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())) + .actionGet() + .getResponse() + .results() + .get(0) + .getNode(); + + setMlIndicesDelayedNodeLeftTimeoutToZero(); + + StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L); + client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get(); + + waitForJobToHaveProcessedAtLeast(jobId, 1000); + + // The datafeed should be started + assertBusy(() -> { + GetDatafeedsStatsAction.Response statsResponse = client().execute( + GetDatafeedsStatsAction.INSTANCE, + new GetDatafeedsStatsAction.Request(config.getId()) + ).actionGet(); + assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState()); + }, 30, TimeUnit.SECONDS); + + // Create a problem that will make the job fail when it restarts on a different node + String snapshotId = "123"; + ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build(); + JobResultsPersister jobResultsPersister = internalCluster().getInstance( + JobResultsPersister.class, + internalCluster().getMasterName() + ); + jobResultsPersister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE, () -> true); + UpdateJobAction.Request updateJobRequest = UpdateJobAction.Request.internal( + jobId, + new JobUpdate.Builder(jobId).setModelSnapshotId(snapshotId).build() + ); + client().execute(UpdateJobAction.INSTANCE, updateJobRequest).actionGet(); + refresh(AnomalyDetectorsIndex.resultsWriteAlias(jobId)); + + // Make the job move to a different node + internalCluster().stopNode(nodeRunningJob.getName()); + + // Wait for the job to fail during reassignment + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = client().execute( + GetJobsStatsAction.INSTANCE, + new GetJobsStatsAction.Request(job.getId()) + ).actionGet(); + assertEquals(JobState.FAILED, statsResponse.getResponse().results().get(0).getState()); + }, 30, TimeUnit.SECONDS); + + // The datafeed should then be stopped + assertBusy(() -> { + GetDatafeedsStatsAction.Response statsResponse = client().execute( + GetDatafeedsStatsAction.INSTANCE, + new GetDatafeedsStatsAction.Request(config.getId()) + ).actionGet(); + assertEquals(DatafeedState.STOPPED, statsResponse.getResponse().results().get(0).getDatafeedState()); + }, 30, TimeUnit.SECONDS); + // Force close the failed job to clean up + client().execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId).setForce(true)).actionGet(); } private void setupJobWithoutDatafeed(String jobId, ByteSizeValue modelMemoryLimit) throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index c3d333b7d142b..e0785d3bda106 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -145,7 +145,7 @@ private AssignmentFailure checkAssignment() { } if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { - // lets try again later when the job has been opened: + // let's try again later when the job has been opened: String reason = "cannot start datafeed [" + datafeedId + "], because the job's [" diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 4dcd3954488f7..127c529d2fd71 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -404,13 +404,7 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams // Step 3. Set scheduled events on message and write update process message ActionListener> eventsListener = ActionListener.wrap(events -> { updateProcessMessage.setScheduledEvents(events == null ? null : events.results()); - communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> { - if (e == null) { - handler.accept(null); - } else { - handler.accept(e); - } - }); + communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> handler.accept(e)); }, handler); // Step 2. Set the filters on the message and get scheduled events @@ -545,20 +539,18 @@ public void openJob( // Start the process ActionListener stateAliasHandler = ActionListener.wrap( - r -> { - jobManager.getJob( - jobId, - ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true)) - ); - }, + r -> jobManager.getJob( + jobId, + ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true)) + ), e -> { if (ExceptionsHelper.unwrapCause(e) instanceof InvalidAliasNameException) { String msg = "Detected a problem with your setup of machine learning, the state index alias [" + AnomalyDetectorsIndex.jobStateIndexWriteAlias() + "] exists as index but must be an alias."; logger.error(new ParameterizedMessage("[{}] {}", jobId, msg), e); - auditor.error(jobId, msg); - setJobState(jobTask, JobState.FAILED, msg, e2 -> closeHandler.accept(e, true)); + // The close handler is responsible for auditing this and setting the job state to failed + closeHandler.accept(new IllegalStateException(msg, e), true); } else { closeHandler.accept(e, true); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java index f46220b82944b..222dd37917492 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.ResetJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.job.config.Blocked; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -298,8 +299,8 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams return; } - ActionListener hasRunningDatafeedTaskListener = ActionListener.wrap(hasRunningDatafeed -> { - if (hasRunningDatafeed && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) { + ActionListener getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> { + if (runningDatafeedId != null && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) { // This job has a running datafeed attached to it. // In order to prevent gaps in the model we revert to the current snapshot deleting intervening results. @@ -319,45 +320,84 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams } }); - hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener); + getRunningDatafeed(jobTask.getJobId(), getRunningDatafeedListener); } private void failTask(JobTask jobTask, String reason) { + String jobId = jobTask.getJobId(); + auditor.error(jobId, reason); JobTaskState failedState = new JobTaskState(JobState.FAILED, jobTask.getAllocationId(), reason); - jobTask.updatePersistentTaskState( - failedState, - ActionListener.wrap( - r -> logger.debug(() -> new ParameterizedMessage("[{}] updated task state to failed", jobTask.getJobId())), - e -> { - logger.error( - new ParameterizedMessage( - "[{}] error while setting task state to failed; marking task as failed", - jobTask.getJobId() - ), - e - ); - jobTask.markAsFailed(e); - } - ) - ); + jobTask.updatePersistentTaskState(failedState, ActionListener.wrap(r -> { + logger.debug("[{}] updated task state to failed", jobId); + stopAssociatedDatafeedForFailedJob(jobId); + }, e -> { + logger.error(new ParameterizedMessage("[{}] error while setting task state to failed; marking task as failed", jobId), e); + jobTask.markAsFailed(e); + stopAssociatedDatafeedForFailedJob(jobId); + })); + } + + private void stopAssociatedDatafeedForFailedJob(String jobId) { + + if (autodetectProcessManager.isNodeDying()) { + // The node shutdown caught us at a bad time, and we cannot stop the datafeed + return; + } + + ActionListener getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> { + if (runningDatafeedId == null) { + return; + } + StopDatafeedAction.Request request = new StopDatafeedAction.Request(runningDatafeedId); + request.setForce(true); + executeAsyncWithOrigin( + client, + ML_ORIGIN, + StopDatafeedAction.INSTANCE, + request, + ActionListener.wrap( + // StopDatafeedAction will audit the stopping of the datafeed if it succeeds so we don't need to do that here + r -> logger.info("[{}] stopped associated datafeed [{}] after job failure", jobId, runningDatafeedId), + e -> { + if (autodetectProcessManager.isNodeDying() == false) { + logger.error( + new ParameterizedMessage( + "[{}] failed to stop associated datafeed [{}] after job failure", + jobId, + runningDatafeedId + ), + e + ); + auditor.error(jobId, "failed to stop associated datafeed after job failure"); + } + } + ) + ); + }, e -> { + if (autodetectProcessManager.isNodeDying() == false) { + logger.error(new ParameterizedMessage("[{}] failed to search for associated datafeed", jobId), e); + } + }); + + getRunningDatafeed(jobId, getRunningDatafeedListener); } private boolean isMasterNodeVersionOnOrAfter(Version version) { return clusterState.nodes().getMasterNode().getVersion().onOrAfter(version); } - private void hasRunningDatafeedTask(String jobId, ActionListener listener) { + private void getRunningDatafeed(String jobId, ActionListener listener) { ActionListener> datafeedListener = ActionListener.wrap(datafeeds -> { assert datafeeds.size() <= 1; if (datafeeds.isEmpty()) { - listener.onResponse(false); + listener.onResponse(null); return; } String datafeedId = datafeeds.iterator().next(); PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); PersistentTasksCustomMetadata.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); - listener.onResponse(datafeedTask != null); + listener.onResponse(datafeedTask != null ? datafeedId : null); }, listener::onFailure); datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singleton(jobId), datafeedListener); @@ -504,7 +544,7 @@ private void openJob(JobTask jobTask) { } } else if (autodetectProcessManager.isNodeDying() == false) { logger.error(new ParameterizedMessage("[{}] failed to open job", jobTask.getJobId()), e2); - failTask(jobTask, "failed to open job"); + failTask(jobTask, "failed to open job: " + e2.getMessage()); } }); }