Skip to content

Commit

Permalink
YARN-11048. Add tests that shows how to delete config values with Mut…
Browse files Browse the repository at this point in the history
…ation API (apache#3799). Contributed by Szilard Nemeth
szilard-nemeth authored and ashutoshcipher committed Dec 22, 2021
1 parent 4ecd77a commit ec06898
Showing 5 changed files with 301 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -344,7 +344,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
if (Math.abs(childrenPctSum) > PRECISION) {
// It is wrong when percent sum != {0, 1}
throw new IOException(
"Illegal" + " capacity sum of " + childrenPctSum
"Illegal capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName() + " for label="
+ nodeLabel + ". It should be either 0 or 1.0");
} else{
@@ -357,7 +357,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
if ((Math.abs(queueCapacities.getCapacity(nodeLabel))
> PRECISION) && (!allowZeroCapacitySum)) {
throw new IOException(
"Illegal" + " capacity sum of " + childrenPctSum
"Illegal capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName()
+ " for label=" + nodeLabel
+ ". It is set to 0, but parent percent != 0, and "
@@ -372,7 +372,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
queueCapacities.getCapacity(nodeLabel)) <= 0f
&& !allowZeroCapacitySum) {
throw new IOException(
"Illegal" + " capacity sum of " + childrenPctSum
"Illegal capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName() + " for label="
+ nodeLabel + ". queue=" + getQueueName()
+ " has zero capacity, but child"
Original file line number Diff line number Diff line change
@@ -61,12 +61,13 @@ public QueuePath(String fullPath) {
}

/**
* Concatenate queue path parts into one queue path string.
* @param parts Parts of the full queue pathAutoCreatedQueueTemplate
* @return full path of the given queue parts
* Constructor to create Queue path from queue names.
* The provided queue names will be concatenated by dots, giving a full queue path.
* @param parts Parts of queue path
* @return QueuePath object
*/
public static String concatenatePath(String... parts) {
return String.join(DOT, parts);
public static QueuePath createFromQueues(String... parts) {
return new QueuePath(String.join(DOT, parts));
}

/**
Original file line number Diff line number Diff line change
@@ -2656,8 +2656,7 @@ public Response formatSchedulerConfiguration(@Context HttpServletRequest hsr)
initForWritableEndpoints(callerUGI, true);

ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
if (isConfigurationMutable(scheduler)) {
try {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
@@ -2696,8 +2695,7 @@ public synchronized Response validateAndGetSchedulerConfiguration(
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
scheduler).isConfigurationMutable()) {
if (isConfigurationMutable(scheduler)) {
try {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
@@ -2746,51 +2744,61 @@ public synchronized Response validateAndGetSchedulerConfiguration(
public synchronized Response updateSchedulerConfiguration(SchedConfUpdateInfo
mutationInfo, @Context HttpServletRequest hsr)
throws AuthorizationException, InterruptedException {

UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);

ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
scheduler).isConfigurationMutable()) {
if (isConfigurationMutable(scheduler)) {
try {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
MutableConfigurationProvider provider = ((MutableConfScheduler)
scheduler).getMutableConfProvider();
if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
mutationInfo)) {
throw new org.apache.hadoop.security.AccessControlException("User"
+ " is not admin of all modified queues.");
}
LogMutation logMutation = provider.logAndApplyMutation(callerUGI,
mutationInfo);
try {
rm.getRMContext().getRMAdminService().refreshQueues();
} catch (IOException | YarnException e) {
provider.confirmPendingMutation(logMutation, false);
throw e;
}
provider.confirmPendingMutation(logMutation, true);
return null;
}
callerUGI.doAs((PrivilegedExceptionAction<Void>) () -> {
MutableConfigurationProvider provider = ((MutableConfScheduler)
scheduler).getMutableConfProvider();
LogMutation logMutation = applyMutation(provider, callerUGI, mutationInfo);
return refreshQueues(provider, logMutation);
});
} catch (IOException e) {
LOG.error("Exception thrown when modifying configuration.", e);
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
.build();
}
return Response.status(Status.OK).entity("Configuration change " +
"successfully applied.").build();
return Response.status(Status.OK).entity("Configuration change successfully applied.")
.build();
} else {
return Response.status(Status.BAD_REQUEST)
.entity("Configuration change only supported by " +
"MutableConfScheduler.")
.entity(String.format("Configuration change only supported by " +
"%s.", MutableConfScheduler.class.getSimpleName()))
.build();
}
}

private Void refreshQueues(MutableConfigurationProvider provider, LogMutation logMutation)
throws Exception {
try {
rm.getRMContext().getRMAdminService().refreshQueues();
} catch (IOException | YarnException e) {
provider.confirmPendingMutation(logMutation, false);
throw e;
}
provider.confirmPendingMutation(logMutation, true);
return null;
}

private LogMutation applyMutation(MutableConfigurationProvider provider,
UserGroupInformation callerUGI, SchedConfUpdateInfo mutationInfo) throws Exception {
if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
mutationInfo)) {
throw new org.apache.hadoop.security.AccessControlException("User"
+ " is not admin of all modified queues.");
}
return provider.logAndApplyMutation(callerUGI,
mutationInfo);
}

private boolean isConfigurationMutable(ResourceScheduler scheduler) {
return scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
scheduler).isConfigurationMutable();
}

@GET
@Path(RMWSConsts.SCHEDULER_CONF)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
@@ -2803,8 +2811,7 @@ public Response getSchedulerConfiguration(@Context HttpServletRequest hsr)
initForWritableEndpoints(callerUGI, true);

ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
if (isConfigurationMutable(scheduler)) {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
// We load the cached configuration from configuration store,
@@ -2835,8 +2842,7 @@ public Response getSchedulerConfigurationVersion(@Context
initForWritableEndpoints(callerUGI, true);

ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
if (isConfigurationMutable(scheduler)) {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();

Original file line number Diff line number Diff line change
@@ -1098,4 +1098,4 @@ private RMWebServices prepareWebServiceForValidation(
return webService;
}

}
}

Large diffs are not rendered by default.

0 comments on commit ec06898

Please sign in to comment.