Skip to content

Commit

Permalink
[ML] Avoid memory tracker race condition (#69290)
Browse files Browse the repository at this point in the history
This change fixes a race condition that can occur if the
return value of memoryTracker.isRecentlyRefreshed() changes
between two calls that are assumed to return the same value.
The solution is to just call the method once and pass that
value to the other place where it is needed.  Then all related
code makes decisions based on the same view of whether the
memory tracker has been recently refreshed or not.

Fixes #69289
  • Loading branch information
droberts195 authored Feb 19, 2021
1 parent 0cbab23 commit cf7e6b3
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,8 @@ protected AllocatedPersistentTask createTask(
@Override
public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params, ClusterState clusterState) {
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
if (optionalAssignment.isPresent()) {
return optionalAssignment.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public SnapshotUpgradeTaskExecutor(Settings settings,
@Override
public PersistentTasksCustomMetadata.Assignment getAssignment(SnapshotUpgradeTaskParams params, ClusterState clusterState) {
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
if (optionalAssignment.isPresent()) {
return optionalAssignment.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clu
return AWAITING_MIGRATION;
}
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
Optional<Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
Optional<Assignment> optionalAssignment = getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
if (optionalAssignment.isPresent()) {
return optionalAssignment.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ protected boolean allowsMissingIndices() {
return true;
}

public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment(Params params, ClusterState clusterState) {
public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment(Params params, ClusterState clusterState,
boolean isMemoryTrackerRecentlyRefreshed) {
// If we are waiting for an upgrade to complete, we should not assign to a node
if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) {
return Optional.of(AWAITING_UPGRADE);
Expand All @@ -165,7 +166,7 @@ public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment
if (missingIndices.isPresent()) {
return missingIndices;
}
Optional<PersistentTasksCustomMetadata.Assignment> staleMemory = checkMemoryFreshness(jobId);
Optional<PersistentTasksCustomMetadata.Assignment> staleMemory = checkMemoryFreshness(jobId, isMemoryTrackerRecentlyRefreshed);
if (staleMemory.isPresent()) {
return staleMemory;
}
Expand Down Expand Up @@ -212,8 +213,7 @@ public Optional<PersistentTasksCustomMetadata.Assignment> checkRequiredIndices(S
return Optional.empty();
}

public Optional<PersistentTasksCustomMetadata.Assignment> checkMemoryFreshness(String jobId) {
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
public Optional<PersistentTasksCustomMetadata.Assignment> checkMemoryFreshness(String jobId, boolean isMemoryTrackerRecentlyRefreshed) {
if (isMemoryTrackerRecentlyRefreshed == false) {
boolean scheduledRefresh = memoryTracker.asyncRefresh();
if (scheduledRefresh) {
Expand Down

0 comments on commit cf7e6b3

Please sign in to comment.