Skip to content

Commit

Permalink
updated all getter and setters with suitable methods present in Capac…
Browse files Browse the repository at this point in the history
…itySchedulerConfiguration

Change-Id: I2d2a9a0d7da17d8fe0e69263743c4c9ecf9b18e0
  • Loading branch information
susheel-gupta committed Jan 31, 2023
1 parent a38eb1f commit 3b56d68
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
}

public void setMaximumApplicationMasterResourcePercent(float percent) {
setFloat(PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
}

private void throwExceptionForUnexpectedWeight(float weight, String queue,
String label) {
if ((weight < -1e-6 && Math.abs(weight + 1) > 1e-6) || weight > 10000) {
Expand Down Expand Up @@ -723,6 +727,17 @@ public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy(
return orderingPolicy;
}

public void setOrderingPolicy(String queue,
String appOrderingPolicy, String postfix, boolean value) {
setBoolean(getQueuePrefix(queue) +
"ordering-policy" + DOT + appOrderingPolicy + DOT + postfix, value);
}

public boolean getOrderingPolicy(String queue, String appOrderingPolicy, String postfix) {
return getBoolean(getQueuePrefix(queue) +
"ordering-policy" + DOT + appOrderingPolicy + DOT + postfix, false);
}

public void setUserLimit(String queue, float userLimit) {
setFloat(getQueuePrefix(queue) + USER_LIMIT, userLimit);
LOG.debug("here setUserLimit: queuePrefix={}, userLimit={}",
Expand Down Expand Up @@ -1203,6 +1218,16 @@ public void reinitializeConfigurationProperties() {
configurationProperties = new ConfigurationProperties(props);
}

public void setQueueMaximumAllocationMb(String queue, String value) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + "maximum-allocation-mb", value);
}

public void setQueueMaximumAllocationVcores(String queue, String value) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + "maximum-allocation-vcores", value);
}

public long getQueueMaximumAllocationMb(String queue) {
String queuePrefix = getQueuePrefix(queue);
return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
Expand Down Expand Up @@ -1477,6 +1502,14 @@ public List<MappingRule> parseJSONMappingRules() throws IOException {
return new ArrayList<>();
}

public void setMappingRuleFormat(String format) {
set(MAPPING_RULE_FORMAT, format);
}

public void setMappingRuleJson(String json) {
set(MAPPING_RULE_JSON, json);
}

public List<MappingRule> getMappingRules() throws IOException {
String mappingFormat =
get(MAPPING_RULE_FORMAT, MAPPING_RULE_FORMAT_DEFAULT);
Expand Down Expand Up @@ -1693,6 +1726,14 @@ public boolean getIntraQueuePreemptionDisabled(String queue,
+ QUEUE_PREEMPTION_DISABLED, defaultVal);
}

public void setDisablePreemptionForPreemtionModes(String preemptionType, boolean value) {
setBoolean(PREEMPTION_CONFIG_PREFIX + preemptionType, value);
}

public boolean getDisablePreemptionForObserveOnly() {
return getBoolean(PREEMPTION_OBSERVE_ONLY, DEFAULT_PREEMPTION_OBSERVE_ONLY);
}

/**
* Get configured node labels in a given queuePath
*/
Expand Down Expand Up @@ -1794,29 +1835,48 @@ public static boolean shouldAppFailFast(Configuration conf) {
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
}

public Integer getMaxParallelAppsForQueue(String queue) {
int defaultMaxParallelAppsForQueue =
getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
public void setQueueMaxParallelApplications(String value) {
set(PREFIX + MAX_PARALLEL_APPLICATIONS, value);
}

public void setQueueMaxParallelApplications(String queue, String value) {
set(getQueuePrefix(queue) + MAX_PARALLEL_APPLICATIONS, value);
}

public void setUserMaxParallelApplications(String value) {
set(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS, value);
}

public void setUserMaxParallelApplications(String user, int value) {
setInt(getUserPrefix(user) + MAX_PARALLEL_APPLICATIONS, value);
}

public Integer getMaxParallelAppsForQueue(String queue) {
String maxParallelAppsForQueue = get(getQueuePrefix(queue)
+ MAX_PARALLEL_APPLICATIONS);

return (maxParallelAppsForQueue != null) ?
Integer.parseInt(maxParallelAppsForQueue)
: defaultMaxParallelAppsForQueue;
: getMaxParallelAppsForQueue();
}

public Integer getMaxParallelAppsForUser(String user) {
int defaultMaxParallelAppsForUser =
getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
public Integer getMaxParallelAppsForQueue() {
return getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
}

public Integer getMaxParallelAppsForUser(String user) {
String maxParallelAppsForUser = get(getUserPrefix(user)
+ MAX_PARALLEL_APPLICATIONS);

return (maxParallelAppsForUser != null) ?
Integer.parseInt(maxParallelAppsForUser)
: defaultMaxParallelAppsForUser;
: getMaxParallelAppsForUser();
}

public Integer getMaxParallelAppsForUser() {
return getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
}

public boolean getAllowZeroCapacitySum(String queue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_JSON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON;
Expand All @@ -35,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AccessType;
Expand Down Expand Up @@ -342,14 +342,13 @@ private void performRuleConversion(FairScheduler fs)
}
writer.writeValue(mappingRulesOutputStream, desc);

capacitySchedulerConfig.set(MAPPING_RULE_FORMAT,
MAPPING_RULE_FORMAT_JSON);
capacitySchedulerConfig.setMappingRuleFormat(MAPPING_RULE_FORMAT_JSON);
capacitySchedulerConfig.setOverrideWithQueueMappings(true);
if (!rulesToFile) {
String json =
((ByteArrayOutputStream)mappingRulesOutputStream)
.toString(StandardCharsets.UTF_8.displayName());
capacitySchedulerConfig.set(MAPPING_RULE_JSON, json);
capacitySchedulerConfig.setMappingRuleJson(json);
}
} else {
LOG.info("No rules to convert");
Expand Down Expand Up @@ -377,47 +376,39 @@ private OutputStream getOutputStreamForJson() throws FileNotFoundException {

private void emitDefaultQueueMaxParallelApplications() {
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
PREFIX + "max-parallel-apps",
capacitySchedulerConfig.setQueueMaxParallelApplications(
String.valueOf(queueMaxAppsDefault));
}
}

private void emitDefaultUserMaxParallelApplications() {
if (userMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
PREFIX + "user.max-parallel-apps",
capacitySchedulerConfig.setUserMaxParallelApplications(
String.valueOf(userMaxAppsDefault));
}
}

private void emitUserMaxParallelApplications() {
userMaxApps
.forEach((user, apps) -> {
capacitySchedulerConfig.setInt(
PREFIX + "user." + user + ".max-parallel-apps", apps);
capacitySchedulerConfig.setUserMaxParallelApplications(user, apps);
});
}

private void emitDefaultMaxAMShare() {
if (queueMaxAMShareDefault == QUEUE_MAX_AM_SHARE_DISABLED) {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
capacitySchedulerConfig.setMaximumApplicationMasterResourcePercent(
1.0f);
} else {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
capacitySchedulerConfig.setMaximumApplicationMasterResourcePercent(
queueMaxAMShareDefault);
}
}
private void emitDisablePreemptionForObserveOnlyMode() {
if (preemptionMode == FSConfigToCSConfigConverterParams
.PreemptionMode.OBSERVE_ONLY) {
capacitySchedulerConfig.
setBoolean(CapacitySchedulerConfiguration.
PREEMPTION_OBSERVE_ONLY, true);
setDisablePreemptionForPreemtionModes("observe_only", true);
}
}

Expand All @@ -433,13 +424,13 @@ private void generateQueueAcl(String queue,

if (!submitAcls.getGroups().isEmpty() ||
!submitAcls.getUsers().isEmpty()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_submit_applications",
capacitySchedulerConfig.setAcl(queue, QueueACL.SUBMIT_APPLICATIONS,
submitAcls.getAclString());
}

if (!adminAcls.getGroups().isEmpty() ||
!adminAcls.getUsers().isEmpty()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_administer_queue",
capacitySchedulerConfig.setAcl(queue, QueueACL.ADMINISTER_QUEUE,
adminAcls.getAclString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_LIMIT_FACTOR;

import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -103,11 +99,10 @@ private void emitChildQueues(String queueName, List<FSQueue> children) {
ruleHandler.handleChildQueueCount(queueName, children.size());

if (children.size() > 0) {
String childQueues = children.stream()
List<String> childQueues = children.stream()
.map(child -> getQueueShortName(child.getName()))
.collect(Collectors.joining(","));

capacitySchedulerConfig.set(PREFIX + queueName + ".queues", childQueues);
.collect(Collectors.toList());
capacitySchedulerConfig.setQueues(queueName, childQueues.toArray(new String[0]));
}
}

Expand All @@ -124,14 +119,14 @@ private void emitMaxAMShare(String queueName, FSQueue queue) {
if (queueMaxAmShare != 0.0f
&& queueMaxAmShare != queueMaxAMShareDefault
&& queueMaxAmShare != QUEUE_MAX_AM_SHARE_DISABLED) {
capacitySchedulerConfig.setFloat(PREFIX + queueName +
".maximum-am-resource-percent", queueMaxAmShare);
capacitySchedulerConfig.setMaximumApplicationMasterResourcePerQueuePercent(
queueName, queueMaxAmShare);
}

if (queueMaxAmShare == QUEUE_MAX_AM_SHARE_DISABLED
&& queueMaxAmShare != queueMaxAMShareDefault) {
capacitySchedulerConfig.setFloat(PREFIX + queueName +
".maximum-am-resource-percent", 1.0f);
capacitySchedulerConfig.setMaximumApplicationMasterResourcePerQueuePercent(
queueName, 1.0f);
}
}

Expand All @@ -144,7 +139,7 @@ private void emitMaxAMShare(String queueName, FSQueue queue) {
private void emitMaxParallelApps(String queueName, FSQueue queue) {
if (queue.getMaxRunningApps() != MAX_RUNNING_APPS_UNSET
&& queue.getMaxRunningApps() != queueMaxAppsDefault) {
capacitySchedulerConfig.set(PREFIX + queueName + ".max-parallel-apps",
capacitySchedulerConfig.setQueueMaxParallelApplications(queueName,
String.valueOf(queue.getMaxRunningApps()));
}
}
Expand All @@ -164,8 +159,8 @@ private void emitMaximumCapacity(String queueName, FSQueue queue) {
ruleHandler.handleMaxResources();
}

capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-capacity",
"100");
capacitySchedulerConfig.setMaximumCapacity(queueName,
100.0f);
}

/**
Expand Down Expand Up @@ -196,11 +191,11 @@ private void emitMaxAllocations(String queueName, FSQueue queue) {

// only emit max allocation if it differs from the parent's setting
if (maxVcores != parentMaxVcores || maxMemory != parentMaxMemory) {
capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-allocation-mb", String.valueOf(maxMemory));
capacitySchedulerConfig.setQueueMaximumAllocationMb(
queueName, String.valueOf(maxMemory));

capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-allocation-vcores", String.valueOf(maxVcores));
capacitySchedulerConfig.setQueueMaximumAllocationVcores(
queueName, String.valueOf(maxVcores));
}
}
}
Expand All @@ -213,17 +208,13 @@ private void emitMaxAllocations(String queueName, FSQueue queue) {
*/
private void emitPreemptionDisabled(String queueName, FSQueue queue) {
if (preemptionEnabled && !queue.isPreemptable()) {
capacitySchedulerConfig.set(PREFIX + queueName + ".disable_preemption",
"true");
capacitySchedulerConfig.setPreemptionDisabled(queueName, true);
}
}

public void emitDefaultUserLimitFactor(String queueName, List<FSQueue> children) {
if (children.isEmpty()) {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
PREFIX + queueName + DOT + USER_LIMIT_FACTOR,
-1.0f);
capacitySchedulerConfig.setUserLimitFactor(queueName, -1.0f);
}
}

Expand All @@ -235,8 +226,8 @@ public void emitDefaultUserLimitFactor(String queueName, List<FSQueue> children)
*/
private void emitSizeBasedWeight(String queueName) {
if (sizeBasedWeight) {
capacitySchedulerConfig.setBoolean(PREFIX + queueName +
".ordering-policy.fair.enable-size-based-weight", true);
capacitySchedulerConfig.setOrderingPolicy(
queueName, "fair", "enable-size-based-weight", true);
}
}

Expand All @@ -252,19 +243,16 @@ private void emitOrderingPolicy(String queueName, FSQueue queue) {

switch (policy) {
case DominantResourceFairnessPolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FAIR_POLICY);
capacitySchedulerConfig.setOrderingPolicy(queueName, FAIR_POLICY);
break;
case FairSharePolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FAIR_POLICY);
capacitySchedulerConfig.setOrderingPolicy(queueName, FAIR_POLICY);
if (drfUsed) {
ruleHandler.handleFairAsDrf(queueName);
}
break;
case FifoPolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FIFO_POLICY);
capacitySchedulerConfig.setOrderingPolicy(queueName, FIFO_POLICY);
break;
default:
String msg = String.format("Unexpected ordering policy " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;

public interface CapacityConverter {
void convertWeightsForChildQueues(FSQueue queue, Configuration csConfig);
void convertWeightsForChildQueues(FSQueue queue, CapacitySchedulerConfiguration csConfig);
}
Loading

0 comments on commit 3b56d68

Please sign in to comment.