From 9a2e0306b5d5c4c64b0e6539da83df57c35402d4 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 6 Feb 2019 14:39:24 -0600 Subject: [PATCH] ML: update set_upgrade_mode, add logging (#38372) * ML: update set_upgrade_mode, add logging * Attempt to fix datafeed isolation Also renamed a few methods/variables for clarity and added some comments --- .../xpack/core/ml/job/messages/Messages.java | 1 + .../xpack/ml/MlLifeCycleService.java | 2 +- .../action/TransportSetUpgradeModeAction.java | 4 ++ .../xpack/ml/datafeed/DatafeedManager.java | 32 +++++++------ .../test/ml/set_upgrade_mode.yml | 47 +++++++++++++++++-- 5 files changed, 67 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index a877b72bee0da..77ae8cb26eae9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -79,6 +79,7 @@ public final class Messages { public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]"; public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time"; public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped"; + public static final String JOB_AUDIT_DATAFEED_ISOLATED = "Datafeed isolated"; public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''"; public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}"; public static final String JOB_AUDIT_DELETED = "Job deleted"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java index 302d9a7611d96..8005912107ad9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java @@ -45,7 +45,7 @@ public synchronized void stop() { // datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds // could fail if they send data to a dead autodetect process. if (datafeedManager != null) { - datafeedManager.isolateAllDatafeedsOnThisNode(); + datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown(); } NativeController nativeController = NativeControllerHolder.getNativeController(environment); if (nativeController != null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index edc31f1e896b7..d16f9e18421d8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -263,6 +263,9 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe .sorted(Comparator.comparing(PersistentTask::getTaskName)) .collect(Collectors.toList()); + logger.info("Un-assigning persistent tasks : " + + datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]"))); + TypedChainTaskExecutor> chainTaskExecutor = new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, @@ -287,6 +290,7 @@ private void isolateDatafeeds(PersistentTasksCustomMetaData tasksCustomMetaData, ActionListener> listener) { Set datafeedsToIsolate = MlTasks.startedDatafeedIds(tasksCustomMetaData); + logger.info("Isolating datafeeds: " + datafeedsToIsolate.toString()); TypedChainTaskExecutor isolateDatafeedsExecutor = new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 409d15182d96a..53568c3705a8d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -64,7 +64,6 @@ public class DatafeedManager { private final DatafeedJobBuilder datafeedJobBuilder; private final TaskRunner taskRunner = new TaskRunner(); private final AutodetectProcessManager autodetectProcessManager; - private volatile boolean isolated; public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, Supplier currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) { @@ -130,18 +129,20 @@ public void stopAllDatafeedsOnThisNode(String reason) { * This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves * the datafeed tasks in the "started" state, so that they get restarted on a different node. */ - public void isolateAllDatafeedsOnThisNode() { - isolated = true; + public void isolateAllDatafeedsOnThisNodeBeforeShutdown() { Iterator iter = runningDatafeedsOnThisNode.values().iterator(); while (iter.hasNext()) { Holder next = iter.next(); next.isolateDatafeed(); - next.setRelocating(); + // TODO: it's not ideal that this "isolate" method does something a bit different to the one below + next.setNodeIsShuttingDown(); iter.remove(); } } public void isolateDatafeed(long allocationId) { + // This calls get() rather than remove() because we expect that the persistent task will + // be removed shortly afterwards and that operation needs to be able to find the holder Holder holder = runningDatafeedsOnThisNode.get(allocationId); if (holder != null) { holder.isolateDatafeed(); @@ -195,7 +196,7 @@ protected void doRun() { holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e); return; } - if (isolated == false) { + if (holder.isIsolated() == false) { if (next != null) { doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder); } else { @@ -298,7 +299,7 @@ public class Holder { private final ProblemTracker problemTracker; private final Consumer finishHandler; volatile Scheduler.Cancellable cancellable; - private volatile boolean isRelocating; + private volatile boolean isNodeShuttingDown; Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, ProblemTracker problemTracker, Consumer finishHandler) { @@ -324,7 +325,7 @@ boolean isIsolated() { } public void stop(String source, TimeValue timeout, Exception e) { - if (isRelocating) { + if (isNodeShuttingDown) { return; } @@ -344,11 +345,12 @@ public void stop(String source, TimeValue timeout, Exception e) { if (cancellable != null) { cancellable.cancel(); } - auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); + auditor.info(datafeedJob.getJobId(), + Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED)); finishHandler.accept(e); logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); - if (autoCloseJob) { + if (autoCloseJob && isIsolated() == false) { closeJob(); } if (acquired) { @@ -361,16 +363,18 @@ public void stop(String source, TimeValue timeout, Exception e) { } /** - * This stops a datafeed WITHOUT updating the corresponding persistent task. It must ONLY be called - * immediately prior to shutting down a node. Then the datafeed task can remain "started", and be - * relocated to a different node. Calling this method at any other time will ruin the datafeed. + * This stops a datafeed WITHOUT updating the corresponding persistent task. When called it + * will stop the datafeed from sending data to its job as quickly as possible. The caller + * must do something sensible with the corresponding persistent task. If the node is shutting + * down the task will automatically get reassigned. Otherwise the caller must take action to + * remove or reassign the persistent task, or the datafeed will be left in limbo. */ public void isolateDatafeed() { datafeedJob.isolate(); } - public void setRelocating() { - isRelocating = true; + public void setNodeIsShuttingDown() { + isNodeShuttingDown = true; } private Long executeLookBack(long startTime, Long endTime) throws Exception { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml index be1e0203a92c7..9b33af5f48bb0 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml @@ -6,6 +6,10 @@ setup: indices.create: index: airline-data body: + settings: + index: + number_of_replicas: 0 + number_of_shards: 1 mappings: properties: time: @@ -53,10 +57,9 @@ setup: job_id: set-upgrade-mode-job - do: - headers: - Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser - ml.start_datafeed: - datafeed_id: set-upgrade-mode-job-datafeed + cluster.health: + index: airline-data + wait_for_status: green --- teardown: @@ -70,6 +73,10 @@ teardown: --- "Test setting upgrade_mode to false when it is already false": + - do: + ml.start_datafeed: + datafeed_id: set-upgrade-mode-job-datafeed + - do: ml.set_upgrade_mode: enabled: false @@ -92,6 +99,22 @@ teardown: --- "Setting upgrade_mode to enabled": + - do: + ml.start_datafeed: + datafeed_id: set-upgrade-mode-job-datafeed + + - do: + cat.tasks: {} + - match: + $body: | + /.+job.+/ + + - do: + cat.tasks: {} + - match: + $body: | + /.+datafeed.+/ + - do: ml.info: {} - match: { upgrade_mode: false } @@ -125,6 +148,22 @@ teardown: --- "Setting upgrade mode to disabled from enabled": + - do: + ml.start_datafeed: + datafeed_id: set-upgrade-mode-job-datafeed + + - do: + cat.tasks: {} + - match: + $body: | + /.+job.+/ + + - do: + cat.tasks: {} + - match: + $body: | + /.+datafeed.+/ + - do: ml.set_upgrade_mode: enabled: true