Skip to content

Commit

Permalink
YARN-7077. Fix CheckStyle.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Jun 15, 2023
1 parent 4099297 commit 46a61d9
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public GPGPolicyFacade(FederationStateStoreFacade stateStore,
*
* @param queueName the name of the queue we want the policy manager for.
* @return the policy manager responsible for the queue policy.
* @throws YarnException exceptions from yarn servers.
*/
public FederationPolicyManager getPolicyManager(String queueName)
throws YarnException {
Expand Down Expand Up @@ -155,6 +156,7 @@ public FederationPolicyManager getPolicyManager(String queueName)
* @param policyManager The policy manager we want to update into the state
* store. It contains policy information as well as
* the queue name we will update for.
* @throws YarnException exceptions from yarn servers.
*/
public void setPolicyManager(FederationPolicyManager policyManager)
throws YarnException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

import com.sun.jersey.api.client.Client;
Expand All @@ -45,10 +44,15 @@ private GPGUtils() {
}

/**
* Performs an invocation of the the remote RMWebService.
* Performs an invocation of the remote RMWebService.
*
* @param <T> Generic T.
* @param webAddr WebAddress.
* @param path url path.
* @param returnType return type.
* @return response entity.
*/
public static <T> T invokeRMWebService(Configuration conf, String webAddr,
String path, final Class<T> returnType) {
public static <T> T invokeRMWebService(String webAddr, String path, final Class<T> returnType) {
Client client = Client.create();
T obj = null;

Expand All @@ -66,6 +70,9 @@ public static <T> T invokeRMWebService(Configuration conf, String webAddr,

/**
* Creates a uniform weighting of 1.0 for each sub cluster.
*
* @param ids subClusterId set
* @return weight of subCluster.
*/
public static Map<SubClusterIdInfo, Float> createUniformWeights(
Set<SubClusterId> ids) {
Expand All @@ -76,5 +83,4 @@ public static Map<SubClusterIdInfo, Float> createUniformWeights(
}
return weights;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public Configuration getConf() {
* Delegating this responsibility to the PolicyGenerator enables us to avoid
* duplicate calls to the same * endpoints as the GlobalPolicy is invoked
* once per queue.
*
* @return a map of the object type and RM path.
*/
protected Map<Class, String> registerPaths() {
// Default register nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ public class PolicyGenerator implements Runnable, Configurable {

// Global policy instance
@VisibleForTesting
protected GlobalPolicy policy;
private GlobalPolicy policy;

/**
* The PolicyGenerator periodically reads SubCluster load and updates
* policies into the FederationStateStore.
*
* @param conf Configuration.
* @param context GPG Context.
*/
public PolicyGenerator(Configuration conf, GPGContext context) {
setConf(conf);
Expand Down Expand Up @@ -157,7 +160,7 @@ protected Map<SubClusterId, Map<Class, Object>> getInfos(
clusterInfo.put(sci.getSubClusterId(), new HashMap<Class, Object>());
}
Object ret = GPGUtils
.invokeRMWebService(conf, sci.getRMWebServiceAddress(),
.invokeRMWebService(sci.getRMWebServiceAddress(),
e.getValue(), e.getKey());
clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
}
Expand All @@ -178,7 +181,7 @@ protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo(
new HashMap<>();
for (SubClusterInfo sci : activeSubClusters.values()) {
SchedulerTypeInfo sti = GPGUtils
.invokeRMWebService(conf, sci.getRMWebServiceAddress(),
.invokeRMWebService(sci.getRMWebServiceAddress(),
RMWSConsts.SCHEDULER, SchedulerTypeInfo.class);
if(sti != null){
schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
Expand Down Expand Up @@ -258,4 +261,11 @@ private <K, V> void addOrAppend(Map<K, Set<V>> multimap, K key, V value) {
multimap.get(key).add(value);
}

public GlobalPolicy getPolicy() {
return policy;
}

public void setPolicy(GlobalPolicy policy) {
this.policy = policy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ private <T> T readJSON(String pathname, Class<T> classy)
@Test
public void testPolicyGenerator() throws YarnException {
policyGenerator = new TestablePolicyGenerator();
policyGenerator.policy = mock(GlobalPolicy.class);
policyGenerator.setPolicy(mock(GlobalPolicy.class));
policyGenerator.run();
verify(policyGenerator.policy, times(1))
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy("default", clusterInfos, null);
verify(policyGenerator.policy, times(1))
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy("default2", clusterInfos, null);
}

Expand All @@ -188,11 +188,11 @@ public void testBlacklist() throws YarnException {
new HashMap<>(clusterInfos);
blacklistedCMI.remove(subClusterIds.get(0));
policyGenerator = new TestablePolicyGenerator();
policyGenerator.policy = mock(GlobalPolicy.class);
policyGenerator.setPolicy(mock(GlobalPolicy.class));
policyGenerator.run();
verify(policyGenerator.policy, times(1))
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy("default", blacklistedCMI, null);
verify(policyGenerator.policy, times(0))
verify(policyGenerator.getPolicy(), times(0))
.updatePolicy("default", clusterInfos, null);
}

Expand All @@ -206,11 +206,11 @@ public void testBlacklistTwo() throws YarnException {
blacklistedCMI.remove(subClusterIds.get(0));
blacklistedCMI.remove(subClusterIds.get(1));
policyGenerator = new TestablePolicyGenerator();
policyGenerator.policy = mock(GlobalPolicy.class);
policyGenerator.setPolicy(mock(GlobalPolicy.class));
policyGenerator.run();
verify(policyGenerator.policy, times(1))
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy("default", blacklistedCMI, null);
verify(policyGenerator.policy, times(0))
verify(policyGenerator.getPolicy(), times(0))
.updatePolicy("default", clusterInfos, null);
}

Expand All @@ -230,12 +230,12 @@ public void testExistingPolicy() throws YarnException {
GetSubClusterPolicyConfigurationResponse.newInstance(testConf));

policyGenerator = new TestablePolicyGenerator();
policyGenerator.policy = mock(GlobalPolicy.class);
policyGenerator.setPolicy(mock(GlobalPolicy.class));
policyGenerator.run();

ArgumentCaptor<FederationPolicyManager> argCaptor =
ArgumentCaptor.forClass(FederationPolicyManager.class);
verify(policyGenerator.policy, times(1))
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy(eq("default"), eq(clusterInfos), argCaptor.capture());
assertEquals(argCaptor.getValue().getClass(), manager.getClass());
assertEquals(argCaptor.getValue().serializeConf(), manager.serializeConf());
Expand Down Expand Up @@ -294,7 +294,7 @@ public void testCallRM() {

String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf);
SchedulerTypeInfo sti = GPGUtils
.invokeRMWebService(conf, rmAddress, RMWSConsts.SCHEDULER,
.invokeRMWebService(rmAddress, RMWSConsts.SCHEDULER,
SchedulerTypeInfo.class);

Assert.assertNotNull(sti);
Expand Down

0 comments on commit 46a61d9

Please sign in to comment.