Skip to content

Commit

Permalink
YARN-11224. [Federation] Add getAppQueue, updateAppQueue REST APIs fo…
Browse files Browse the repository at this point in the history
…r Router. (#4747)
  • Loading branch information
slfan1989 authored Aug 17, 2022
1 parent e40b3a3 commit cd72f7e
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1312,14 +1312,52 @@ public Response updateApplicationPriority(AppPriority targetPriority,
@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppQueue(hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get queue by appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppQueue Failed.", e);
}

return null;
}

@Override
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

if (targetQueue == null) {
throw new IllegalArgumentException("Parameter error, the targetQueue is null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.updateAppQueue(targetQueue, hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to update app queue by appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("updateAppQueue Failed.", e);
}

return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import javax.ws.rs.core.Response.Status;

import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerId;
Expand Down Expand Up @@ -77,6 +77,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand Down Expand Up @@ -621,4 +622,43 @@ public AppPriority getAppPriority(HttpServletRequest hsr, String appId)

return new AppPriority(priority.getPriority());
}

@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
String queue = applicationMap.get(applicationId).getQueue();
return new AppQueue(queue);
}

@Override
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, String appId)
throws AuthorizationException, YarnException, InterruptedException, IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
if (targetQueue == null || StringUtils.isBlank(targetQueue.getQueue())) {
return Response.status(Status.BAD_REQUEST).build();
}

ApplicationReport appReport = applicationMap.get(applicationId);
String originalQueue = appReport.getQueue();
appReport.setQueue(targetQueue.getQueue());
applicationMap.put(applicationId, appReport);
LOG.info("Update applicationId = {} from originalQueue = {} to targetQueue = {}.",
appId, originalQueue, targetQueue);

AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue());
return Response.status(Status.OK).entity(targetAppQueue).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand Down Expand Up @@ -900,4 +901,52 @@ public void testGetAppPriority() throws IOException, InterruptedException,
Assert.assertNotNull(appPriority);
Assert.assertEquals(priority, appPriority.getPriority());
}

@Test
public void testUpdateAppQueue() throws IOException, InterruptedException,
YarnException {

String oldQueue = "oldQueue";
String newQueue = "newQueue";

// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setQueue(oldQueue);

// Submit the application
Assert.assertNotNull(interceptor.submitApplication(context, null));

// Set New Queue for application
Response response = interceptor.updateAppQueue(new AppQueue(newQueue),
null, appId.toString());

Assert.assertNotNull(response);
AppQueue appQueue = (AppQueue) response.getEntity();
Assert.assertEquals(newQueue, appQueue.getQueue());

// Get AppQueue by application
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
Assert.assertNotNull(queue);
Assert.assertEquals(newQueue, queue.getQueue());
}

@Test
public void testGetAppQueue() throws IOException, InterruptedException, YarnException {
String queueName = "queueName";

// Submit application to multiSubCluster
ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
context.setApplicationId(appId.toString());
context.setQueue(queueName);

Assert.assertNotNull(interceptor.submitApplication(context, null));

// Get Queue by application
AppQueue queue = interceptor.getAppQueue(null, appId.toString());
Assert.assertNotNull(queue);
Assert.assertEquals(queueName, queue.getQueue());
}
}

0 comments on commit cd72f7e

Please sign in to comment.