Skip to content

Commit

Permalink
YARN-11531. [Federation] Code cleanup for NodeManager#amrmproxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Jul 14, 2023
1 parent 680af87 commit f9b0924
Show file tree
Hide file tree
Showing 14 changed files with 710 additions and 858 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public interface AMRMProxyApplicationContext {
* Gets the NMContext object.
* @return the NMContext.
*/
Context getNMCotext();
Context getNMContext();

/**
* Gets the credentials of this application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class AMRMProxyApplicationContextImpl implements
* @param nmContext NM context
* @param conf configuration
* @param applicationAttemptId attempt id
* @param user user name of the application
* @param user username of the application
* @param amrmToken amrmToken issued by RM
* @param localToken amrmToken issued by AMRMProxy
* @param credentials application credentials
Expand Down Expand Up @@ -144,7 +144,7 @@ public synchronized int getLocalAMRMTokenKeyId() {
}

@Override
public Context getNMCotext() {
public Context getNMContext() {
return nmContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ public void processApplicationStartRequest(StartContainerRequest request)
* Initializes the request interceptor pipeline for the specified application.
*
* @param applicationAttemptId attempt id
* @param user user name
* @param user username
* @param amrmToken amrmToken issued by RM
* @param localToken amrmToken issued by AMRMProxy
* @param recoveredDataMap the recovered states for AMRMProxy from NMSS
Expand Down Expand Up @@ -741,7 +741,7 @@ private List<String> getInterceptorClassNames(Configuration conf) {
YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);

List<String> interceptorClassNames = new ArrayList<String>();
List<String> interceptorClassNames = new ArrayList<>();
Collection<String> tempList =
StringUtils.getStringCollection(configuredInterceptorClassNames);
for (String item : tempList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public class AMRMProxyTokenSecretManager extends

private NMStateStoreService nmStateStore;

private final Set<ApplicationAttemptId> appAttemptSet =
new HashSet<ApplicationAttemptId>();
private final Set<ApplicationAttemptId> appAttemptSet = new HashSet<>();

/**
* Create an {@link AMRMProxyTokenSecretManager}.
Expand Down Expand Up @@ -226,8 +225,7 @@ public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
.getMasterKey().getKeyId());
byte[] password = this.createPassword(identifier);
appAttemptSet.add(appAttemptId);
return new Token<AMRMTokenIdentifier>(identifier.getBytes(),
password, identifier.getKind(), new Text());
return new Token<>(identifier.getBytes(), password, identifier.getKind(), new Text());
} finally {
this.writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
* @return the NMSS instance
*/
public NMStateStoreService getNMStateStore() {
if (this.appContext == null || this.appContext.getNMCotext() == null) {
if (this.appContext == null || this.appContext.getNMContext() == null) {
return null;
}
return this.appContext.getNMCotext().getNMStateStore();
return this.appContext.getNMContext().getNMStateStore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,17 @@ public void init(AMRMProxyApplicationContext appContext) {
private ApplicationMasterProtocol createRMClient(
AMRMProxyApplicationContext appContext, final Configuration conf)
throws IOException, InterruptedException {
if (appContext.getNMCotext().isDistributedSchedulingEnabled()) {
return user.doAs(
new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
@Override
public DistributedSchedulingAMProtocol run() throws Exception {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf,
DistributedSchedulingAMProtocol.class);
}
});
if (appContext.getNMContext().isDistributedSchedulingEnabled()) {
return user.doAs((PrivilegedExceptionAction<DistributedSchedulingAMProtocol>) () -> {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf, DistributedSchedulingAMProtocol.class);
});
} else {
return user.doAs(
new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() throws Exception {
setAMRMTokenService(conf);
return ClientRMProxy.createRMProxy(conf,
ApplicationMasterProtocol.class);
}
(PrivilegedExceptionAction<ApplicationMasterProtocol>) () -> {
setAMRMTokenService(conf);
return ClientRMProxy.createRMProxy(conf,
ApplicationMasterProtocol.class);
});
}
}
Expand Down Expand Up @@ -144,7 +136,7 @@ public AllocateResponse allocate(final AllocateRequest request)
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
if (getApplicationContext().getNMCotext()
if (getApplicationContext().getNMContext()
.isDistributedSchedulingEnabled()) {
LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
"request to the real YARN RM");
Expand All @@ -161,7 +153,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
throws YarnException, IOException {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
if (getApplicationContext().getNMCotext()
if (getApplicationContext().getNMContext()
.isDistributedSchedulingEnabled()) {
DistributedSchedulingAllocateResponse allocateResponse =
((DistributedSchedulingAMProtocol)rmClient)
Expand Down Expand Up @@ -197,7 +189,7 @@ public void setNextInterceptor(RequestInterceptor next) {
@VisibleForTesting
public void setRMClient(final ApplicationMasterProtocol rmClient) {
if (rmClient instanceof DistributedSchedulingAMProtocol) {
this.rmClient = (DistributedSchedulingAMProtocol)rmClient;
this.rmClient = rmClient;
} else {
this.rmClient = new DistributedSchedulingAMProtocol() {
@Override
Expand Down Expand Up @@ -254,7 +246,7 @@ public static Text getTokenService(Configuration conf, String address,
String defaultAddr, int defaultPort) {
if (HAUtil.isHAEnabled(conf)) {
// Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<String>();
ArrayList<String> services = new ArrayList<>();
YarnConfiguration yarnConf = new YarnConfiguration(conf);
for (String rmId : HAUtil.getRMHAIds(conf)) {
// Set RM_ID to get the corresponding RM_ADDRESS
Expand Down
Loading

0 comments on commit f9b0924

Please sign in to comment.