Skip to content

Commit

Permalink
YARN-11227. [Federation] Add getAppTimeout, getAppTimeouts, updateApp…
Browse files Browse the repository at this point in the history
…licationTimeout REST APIs for Router. (#4715)
  • Loading branch information
slfan1989 authored Aug 10, 2022
1 parent ffa9ed9 commit 133e8aa
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;

import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;

/**
Expand All @@ -45,6 +46,12 @@ public AppTimeoutInfo() {
remainingTimeInSec = -1;
}

public AppTimeoutInfo(ApplicationTimeout applicationTimeout) {
this.expiryTime = applicationTimeout.getExpiryTime();
this.remainingTimeInSec = applicationTimeout.getRemainingTime();
this.timeoutType = applicationTimeout.getTimeoutType();
}

public ApplicationTimeoutType getTimeoutType() {
return timeoutType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1342,41 +1342,96 @@ public Response listReservation(String queue, String reservationId,
@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
String type) throws AuthorizationException {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

if (type == null || type.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the type is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppTimeout(hsr, appId, type);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the getAppTimeout appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppTimeout Failed.", e);
}
return null;
}

@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppTimeouts(hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the getAppTimeouts appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getAppTimeouts Failed.", e);
}
return null;
}

@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
HttpServletRequest hsr, String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
throw new NotImplementedException("Code is not implemented");

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

if (appTimeout == null) {
throw new IllegalArgumentException("Parameter error, the appTimeout is null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.updateApplicationTimeout(appTimeout, hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the updateApplicationTimeout appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("updateApplicationTimeout Failed.", e);
}
return null;
}

@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {

if (appId == null || appId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}

try {
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);

DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
return interceptor.getAppAttempts(hsr, appId);
} catch (IllegalArgumentException e) {
RouterServerUtil.logAndThrowRunTimeException(e,
"Unable to get the AppAttempt appId: %s.", appId);
} catch (YarnException e) {
RouterServerUtil.logAndThrowRunTimeException("getContainer Failed.", e);
RouterServerUtil.logAndThrowRunTimeException("getAppAttempts Failed.", e);
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
Expand Down Expand Up @@ -54,6 +53,8 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
Expand All @@ -73,6 +74,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
Expand All @@ -93,7 +96,7 @@ public class MockDefaultRequestInterceptorREST
// This property allows us to write tests for specific scenario as YARN RM
// down e.g. network issue, failover.
private boolean isRunning = true;
private HashSet<ApplicationId> applicationMap = new HashSet<>();
private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>();
public static final String APP_STATE_RUNNING = "RUNNING";

private void validateRunning() throws ConnectException {
Expand Down Expand Up @@ -123,7 +126,22 @@ public Response submitApplication(ApplicationSubmissionContextInfo newApp,

ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
LOG.info("Application submitted: " + appId);
applicationMap.add(appId);

// Initialize appReport
ApplicationReport appReport = ApplicationReport.newInstance(
appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0,
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0, null, null, null,
false, Priority.newInstance(newApp.getPriority()), null, null);

// Initialize appTimeoutsMap
HashMap<ApplicationTimeoutType, ApplicationTimeout> appTimeoutsMap = new HashMap<>();
ApplicationTimeoutType timeoutType = ApplicationTimeoutType.LIFETIME;
ApplicationTimeout appTimeOut =
ApplicationTimeout.newInstance(ApplicationTimeoutType.LIFETIME, "UNLIMITED", 10);
appTimeoutsMap.put(timeoutType, appTimeOut);
appReport.setApplicationTimeouts(appTimeoutsMap);

applicationMap.put(appId, appReport);
return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, "")
.entity(getSubClusterId()).build();
}
Expand All @@ -136,7 +154,7 @@ public AppInfo getApp(HttpServletRequest hsr, String appId,
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

Expand Down Expand Up @@ -171,7 +189,7 @@ public Response updateAppState(AppState targetState, HttpServletRequest hsr,
validateRunning();

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.remove(applicationId)) {
if (applicationMap.remove(applicationId) == null) {
throw new ApplicationNotFoundException(
"Trying to kill an absent application: " + appId);
}
Expand Down Expand Up @@ -244,7 +262,7 @@ public AppState getAppState(HttpServletRequest hsr, String appId)
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

Expand Down Expand Up @@ -428,7 +446,7 @@ public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

Expand All @@ -454,7 +472,7 @@ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.contains(applicationId)) {
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

Expand All @@ -463,4 +481,102 @@ public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(1));
return infos;
}

@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr,
String appId, String type) throws AuthorizationException {

if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

ApplicationReport appReport = applicationMap.get(applicationId);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
ApplicationTimeoutType paramType = ApplicationTimeoutType.valueOf(type);

if (paramType == null) {
throw new NotFoundException("application timeout type not found");
}

if (!timeouts.containsKey(paramType)) {
throw new NotFoundException("timeout with id: " + appId + " not found");
}

ApplicationTimeout applicationTimeout = timeouts.get(paramType);

AppTimeoutInfo timeoutInfo = new AppTimeoutInfo();
timeoutInfo.setExpiryTime(applicationTimeout.getExpiryTime());
timeoutInfo.setTimeoutType(applicationTimeout.getTimeoutType());
timeoutInfo.setRemainingTime(applicationTimeout.getRemainingTime());

return timeoutInfo;
}

@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {

if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);

if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

ApplicationReport appReport = applicationMap.get(applicationId);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();

AppTimeoutsInfo timeoutsInfo = new AppTimeoutsInfo();

for (ApplicationTimeout timeout : timeouts.values()) {
AppTimeoutInfo timeoutInfo = new AppTimeoutInfo();
timeoutInfo.setExpiryTime(timeout.getExpiryTime());
timeoutInfo.setTimeoutType(timeout.getTimeoutType());
timeoutInfo.setRemainingTime(timeout.getRemainingTime());
timeoutsInfo.add(timeoutInfo);
}

return timeoutsInfo;
}

@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, HttpServletRequest hsr,
String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {

if (!isRunning) {
throw new RuntimeException("RM is stopped");
}

ApplicationId applicationId = ApplicationId.fromString(appId);

if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}

ApplicationReport appReport = applicationMap.get(applicationId);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();

ApplicationTimeoutType paramTimeoutType = appTimeout.getTimeoutType();
if (!timeouts.containsKey(paramTimeoutType)) {
throw new NotFoundException("TimeOutType with id: " + appId + " not found");
}

ApplicationTimeout applicationTimeout = timeouts.get(paramTimeoutType);
applicationTimeout.setTimeoutType(appTimeout.getTimeoutType());
applicationTimeout.setExpiryTime(appTimeout.getExpireTime());
applicationTimeout.setRemainingTime(appTimeout.getRemainingTimeInSec());

AppTimeoutInfo result = new AppTimeoutInfo(applicationTimeout);

return Response.status(Status.OK).entity(result).build();
}
}
Loading

0 comments on commit 133e8aa

Please sign in to comment.