Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
szilard-nemeth committed Dec 14, 2021
1 parent 843f66f commit 5bbf36f
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ public class CapacityScheduler extends

private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr;

// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;

private PreemptionManager preemptionManager = new PreemptionManager();

private volatile boolean isLazyPreemptionEnabled = false;
Expand Down Expand Up @@ -227,27 +224,14 @@ public Configuration getConf() {
private ResourceCalculator calculator;
private boolean usePortForNodeName;

private boolean scheduleAsynchronously;
@VisibleForTesting
protected List<AsyncScheduleThread> asyncSchedulerThreads;
private ResourceCommitterService resourceCommitterService;
private AsyncSchedulingConfiguration asyncSchedulingConf;
private RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
private boolean multiNodePlacementEnabled;

private boolean printedVerboseLoggingForAsyncScheduling;
private boolean appShouldFailFast;

/**
* EXPERT
*/
private long asyncScheduleInterval;
private static final String ASYNC_SCHEDULER_INTERVAL =
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private long asyncMaxPendingBacklogs;

private CSMaxRunningAppsEnforcer maxRunningEnforcer;

public CapacityScheduler() {
Expand Down Expand Up @@ -376,27 +360,7 @@ private ResourceCalculator initResourceCalculator() {
}

private void initAsyncSchedulingProperties() {
scheduleAsynchronously = this.conf.getScheduleAynschronously();
asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL);

// number of threads for async scheduling
int maxAsyncSchedulingThreads = this.conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1);
maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);

if (scheduleAsynchronously) {
asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(this));
}
resourceCommitterService = new ResourceCommitterService(this);
asyncMaxPendingBacklogs = this.conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
}
this.asyncSchedulingConf = new AsyncSchedulingConfiguration(conf, this);
}

private void initMultiNodePlacement() {
Expand All @@ -419,8 +383,8 @@ private void printSchedulerInitialized() {
getResourceCalculator().getClass(),
getMinimumResourceCapability(),
getMaximumResourceCapability(),
scheduleAsynchronously,
asyncScheduleInterval,
asyncSchedulingConf.isScheduleAsynchronously(),
asyncSchedulingConf.getAsyncScheduleInterval(),
multiNodePlacementEnabled,
assignMultipleEnabled,
maxAssignPerHeartbeat,
Expand All @@ -431,15 +395,7 @@ private void startSchedulerThreads() {
writeLock.lock();
try {
activitiesManager.start();
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads,
"asyncSchedulerThreads is null");
for (Thread t : asyncSchedulerThreads) {
t.start();
}

resourceCommitterService.start();
}
asyncSchedulingConf.startThreads();
} finally {
writeLock.unlock();
}
Expand All @@ -465,14 +421,7 @@ public void serviceStop() throws Exception {
writeLock.lock();
try {
this.activitiesManager.stop();
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
for (Thread t : asyncSchedulerThreads) {
t.interrupt();
t.join(THREAD_JOIN_TIMEOUT_MS);
}
resourceCommitterService.interrupt();
resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS);
}
asyncSchedulingConf.serviceStopInvoked();
} finally {
writeLock.unlock();
}
Expand Down Expand Up @@ -539,7 +488,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext)
}

long getAsyncScheduleInterval() {
return asyncScheduleInterval;
return asyncSchedulingConf.getAsyncScheduleInterval();
}

private final static Random random = new Random(System.currentTimeMillis());
Expand Down Expand Up @@ -692,7 +641,7 @@ public void run() {
} else {
// Don't run schedule if we have some pending backlogs already
if (cs.getAsyncSchedulingPendingBacklogs()
> cs.asyncMaxPendingBacklogs) {
> cs.asyncSchedulingConf.getAsyncMaxPendingBacklogs()) {
Thread.sleep(1);
} else{
schedule(cs);
Expand Down Expand Up @@ -1479,7 +1428,7 @@ protected void nodeUpdate(RMNode rmNode) {
}

// Try to do scheduling
if (!scheduleAsynchronously) {
if (!asyncSchedulingConf.isScheduleAsynchronously()) {
writeLock.lock();
try {
// reset allocation and reservation stats before we start doing any
Expand Down Expand Up @@ -2291,8 +2240,8 @@ private void addNode(RMNode nodeManager) {
"Added node " + nodeManager.getNodeAddress() + " clusterResource: "
+ clusterResource);

if (scheduleAsynchronously && getNumClusterNodes() == 1) {
for (AsyncScheduleThread t : asyncSchedulerThreads) {
if (asyncSchedulingConf.isScheduleAsynchronously() && getNumClusterNodes() == 1) {
for (AsyncScheduleThread t : asyncSchedulingConf.asyncSchedulerThreads) {
t.beginSchedule();
}
}
Expand Down Expand Up @@ -2340,11 +2289,7 @@ private void removeNode(RMNode nodeInfo) {
new ResourceLimits(clusterResource));
int numNodes = nodeTracker.nodeCount();

if (scheduleAsynchronously && numNodes == 0) {
for (AsyncScheduleThread t : asyncSchedulerThreads) {
t.suspendSchedule();
}
}
asyncSchedulingConf.nodeRemoved(numNodes);

LOG.info(
"Removed node " + nodeInfo.getNodeAddress() + " clusterResource: "
Expand Down Expand Up @@ -3092,9 +3037,9 @@ public void submitResourceCommitRequest(Resource cluster,
return;
}

if (scheduleAsynchronously) {
if (asyncSchedulingConf.isScheduleAsynchronously()) {
// Submit to a commit thread and commit it async-ly
resourceCommitterService.addNewCommitRequest(request);
asyncSchedulingConf.resourceCommitterService.addNewCommitRequest(request);
} else{
// Otherwise do it sync-ly.
tryCommit(cluster, request, true);
Expand Down Expand Up @@ -3339,10 +3284,7 @@ public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
}

public int getAsyncSchedulingPendingBacklogs() {
if (scheduleAsynchronously) {
return resourceCommitterService.getPendingBacklogs();
}
return 0;
return asyncSchedulingConf.getPendingBacklogs();
}

@Override
Expand Down Expand Up @@ -3483,7 +3425,7 @@ public boolean isMultiNodePlacementEnabled() {
}

public int getNumAsyncSchedulerThreads() {
return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
return asyncSchedulingConf.getNumAsyncSchedulerThreads();
}

@VisibleForTesting
Expand All @@ -3503,4 +3445,109 @@ public boolean placementConstraintEnabled() {
public void setQueueManager(CapacitySchedulerQueueManager qm) {
this.queueManager = qm;
}

@VisibleForTesting
public List<AsyncScheduleThread> getAsyncSchedulerThreads() {
return asyncSchedulingConf.getAsyncSchedulerThreads();
}

private static class AsyncSchedulingConfiguration {
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;

@VisibleForTesting
protected List<AsyncScheduleThread> asyncSchedulerThreads;
private ResourceCommitterService resourceCommitterService;

private long asyncScheduleInterval;
private static final String ASYNC_SCHEDULER_INTERVAL =
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private long asyncMaxPendingBacklogs;

private final boolean scheduleAsynchronously;

AsyncSchedulingConfiguration(CapacitySchedulerConfiguration conf,
CapacityScheduler cs) {
this.scheduleAsynchronously = conf.getScheduleAynschronously();
if (this.scheduleAsynchronously) {
this.asyncScheduleInterval = conf.getLong(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_INTERVAL,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL);
// number of threads for async scheduling
int maxAsyncSchedulingThreads = conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
1);
maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
this.asyncMaxPendingBacklogs = conf.getInt(
CapacitySchedulerConfiguration.
SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
CapacitySchedulerConfiguration.
DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);

this.asyncSchedulerThreads = new ArrayList<>();
for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
asyncSchedulerThreads.add(new AsyncScheduleThread(cs));
}
this.resourceCommitterService = new ResourceCommitterService(cs);
}
}
public boolean isScheduleAsynchronously() {
return scheduleAsynchronously;
}
public long getAsyncScheduleInterval() {
return asyncScheduleInterval;
}
public long getAsyncMaxPendingBacklogs() {
return asyncMaxPendingBacklogs;
}

public void startThreads() {
if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads,
"asyncSchedulerThreads is null");
for (Thread t : asyncSchedulerThreads) {
t.start();
}

resourceCommitterService.start();
}
}

public void serviceStopInvoked() throws InterruptedException {
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
for (Thread t : asyncSchedulerThreads) {
t.interrupt();
t.join(THREAD_JOIN_TIMEOUT_MS);
}
resourceCommitterService.interrupt();
resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS);
}
}

public void nodeRemoved(int numNodes) {
if (scheduleAsynchronously && numNodes == 0) {
for (AsyncScheduleThread t : asyncSchedulerThreads) {
t.suspendSchedule();
}
}
}

public int getPendingBacklogs() {
if (scheduleAsynchronously) {
return resourceCommitterService.getPendingBacklogs();
}
return 0;
}

public int getNumAsyncSchedulerThreads() {
return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
}

@VisibleForTesting
public List<AsyncScheduleThread> getAsyncSchedulerThreads() {
return asyncSchedulerThreads;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-pending-backlogs";

@Private
public static final String SCHEDULE_ASYNCHRONOUSLY_INTERVAL =
SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms";
@Private
public static final long DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL = 5;

@Private
public static final String APP_FAIL_FAST = PREFIX + "application.fail-fast";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public RMNodeLabelsManager createNodeLabelManager() {

CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
for (CapacityScheduler.AsyncScheduleThread thread :
cs.asyncSchedulerThreads) {
cs.getAsyncSchedulerThreads()) {
Assert.assertTrue(thread.getName()
.startsWith("AsyncCapacitySchedulerThread"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(

if (numThreads > 0) {
// disable async scheduling threads
for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
t.suspendSchedule();
}
}
Expand Down Expand Up @@ -268,7 +268,7 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(

if (numThreads > 0) {
// enable async scheduling threads
for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) {
t.beginSchedule();
}

Expand Down

0 comments on commit 5bbf36f

Please sign in to comment.