From 3e4b0aa5e20fc81d9a6807dce22a30af57ab0d0f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 19 Feb 2021 18:52:30 +0000 Subject: [PATCH] [ML] Avoid memory tracker race condition (#69290) This change fixes a race condition that can occur if the return value of memoryTracker.isRecentlyRefreshed() changes between two calls that are assumed to return the same value. The solution is to just call the method once and pass that value to the other place where it is needed. Then all related code makes decisions based on the same view of whether the memory tracker has been recently refreshed or not. Fixes #69289 --- .../ml/action/TransportStartDataFrameAnalyticsAction.java | 3 ++- .../snapshot/upgrader/SnapshotUpgradeTaskExecutor.java | 3 ++- .../xpack/ml/job/task/OpenJobPersistentTasksExecutor.java | 2 +- .../xpack/ml/task/AbstractJobPersistentTasksExecutor.java | 8 ++++---- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index d2e70f18ef086..b397572a0c9b4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -609,7 +609,8 @@ protected AllocatedPersistentTask createTask( @Override public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params, ClusterState clusterState) { boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); - Optional optionalAssignment = getPotentialAssignment(params, clusterState); + Optional optionalAssignment = + getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed); if (optionalAssignment.isPresent()) { return optionalAssignment.get(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java index 8e954249d84d3..415a04f5cf804 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java @@ -79,7 +79,8 @@ public SnapshotUpgradeTaskExecutor(Settings settings, @Override public PersistentTasksCustomMetadata.Assignment getAssignment(SnapshotUpgradeTaskParams params, ClusterState clusterState) { boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); - Optional optionalAssignment = getPotentialAssignment(params, clusterState); + Optional optionalAssignment = + getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed); if (optionalAssignment.isPresent()) { return optionalAssignment.get(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java index 77899e98c98d7..9c21c79f25554 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java @@ -114,7 +114,7 @@ public Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clu return AWAITING_MIGRATION; } boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); - Optional optionalAssignment = getPotentialAssignment(params, clusterState); + Optional optionalAssignment = getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed); if (optionalAssignment.isPresent()) { return optionalAssignment.get(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java index b613a34d96f24..ad0f51e654d4d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java @@ -152,7 +152,8 @@ protected boolean allowsMissingIndices() { return true; } - public Optional getPotentialAssignment(Params params, ClusterState clusterState) { + public Optional getPotentialAssignment(Params params, ClusterState clusterState, + boolean isMemoryTrackerRecentlyRefreshed) { // If we are waiting for an upgrade to complete, we should not assign to a node if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) { return Optional.of(AWAITING_UPGRADE); @@ -165,7 +166,7 @@ public Optional getPotentialAssignment if (missingIndices.isPresent()) { return missingIndices; } - Optional staleMemory = checkMemoryFreshness(jobId); + Optional staleMemory = checkMemoryFreshness(jobId, isMemoryTrackerRecentlyRefreshed); if (staleMemory.isPresent()) { return staleMemory; } @@ -212,8 +213,7 @@ public Optional checkRequiredIndices(S return Optional.empty(); } - public Optional checkMemoryFreshness(String jobId) { - boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); + public Optional checkMemoryFreshness(String jobId, boolean isMemoryTrackerRecentlyRefreshed) { if (isMemoryTrackerRecentlyRefreshed == false) { boolean scheduledRefresh = memoryTracker.asyncRefresh(); if (scheduledRefresh) {