Skip to content

Commit

Permalink
[ML] Avoid memory tracker race condition (elastic#69290)
Browse files Browse the repository at this point in the history
This change fixes a race condition that can occur if the
return value of memoryTracker.isRecentlyRefreshed() changes
between two calls that are assumed to return the same value.
The solution is to just call the method once and pass that
value to the other place where it is needed.  Then all related
code makes decisions based on the same view of whether the
memory tracker has been recently refreshed or not.

Fixes elastic#69289
  • Loading branch information
droberts195 committed Feb 22, 2021
1 parent 0a1d422 commit 3e4b0aa
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ protected AllocatedPersistentTask createTask(
@Override
public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params, ClusterState clusterState) {
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
if (optionalAssignment.isPresent()) {
return optionalAssignment.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public SnapshotUpgradeTaskExecutor(Settings settings,
@Override
public PersistentTasksCustomMetadata.Assignment getAssignment(SnapshotUpgradeTaskParams params, ClusterState clusterState) {
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
if (optionalAssignment.isPresent()) {
return optionalAssignment.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clu
return AWAITING_MIGRATION;
}
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
Optional<Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
Optional<Assignment> optionalAssignment = getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
if (optionalAssignment.isPresent()) {
return optionalAssignment.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ protected boolean allowsMissingIndices() {
return true;
}

public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment(Params params, ClusterState clusterState) {
public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment(Params params, ClusterState clusterState,
boolean isMemoryTrackerRecentlyRefreshed) {
// If we are waiting for an upgrade to complete, we should not assign to a node
if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) {
return Optional.of(AWAITING_UPGRADE);
Expand All @@ -165,7 +166,7 @@ public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment
if (missingIndices.isPresent()) {
return missingIndices;
}
Optional<PersistentTasksCustomMetadata.Assignment> staleMemory = checkMemoryFreshness(jobId);
Optional<PersistentTasksCustomMetadata.Assignment> staleMemory = checkMemoryFreshness(jobId, isMemoryTrackerRecentlyRefreshed);
if (staleMemory.isPresent()) {
return staleMemory;
}
Expand Down Expand Up @@ -212,8 +213,7 @@ public Optional<PersistentTasksCustomMetadata.Assignment> checkRequiredIndices(S
return Optional.empty();
}

public Optional<PersistentTasksCustomMetadata.Assignment> checkMemoryFreshness(String jobId) {
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
public Optional<PersistentTasksCustomMetadata.Assignment> checkMemoryFreshness(String jobId, boolean isMemoryTrackerRecentlyRefreshed) {
if (isMemoryTrackerRecentlyRefreshed == false) {
boolean scheduledRefresh = memoryTracker.asyncRefresh();
if (scheduledRefresh) {
Expand Down

0 comments on commit 3e4b0aa

Please sign in to comment.