Skip to content

Commit

Permalink
YARN-11614. [Federation] Add Federation PolicyManager Validation Rules.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Nov 13, 2023
1 parent a32097a commit a5798bf
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,26 @@ public static void checkHeadRoomAlphaValid(String headRoomAlpha) throws YarnExce
}
}

public static void checkPolicyManagerValid(String policyManager) throws YarnException {
switch (policyManager) {
case "org.apache.hadoop.yarn.server.federation.policies.manager.HashBroadcastPolicyManager":
throw new YarnException("HashBroadcastPolicyManager does not support the use of queue weights.");
case "org.apache.hadoop.yarn.server.federation.policies.manager.HomePolicyManager":
throw new YarnException("HomePolicyManager does not support the use of queue weights.");
case "org.apache.hadoop.yarn.server.federation.policies.manager.RejectAllPolicyManager":
throw new YarnException("RejectAllPolicyManager does not support the " +
"use of queue weights.");
case "org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager":
throw new YarnException("UniformBroadcastPolicyManager does not support the " +
"use of queue weights.");
case "org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager":
case "org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager":
break;
default:
throw new YarnException("Unknown PolicyManager");
}
}

/**
* Determines whether the given value is a number.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkHeadRoomAlphaValid;
import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkSubClusterQueueWeightRatioValid;
import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkPolicyManagerValid;

public class RouterCLI extends Configured implements Tool {

Expand Down Expand Up @@ -634,6 +635,8 @@ protected SaveFederationQueuePolicyRequest parsePolicy(String policy) throws Yar
FederationQueueWeight.newInstance(routerWeight, amrmWeight, headroomalpha);
String policyManager = getConf().get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
checkPolicyManagerValid(policyManager);

SaveFederationQueuePolicyRequest request = SaveFederationQueuePolicyRequest.newInstance(
queue, federationQueueWeight, policyManager);

Expand All @@ -654,6 +657,11 @@ protected SaveFederationQueuePolicyRequest parsePolicy(String policy) throws Yar
*/
protected int parseXml2PoliciesAndBatchSavePolicies(String policiesXml) {
try {

String policyManager = getConf().get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
checkPolicyManagerValid(policyManager);

List<FederationQueueWeight> federationQueueWeightsList = parsePoliciesByXml(policiesXml);
MemoryPageUtils<FederationQueueWeight> memoryPageUtils = new MemoryPageUtils<>(20);
federationQueueWeightsList.forEach(federationQueueWeight ->
Expand Down Expand Up @@ -726,7 +734,6 @@ protected List<FederationQueueWeight> parsePoliciesByXml(String policiesXml)

String policyManager = getConf().get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);

LOG.debug("Queue: {}, AmrmPolicyWeights: {}, RouterWeight: {}, HeadroomAlpha: {}.",
queueName, amrmWeight, routerWeight, headroomAlpha);

Expand Down

0 comments on commit a5798bf

Please sign in to comment.