Skip to content

Commit

Permalink
YARN-7707. BackPort [GPG] Policy generator framework.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Jul 5, 2023
1 parent 6042d59 commit 0048e13
Show file tree
Hide file tree
Showing 18 changed files with 1,777 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4376,6 +4376,37 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY = FEDERATION_GPG_PREFIX +
"kerberos.principal.hostname";

public static final String FEDERATION_GPG_POLICY_PREFIX =
FEDERATION_GPG_PREFIX + "policy.generator.";

/** The interval at which the policy generator runs, default is one hour. */
public static final String GPG_POLICY_GENERATOR_INTERVAL_MS =
FEDERATION_GPG_POLICY_PREFIX + "interval-ms";
public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL_MS = TimeUnit.HOURS.toMillis(1);

/**
* The configured policy generator class, runs NoOpGlobalPolicy by
* default.
*/
public static final String GPG_GLOBAL_POLICY_CLASS = FEDERATION_GPG_POLICY_PREFIX + "class";
public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS =
"org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator." +
"NoOpGlobalPolicy";

/**
* Whether or not the policy generator is running in read only (won't modify
* policies), default is false.
*/
public static final String GPG_POLICY_GENERATOR_READONLY =
FEDERATION_GPG_POLICY_PREFIX + "readonly";
public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY = false;

/**
* Which sub-clusters the policy generator should blacklist.
*/
public static final String GPG_POLICY_GENERATOR_BLACKLIST =
FEDERATION_GPG_POLICY_PREFIX + "blacklist";

/**
* Connection and Read timeout from the Router to RM.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5408,4 +5408,36 @@
</description>
</property>

<property>
<description>
The interval at which the policy generator runs, default is one hour
</description>
<name>yarn.federation.gpg.policy.generator.interval-ms</name>
<value>1h</value>
</property>

<property>
<description>
The configured policy generator class, runs NoOpGlobalPolicy by default
</description>
<name>yarn.federation.gpg.policy.generator.class</name>
<value>org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.NoOpGlobalPolicy</value>
</property>

<property>
<description>
Whether or not the policy generator is running in read only (won't modify policies), default is false
</description>
<name>yarn.federation.gpg.policy.generator.readonly</name>
<value>false</value>
</property>

<property>
<description>
Which subclusters the gpg should blacklist, default is none
</description>
<name>yarn.federation.gpg.policy.generator.blacklist</name>
<value></value>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
Expand Down Expand Up @@ -311,6 +312,18 @@ public SubClusterPolicyConfiguration getPolicyConfiguration(final String queue)
}
}

/**
* Set a policy configuration into the state store.
*
* @param policyConf the policy configuration to set
* @throws YarnException if the request is invalid/fails
*/
public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf)
throws YarnException {
stateStore.setPolicyConfiguration(
SetSubClusterPolicyConfigurationRequest.newInstance(policyConf));
}

/**
* Get the policies that is represented as
* {@link SubClusterPolicyConfiguration} for all currently active queues in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
Expand All @@ -72,6 +78,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
Expand Down Expand Up @@ -106,6 +118,12 @@
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/schedulerInfo1.json</exclude>
<exclude>src/test/resources/schedulerInfo2.json</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ public interface GPGContext {
FederationStateStoreFacade getStateStoreFacade();

void setStateStoreFacade(FederationStateStoreFacade facade);

GPGPolicyFacade getPolicyFacade();

void setPolicyFacade(GPGPolicyFacade facade);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class GPGContextImpl implements GPGContext {

private FederationStateStoreFacade facade;
private GPGPolicyFacade policyFacade;

@Override
public FederationStateStoreFacade getStateStoreFacade() {
Expand All @@ -38,4 +39,13 @@ public void setStateStoreFacade(
this.facade = federationStateStoreFacade;
}

@Override
public GPGPolicyFacade getPolicyFacade(){
return policyFacade;
}

@Override
public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
policyFacade = gpgPolicyfacade;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.yarn.server.globalpolicygenerator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* A utility class for the GPG Policy Generator to read and write policies
* into the FederationStateStore. Policy specific logic is abstracted away in
* this class, so the PolicyGenerator can avoid dealing with policy
* construction, reinitialization, and serialization.
*
* There are only two exposed methods:
*
* {@link #getPolicyManager(String)}
* Gets the PolicyManager via queue name. Null if there is no policy
* configured for the specified queue. The PolicyManager can be used to
* extract the {@link FederationRouterPolicy} and
* {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters
*
* {@link #setPolicyManager(FederationPolicyManager)}
* Sets the PolicyManager. If the policy configuration is the same, no change
* occurs. Otherwise, the internal cache is updated and the new configuration
* is written into the FederationStateStore
*
* This class assumes that the GPG is the only service
* writing policies. Thus, the only FederationStateStore reads occur the first
* time a queue policy is retrieved - after that, the GPG only writes to the
* FederationStateStore.
*
* The class uses a PolicyManager cache and a SubClusterPolicyConfiguration
* cache. The primary use for these caches are to serve reads, and to
* identify when the PolicyGenerator has actually changed the policy
* so unnecessary FederationStateStore policy writes can be avoided.
*/

public class GPGPolicyFacade {

private static final Logger LOG =
LoggerFactory.getLogger(GPGPolicyFacade.class);

private FederationStateStoreFacade stateStore;

private Map<String, FederationPolicyManager> policyManagerMap;
private Map<String, SubClusterPolicyConfiguration> policyConfMap;

private boolean readOnly;

public GPGPolicyFacade(FederationStateStoreFacade stateStore,
Configuration conf) {
this.stateStore = stateStore;
this.policyManagerMap = new HashMap<>();
this.policyConfMap = new HashMap<>();
this.readOnly =
conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY,
YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY);
}

/**
* Provides a utility for the policy generator to read the policy manager
* from the FederationStateStore. Because the policy generator should be the
* only component updating the policy, this implementation does not use the
* reinitialization feature.
*
* @param queueName the name of the queue we want the policy manager for.
* @return the policy manager responsible for the queue policy.
* @throws YarnException exceptions from yarn servers.
*/
public FederationPolicyManager getPolicyManager(String queueName)
throws YarnException {
FederationPolicyManager policyManager = policyManagerMap.get(queueName);
// If we don't have the policy manager cached, pull configuration
// from the FederationStateStore to create and cache it
if (policyManager == null) {
try {
// If we don't have the configuration cached, pull it
// from the stateStore
SubClusterPolicyConfiguration conf = policyConfMap.get(queueName);
if (conf == null) {
conf = stateStore.getPolicyConfiguration(queueName);
}
// If configuration is still null, it does not exist in the
// FederationStateStore
if (conf == null) {
LOG.info("Read null policy for queue {}", queueName);
return null;
}
policyManager =
FederationPolicyUtils.instantiatePolicyManager(conf.getType());
policyManager.setQueue(queueName);

// TODO there is currently no way to cleanly deserialize a policy
// manager sub type from just the configuration
if (policyManager instanceof WeightedLocalityPolicyManager) {
WeightedPolicyInfo wpinfo =
WeightedPolicyInfo.fromByteBuffer(conf.getParams());
WeightedLocalityPolicyManager wlpmanager =
(WeightedLocalityPolicyManager) policyManager;
LOG.info("Updating policy for queue {} to configured weights router: "
+ "{}, amrmproxy: {}", queueName,
wpinfo.getRouterPolicyWeights(),
wpinfo.getAMRMPolicyWeights());
wlpmanager.setWeightedPolicyInfo(wpinfo);
} else {
LOG.warn("Warning: FederationPolicyManager of unsupported type {}, "
+ "initialization may be incomplete ", policyManager.getClass());
}

policyManagerMap.put(queueName, policyManager);
policyConfMap.put(queueName, conf);
} catch (YarnException e) {
LOG.error("Error reading SubClusterPolicyConfiguration from state "
+ "store for queue: {}", queueName);
throw e;
}
}
return policyManager;
}

/**
* Provides a utility for the policy generator to write a policy manager
* into the FederationStateStore. The facade keeps a cache and will only write
* into the FederationStateStore if the policy configuration has changed.
*
* @param policyManager The policy manager we want to update into the state
* store. It contains policy information as well as
* the queue name we will update for.
* @throws YarnException exceptions from yarn servers.
*/
public void setPolicyManager(FederationPolicyManager policyManager)
throws YarnException {
if (policyManager == null) {
LOG.warn("Attempting to set null policy manager");
return;
}
// Extract the configuration from the policy manager
String queue = policyManager.getQueue();
SubClusterPolicyConfiguration conf;
try {
conf = policyManager.serializeConf();
} catch (FederationPolicyInitializationException e) {
LOG.warn("Error serializing policy for queue {}", queue);
throw e;
}
if (conf == null) {
// State store does not currently support setting a policy back to null
// because it reads the queue name to set from the policy!
LOG.warn("Skip setting policy to null for queue {} into state store",
queue);
return;
}
// Compare with configuration cache, if different, write the conf into
// store and update our conf and manager cache
if (!confCacheEqual(queue, conf)) {
try {
if (readOnly) {
LOG.info("[read-only] Skipping policy update for queue {}", queue);
return;
}
LOG.info("Updating policy for queue {} into state store", queue);
stateStore.setPolicyConfiguration(conf);
policyConfMap.put(queue, conf);
policyManagerMap.put(queue, policyManager);
} catch (YarnException e) {
LOG.warn("Error writing SubClusterPolicyConfiguration to state "
+ "store for queue: {}", queue);
throw e;
}
} else {
LOG.info("Setting unchanged policy - state store write skipped");
}
}

/**
* @param queue the queue to check the cached policy configuration for
* @param conf the new policy configuration
* @return whether or not the conf is equal to the cached conf
*/
private boolean confCacheEqual(String queue,
SubClusterPolicyConfiguration conf) {
SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue);
if (conf == null && cachedConf == null) {
return true;
} else if (conf != null && cachedConf != null) {
if (conf.equals(cachedConf)) {
return true;
}
}
return false;
}
}
Loading

0 comments on commit 0048e13

Please sign in to comment.