Skip to content

Commit

Permalink
YARN-11531. [Federation] Code cleanup for NodeManager#amrmproxy. (#5841)
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Jul 19, 2023
1 parent b6b2590 commit 84dd624
Show file tree
Hide file tree
Showing 15 changed files with 707 additions and 857 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public abstract class FederationMethodWrapper {

/**
* List of parameters: static and dynamic values, matchings types.
* List of parameters: static and dynamic values, matching types.
*/
private Object[] params;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public interface AMRMProxyApplicationContext {
* Gets the NMContext object.
* @return the NMContext.
*/
Context getNMCotext();
Context getNMContext();

/**
* Gets the credentials of this application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public synchronized int getLocalAMRMTokenKeyId() {
}

@Override
public Context getNMCotext() {
public Context getNMContext() {
return nmContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ private List<String> getInterceptorClassNames(Configuration conf) {
YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);

List<String> interceptorClassNames = new ArrayList<String>();
List<String> interceptorClassNames = new ArrayList<>();
Collection<String> tempList =
StringUtils.getStringCollection(configuredInterceptorClassNames);
for (String item : tempList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public class AMRMProxyTokenSecretManager extends

private NMStateStoreService nmStateStore;

private final Set<ApplicationAttemptId> appAttemptSet =
new HashSet<ApplicationAttemptId>();
private final Set<ApplicationAttemptId> appAttemptSet = new HashSet<>();

/**
* Create an {@link AMRMProxyTokenSecretManager}.
Expand Down Expand Up @@ -226,8 +225,7 @@ public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
.getMasterKey().getKeyId());
byte[] password = this.createPassword(identifier);
appAttemptSet.add(appAttemptId);
return new Token<AMRMTokenIdentifier>(identifier.getBytes(),
password, identifier.getKind(), new Text());
return new Token<>(identifier.getBytes(), password, identifier.getKind(), new Text());
} finally {
this.writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
* @return the NMSS instance
*/
public NMStateStoreService getNMStateStore() {
if (this.appContext == null || this.appContext.getNMCotext() == null) {
if (this.appContext == null || this.appContext.getNMContext() == null) {
return null;
}
return this.appContext.getNMCotext().getNMStateStore();
return this.appContext.getNMContext().getNMStateStore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,17 @@ public void init(AMRMProxyApplicationContext appContext) {
private ApplicationMasterProtocol createRMClient(
AMRMProxyApplicationContext appContext, final Configuration conf)
throws IOException, InterruptedException {
if (appContext.getNMCotext().isDistributedSchedulingEnabled()) {
return user.doAs(
new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
@Override
public DistributedSchedulingAMProtocol run() throws Exception {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf,
DistributedSchedulingAMProtocol.class);
}
});
if (appContext.getNMContext().isDistributedSchedulingEnabled()) {
return user.doAs((PrivilegedExceptionAction<DistributedSchedulingAMProtocol>) () -> {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf, DistributedSchedulingAMProtocol.class);
});
} else {
return user.doAs(
new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() throws Exception {
setAMRMTokenService(conf);
return ClientRMProxy.createRMProxy(conf,
ApplicationMasterProtocol.class);
}
(PrivilegedExceptionAction<ApplicationMasterProtocol>) () -> {
setAMRMTokenService(conf);
return ClientRMProxy.createRMProxy(conf,
ApplicationMasterProtocol.class);
});
}
}
Expand Down Expand Up @@ -144,7 +136,7 @@ public AllocateResponse allocate(final AllocateRequest request)
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
if (getApplicationContext().getNMCotext()
if (getApplicationContext().getNMContext()
.isDistributedSchedulingEnabled()) {
LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
"request to the real YARN RM");
Expand All @@ -161,7 +153,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
throws YarnException, IOException {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
if (getApplicationContext().getNMCotext()
if (getApplicationContext().getNMContext()
.isDistributedSchedulingEnabled()) {
DistributedSchedulingAllocateResponse allocateResponse =
((DistributedSchedulingAMProtocol)rmClient)
Expand Down Expand Up @@ -197,7 +189,7 @@ public void setNextInterceptor(RequestInterceptor next) {
@VisibleForTesting
public void setRMClient(final ApplicationMasterProtocol rmClient) {
if (rmClient instanceof DistributedSchedulingAMProtocol) {
this.rmClient = (DistributedSchedulingAMProtocol)rmClient;
this.rmClient = rmClient;
} else {
this.rmClient = new DistributedSchedulingAMProtocol() {
@Override
Expand Down Expand Up @@ -254,7 +246,7 @@ public static Text getTokenService(Configuration conf, String address,
String defaultAddr, int defaultPort) {
if (HAUtil.isHAEnabled(conf)) {
// Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<String>();
ArrayList<String> services = new ArrayList<>();
YarnConfiguration yarnConf = new YarnConfiguration(conf);
for (String rmId : HAUtil.getRMHAIds(conf)) {
// Set RM_ID to get the corresponding RM_ADDRESS
Expand Down
Loading

0 comments on commit 84dd624

Please sign in to comment.