Skip to content

Commit

Permalink
YARN-10802. Change Capacity Scheduler minimum-user-limit-percent to a…
Browse files Browse the repository at this point in the history
…ccept decimal values. Contributed by Benjamin Teke
  • Loading branch information
szilard-nemeth committed Jun 14, 2021
1 parent ebee2ae commit e31d060
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,8 @@ public void setMaximumCapacityByLabel(String queue, String label,
absoluteResourceCapacity);
}

public int getUserLimit(String queue) {
int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT,
public float getUserLimit(String queue) {
float userLimit = getFloat(getQueuePrefix(queue) + USER_LIMIT,
DEFAULT_USER_LIMIT);
return userLimit;
}
Expand Down Expand Up @@ -686,8 +686,8 @@ public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy(
return orderingPolicy;
}

public void setUserLimit(String queue, int userLimit) {
setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit);
public void setUserLimit(String queue, float userLimit) {
setFloat(getQueuePrefix(queue) + USER_LIMIT, userLimit);
LOG.debug("here setUserLimit: queuePrefix={}, userLimit={}",
getQueuePrefix(queue), getUserLimit(queue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ protected void setupQueueConfigs(Resource clusterResource,
conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));

// Validate leaf queue's user's weights.
int queueUL = Math.min(100, conf.getUserLimit(getQueuePath()));
float queueUL = Math.min(100.0f, conf.getUserLimit(getQueuePath()));
for (Entry<String, Float> e : getUserWeights().entrySet()) {
float val = e.getValue().floatValue();
if (val < 0.0f || val > (100.0f / queueUL)) {
Expand Down Expand Up @@ -367,17 +367,17 @@ public List<CSQueue> getChildQueues() {
}

/**
* Set user limit - used only for testing.
* Set user limit.
* @param userLimit new user limit
*/
@VisibleForTesting
void setUserLimit(int userLimit) {
void setUserLimit(float userLimit) {
usersManager.setUserLimit(userLimit);
usersManager.userLimitNeedsRecompute();
}

/**
* Set user limit factor - used only for testing.
* Set user limit factor.
* @param userLimitFactor new user limit factor
*/
@VisibleForTesting
Expand Down Expand Up @@ -444,7 +444,7 @@ public int getNumActiveApplications(String user) {
}

@Private
public int getUserLimit() {
public float getUserLimit() {
return usersManager.getUserLimit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PlanQueue extends AbstractManagedParentQueue {

private int maxAppsForReservation;
private int maxAppsPerUserForReservation;
private int userLimit;
private float userLimit;
private float userLimitFactor;
protected CapacitySchedulerContext schedulerContext;
private boolean showReservationsAsQueues;
Expand All @@ -60,25 +60,27 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName,
DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super
.getAbsoluteCapacity());
}
int userLimit = conf.getUserLimit(queuePath);
float userLimitFactor = conf.getUserLimitFactor(queuePath);
int maxAppsPerUserForReservation =
(int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
if (userLimitFactor == -1) {
maxAppsPerUserForReservation = maxAppsForReservation;
float configuredUserLimit = conf.getUserLimit(queuePath);
float configuredUserLimitFactor = conf.getUserLimitFactor(queuePath);
int configuredMaxAppsPerUserForReservation =
(int) (maxAppsForReservation * (configuredUserLimit / 100.0f) *
configuredUserLimitFactor);
if (configuredUserLimitFactor == -1) {
configuredMaxAppsPerUserForReservation = maxAppsForReservation;
}
updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
maxAppsPerUserForReservation);
updateQuotas(configuredUserLimit, configuredUserLimitFactor,
maxAppsForReservation, configuredMaxAppsPerUserForReservation);

StringBuffer queueInfo = new StringBuffer();
queueInfo.append("Created Plan Queue: ").append(queueName)
.append("\nwith capacity: [").append(super.getCapacity())
.append("]\nwith max capacity: [").append(super.getMaximumCapacity())
.append("\nwith max reservation apps: [").append(maxAppsForReservation)
.append("]\nwith max reservation apps per user: [")
.append(maxAppsPerUserForReservation).append("]\nwith user limit: [")
.append(userLimit).append("]\nwith user limit factor: [")
.append(userLimitFactor).append("].");
.append(configuredMaxAppsPerUserForReservation)
.append("]\nwith user limit: [")
.append(configuredUserLimit).append("]\nwith user limit factor: [")
.append(configuredUserLimitFactor).append("].");
LOG.info(queueInfo.toString());
}

Expand Down Expand Up @@ -123,12 +125,12 @@ public void reinitialize(CSQueue newlyParsedQueue,
}
}

private void updateQuotas(int userLimit, float userLimitFactor,
int maxAppsForReservation, int maxAppsPerUserForReservation) {
this.userLimit = userLimit;
this.userLimitFactor = userLimitFactor;
this.maxAppsForReservation = maxAppsForReservation;
this.maxAppsPerUserForReservation = maxAppsPerUserForReservation;
private void updateQuotas(float newUserLimit, float newUserLimitFactor,
int newMaxAppsForReservation, int newMaxAppsPerUserForReservation) {
this.userLimit = newUserLimit;
this.userLimitFactor = newUserLimitFactor;
this.maxAppsForReservation = newMaxAppsForReservation;
this.maxAppsPerUserForReservation = newMaxAppsPerUserForReservation;
}

/**
Expand All @@ -155,7 +157,7 @@ public int getMaxApplicationsPerUserForReservation() {
*
* @return userLimit
*/
public int getUserLimitForReservation() {
public float getUserLimitForReservation() {
return userLimit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void reinitialize(CSQueue newlyParsedQueue,
}
}

private void updateQuotas(int userLimit, float userLimitFactor,
private void updateQuotas(float userLimit, float userLimitFactor,
int maxAppsForReservation, int maxAppsPerUserForReservation) {
setUserLimit(userLimit);
setUserLimitFactor(userLimitFactor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class UsersManager implements AbstractUsersManager {
private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
new HashMap<String, Map<SchedulingMode, Long>>();

private volatile int userLimit;
private volatile float userLimit;
private volatile float userLimitFactor;

private WriteLock writeLock;
Expand Down Expand Up @@ -320,15 +320,15 @@ public UsersManager(QueueMetrics metrics, LeafQueue lQueue,
* Get configured user-limit.
* @return user limit
*/
public int getUserLimit() {
public float getUserLimit() {
return userLimit;
}

/**
* Set configured user-limit.
* @param userLimit user limit
*/
public void setUserLimit(int userLimit) {
public void setUserLimit(float userLimit) {
this.userLimit = userLimit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,21 @@ private void renderQueueCapacityInfo(ResponseInfo ri, String label) {

private void renderCommonLeafQueueInfo(ResponseInfo ri) {
ri.
__("Num Schedulable Applications:", Integer.toString(lqinfo.getNumActiveApplications())).
__("Num Non-Schedulable Applications:", Integer.toString(lqinfo.getNumPendingApplications())).
__("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
__("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
__("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
__("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
__("Num Schedulable Applications:",
Integer.toString(lqinfo.getNumActiveApplications())).
__("Num Non-Schedulable Applications:",
Integer.toString(lqinfo.getNumPendingApplications())).
__("Num Containers:",
Integer.toString(lqinfo.getNumContainers())).
__("Max Applications:",
Integer.toString(lqinfo.getMaxApplications())).
__("Max Applications Per User:",
Integer.toString(lqinfo.getMaxApplicationsPerUser())).
__("Configured Minimum User Limit Percent:",
lqinfo.getUserLimit() + "%").
__("Configured User Limit Factor:", lqinfo.getUserLimitFactor()).
__("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
__("Accessible Node Labels:",
StringUtils.join(",", lqinfo.getNodeLabels())).
__("Ordering Policy: ", lqinfo.getOrderingPolicyDisplayName()).
__("Preemption:",
lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
protected int numContainers;
protected int maxApplications;
protected int maxApplicationsPerUser;
protected int userLimit;
protected float userLimit;
protected UsersInfo users; // To add another level in the XML
protected float userLimitFactor;
protected float configuredMaxAMResourceLimit;
Expand Down Expand Up @@ -130,7 +130,7 @@ public int getMaxApplicationsPerUser() {
return maxApplicationsPerUser;
}

public int getUserLimit() {
public float getUserLimit() {
return userLimit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1731,6 +1731,111 @@ public void testUserLimits() throws Exception {
1, a.getAbstractUsersManager().getNumActiveUsers());
}

@Test
public void testDecimalUserLimits() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);

when(csContext.getClusterResource())
.thenReturn(Resources.createResource(16 * GB, 32));

// Users
final String user0 = "user_0";
final String user1 = "user_1";

// Submit applications
final ApplicationAttemptId appAttemptId0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, a,
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app0, user0);

final ApplicationAttemptId appAttemptId1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app1 =
new FiCaSchedulerApp(appAttemptId1, user1, a,
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app1, user1); // different user

// Setup some nodes
String host0 = "127.0.0.1";
FiCaSchedulerNode node0 =
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8*GB);
String host1 = "127.0.0.2";
FiCaSchedulerNode node1 =
TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8*GB);

final int numNodes = 2;
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
priority, recordFactory)));

app1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));

Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
app1);
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(),
node0, node1.getNodeID(), node1);

/**
* Start testing...
*/

// Set user-limit
a.setUserLimit(50.1f);
a.setUserLimitFactor(2);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

// There're two active users
assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());

// 1 container to user_0
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(3*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize());
assertEquals(0, app1.getCurrentConsumption().getMemorySize());

// Allocate another container. Since the user limit is 50.1% it isn't
// reached, app_0 will get another container.
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(6*GB, a.getUsedResources().getMemorySize());
assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize());
assertEquals(0, app1.getCurrentConsumption().getMemorySize());

applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(7*GB, a.getUsedResources().getMemorySize());
assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize());
assertEquals(GB, app1.getCurrentConsumption().getMemorySize());

// app_0 doesn't have outstanding resources, there's only one active user.
assertEquals("There should only be 1 active user!",
1, a.getAbstractUsersManager().getNumActiveUsers());
}

@Test
public void testUserSpecificUserLimits() throws Exception {
// Mock the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1164,19 +1164,27 @@ public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());

// Extra cases for testing maxApplicationsPerUser
int halfPercent = 50;
int oneAndQuarterPercent = 125;
float halfPercent = 50f;
float oneAndQuarterPercent = 125f;
float thirdPercent = 33.3f;
a.getUsersManager().setUserLimit(halfPercent);
b.getUsersManager().setUserLimit(oneAndQuarterPercent);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

assertEquals(a.getMaxApplications() * halfPercent / 100,
assertEquals((int) (a.getMaxApplications() * halfPercent / 100),
a.getMaxApplicationsPerUser());
// Q_B's limit per user shouldn't be greater
// than the whole queue's application limit
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());

b.getUsersManager().setUserLimit(thirdPercent);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

assertEquals((int) (b.getMaxApplications() * thirdPercent / 100),
b.getMaxApplicationsPerUser());

float userLimitFactorQueueA = 0.9f;
float userLimitFactorQueueB = 1.1f;
a.getUsersManager().setUserLimit(halfPercent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private class LeafQueueInfo extends QueueInfo {
int numContainers;
int maxApplications;
int maxApplicationsPerUser;
int userLimit;
float userLimit;
float userLimitFactor;
long defaultApplicationLifetime;
long maxApplicationLifetime;
Expand Down Expand Up @@ -352,7 +352,7 @@ public void verifySubQueueXML(Element qElem, String q,
WebServicesTestUtils.getXmlInt(qElem, "maxApplications");
lqi.maxApplicationsPerUser =
WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser");
lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit");
lqi.userLimit = WebServicesTestUtils.getXmlFloat(qElem, "userLimit");
lqi.userLimitFactor =
WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
lqi.defaultApplicationLifetime =
Expand Down Expand Up @@ -477,7 +477,7 @@ private void verifySubQueue(JSONObject info, String q,
lqi.numContainers = info.getInt("numContainers");
lqi.maxApplications = info.getInt("maxApplications");
lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
lqi.userLimit = info.getInt("userLimit");
lqi.userLimit = (float) info.getDouble("userLimit");
lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
lqi.defaultApplicationLifetime =
info.getLong("defaultApplicationLifetime");
Expand Down Expand Up @@ -553,7 +553,7 @@ private void verifyLeafQueueGeneric(String q, LeafQueueInfo info)
(float)info.maxApplicationsPerUser, info.userLimitFactor);

assertEquals("userLimit doesn't match", csConf.getUserLimit(q),
info.userLimit);
info.userLimit, 1e-3f);
assertEquals("userLimitFactor doesn't match",
csConf.getUserLimitFactor(q), info.userLimitFactor, 1e-3f);

Expand Down

0 comments on commit e31d060

Please sign in to comment.