diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index abd40a8062b53..7f82f7e0b5bb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -187,9 +187,6 @@ public class CapacityScheduler extends private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr; - // timeout to join when we stop this service - protected final long THREAD_JOIN_TIMEOUT_MS = 1000; - private PreemptionManager preemptionManager = new PreemptionManager(); private volatile boolean isLazyPreemptionEnabled = false; @@ -227,10 +224,7 @@ public Configuration getConf() { private ResourceCalculator calculator; private boolean usePortForNodeName; - private boolean scheduleAsynchronously; - @VisibleForTesting - protected List asyncSchedulerThreads; - private ResourceCommitterService resourceCommitterService; + private AsyncSchedulingConfiguration asyncSchedulingConf; private RMNodeLabelsManager labelManager; private AppPriorityACLsManager appPriorityACLManager; private boolean multiNodePlacementEnabled; @@ -238,16 +232,6 @@ public Configuration getConf() { private boolean printedVerboseLoggingForAsyncScheduling; private boolean appShouldFailFast; - /** - * EXPERT - */ - private long asyncScheduleInterval; - private static final String ASYNC_SCHEDULER_INTERVAL = - CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX - + ".scheduling-interval-ms"; - private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; - private long asyncMaxPendingBacklogs; - private CSMaxRunningAppsEnforcer maxRunningEnforcer; public CapacityScheduler() { @@ -376,27 +360,7 @@ private ResourceCalculator initResourceCalculator() { } private void initAsyncSchedulingProperties() { - scheduleAsynchronously = this.conf.getScheduleAynschronously(); - asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, - DEFAULT_ASYNC_SCHEDULER_INTERVAL); - - // number of threads for async scheduling - int maxAsyncSchedulingThreads = this.conf.getInt( - CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1); - maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1); - - if (scheduleAsynchronously) { - asyncSchedulerThreads = new ArrayList<>(); - for (int i = 0; i < maxAsyncSchedulingThreads; i++) { - asyncSchedulerThreads.add(new AsyncScheduleThread(this)); - } - resourceCommitterService = new ResourceCommitterService(this); - asyncMaxPendingBacklogs = this.conf.getInt( - CapacitySchedulerConfiguration. - SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS, - CapacitySchedulerConfiguration. - DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS); - } + this.asyncSchedulingConf = new AsyncSchedulingConfiguration(conf, this); } private void initMultiNodePlacement() { @@ -419,8 +383,8 @@ private void printSchedulerInitialized() { getResourceCalculator().getClass(), getMinimumResourceCapability(), getMaximumResourceCapability(), - scheduleAsynchronously, - asyncScheduleInterval, + asyncSchedulingConf.isScheduleAsynchronously(), + asyncSchedulingConf.getAsyncScheduleInterval(), multiNodePlacementEnabled, assignMultipleEnabled, maxAssignPerHeartbeat, @@ -431,15 +395,7 @@ private void startSchedulerThreads() { writeLock.lock(); try { activitiesManager.start(); - if (scheduleAsynchronously) { - Preconditions.checkNotNull(asyncSchedulerThreads, - "asyncSchedulerThreads is null"); - for (Thread t : asyncSchedulerThreads) { - t.start(); - } - - resourceCommitterService.start(); - } + asyncSchedulingConf.startThreads(); } finally { writeLock.unlock(); } @@ -465,14 +421,7 @@ public void serviceStop() throws Exception { writeLock.lock(); try { this.activitiesManager.stop(); - if (scheduleAsynchronously && asyncSchedulerThreads != null) { - for (Thread t : asyncSchedulerThreads) { - t.interrupt(); - t.join(THREAD_JOIN_TIMEOUT_MS); - } - resourceCommitterService.interrupt(); - resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS); - } + asyncSchedulingConf.serviceStopInvoked(); } finally { writeLock.unlock(); } @@ -539,7 +488,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext) } long getAsyncScheduleInterval() { - return asyncScheduleInterval; + return asyncSchedulingConf.getAsyncScheduleInterval(); } private final static Random random = new Random(System.currentTimeMillis()); @@ -692,7 +641,7 @@ public void run() { } else { // Don't run schedule if we have some pending backlogs already if (cs.getAsyncSchedulingPendingBacklogs() - > cs.asyncMaxPendingBacklogs) { + > cs.asyncSchedulingConf.getAsyncMaxPendingBacklogs()) { Thread.sleep(1); } else{ schedule(cs); @@ -1479,7 +1428,7 @@ protected void nodeUpdate(RMNode rmNode) { } // Try to do scheduling - if (!scheduleAsynchronously) { + if (!asyncSchedulingConf.isScheduleAsynchronously()) { writeLock.lock(); try { // reset allocation and reservation stats before we start doing any @@ -2291,8 +2240,8 @@ private void addNode(RMNode nodeManager) { "Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); - if (scheduleAsynchronously && getNumClusterNodes() == 1) { - for (AsyncScheduleThread t : asyncSchedulerThreads) { + if (asyncSchedulingConf.isScheduleAsynchronously() && getNumClusterNodes() == 1) { + for (AsyncScheduleThread t : asyncSchedulingConf.asyncSchedulerThreads) { t.beginSchedule(); } } @@ -2340,11 +2289,7 @@ private void removeNode(RMNode nodeInfo) { new ResourceLimits(clusterResource)); int numNodes = nodeTracker.nodeCount(); - if (scheduleAsynchronously && numNodes == 0) { - for (AsyncScheduleThread t : asyncSchedulerThreads) { - t.suspendSchedule(); - } - } + asyncSchedulingConf.nodeRemoved(numNodes); LOG.info( "Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " @@ -3092,9 +3037,9 @@ public void submitResourceCommitRequest(Resource cluster, return; } - if (scheduleAsynchronously) { + if (asyncSchedulingConf.isScheduleAsynchronously()) { // Submit to a commit thread and commit it async-ly - resourceCommitterService.addNewCommitRequest(request); + asyncSchedulingConf.resourceCommitterService.addNewCommitRequest(request); } else{ // Otherwise do it sync-ly. tryCommit(cluster, request, true); @@ -3339,10 +3284,7 @@ public boolean tryCommit(Resource cluster, ResourceCommitRequest r, } public int getAsyncSchedulingPendingBacklogs() { - if (scheduleAsynchronously) { - return resourceCommitterService.getPendingBacklogs(); - } - return 0; + return asyncSchedulingConf.getPendingBacklogs(); } @Override @@ -3483,7 +3425,7 @@ public boolean isMultiNodePlacementEnabled() { } public int getNumAsyncSchedulerThreads() { - return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size(); + return asyncSchedulingConf.getNumAsyncSchedulerThreads(); } @VisibleForTesting @@ -3503,4 +3445,109 @@ public boolean placementConstraintEnabled() { public void setQueueManager(CapacitySchedulerQueueManager qm) { this.queueManager = qm; } + + @VisibleForTesting + public List getAsyncSchedulerThreads() { + return asyncSchedulingConf.getAsyncSchedulerThreads(); + } + + private static class AsyncSchedulingConfiguration { + // timeout to join when we stop this service + protected final long THREAD_JOIN_TIMEOUT_MS = 1000; + + @VisibleForTesting + protected List asyncSchedulerThreads; + private ResourceCommitterService resourceCommitterService; + + private long asyncScheduleInterval; + private static final String ASYNC_SCHEDULER_INTERVAL = + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX + + ".scheduling-interval-ms"; + private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; + private long asyncMaxPendingBacklogs; + + private final boolean scheduleAsynchronously; + + AsyncSchedulingConfiguration(CapacitySchedulerConfiguration conf, + CapacityScheduler cs) { + this.scheduleAsynchronously = conf.getScheduleAynschronously(); + if (this.scheduleAsynchronously) { + this.asyncScheduleInterval = conf.getLong( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_INTERVAL, + CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL); + // number of threads for async scheduling + int maxAsyncSchedulingThreads = conf.getInt( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, + 1); + maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1); + this.asyncMaxPendingBacklogs = conf.getInt( + CapacitySchedulerConfiguration. + SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS, + CapacitySchedulerConfiguration. + DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS); + + this.asyncSchedulerThreads = new ArrayList<>(); + for (int i = 0; i < maxAsyncSchedulingThreads; i++) { + asyncSchedulerThreads.add(new AsyncScheduleThread(cs)); + } + this.resourceCommitterService = new ResourceCommitterService(cs); + } + } + public boolean isScheduleAsynchronously() { + return scheduleAsynchronously; + } + public long getAsyncScheduleInterval() { + return asyncScheduleInterval; + } + public long getAsyncMaxPendingBacklogs() { + return asyncMaxPendingBacklogs; + } + + public void startThreads() { + if (scheduleAsynchronously) { + Preconditions.checkNotNull(asyncSchedulerThreads, + "asyncSchedulerThreads is null"); + for (Thread t : asyncSchedulerThreads) { + t.start(); + } + + resourceCommitterService.start(); + } + } + + public void serviceStopInvoked() throws InterruptedException { + if (scheduleAsynchronously && asyncSchedulerThreads != null) { + for (Thread t : asyncSchedulerThreads) { + t.interrupt(); + t.join(THREAD_JOIN_TIMEOUT_MS); + } + resourceCommitterService.interrupt(); + resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS); + } + } + + public void nodeRemoved(int numNodes) { + if (scheduleAsynchronously && numNodes == 0) { + for (AsyncScheduleThread t : asyncSchedulerThreads) { + t.suspendSchedule(); + } + } + } + + public int getPendingBacklogs() { + if (scheduleAsynchronously) { + return resourceCommitterService.getPendingBacklogs(); + } + return 0; + } + + public int getNumAsyncSchedulerThreads() { + return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size(); + } + + @VisibleForTesting + public List getAsyncSchedulerThreads() { + return asyncSchedulerThreads; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 2716ddebbdc6d..628c58576e68d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -273,6 +273,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS = SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-pending-backlogs"; + @Private + public static final String SCHEDULE_ASYNCHRONOUSLY_INTERVAL = + SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".scheduling-interval-ms"; + @Private + public static final long DEFAULT_SCHEDULE_ASYNCHRONOUSLY_INTERVAL = 5; + @Private public static final String APP_FAIL_FAST = PREFIX + "application.fail-fast"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 98214a030c954..b36e0edc73503 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -153,7 +153,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); for (CapacityScheduler.AsyncScheduleThread thread : - cs.asyncSchedulerThreads) { + cs.getAsyncSchedulerThreads()) { Assert.assertTrue(thread.getName() .startsWith("AsyncCapacitySchedulerThread")); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java index b71fe063927ac..b8209a54952e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java @@ -237,7 +237,7 @@ private void testUserLimitThroughputWithNumberOfResourceTypes( if (numThreads > 0) { // disable async scheduling threads - for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) { + for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) { t.suspendSchedule(); } } @@ -268,7 +268,7 @@ private void testUserLimitThroughputWithNumberOfResourceTypes( if (numThreads > 0) { // enable async scheduling threads - for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) { + for (CapacityScheduler.AsyncScheduleThread t : cs.getAsyncSchedulerThreads()) { t.beginSchedule(); }