Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11416. FS2CS should use CapacitySchedulerConfiguration in FSQueueConverterBuilder #5320

Merged
merged 10 commits into from
Aug 4, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
}

public void setMaximumApplicationMasterResourcePercent(float percent) {
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved
setFloat(PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
}

private void throwExceptionForUnexpectedWeight(float weight, String queue,
String label) {
if ((weight < -1e-6 && Math.abs(weight + 1) > 1e-6) || weight > 10000) {
Expand Down Expand Up @@ -723,6 +727,17 @@ public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy(
return orderingPolicy;
}

public void setOrderingPolicy(String queue,
String appOrderingPolicy, String postfix, boolean value) {
setBoolean(getQueuePrefix(queue) +
"ordering-policy" + DOT + appOrderingPolicy + DOT + postfix, value);
}

public boolean getOrderingPolicy(String queue, String appOrderingPolicy, String postfix) {
return getBoolean(getQueuePrefix(queue) +
"ordering-policy" + DOT + appOrderingPolicy + DOT + postfix, false);
}
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved

public void setUserLimit(String queue, float userLimit) {
setFloat(getQueuePrefix(queue) + USER_LIMIT, userLimit);
LOG.debug("here setUserLimit: queuePrefix={}, userLimit={}",
Expand Down Expand Up @@ -1203,6 +1218,16 @@ public void reinitializeConfigurationProperties() {
configurationProperties = new ConfigurationProperties(props);
}

public void setQueueMaximumAllocationMb(String queue, String value) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + "maximum-allocation-mb", value);
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved
}

public void setQueueMaximumAllocationVcores(String queue, String value) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + "maximum-allocation-vcores", value);
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved
}

public long getQueueMaximumAllocationMb(String queue) {
String queuePrefix = getQueuePrefix(queue);
return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
Expand Down Expand Up @@ -1477,6 +1502,14 @@ public List<MappingRule> parseJSONMappingRules() throws IOException {
return new ArrayList<>();
}

public void setMappingRuleFormat(String format) {
set(MAPPING_RULE_FORMAT, format);
}

public void setMappingRuleJson(String json) {
set(MAPPING_RULE_JSON, json);
}

public List<MappingRule> getMappingRules() throws IOException {
String mappingFormat =
get(MAPPING_RULE_FORMAT, MAPPING_RULE_FORMAT_DEFAULT);
Expand Down Expand Up @@ -1693,6 +1726,14 @@ public boolean getIntraQueuePreemptionDisabled(String queue,
+ QUEUE_PREEMPTION_DISABLED, defaultVal);
}

public void setDisablePreemptionForPreemtionModes(String preemptionType, boolean value) {
setBoolean(PREEMPTION_CONFIG_PREFIX + preemptionType, value);
}
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved

public boolean getDisablePreemptionForObserveOnly() {
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved
return getBoolean(PREEMPTION_OBSERVE_ONLY, DEFAULT_PREEMPTION_OBSERVE_ONLY);
}

/**
* Get configured node labels in a given queuePath
*/
Expand Down Expand Up @@ -1794,29 +1835,48 @@ public static boolean shouldAppFailFast(Configuration conf) {
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
}

public Integer getMaxParallelAppsForQueue(String queue) {
int defaultMaxParallelAppsForQueue =
getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
public void setQueueMaxParallelApplications(String value) {
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved
set(PREFIX + MAX_PARALLEL_APPLICATIONS, value);
}

public void setQueueMaxParallelApplications(String queue, String value) {
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved
set(getQueuePrefix(queue) + MAX_PARALLEL_APPLICATIONS, value);
}

public void setUserMaxParallelApplications(String value) {
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved
set(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS, value);
}

public void setUserMaxParallelApplications(String user, int value) {
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved
setInt(getUserPrefix(user) + MAX_PARALLEL_APPLICATIONS, value);
}

public Integer getMaxParallelAppsForQueue(String queue) {
String maxParallelAppsForQueue = get(getQueuePrefix(queue)
+ MAX_PARALLEL_APPLICATIONS);

return (maxParallelAppsForQueue != null) ?
Integer.parseInt(maxParallelAppsForQueue)
: defaultMaxParallelAppsForQueue;
: getMaxParallelAppsForQueue();
}

public Integer getMaxParallelAppsForUser(String user) {
int defaultMaxParallelAppsForUser =
getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
public Integer getMaxParallelAppsForQueue() {
return getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
}
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved

public Integer getMaxParallelAppsForUser(String user) {
String maxParallelAppsForUser = get(getUserPrefix(user)
+ MAX_PARALLEL_APPLICATIONS);

return (maxParallelAppsForUser != null) ?
Integer.parseInt(maxParallelAppsForUser)
: defaultMaxParallelAppsForUser;
: getMaxParallelAppsForUser();
}

public Integer getMaxParallelAppsForUser() {
return getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
}
susheel-gupta marked this conversation as resolved.
Show resolved Hide resolved

public boolean getAllowZeroCapacitySum(String queue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_JSON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON;
Expand All @@ -35,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AccessType;
Expand Down Expand Up @@ -342,14 +342,13 @@ private void performRuleConversion(FairScheduler fs)
}
writer.writeValue(mappingRulesOutputStream, desc);

capacitySchedulerConfig.set(MAPPING_RULE_FORMAT,
MAPPING_RULE_FORMAT_JSON);
capacitySchedulerConfig.setMappingRuleFormat(MAPPING_RULE_FORMAT_JSON);
capacitySchedulerConfig.setOverrideWithQueueMappings(true);
if (!rulesToFile) {
String json =
((ByteArrayOutputStream)mappingRulesOutputStream)
.toString(StandardCharsets.UTF_8.displayName());
capacitySchedulerConfig.set(MAPPING_RULE_JSON, json);
capacitySchedulerConfig.setMappingRuleJson(json);
}
} else {
LOG.info("No rules to convert");
Expand Down Expand Up @@ -377,47 +376,39 @@ private OutputStream getOutputStreamForJson() throws FileNotFoundException {

private void emitDefaultQueueMaxParallelApplications() {
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
PREFIX + "max-parallel-apps",
capacitySchedulerConfig.setQueueMaxParallelApplications(
String.valueOf(queueMaxAppsDefault));
}
}

private void emitDefaultUserMaxParallelApplications() {
if (userMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
PREFIX + "user.max-parallel-apps",
capacitySchedulerConfig.setUserMaxParallelApplications(
String.valueOf(userMaxAppsDefault));
}
}

private void emitUserMaxParallelApplications() {
userMaxApps
.forEach((user, apps) -> {
capacitySchedulerConfig.setInt(
PREFIX + "user." + user + ".max-parallel-apps", apps);
capacitySchedulerConfig.setUserMaxParallelApplications(user, apps);
});
}

private void emitDefaultMaxAMShare() {
if (queueMaxAMShareDefault == QUEUE_MAX_AM_SHARE_DISABLED) {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
capacitySchedulerConfig.setMaximumApplicationMasterResourcePercent(
1.0f);
} else {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
capacitySchedulerConfig.setMaximumApplicationMasterResourcePercent(
queueMaxAMShareDefault);
}
}
private void emitDisablePreemptionForObserveOnlyMode() {
if (preemptionMode == FSConfigToCSConfigConverterParams
.PreemptionMode.OBSERVE_ONLY) {
capacitySchedulerConfig.
setBoolean(CapacitySchedulerConfiguration.
PREEMPTION_OBSERVE_ONLY, true);
setDisablePreemptionForPreemtionModes("observe_only", true);
}
}

Expand All @@ -433,13 +424,13 @@ private void generateQueueAcl(String queue,

if (!submitAcls.getGroups().isEmpty() ||
!submitAcls.getUsers().isEmpty()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_submit_applications",
capacitySchedulerConfig.setAcl(queue, QueueACL.SUBMIT_APPLICATIONS,
submitAcls.getAclString());
}

if (!adminAcls.getGroups().isEmpty() ||
!adminAcls.getUsers().isEmpty()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_administer_queue",
capacitySchedulerConfig.setAcl(queue, QueueACL.ADMINISTER_QUEUE,
adminAcls.getAclString());
}
}
Expand Down Expand Up @@ -501,7 +492,7 @@ Configuration getYarnSiteConfig() {
}

@VisibleForTesting
Configuration getCapacitySchedulerConfig() {
CapacitySchedulerConfiguration getCapacitySchedulerConfig() {
return capacitySchedulerConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_LIMIT_FACTOR;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
Expand All @@ -48,7 +43,7 @@ public class FSQueueConverter {
private static final String FIFO_POLICY = "fifo";

private final FSConfigToCSConfigRuleHandler ruleHandler;
private Configuration capacitySchedulerConfig;
private CapacitySchedulerConfiguration capacitySchedulerConfig;
private final boolean preemptionEnabled;
private final boolean sizeBasedWeight;
@SuppressWarnings("unused")
Expand Down Expand Up @@ -104,11 +99,10 @@ private void emitChildQueues(String queueName, List<FSQueue> children) {
ruleHandler.handleChildQueueCount(queueName, children.size());

if (children.size() > 0) {
String childQueues = children.stream()
List<String> childQueues = children.stream()
.map(child -> getQueueShortName(child.getName()))
.collect(Collectors.joining(","));

capacitySchedulerConfig.set(PREFIX + queueName + ".queues", childQueues);
.collect(Collectors.toList());
capacitySchedulerConfig.setQueues(queueName, childQueues.toArray(new String[0]));
}
}

Expand All @@ -125,14 +119,14 @@ private void emitMaxAMShare(String queueName, FSQueue queue) {
if (queueMaxAmShare != 0.0f
&& queueMaxAmShare != queueMaxAMShareDefault
&& queueMaxAmShare != QUEUE_MAX_AM_SHARE_DISABLED) {
capacitySchedulerConfig.setFloat(PREFIX + queueName +
".maximum-am-resource-percent", queueMaxAmShare);
capacitySchedulerConfig.setMaximumApplicationMasterResourcePerQueuePercent(
queueName, queueMaxAmShare);
}

if (queueMaxAmShare == QUEUE_MAX_AM_SHARE_DISABLED
&& queueMaxAmShare != queueMaxAMShareDefault) {
capacitySchedulerConfig.setFloat(PREFIX + queueName +
".maximum-am-resource-percent", 1.0f);
capacitySchedulerConfig.setMaximumApplicationMasterResourcePerQueuePercent(
queueName, 1.0f);
}
}

Expand All @@ -145,7 +139,7 @@ private void emitMaxAMShare(String queueName, FSQueue queue) {
private void emitMaxParallelApps(String queueName, FSQueue queue) {
if (queue.getMaxRunningApps() != MAX_RUNNING_APPS_UNSET
&& queue.getMaxRunningApps() != queueMaxAppsDefault) {
capacitySchedulerConfig.set(PREFIX + queueName + ".max-parallel-apps",
capacitySchedulerConfig.setQueueMaxParallelApplications(queueName,
String.valueOf(queue.getMaxRunningApps()));
}
}
Expand All @@ -165,8 +159,8 @@ private void emitMaximumCapacity(String queueName, FSQueue queue) {
ruleHandler.handleMaxResources();
}

capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-capacity",
"100");
capacitySchedulerConfig.setMaximumCapacity(queueName,
100.0f);
}

/**
Expand Down Expand Up @@ -197,11 +191,11 @@ private void emitMaxAllocations(String queueName, FSQueue queue) {

// only emit max allocation if it differs from the parent's setting
if (maxVcores != parentMaxVcores || maxMemory != parentMaxMemory) {
capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-allocation-mb", String.valueOf(maxMemory));
capacitySchedulerConfig.setQueueMaximumAllocationMb(
queueName, String.valueOf(maxMemory));

capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-allocation-vcores", String.valueOf(maxVcores));
capacitySchedulerConfig.setQueueMaximumAllocationVcores(
queueName, String.valueOf(maxVcores));
}
}
}
Expand All @@ -214,17 +208,13 @@ private void emitMaxAllocations(String queueName, FSQueue queue) {
*/
private void emitPreemptionDisabled(String queueName, FSQueue queue) {
if (preemptionEnabled && !queue.isPreemptable()) {
capacitySchedulerConfig.set(PREFIX + queueName + ".disable_preemption",
"true");
capacitySchedulerConfig.setPreemptionDisabled(queueName, true);
}
}

public void emitDefaultUserLimitFactor(String queueName, List<FSQueue> children) {
if (children.isEmpty()) {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
PREFIX + queueName + DOT + USER_LIMIT_FACTOR,
-1.0f);
capacitySchedulerConfig.setUserLimitFactor(queueName, -1.0f);
}
}

Expand All @@ -236,8 +226,8 @@ public void emitDefaultUserLimitFactor(String queueName, List<FSQueue> children)
*/
private void emitSizeBasedWeight(String queueName) {
if (sizeBasedWeight) {
capacitySchedulerConfig.setBoolean(PREFIX + queueName +
".ordering-policy.fair.enable-size-based-weight", true);
capacitySchedulerConfig.setOrderingPolicy(
queueName, "fair", "enable-size-based-weight", true);
}
}

Expand All @@ -253,19 +243,16 @@ private void emitOrderingPolicy(String queueName, FSQueue queue) {

switch (policy) {
case DominantResourceFairnessPolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FAIR_POLICY);
capacitySchedulerConfig.setOrderingPolicy(queueName, FAIR_POLICY);
break;
case FairSharePolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FAIR_POLICY);
capacitySchedulerConfig.setOrderingPolicy(queueName, FAIR_POLICY);
if (drfUsed) {
ruleHandler.handleFairAsDrf(queueName);
}
break;
case FifoPolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FIFO_POLICY);
capacitySchedulerConfig.setOrderingPolicy(queueName, FIFO_POLICY);
break;
default:
String msg = String.format("Unexpected ordering policy " +
Expand Down
Loading