Skip to content

Commit

Permalink
YARN-11515. [Federation] Improve DefaultRequestInterceptor#init Code. (
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Jul 12, 2023
1 parent 8b88e9f commit 680af87
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Extends the {@code AbstractRequestInterceptorClient} class and provides an
Expand All @@ -107,25 +109,26 @@
*/
public class DefaultClientRequestInterceptor
extends AbstractClientRequestInterceptor {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
private ApplicationClientProtocol clientRMProxy;

@Override
public void init(String userName) {
super.init(userName);

final Configuration conf = this.getConf();
try {
clientRMProxy =
user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
@Override
public ApplicationClientProtocol run() throws Exception {
return ClientRMProxy.createRMProxy(conf,
ApplicationClientProtocol.class);
}
});
final Configuration conf = this.getConf();
clientRMProxy = user.doAs(
(PrivilegedExceptionAction<ApplicationClientProtocol>) () ->
ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class));
} catch (Exception e) {
throw new YarnRuntimeException(
"Unable to create the interface to reach the YarnRM", e);
StringBuilder message = new StringBuilder();
message.append("Error while creating Router RMClient Service");
if (user != null) {
message.append(", user: " + user);
}
LOG.error(message.toString(), e);
throw new YarnRuntimeException(message.toString(), e);
}
}

Expand Down Expand Up @@ -355,6 +358,5 @@ public GetNodesToAttributesResponse getNodesToAttributes(
@VisibleForTesting
public void setRMClient(ApplicationClientProtocol clientRM) {
this.clientRMProxy = clientRM;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,38 +80,18 @@ public class DefaultRMAdminRequestInterceptor
public void init(String userName) {
super.init(userName);
try {
// Do not create a proxy user if user name matches the user name on
// current UGI
if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}

final Configuration conf = this.getConf();

rmAdminProxy = user.doAs(
new PrivilegedExceptionAction<ResourceManagerAdministrationProtocol>() {
@Override
public ResourceManagerAdministrationProtocol run()
throws Exception {
return ClientRMProxy.createRMProxy(conf,
ResourceManagerAdministrationProtocol.class);
}
});
} catch (IOException e) {
String message = "Error while creating Router RMAdmin Service for user:";
(PrivilegedExceptionAction<ResourceManagerAdministrationProtocol>) () ->
ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class));
} catch (Exception e) {
StringBuilder message = new StringBuilder();
message.append("Error while creating Router RMAdmin Service");
if (user != null) {
message += ", user: " + user;
message.append(", user: " + user);
}

LOG.info(message);
throw new YarnRuntimeException(message, e);
} catch (Exception e) {
throw new YarnRuntimeException(e);
LOG.error(message.toString(), e);
throw new YarnRuntimeException(message.toString(), e);
}
}

Expand All @@ -126,7 +106,6 @@ public void setNextInterceptor(RMAdminRequestInterceptor next) {
@VisibleForTesting
public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) {
this.rmAdminProxy = rmAdmin;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ protected SubClusterId getSubClusterId() {

@Override
public void init(String user) {
super.init(user);
webAppAddress = WebAppUtils.getRMWebAppURLWithScheme(getConf());
client = RouterWebServiceUtil.createJerseyClient(getConf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public Response createNewApplication(HttpServletRequest hsr)
validateRunning();

ApplicationId applicationId =
ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()),
applicationCounter.incrementAndGet());
NewApplication appId =
new NewApplication(applicationId.toString(), new ResourceInfo());
Expand Down Expand Up @@ -275,7 +275,7 @@ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
AppInfo appInfo = new AppInfo();

appInfo.setAppId(
ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()),
applicationCounter.incrementAndGet()).toString());
appInfo.setAMHostHttpAddress("http://i_am_the_AM:1234");

Expand Down Expand Up @@ -316,7 +316,7 @@ public NodeInfo getNode(String nodeId) {
if (nodeId.contains(subClusterId) || nodeId.contains("test")) {
node = new NodeInfo();
node.setId(nodeId);
node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId()));
}
return node;
}
Expand All @@ -328,7 +328,7 @@ public NodesInfo getNodes(String states) {
}
NodeInfo node = new NodeInfo();
node.setId("Node " + Integer.valueOf(getSubClusterId().getId()));
node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId()));
NodesInfo nodes = new NodesInfo();
nodes.add(node);
return nodes;
Expand All @@ -350,12 +350,12 @@ public ClusterMetricsInfo getClusterMetricsInfo() {
throw new RuntimeException("RM is stopped");
}
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
metrics.setAppsSubmitted(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsCompleted(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsPending(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsRunning(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsFailed(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsKilled(Integer.valueOf(getSubClusterId().getId()));
metrics.setAppsSubmitted(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsCompleted(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsPending(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsRunning(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsFailed(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsKilled(Integer.parseInt(getSubClusterId().getId()));

return metrics;
}
Expand Down

0 comments on commit 680af87

Please sign in to comment.