Skip to content

Commit

Permalink
YARN-11069. Dynamic Queue ACL handling in Legacy and Flexible Auto Cr…
Browse files Browse the repository at this point in the history
…eated Queues. Contributed by Tamas Domok
  • Loading branch information
tomicooler authored and brumi1024 committed Mar 25, 2022
1 parent 08a77a7 commit da09d68
Show file tree
Hide file tree
Showing 16 changed files with 4,045 additions and 365 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public PrivilegedEntity(EntityType type, String name) {
this.name = name;
}

public PrivilegedEntity(String name) {
this.type = EntityType.QUEUE;
this.name = name;
}

public EntityType getType() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -473,32 +477,33 @@ private RMAppImpl createAndPopulateNewRMApp(
if (scheduler instanceof CapacityScheduler) {
String queueName = placementContext == null ?
submissionContext.getQueue() : placementContext.getFullQueuePath();

String appName = submissionContext.getApplicationName();
CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);

if (csqueue == null && placementContext != null) {
//could be an auto created queue through queue mapping. Validate
// parent queue exists and has valid acls
String parentQueueName = placementContext.getParentQueue();
csqueue = ((CapacityScheduler) scheduler).getQueue(parentQueueName);
CapacityScheduler cs = (CapacityScheduler) scheduler;
CSQueue csqueue = cs.getQueue(queueName);
PrivilegedEntity privilegedEntity = new PrivilegedEntity(
csqueue == null ? queueName : csqueue.getQueuePath());

YarnAuthorizationProvider dynamicAuthorizer = null;
if (csqueue == null) {
List<Permission> permissions =
cs.getCapacitySchedulerQueueManager().getPermissionsForDynamicQueue(
new QueuePath(queueName), cs.getConfiguration());
if (!permissions.isEmpty()) {
dynamicAuthorizer = new ConfiguredYarnAuthorizer();
dynamicAuthorizer.setPermission(permissions, userUgi);
}
}

if (csqueue != null
&& !authorizer.checkPermission(
new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
applicationId.toString(), appName, Server.getRemoteAddress(),
null))
&& !authorizer.checkPermission(
new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
applicationId.toString(), appName, Server.getRemoteAddress(),
null))) {
throw RPCUtil.getRemoteException(new AccessControlException(
"User " + user + " does not have permission to submit "
+ applicationId + " to queue "
+ submissionContext.getQueue()));
if (csqueue != null || dynamicAuthorizer != null) {
String appName = submissionContext.getApplicationName();
if (!checkPermission(createAccessRequest(privilegedEntity, userUgi, applicationId,
appName, QueueACL.SUBMIT_APPLICATIONS), dynamicAuthorizer) &&
!checkPermission(createAccessRequest(privilegedEntity, userUgi, applicationId,
appName, QueueACL.ADMINISTER_QUEUE), dynamicAuthorizer)) {
throw RPCUtil.getRemoteException(new AccessControlException(
"User " + user + " does not have permission to submit "
+ applicationId + " to queue "
+ submissionContext.getQueue()));
}
}
}
if (scheduler instanceof FairScheduler) {
Expand Down Expand Up @@ -572,6 +577,23 @@ private RMAppImpl createAndPopulateNewRMApp(
return application;
}

private boolean checkPermission(AccessRequest accessRequest,
YarnAuthorizationProvider dynamicAuthorizer) {
return authorizer.checkPermission(accessRequest) ||
(dynamicAuthorizer != null && dynamicAuthorizer.checkPermission(accessRequest));
}

private static AccessRequest createAccessRequest(PrivilegedEntity privilegedEntity,
UserGroupInformation userUgi,
ApplicationId applicationId,
String appName,
QueueACL submitApplications) {
return new AccessRequest(privilegedEntity, userUgi,
SchedulerUtils.toAccessType(submitApplications),
applicationId.toString(), appName, Server.getRemoteAddress(),
null);
}

private List<ResourceRequest> validateAndCreateResourceRequest(
ApplicationSubmissionContext submissionContext, boolean isRecovery)
throws InvalidResourceRequestException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,11 @@ protected void setupQueueConfigs(Resource clusterResource) throws
writeLock.lock();
try {
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
this.acls = configuration.getAcls(getQueuePath());

if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) {
setDynamicQueueProperties();
setDynamicQueueACLProperties();
}

// Collect and set the Node label configuration
Expand All @@ -369,8 +372,6 @@ protected void setupQueueConfigs(Resource clusterResource) throws

authorizer = YarnAuthorizationProvider.getInstance(configuration);

this.acls = configuration.getAcls(getQueuePath());

this.userWeights = getUserWeightsFromHierarchy();

this.reservationsContinueLooking =
Expand Down Expand Up @@ -426,6 +427,9 @@ protected void setDynamicQueueProperties() {
}
}

protected void setDynamicQueueACLProperties() {
}

private UserWeights getUserWeightsFromHierarchy() {
UserWeights unionInheritedWeights = UserWeights.createEmpty();
CSQueue parentQ = parent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedLeafQueue;

public class AbstractLeafQueue extends AbstractCSQueue {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractLeafQueue.class);
Expand Down Expand Up @@ -1697,6 +1699,19 @@ protected void setDynamicQueueProperties() {
super.setDynamicQueueProperties();
}

@Override
protected void setDynamicQueueACLProperties() {
super.setDynamicQueueACLProperties();

if (parent instanceof AbstractManagedParentQueue) {
acls.putAll(queueContext.getConfiguration().getACLsForLegacyAutoCreatedLeafQueue(
parent.getQueuePath()));
} else if (parent instanceof ParentQueue) {
acls.putAll(getACLsForFlexibleAutoCreatedLeafQueue(
((ParentQueue) parent).getAutoCreatedQueueTemplate()));
}
}

private void updateSchedulerHealthForCompletedContainer(
RMContainer rmContainer, ContainerStatus containerStatus) {
// Update SchedulerHealth for released / preempted container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,90 @@ private static String getAclKey(AccessType acl) {
return "acl_" + StringUtils.toLowerCase(acl.toString());
}

/**
* Creates a mapping of queue ACLs for a Legacy Auto Created Leaf Queue.
*
* @param parentQueuePath the parent's queue path
* @return A mapping of the queue ACLs.
*/
public Map<AccessType, AccessControlList> getACLsForLegacyAutoCreatedLeafQueue(
String parentQueuePath) {
final String prefix =
getQueuePrefix(getAutoCreatedQueueTemplateConfPrefix(
parentQueuePath));

Map<String, String> properties = new HashMap<>();
for (QueueACL acl : QueueACL.values()) {
final String key = getAclKey(acl);
final String value = get(prefix + key);
if (value != null) {
properties.put(key, get(prefix + key));
}
}
return getACLsFromProperties(properties);
}

/**
* Creates a mapping of queue ACLs for a Flexible Auto Created Parent Queue.
* The .parent-template is preferred to .template ACLs.
*
* @param aqc The AQC templates to use.
* @return A mapping of the queue ACLs.
*/
public static Map<AccessType, AccessControlList> getACLsForFlexibleAutoCreatedParentQueue(
AutoCreatedQueueTemplate aqc) {
return getACLsFromProperties(aqc.getParentOnlyProperties(),
aqc.getTemplateProperties());
}

/**
* Creates a mapping of queue ACLs for a Flexible Auto Created Leaf Queue.
* The .leaf-template is preferred to .template ACLs.
*
* @param aqc The AQC templates to use.
* @return A mapping of the queue ACLs.
*/
public static Map<AccessType, AccessControlList> getACLsForFlexibleAutoCreatedLeafQueue(
AutoCreatedQueueTemplate aqc) {
return getACLsFromProperties(aqc.getLeafOnlyProperties(),
aqc.getTemplateProperties());
}

/**
* Transforms the string ACL properties to AccessType and AccessControlList mapping.
*
* @param properties The ACL properties.
* @return A mapping of the queue ACLs.
*/
private static Map<AccessType, AccessControlList> getACLsFromProperties(
Map<String, String> properties) {
return getACLsFromProperties(properties, new HashMap<>());
}

/**
* Transforms the string ACL properties to AccessType and AccessControlList mapping.
*
* @param properties The ACL properties.
* @param fallbackProperties The fallback properties to use.
* @return A mapping of the queue ACLs.
*/
private static Map<AccessType, AccessControlList> getACLsFromProperties(
Map<String, String> properties, Map<String, String> fallbackProperties) {
Map<AccessType, AccessControlList> acls = new HashMap<>();
for (QueueACL acl : QueueACL.values()) {
String aclStr = properties.get(getAclKey(acl));
if (aclStr == null) {
aclStr = fallbackProperties.get(getAclKey(acl));
if (aclStr == null) {
aclStr = NONE_ACL;
}
}
acls.put(SchedulerUtils.toAccessType(acl),
new AccessControlList(aclStr));
}
return acls;
}

@Override
public Map<ReservationACL, AccessControlList> getReservationAcls(String
queue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand All @@ -52,6 +52,9 @@

import org.apache.hadoop.classification.VisibleForTesting;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedLeafQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedParentQueue;

/**
*
* Context of the Queues in Capacity Scheduler.
Expand Down Expand Up @@ -596,6 +599,44 @@ public List<String> determineMissingParents(
return parentsToCreate;
}

public List<Permission> getPermissionsForDynamicQueue(
QueuePath queuePath,
CapacitySchedulerConfiguration csConf) {
List<Permission> permissions = new ArrayList<>();

try {
PrivilegedEntity privilegedEntity = new PrivilegedEntity(queuePath.getFullPath());

CSQueue parentQueue = getQueueByFullName(queuePath.getParent());
if (parentQueue == null) {
for (String missingParent : determineMissingParents(queuePath)) {
String parentOfMissingParent = new QueuePath(missingParent).getParent();
permissions.add(new Permission(new PrivilegedEntity(missingParent),
getACLsForFlexibleAutoCreatedParentQueue(
new AutoCreatedQueueTemplate(csConf,
new QueuePath(parentOfMissingParent)))));
}
}

if (parentQueue instanceof AbstractManagedParentQueue) {
// An AbstractManagedParentQueue must have been found for Legacy AQC
permissions.add(new Permission(privilegedEntity,
csConf.getACLsForLegacyAutoCreatedLeafQueue(queuePath.getParent())));
} else {
// Every other case must be a Flexible Leaf Queue
permissions.add(new Permission(privilegedEntity,
getACLsForFlexibleAutoCreatedLeafQueue(
new AutoCreatedQueueTemplate(csConf, new QueuePath(queuePath.getParent())))));
}

} catch (SchedulerDynamicEditException e) {
LOG.debug("Could not determine missing parents for queue {} reason {}",
queuePath.getFullPath(), e.getMessage());
}

return permissions;
}

/**
* Get {@code ConfiguredNodeLabels} which contains the configured node labels
* for all queues.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedParentQueue;

@Private
@Evolving
public class ParentQueue extends AbstractCSQueue {
Expand Down Expand Up @@ -188,6 +190,16 @@ protected void setupQueueConfigs(Resource clusterResource)
}
}

@Override
protected void setDynamicQueueACLProperties() {
super.setDynamicQueueACLProperties();

if (parent instanceof ParentQueue) {
acls.putAll(getACLsForFlexibleAutoCreatedParentQueue(
((ParentQueue) parent).getAutoCreatedQueueTemplate()));
}
}

private static float PRECISION = 0.0005f; // 0.05% precision

// Check weight configuration, throw exception when configuration is invalid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) {

CapacitySchedulerConfiguration conf = cs.getConfiguration();
queueAcls = new QueueAclsInfo();
queueAcls.addAll(getSortedQueueAclInfoList(queueName, conf));
queueAcls.addAll(getSortedQueueAclInfoList(parent, queueName, conf));

queuePriority = parent.getPriority().getPriority();
if (parent instanceof ParentQueue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public class CapacitySchedulerQueueInfo {

CapacitySchedulerConfiguration conf = cs.getConfiguration();
queueAcls = new QueueAclsInfo();
queueAcls.addAll(getSortedQueueAclInfoList(queuePath, conf));
queueAcls.addAll(getSortedQueueAclInfoList(q, queuePath, conf));

queuePriority = q.getPriority().getPriority();
if (q instanceof ParentQueue) {
Expand All @@ -183,11 +183,11 @@ public class CapacitySchedulerQueueInfo {
leafQueueTemplate = new LeafQueueTemplateInfo(conf, queuePath);
}

public static ArrayList<QueueAclInfo> getSortedQueueAclInfoList(String queuePath,
CapacitySchedulerConfiguration conf) {
public static ArrayList<QueueAclInfo> getSortedQueueAclInfoList(
CSQueue queue, String queuePath, CapacitySchedulerConfiguration conf) {
ArrayList<QueueAclInfo> queueAclsInfo = new ArrayList<>();
for (Map.Entry<AccessType, AccessControlList> e : conf
.getAcls(queuePath).entrySet()) {
for (Map.Entry<AccessType, AccessControlList> e :
((AbstractCSQueue) queue).getACLs().entrySet()) {
QueueAclInfo queueAcl = new QueueAclInfo(e.getKey().toString(),
e.getValue().getAclString());
queueAclsInfo.add(queueAcl);
Expand Down
Loading

0 comments on commit da09d68

Please sign in to comment.