Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ML: update set_upgrade_mode, add logging (#38372) #38538

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class Messages {
public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]";
public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time";
public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped";
public static final String JOB_AUDIT_DATAFEED_ISOLATED = "Datafeed isolated";
public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''";
public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}";
public static final String JOB_AUDIT_DELETED = "Job deleted";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public synchronized void stop() {
// datafeeds, so they get reallocated. We have to do this first, otherwise the datafeeds
// could fail if they send data to a dead autodetect process.
if (datafeedManager != null) {
datafeedManager.isolateAllDatafeedsOnThisNode();
datafeedManager.isolateAllDatafeedsOnThisNodeBeforeShutdown();
}
NativeController nativeController = NativeControllerHolder.getNativeController(environment);
if (nativeController != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe
.sorted(Comparator.comparing(PersistentTask::getTaskName))
.collect(Collectors.toList());

logger.info("Un-assigning persistent tasks : " +
datafeedAndJobTasks.stream().map(PersistentTask::getId).collect(Collectors.joining(", ", "[ ", " ]")));

TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
r -> true,
Expand All @@ -287,6 +290,7 @@ private void isolateDatafeeds(PersistentTasksCustomMetaData tasksCustomMetaData,
ActionListener<List<IsolateDatafeedAction.Response>> listener) {
Set<String> datafeedsToIsolate = MlTasks.startedDatafeedIds(tasksCustomMetaData);

logger.info("Isolating datafeeds: " + datafeedsToIsolate.toString());
TypedChainTaskExecutor<IsolateDatafeedAction.Response> isolateDatafeedsExecutor =
new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class DatafeedManager {
private final DatafeedJobBuilder datafeedJobBuilder;
private final TaskRunner taskRunner = new TaskRunner();
private final AutodetectProcessManager autodetectProcessManager;
private volatile boolean isolated;

public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
Expand Down Expand Up @@ -130,18 +129,20 @@ public void stopAllDatafeedsOnThisNode(String reason) {
* This is used before the JVM is killed. It differs from stopAllDatafeedsOnThisNode in that it leaves
* the datafeed tasks in the "started" state, so that they get restarted on a different node.
*/
public void isolateAllDatafeedsOnThisNode() {
isolated = true;
public void isolateAllDatafeedsOnThisNodeBeforeShutdown() {
Iterator<Holder> iter = runningDatafeedsOnThisNode.values().iterator();
while (iter.hasNext()) {
Holder next = iter.next();
next.isolateDatafeed();
next.setRelocating();
// TODO: it's not ideal that this "isolate" method does something a bit different to the one below
next.setNodeIsShuttingDown();
iter.remove();
}
}

public void isolateDatafeed(long allocationId) {
// This calls get() rather than remove() because we expect that the persistent task will
// be removed shortly afterwards and that operation needs to be able to find the holder
Holder holder = runningDatafeedsOnThisNode.get(allocationId);
if (holder != null) {
holder.isolateDatafeed();
Expand Down Expand Up @@ -195,7 +196,7 @@ protected void doRun() {
holder.stop("general_lookback_failure", TimeValue.timeValueSeconds(20), e);
return;
}
if (isolated == false) {
if (holder.isIsolated() == false) {
if (next != null) {
doDatafeedRealtime(next, holder.datafeedJob.getJobId(), holder);
} else {
Expand Down Expand Up @@ -298,7 +299,7 @@ public class Holder {
private final ProblemTracker problemTracker;
private final Consumer<Exception> finishHandler;
volatile Scheduler.Cancellable cancellable;
private volatile boolean isRelocating;
private volatile boolean isNodeShuttingDown;

Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob,
ProblemTracker problemTracker, Consumer<Exception> finishHandler) {
Expand All @@ -324,7 +325,7 @@ boolean isIsolated() {
}

public void stop(String source, TimeValue timeout, Exception e) {
if (isRelocating) {
if (isNodeShuttingDown) {
return;
}

Expand All @@ -344,11 +345,12 @@ public void stop(String source, TimeValue timeout, Exception e) {
if (cancellable != null) {
cancellable.cancel();
}
auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
auditor.info(datafeedJob.getJobId(),
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED));
finishHandler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");
if (autoCloseJob) {
if (autoCloseJob && isIsolated() == false) {
closeJob();
}
if (acquired) {
Expand All @@ -361,16 +363,18 @@ public void stop(String source, TimeValue timeout, Exception e) {
}

/**
* This stops a datafeed WITHOUT updating the corresponding persistent task. It must ONLY be called
* immediately prior to shutting down a node. Then the datafeed task can remain "started", and be
* relocated to a different node. Calling this method at any other time will ruin the datafeed.
* This stops a datafeed WITHOUT updating the corresponding persistent task. When called it
* will stop the datafeed from sending data to its job as quickly as possible. The caller
* must do something sensible with the corresponding persistent task. If the node is shutting
* down the task will automatically get reassigned. Otherwise the caller must take action to
* remove or reassign the persistent task, or the datafeed will be left in limbo.
*/
public void isolateDatafeed() {
datafeedJob.isolate();
}

public void setRelocating() {
isRelocating = true;
public void setNodeIsShuttingDown() {
isNodeShuttingDown = true;
}

private Long executeLookBack(long startTime, Long endTime) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ setup:
indices.create:
index: airline-data
body:
settings:
index:
number_of_replicas: 0
number_of_shards: 1
mappings:
properties:
time:
Expand Down Expand Up @@ -53,10 +57,9 @@ setup:
job_id: set-upgrade-mode-job

- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed
cluster.health:
index: airline-data
wait_for_status: green

---
teardown:
Expand All @@ -70,6 +73,10 @@ teardown:

---
"Test setting upgrade_mode to false when it is already false":
- do:
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed

- do:
ml.set_upgrade_mode:
enabled: false
Expand All @@ -92,6 +99,22 @@ teardown:

---
"Setting upgrade_mode to enabled":
- do:
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed

- do:
cat.tasks: {}
- match:
$body: |
/.+job.+/

- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/

- do:
ml.info: {}
- match: { upgrade_mode: false }
Expand Down Expand Up @@ -125,6 +148,22 @@ teardown:

---
"Setting upgrade mode to disabled from enabled":
- do:
ml.start_datafeed:
datafeed_id: set-upgrade-mode-job-datafeed

- do:
cat.tasks: {}
- match:
$body: |
/.+job.+/

- do:
cat.tasks: {}
- match:
$body: |
/.+datafeed.+/

- do:
ml.set_upgrade_mode:
enabled: true
Expand Down