Skip to content

Commit

Permalink
YARN-11614. [Federation] Add Federation PolicyManager Validation Rule…
Browse files Browse the repository at this point in the history
…s. (apache#6271) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
slfan1989 authored and jiajunmao committed Feb 6, 2024
1 parent d66d960 commit 4ccb033
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -822,4 +822,12 @@ public static boolean isRouterWebProxyEnable(Configuration conf) {
return conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE);
}

public static boolean checkPolicyManagerValid(String policyManager,
List<String> supportWeightList) throws YarnException {
if (supportWeightList.contains(policyManager)) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedHomePolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
Expand All @@ -92,6 +95,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
Expand All @@ -107,6 +111,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static org.apache.hadoop.yarn.server.router.RouterServerUtil.checkPolicyManagerValid;

public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor {

private static final Logger LOG =
Expand All @@ -115,6 +121,10 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
private static final String COMMA = ",";
private static final String COLON = ":";

private static final List<String> SUPPORT_WEIGHT_MANAGERS =
new ArrayList<>(Arrays.asList(WeightedLocalityPolicyManager.class.getName(),
PriorityBroadcastPolicyManager.class.getName(), WeightedHomePolicyManager.class.getName()));

private Map<SubClusterId, ResourceManagerAdministrationProtocol> adminRMProxies;
private FederationStateStoreFacade federationFacade;
private final Clock clock = new MonotonicClock();
Expand Down Expand Up @@ -924,6 +934,13 @@ public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
RouterServerUtil.logAndThrowException("Missing Queue information.", null);
}

String policyManagerClassName = request.getPolicyManagerClassName();
if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) {
routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
RouterServerUtil.logAndThrowException(policyManagerClassName +
" does not support the use of queue weights.", null);
}

String amRmWeight = federationQueueWeight.getAmrmWeight();
FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);

Expand All @@ -935,9 +952,6 @@ public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(

try {
long startTime = clock.getTime();
// Step1, get parameters.
String policyManagerClassName = request.getPolicyManagerClassName();


// Step2, parse amRMPolicyWeights.
Map<SubClusterIdInfo, Float> amRMPolicyWeights = getSubClusterWeightMap(amRmWeight);
Expand Down Expand Up @@ -1346,6 +1360,12 @@ private void saveFederationQueuePolicy(FederationQueueWeight federationQueueWeig
RouterServerUtil.logAndThrowException("Missing PolicyManagerClassName information.", null);
}

if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) {
routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
RouterServerUtil.logAndThrowException(policyManagerClassName +
"does not support the use of queue weights.", null);
}

String amRmWeight = federationQueueWeight.getAmrmWeight();
FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,15 +643,27 @@ public void testSaveFederationQueuePolicyErrorRequest() throws Exception {
LambdaTestUtils.intercept(YarnException.class, "Missing Queue information.",
() -> interceptor.saveFederationQueuePolicy(request));

// routerWeight / amrmWeight
// The sum of the routerWeight is not equal to 1.
// PolicyManager needs to support weight
FederationQueueWeight federationQueueWeight2 = FederationQueueWeight.newInstance(
"SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
SaveFederationQueuePolicyRequest request2 =
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2, "-");
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2,
"TestPolicyManager");
LambdaTestUtils.intercept(YarnException.class,
"The sum of ratios for all subClusters must be equal to 1.",
"TestPolicyManager does not support the use of queue weights.",
() -> interceptor.saveFederationQueuePolicy(request2));

// routerWeight / amrmWeight
// The sum of the routerWeight is not equal to 1.
String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName();
FederationQueueWeight federationQueueWeight3 = FederationQueueWeight.newInstance(
"SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
SaveFederationQueuePolicyRequest request3 =
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight3,
policyTypeName);
LambdaTestUtils.intercept(YarnException.class,
"The sum of ratios for all subClusters must be equal to 1.",
() -> interceptor.saveFederationQueuePolicy(request3));
}

@Test
Expand Down

0 comments on commit 4ccb033

Please sign in to comment.