diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationMethodWrapper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationMethodWrapper.java index 9b8944049a6dd..169f8a3e16ef8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationMethodWrapper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationMethodWrapper.java @@ -28,7 +28,7 @@ public abstract class FederationMethodWrapper { /** - * List of parameters: static and dynamic values, matchings types. + * List of parameters: static and dynamic values, matching types. */ private Object[] params; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java index 86499c84b59c2..15a9fafa633ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -67,7 +67,7 @@ public interface AMRMProxyApplicationContext { * Gets the NMContext object. * @return the NMContext. */ - Context getNMCotext(); + Context getNMContext(); /** * Gets the credentials of this application. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java index 881e9425f013c..9eb117424a9a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -144,7 +144,7 @@ public synchronized int getLocalAMRMTokenKeyId() { } @Override - public Context getNMCotext() { + public Context getNMContext() { return nmContext; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index 82873e07cdb09..c0253b334d92f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -741,7 +741,7 @@ private List getInterceptorClassNames(Configuration conf) { YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); - List interceptorClassNames = new ArrayList(); + List interceptorClassNames = new ArrayList<>(); Collection tempList = StringUtils.getStringCollection(configuredInterceptorClassNames); for (String item : tempList) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java index ea4e9794013b0..ee0893d87e550 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java @@ -72,8 +72,7 @@ public class AMRMProxyTokenSecretManager extends private NMStateStoreService nmStateStore; - private final Set appAttemptSet = - new HashSet(); + private final Set appAttemptSet = new HashSet<>(); /** * Create an {@link AMRMProxyTokenSecretManager}. @@ -226,8 +225,7 @@ public Token createAndGetAMRMToken( .getMasterKey().getKeyId()); byte[] password = this.createPassword(identifier); appAttemptSet.add(appAttemptId); - return new Token(identifier.getBytes(), - password, identifier.getKind(), new Text()); + return new Token<>(identifier.getBytes(), password, identifier.getKind(), new Text()); } finally { this.writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java index d1fc341614471..9848e4678df9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java @@ -159,9 +159,9 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( * @return the NMSS instance */ public NMStateStoreService getNMStateStore() { - if (this.appContext == null || this.appContext.getNMCotext() == null) { + if (this.appContext == null || this.appContext.getNMContext() == null) { return null; } - return this.appContext.getNMCotext().getNMStateStore(); + return this.appContext.getNMContext().getNMStateStore(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index d05946be05b7e..a70742a2a53d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -95,25 +95,17 @@ public void init(AMRMProxyApplicationContext appContext) { private ApplicationMasterProtocol createRMClient( AMRMProxyApplicationContext appContext, final Configuration conf) throws IOException, InterruptedException { - if (appContext.getNMCotext().isDistributedSchedulingEnabled()) { - return user.doAs( - new PrivilegedExceptionAction() { - @Override - public DistributedSchedulingAMProtocol run() throws Exception { - setAMRMTokenService(conf); - return ServerRMProxy.createRMProxy(conf, - DistributedSchedulingAMProtocol.class); - } - }); + if (appContext.getNMContext().isDistributedSchedulingEnabled()) { + return user.doAs((PrivilegedExceptionAction) () -> { + setAMRMTokenService(conf); + return ServerRMProxy.createRMProxy(conf, DistributedSchedulingAMProtocol.class); + }); } else { return user.doAs( - new PrivilegedExceptionAction() { - @Override - public ApplicationMasterProtocol run() throws Exception { - setAMRMTokenService(conf); - return ClientRMProxy.createRMProxy(conf, - ApplicationMasterProtocol.class); - } + (PrivilegedExceptionAction) () -> { + setAMRMTokenService(conf); + return ClientRMProxy.createRMProxy(conf, + ApplicationMasterProtocol.class); }); } } @@ -144,7 +136,7 @@ public AllocateResponse allocate(final AllocateRequest request) registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequest request) throws YarnException, IOException { - if (getApplicationContext().getNMCotext() + if (getApplicationContext().getNMContext() .isDistributedSchedulingEnabled()) { LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" + "request to the real YARN RM"); @@ -161,7 +153,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( throws YarnException, IOException { LOG.debug("Forwarding allocateForDistributedScheduling request" + "to the real YARN RM"); - if (getApplicationContext().getNMCotext() + if (getApplicationContext().getNMContext() .isDistributedSchedulingEnabled()) { DistributedSchedulingAllocateResponse allocateResponse = ((DistributedSchedulingAMProtocol)rmClient) @@ -197,7 +189,7 @@ public void setNextInterceptor(RequestInterceptor next) { @VisibleForTesting public void setRMClient(final ApplicationMasterProtocol rmClient) { if (rmClient instanceof DistributedSchedulingAMProtocol) { - this.rmClient = (DistributedSchedulingAMProtocol)rmClient; + this.rmClient = rmClient; } else { this.rmClient = new DistributedSchedulingAMProtocol() { @Override @@ -254,7 +246,7 @@ public static Text getTokenService(Configuration conf, String address, String defaultAddr, int defaultPort) { if (HAUtil.isHAEnabled(conf)) { // Build a list of service addresses to form the service name - ArrayList services = new ArrayList(); + ArrayList services = new ArrayList<>(); YarnConfiguration yarnConf = new YarnConfiguration(conf); for (String rmId : HAUtil.getRMHAIds(conf)) { // Set RM_ID to get the corresponding RM_ADDRESS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 32c5bf217e233..a53efebb7f6e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -124,7 +123,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** * When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn - * Registry. Otherwise if NM recovery is enabled, the UAM token are stored in + * Registry. Otherwise, if NM recovery is enabled, the UAM token are stored in * local NMSS instead under this directory name. */ public static final String NMSS_SECONDARY_SC_PREFIX = @@ -150,7 +149,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private AMRMClientRelayer homeRMRelayer; private SubClusterId homeSubClusterId; - private AMHeartbeatRequestHandler homeHeartbeartHandler; + private AMHeartbeatRequestHandler homeHeartbeatHandler; /** * UAM pool for secondary sub-clusters (ones other than home sub-cluster), @@ -162,20 +161,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * first time. AM heart beats to them are also handled asynchronously for * performance reasons. */ - private UnmanagedAMPoolManager uamPool; + private final UnmanagedAMPoolManager uamPool; /** * The rmProxy relayers for secondary sub-clusters that keep track of all * pending requests. */ - private Map secondaryRelayers; + private final Map secondaryRelayers; /** * Stores the AllocateResponses that are received asynchronously from all the * sub-cluster resource managers, including home RM, but not merged and * returned back to AM yet. */ - private Map> asyncResponseSink; + private final Map> asyncResponseSink; /** * Remembers the last allocate response from all known sub-clusters. This is @@ -183,15 +182,15 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * cluster-wide info (e.g. AvailableResource, NumClusterNodes) in the allocate * response back to AM. */ - private Map lastSCResponse; + private final Map lastSCResponse; /** * The async UAM registration result that is not consumed yet. */ - private Map uamRegistrations; + private final Map uamRegistrations; // For unit test synchronization - private Map> uamRegisterFutures; + private final Map> uamRegisterFutures; /** Thread pool used for asynchronous operations. */ private ExecutorService threadpool; @@ -216,7 +215,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * the container, so that we know which sub-cluster to forward later requests * about existing containers to. */ - private Map containerIdToSubClusterIdMap; + private final Map containerIdToSubClusterIdMap; /** * The original registration request that was sent by the AM. This instance is @@ -259,7 +258,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private boolean waitUamRegisterDone; - private MonotonicClock clock = new MonotonicClock(); + private final MonotonicClock clock = new MonotonicClock(); /** * Creates an instance of the FederationInterceptor class. @@ -324,11 +323,11 @@ public void init(AMRMProxyApplicationContext appContext) { ApplicationMasterProtocol.class, appOwner), appId, this.homeSubClusterId.toString()); - this.homeHeartbeartHandler = - createHomeHeartbeartHandler(conf, appId, this.homeRMRelayer); - this.homeHeartbeartHandler.setUGI(appOwner); - this.homeHeartbeartHandler.setDaemon(true); - this.homeHeartbeartHandler.start(); + this.homeHeartbeatHandler = + createHomeHeartbeatHandler(conf, appId, this.homeRMRelayer); + this.homeHeartbeatHandler.setUGI(appOwner); + this.homeHeartbeatHandler.setDaemon(true); + this.homeHeartbeatHandler.start(); // set lastResponseId to -1 before application master registers this.lastAllocateResponse = @@ -852,7 +851,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( this.homeRMRelayer.finishApplicationMaster(request); // Stop the home heartbeat thread - this.homeHeartbeartHandler.shutdown(); + this.homeHeartbeatHandler.shutdown(); if (failedToUnRegister) { homeResponse.setIsUnregistered(false); @@ -868,9 +867,7 @@ public FinishApplicationMasterResponse finishApplicationMaster( private boolean checkRequestFinalApplicationStatusSuccess( FinishApplicationMasterRequest request) { if (request != null && request.getFinalApplicationStatus() != null) { - if (request.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED)) { - return true; - } + return request.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED); } return false; } @@ -907,7 +904,7 @@ public void shutdown() { } // Stop the home heartbeat thread - this.homeHeartbeartHandler.shutdown(); + this.homeHeartbeatHandler.shutdown(); this.homeRMRelayer.shutdown(); // Shutdown needs to clean up app @@ -946,12 +943,12 @@ protected ApplicationAttemptId getAttemptId() { } @VisibleForTesting - protected AMHeartbeatRequestHandler getHomeHeartbeartHandler() { - return this.homeHeartbeartHandler; + protected AMHeartbeatRequestHandler getHomeHeartbeatHandler() { + return this.homeHeartbeatHandler; } /** - * Create the UAM pool manager for secondary sub-clsuters. For unit test to + * Create the UAM pool manager for secondary sub-clusters. For unit test to * override. * * @param threadPool the thread pool to use @@ -964,7 +961,7 @@ protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( } @VisibleForTesting - protected AMHeartbeatRequestHandler createHomeHeartbeartHandler( + protected AMHeartbeatRequestHandler createHomeHeartbeatHandler( Configuration conf, ApplicationId appId, AMRMClientRelayer rmProxyRelayer) { return new AMHeartbeatRequestHandler(conf, appId, rmProxyRelayer); @@ -1052,49 +1049,41 @@ protected void reAttachUAMAndMergeRegisterResponse( final Token amrmToken = entry.getValue(); completionService - .submit(new Callable() { - @Override - public RegisterApplicationMasterResponse call() throws Exception { - RegisterApplicationMasterResponse response = null; - try { - // Create a config loaded with federation on and subclusterId - // for each UAM - YarnConfiguration config = new YarnConfiguration(getConf()); - FederationProxyProviderUtil.updateConfForFederation(config, - subClusterId.getId()); - - ApplicationSubmissionContext originalSubmissionContext = - federationFacade.getApplicationSubmissionContext(appId); - - uamPool.reAttachUAM(subClusterId.getId(), config, appId, - amRegistrationResponse.getQueue(), - getApplicationContext().getUser(), homeSubClusterId.getId(), - amrmToken, subClusterId.toString(), originalSubmissionContext); - - secondaryRelayers.put(subClusterId.getId(), - uamPool.getAMRMClientRelayer(subClusterId.getId())); - - response = uamPool.registerApplicationMaster( - subClusterId.getId(), amRegistrationRequest); - - // Set sub-cluster to be timed out initially - lastSCResponseTime.put(subClusterId, - clock.getTime() - subClusterTimeOut); - - if (response != null - && response.getContainersFromPreviousAttempts() != null) { - cacheAllocatedContainers( - response.getContainersFromPreviousAttempts(), - subClusterId); - } - LOG.info("UAM {} reattached for {}", subClusterId, appId); - } catch (Throwable e) { - LOG.error( - "Reattaching UAM " + subClusterId + " failed for " + appId, - e); + .submit(() -> { + RegisterApplicationMasterResponse response = null; + try { + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId.getId()); + + ApplicationSubmissionContext originalSubmissionContext = + federationFacade.getApplicationSubmissionContext(appId); + + uamPool.reAttachUAM(subClusterId.getId(), config, appId, + amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), homeSubClusterId.getId(), + amrmToken, subClusterId.toString(), originalSubmissionContext); + + secondaryRelayers.put(subClusterId.getId(), + uamPool.getAMRMClientRelayer(subClusterId.getId())); + + response = uamPool.registerApplicationMaster(subClusterId.getId(), + amRegistrationRequest); + + // Set sub-cluster to be timed out initially + lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut); + + if (response != null && response.getContainersFromPreviousAttempts() != null) { + cacheAllocatedContainers(response.getContainersFromPreviousAttempts(), + subClusterId); } - return response; + LOG.info("UAM {} reattached for {}", subClusterId, appId); + } catch (Throwable e) { + LOG.error("Reattaching UAM {} failed for {}.", subClusterId, appId, e); } + return response; }); } @@ -1115,7 +1104,7 @@ public RegisterApplicationMasterResponse call() throws Exception { } private SubClusterId getSubClusterForNode(String nodeName) { - SubClusterId subClusterId = null; + SubClusterId subClusterId; try { subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName); } catch (YarnException e) { @@ -1139,8 +1128,7 @@ private SubClusterId getSubClusterForNode(String nodeName) { */ private Map splitAllocateRequest( AllocateRequest request) throws YarnException { - Map requestMap = - new HashMap(); + Map requestMap = new HashMap<>(); // Create heart beat request for home sub-cluster resource manager findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request, @@ -1230,8 +1218,8 @@ private Map splitAllocateRequest( * * @param requests contains the heart beat requests to send to the resource * manager keyed by the sub-cluster id - * @throws YarnException - * @throws IOException + * @throws YarnException exceptions from yarn servers. + * @throws IOException an I/O exception of some sort has occurred. */ private void sendRequestsToResourceManagers( Map requests) @@ -1255,7 +1243,7 @@ private void sendRequestsToResourceManagers( if (subClusterId.equals(this.homeSubClusterId)) { // Request for the home sub-cluster resource manager - this.homeHeartbeartHandler.allocateAsync(entry.getValue(), + this.homeHeartbeatHandler.allocateAsync(entry.getValue(), new HeartbeatCallBack(this.homeSubClusterId, false)); } else { if (!this.uamPool.hasUAMId(subClusterId.getId())) { @@ -1280,7 +1268,7 @@ private List registerAndAllocateWithNewSubClusters( // list and create and register Unmanaged AM instance for the new ones List newSubClusters = new ArrayList<>(); - requests.keySet().stream().forEach(subClusterId -> { + requests.keySet().forEach(subClusterId -> { String id = subClusterId.getId(); if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(id)) { newSubClusters.add(subClusterId); @@ -1455,10 +1443,7 @@ private void removeFinishedContainersFromCache( List finishedContainers) { for (ContainerStatus container : finishedContainers) { LOG.debug("Completed container {}", container); - if (containerIdToSubClusterIdMap - .containsKey(container.getContainerId())) { - containerIdToSubClusterIdMap.remove(container.getContainerId()); - } + containerIdToSubClusterIdMap.remove(container.getContainerId()); } } @@ -1697,7 +1682,7 @@ private void cacheAllocatedContainers(List containers, private static AllocateRequest findOrCreateAllocateRequestForSubCluster( SubClusterId subClusterId, AllocateRequest originalAMRequest, Map requestMap) { - AllocateRequest newRequest = null; + AllocateRequest newRequest; if (requestMap.containsKey(subClusterId)) { newRequest = requestMap.get(subClusterId); } else { @@ -1715,14 +1700,14 @@ private static AllocateRequest findOrCreateAllocateRequestForSubCluster( private static AllocateRequest createAllocateRequest() { AllocateRequest request = RECORD_FACTORY.newRecordInstance(AllocateRequest.class); - request.setAskList(new ArrayList()); - request.setReleaseList(new ArrayList()); + request.setAskList(new ArrayList<>()); + request.setReleaseList(new ArrayList<>()); ResourceBlacklistRequest blackList = ResourceBlacklistRequest.newInstance(null, null); - blackList.setBlacklistAdditions(new ArrayList()); - blackList.setBlacklistRemovals(new ArrayList()); + blackList.setBlacklistAdditions(new ArrayList<>()); + blackList.setBlacklistRemovals(new ArrayList<>()); request.setResourceBlacklistRequest(blackList); - request.setUpdateRequests(new ArrayList()); + request.setUpdateRequests(new ArrayList<>()); return request; } @@ -1738,9 +1723,7 @@ protected Set getTimedOutSCs(boolean verbose) { long duration = this.clock.getTime() - entry.getValue(); if (duration > this.subClusterTimeOut) { if (verbose) { - LOG.warn( - "Subcluster {} doesn't have a successful heartbeat" - + " for {} seconds for {}", + LOG.warn("Subcluster {} doesn't have a successful heartbeat for {} seconds for {}", entry.getKey(), (double) duration / 1000, this.attemptId); } timedOutSCs.add(entry.getKey()); @@ -1810,8 +1793,8 @@ public Map> getAsyncResponseSink() { * Async callback handler for heart beat response from all sub-clusters. */ private class HeartbeatCallBack implements AsyncCallback { - private SubClusterId subClusterId; - private boolean isUAM; + private final SubClusterId subClusterId; + private final boolean isUAM; HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) { this.subClusterId = subClusterId; @@ -1823,7 +1806,7 @@ public void callback(AllocateResponse response) { org.apache.hadoop.yarn.api.records.Token amrmToken = response.getAMRMToken(); synchronized (asyncResponseSink) { - List responses = null; + List responses; if (asyncResponseSink.containsKey(subClusterId)) { responses = asyncResponseSink.get(subClusterId); } else { @@ -1846,8 +1829,7 @@ public void callback(AllocateResponse response) { try { policyInterpreter.notifyOfResponse(subClusterId, response); } catch (YarnException e) { - LOG.warn("notifyOfResponse for policy failed for sub-cluster " - + subClusterId, e); + LOG.warn("notifyOfResponse for policy failed for sub-cluster {}.", subClusterId, e); } // Save the new AMRMToken for the UAM if present @@ -1866,11 +1848,9 @@ public void callback(AllocateResponse response) { AMRMTokenIdentifier identifier = new AMRMTokenIdentifier(); identifier.readFields(new DataInputStream( new ByteArrayInputStream(newToken.getIdentifier()))); - LOG.info( - "Received new UAM amrmToken with keyId {} and " - + "service {} from {} for {}, written to Registry", - identifier.getKeyId(), newToken.getService(), subClusterId, - attemptId); + LOG.info("Received new UAM amrmToken with keyId {} and service {} from {} for {}, " + + "written to Registry", identifier.getKeyId(), newToken.getService(), + subClusterId, attemptId); } catch (IOException e) { } } @@ -1881,7 +1861,7 @@ public void callback(AllocateResponse response) { newToken.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT)); } catch (IOException e) { LOG.error("Error storing UAM token as AMRMProxy " - + "context entry in NMSS for " + attemptId, e); + + "context entry in NMSS for {}.", attemptId, e); } } } @@ -1893,8 +1873,8 @@ public void callback(AllocateResponse response) { * FinishApplicationMasterResponse instances. */ private static class FinishApplicationMasterResponseInfo { - private FinishApplicationMasterResponse response; - private String subClusterId; + private final FinishApplicationMasterResponse response; + private final String subClusterId; FinishApplicationMasterResponseInfo( FinishApplicationMasterResponse response, String subClusterId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index 45dcba0ba3221..516468cda24e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -87,11 +87,11 @@ public final class DistributedScheduler extends AbstractRequestInterceptor { public void init(AMRMProxyApplicationContext applicationContext) { super.init(applicationContext); - initLocal(applicationContext.getNMCotext().getNodeStatusUpdater() + initLocal(applicationContext.getNMContext().getNodeStatusUpdater() .getRMIdentifier(), applicationContext.getApplicationAttemptId(), - applicationContext.getNMCotext().getContainerAllocator(), - applicationContext.getNMCotext().getNMTokenSecretManager(), + applicationContext.getNMContext().getContainerAllocator(), + applicationContext.getNMContext().getNMTokenSecretManager(), applicationContext.getUser()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 62b59617fd4a6..db442797108a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -82,7 +82,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorCompletionService; @@ -103,8 +102,7 @@ public abstract class BaseAMRMProxyTest { private MockAMRMProxyService amrmProxyService; // Thread pool used for asynchronous operations - private static ExecutorService threadpool = Executors - .newCachedThreadPool(); + private final ExecutorService threadpool = Executors.newCachedThreadPool(); private Configuration conf; private AsyncDispatcher dispatcher; private Context nmContext; @@ -230,47 +228,37 @@ protected List getCompletedContainerIds( protected List runInParallel(List testContexts, final Function func) { ExecutorCompletionService completionService = - new ExecutorCompletionService(this.getThreadPool()); - LOG.info("Sending requests to endpoints asynchronously. Number of test contexts=" - + testContexts.size()); - for (int index = 0; index < testContexts.size(); index++) { - final T testContext = testContexts.get(index); - - LOG.info("Adding request to threadpool for test context: " - + testContext.toString()); - - completionService.submit(new Callable() { - @Override - public R call() throws Exception { - LOG.info("Sending request. Test context:" - + testContext.toString()); - - R response = null; - try { - response = func.invoke(testContext); - LOG.info("Successfully sent request for context: " - + testContext.toString()); - } catch (Throwable ex) { - LOG.error("Failed to process request for context: " - + testContext); - response = null; - } - - return response; + new ExecutorCompletionService<>(this.getThreadPool()); + LOG.info("Sending requests to endpoints asynchronously. Number of test contexts = {}.", + testContexts.size()); + for (final T testContext : testContexts) { + LOG.info("Adding request to threadpool for test context: {}.", testContext.toString()); + + completionService.submit(() -> { + LOG.info("Sending request. Test context: {}.", testContext); + + R response; + try { + response = func.invoke(testContext); + LOG.info("Successfully sent request for context: {}.", testContext); + } catch (Throwable ex) { + LOG.error("Failed to process request for context: {}.", testContext); + response = null; } + + return response; }); } - ArrayList responseList = new ArrayList(); - LOG.info("Waiting for responses from endpoints. Number of contexts=" - + testContexts.size()); + ArrayList responseList = new ArrayList<>(); + LOG.info("Waiting for responses from endpoints. Number of contexts = {}.", testContexts.size()); for (int i = 0; i < testContexts.size(); ++i) { try { final Future future = completionService.take(); final R response = future.get(3000, TimeUnit.MILLISECONDS); responseList.add(response); } catch (Throwable e) { - LOG.error("Failed to process request " + e.getMessage()); + LOG.error("Failed to process request {}", e.getMessage()); } } @@ -291,29 +279,19 @@ protected RegisterApplicationMasterResponse registerApplicationMaster( final int testAppId) throws Exception, YarnException, IOException { final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId); - return ugi - .getUser() - .doAs( - new PrivilegedExceptionAction() { - @Override - public RegisterApplicationMasterResponse run() - throws Exception { - getAMRMProxyService().initApp( - ugi.getAppAttemptId(), - ugi.getUser().getUserName()); - - final RegisterApplicationMasterRequest req = - Records - .newRecord(RegisterApplicationMasterRequest.class); - req.setHost(Integer.toString(testAppId)); - req.setRpcPort(testAppId); - req.setTrackingUrl(""); - - RegisterApplicationMasterResponse response = - getAMRMProxyService().registerApplicationMaster(req); - return response; - } - }); + return ugi.getUser().doAs((PrivilegedExceptionAction) () -> { + getAMRMProxyService().initApp(ugi.getAppAttemptId(), ugi.getUser().getUserName()); + + final RegisterApplicationMasterRequest req = + Records.newRecord(RegisterApplicationMasterRequest.class); + req.setHost(Integer.toString(testAppId)); + req.setRpcPort(testAppId); + req.setTrackingUrl(""); + + RegisterApplicationMasterResponse response = + getAMRMProxyService().registerApplicationMaster(req); + return response; + }); } /** @@ -327,37 +305,30 @@ protected List> registerApplication final ArrayList testContexts) { List> responses = runInParallel(testContexts, - new Function>() { - @Override - public RegisterApplicationMasterResponseInfo invoke( - T testContext) { - RegisterApplicationMasterResponseInfo response = null; - try { - int index = testContexts.indexOf(testContext); - response = - new RegisterApplicationMasterResponseInfo( - registerApplicationMaster(index), testContext); - Assert.assertNotNull(response.getResponse()); - Assert.assertEquals(Integer.toString(index), response - .getResponse().getQueue()); - - LOG.info("Successfully registered application master with test context: " - + testContext); - } catch (Throwable ex) { - response = null; - LOG.error("Failed to register application master with test context: " - + testContext); - } - - return response; - } - }); + testContext -> { + RegisterApplicationMasterResponseInfo response; + try { + int index = testContexts.indexOf(testContext); + response = new RegisterApplicationMasterResponseInfo<>( + registerApplicationMaster(index), testContext); + Assert.assertNotNull(response.getResponse()); + Assert.assertEquals(Integer.toString(index), response.getResponse().getQueue()); + + LOG.info("Successfully registered application master with test context: {}.", + testContext); + } catch (Throwable ex) { + response = null; + LOG.error("Failed to register application master with test context: {}.", + testContext); + } + + return response; + }); - Assert.assertEquals( - "Number of responses received does not match with request", + Assert.assertEquals("Number of responses received does not match with request", testContexts.size(), responses.size()); - Set contextResponses = new TreeSet(); + Set contextResponses = new TreeSet<>(); for (RegisterApplicationMasterResponseInfo item : responses) { contextResponses.add(item.getTestContext()); } @@ -410,37 +381,28 @@ protected List> finishApplicationMast final ArrayList testContexts) { List> responses = runInParallel(testContexts, - new Function>() { - @Override - public FinishApplicationMasterResponseInfo invoke( - T testContext) { - FinishApplicationMasterResponseInfo response = null; - try { - response = - new FinishApplicationMasterResponseInfo( - finishApplicationMaster( - testContexts.indexOf(testContext), - FinalApplicationStatus.SUCCEEDED), - testContext); - Assert.assertNotNull(response.getResponse()); - - LOG.info("Successfully finished application master with test contexts: " - + testContext); - } catch (Throwable ex) { - response = null; - LOG.error("Failed to finish application master with test context: " - + testContext); - } - - return response; + testContext -> { + FinishApplicationMasterResponseInfo response; + try { + response = new FinishApplicationMasterResponseInfo<>( + finishApplicationMaster(testContexts.indexOf(testContext), + FinalApplicationStatus.SUCCEEDED), testContext); + Assert.assertNotNull(response.getResponse()); + + LOG.info("Successfully finished application master with test contexts: {}.", + testContext); + } catch (Throwable ex) { + response = null; + LOG.error("Failed to finish application master with test context: {}.", + testContext); } + return response; }); - Assert.assertEquals( - "Number of responses received does not match with request", + Assert.assertEquals("Number of responses received does not match with request", testContexts.size(), responses.size()); - Set contextResponses = new TreeSet(); + Set contextResponses = new TreeSet<>(); for (FinishApplicationMasterResponseInfo item : responses) { Assert.assertNotNull(item); Assert.assertNotNull(item.getResponse()); @@ -455,27 +417,19 @@ public FinishApplicationMasterResponseInfo invoke( } protected AllocateResponse allocate(final int testAppId) - throws Exception, YarnException, IOException { + throws Exception { final AllocateRequest req = Records.newRecord(AllocateRequest.class); req.setResponseId(testAppId); return allocate(testAppId, req); } - protected AllocateResponse allocate(final int testAppId, - final AllocateRequest request) throws Exception, YarnException, - IOException { + protected AllocateResponse allocate(final int testAppId, final AllocateRequest request) + throws Exception { final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId); - return ugi.getUser().doAs( - new PrivilegedExceptionAction() { - @Override - public AllocateResponse run() throws Exception { - AllocateResponse response = - getAMRMProxyService().allocate(request); - return response; - } - }); + return ugi.getUser().doAs((PrivilegedExceptionAction) + () -> getAMRMProxyService().allocate(request)); } protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) { @@ -490,44 +444,37 @@ protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) { } protected List createResourceRequests(String[] hosts, - int memory, int vCores, int priority, int containers) - throws Exception { + int memory, int vCores, int priority, int containers) { return createResourceRequests(hosts, memory, vCores, priority, containers, null); } protected List createResourceRequests(String[] hosts, - int memory, int vCores, int priority, int containers, - String labelExpression) throws Exception { - List reqs = new ArrayList(); + int memory, int vCores, int priority, int containers, String labelExpression) { + List reqs = new ArrayList<>(); for (String host : hosts) { - ResourceRequest hostReq = - createResourceRequest(host, memory, vCores, priority, - containers, labelExpression); + ResourceRequest hostReq = createResourceRequest(host, memory, vCores, priority, + containers, labelExpression); reqs.add(hostReq); - ResourceRequest rackReq = - createResourceRequest("/default-rack", memory, vCores, priority, - containers, labelExpression); + ResourceRequest rackReq = createResourceRequest("/default-rack", memory, vCores, priority, + containers, labelExpression); reqs.add(rackReq); } - ResourceRequest offRackReq = - createResourceRequest(ResourceRequest.ANY, memory, vCores, - priority, containers, labelExpression); + ResourceRequest offRackReq = createResourceRequest(ResourceRequest.ANY, memory, vCores, + priority, containers, labelExpression); reqs.add(offRackReq); return reqs; } protected ResourceRequest createResourceRequest(String resource, - int memory, int vCores, int priority, int containers) - throws Exception { - return createResourceRequest(resource, memory, vCores, priority, - containers, null); + int memory, int vCores, int priority, int containers) { + return createResourceRequest(resource, memory, vCores, priority, containers, null); } protected ResourceRequest createResourceRequest(String resource, int memory, int vCores, int priority, int containers, - String labelExpression) throws Exception { + String labelExpression) { ResourceRequest req = Records.newRecord(ResourceRequest.class); req.setResourceName(resource); req.setNumContainers(containers); @@ -548,8 +495,8 @@ protected ResourceRequest createResourceRequest(String resource, /** * Returns an ApplicationId with the specified identifier * - * @param testAppId - * @return + * @param testAppId testApplication. + * @return ApplicationId. */ protected ApplicationId getApplicationId(int testAppId) { return ApplicationId.newInstance(123456, testAppId); @@ -559,8 +506,8 @@ protected ApplicationId getApplicationId(int testAppId) { * Return an instance of ApplicationAttemptId using specified identifier. This * identifier will be used for the ApplicationId too. * - * @param testAppId - * @return + * @param testAppId testApplicationId. + * @return ApplicationAttemptId. */ protected ApplicationAttemptId getApplicationAttemptId(int testAppId) { return ApplicationAttemptId.newInstance(getApplicationId(testAppId), @@ -571,8 +518,8 @@ protected ApplicationAttemptId getApplicationAttemptId(int testAppId) { * Return an instance of ApplicationAttemptId using specified identifier and * application id * - * @param testAppId - * @return + * @param testAppId testApplicationId. + * @return ApplicationAttemptId. */ protected ApplicationAttemptId getApplicationAttemptId(int testAppId, ApplicationId appId) { @@ -580,8 +527,8 @@ protected ApplicationAttemptId getApplicationAttemptId(int testAppId, } protected static class RegisterApplicationMasterResponseInfo { - private RegisterApplicationMasterResponse response; - private T testContext; + private final RegisterApplicationMasterResponse response; + private final T testContext; RegisterApplicationMasterResponseInfo( RegisterApplicationMasterResponse response, T testContext) { @@ -599,8 +546,8 @@ public T getTestContext() { } protected static class FinishApplicationMasterResponseInfo { - private FinishApplicationMasterResponse response; - private T testContext; + private final FinishApplicationMasterResponse response; + private final T testContext; FinishApplicationMasterResponseInfo( FinishApplicationMasterResponse response, T testContext) { @@ -618,8 +565,8 @@ public T getTestContext() { } protected static class ApplicationUserInfo { - private UserGroupInformation user; - private ApplicationAttemptId attemptId; + private final UserGroupInformation user; + private final ApplicationAttemptId attemptId; ApplicationUserInfo(UserGroupInformation user, ApplicationAttemptId attemptId) { @@ -654,12 +601,12 @@ protected void serviceStart() throws Exception { * actual service, the initialization is called by the * ContainerManagerImpl::StartContainers method * - * @param applicationId - * @param user + * @param applicationId ApplicationAttemptId + * @param user username. */ public void initApp(ApplicationAttemptId applicationId, String user) { super.initializePipeline(applicationId, user, - new Token(), null, null, false, null); + new Token<>(), null, null, false, null); } public void stopApp(ApplicationId applicationId) { @@ -672,7 +619,7 @@ public void stopApp(ApplicationId applicationId) { * invoked asynchronously at a later point. */ protected interface Function { - public R invoke(T input); + R invoke(T input); } protected class NullContext implements Context { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java index 134edb7d6e4d1..f4c10eb31a34b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java @@ -86,7 +86,7 @@ public void testAllocateRequestWithNullValues() throws Exception { finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); + Assert.assertTrue(finishResponse.getIsUnregistered()); Assert.assertEquals(failedAppStartRequests, metrics.getFailedAppStartRequests()); Assert.assertEquals(failedRegisterAMRequests, metrics.getFailedRegisterAMRequests()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 10a68cfb33a25..30ed6b61a481c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; @@ -58,6 +57,8 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { /** * Test if the pipeline is created properly. + * + * @throws Exception There was an error registerApplicationMaster. */ @Test public void testRequestInterceptorChainCreation() throws Exception { @@ -73,8 +74,7 @@ public void testRequestInterceptorChainCreation() throws Exception { root.getClass().getName()); break; case 3: - Assert.assertEquals(MockRequestInterceptor.class.getName(), root - .getClass().getName()); + Assert.assertEquals(MockRequestInterceptor.class.getName(), root.getClass().getName()); break; } @@ -82,8 +82,7 @@ public void testRequestInterceptorChainCreation() throws Exception { index++; } - Assert.assertEquals( - "The number of interceptors in chain does not match", + Assert.assertEquals("The number of interceptors in chain does not match", Integer.toString(4), Integer.toString(index)); } @@ -91,7 +90,7 @@ public void testRequestInterceptorChainCreation() throws Exception { /** * Tests registration of a single application master. * - * @throws Exception + * @throws Exception There was an error registerApplicationMaster. */ @Test public void testRegisterOneApplicationMaster() throws Exception { @@ -99,8 +98,7 @@ public void testRegisterOneApplicationMaster() throws Exception { // manager return it as the queue name. Assert that we received the queue // name int testAppId = 1; - RegisterApplicationMasterResponse response1 = - registerApplicationMaster(testAppId); + RegisterApplicationMasterResponse response1 = registerApplicationMaster(testAppId); Assert.assertNotNull(response1); Assert.assertEquals(Integer.toString(testAppId), response1.getQueue()); } @@ -108,7 +106,7 @@ public void testRegisterOneApplicationMaster() throws Exception { /** * Tests the case when interceptor pipeline initialization fails. * - * @throws IOException + * @throws IOException There was an error registerApplicationMaster. */ @Test public void testInterceptorInitFailure() throws IOException { @@ -127,9 +125,8 @@ public void testInterceptorInitFailure() throws IOException { Map pipelines = getAMRMProxyService().getPipelines(); ApplicationId id = getApplicationId(testAppId); - Assert.assertTrue( - "The interceptor pipeline should be removed if initialization fails", - pipelines.get(id) == null); + Assert.assertNull("The interceptor pipeline should be removed if initialization fails", + pipelines.get(id)); } } @@ -137,28 +134,24 @@ public void testInterceptorInitFailure() throws IOException { * Tests the registration of multiple application master serially one at a * time. * - * @throws Exception + * @throws Exception There was an error registerApplicationMaster. */ @Test public void testRegisterMultipleApplicationMasters() throws Exception { for (int testAppId = 0; testAppId < 3; testAppId++) { - RegisterApplicationMasterResponse response = - registerApplicationMaster(testAppId); + RegisterApplicationMasterResponse response = registerApplicationMaster(testAppId); Assert.assertNotNull(response); - Assert - .assertEquals(Integer.toString(testAppId), response.getQueue()); + Assert.assertEquals(Integer.toString(testAppId), response.getQueue()); } } /** * Tests the registration of multiple application masters using multiple * threads in parallel. - * - * @throws Exception + * */ @Test - public void testRegisterMultipleApplicationMastersInParallel() - throws Exception { + public void testRegisterMultipleApplicationMastersInParallel() { int numberOfRequests = 5; ArrayList testContexts = CreateTestRequestIdentifiers(numberOfRequests); @@ -167,10 +160,10 @@ public void testRegisterMultipleApplicationMastersInParallel() private ArrayList CreateTestRequestIdentifiers( int numberOfRequests) { - ArrayList testContexts = new ArrayList(); + ArrayList testContexts = new ArrayList<>(); LOG.info("Creating " + numberOfRequests + " contexts for testing"); for (int ep = 0; ep < numberOfRequests; ep++) { - testContexts.add("test-endpoint-" + Integer.toString(ep)); + testContexts.add("test-endpoint-" + ep); LOG.info("Created test context: " + testContexts.get(ep)); } return testContexts; @@ -190,7 +183,7 @@ public void testFinishOneApplicationMasterWithSuccess() throws Exception { FinalApplicationStatus.SUCCEEDED); Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); + Assert.assertTrue(finishResponse.getIsUnregistered()); } @Test @@ -219,7 +212,7 @@ public void testFinishOneApplicationMasterWithFailure() throws Exception { } @Test - public void testFinishInvalidApplicationMaster() throws Exception { + public void testFinishInvalidApplicationMaster() { try { // Try to finish an application master that was not registered. finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED); @@ -248,11 +241,10 @@ public void testFinishMultipleApplicationMasters() throws Exception { finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED); Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); + Assert.assertTrue(finishResponse.getIsUnregistered()); // Assert that the application has been removed from the collection - Assert.assertTrue(this.getAMRMProxyService() - .getPipelines().size() == index); + Assert.assertEquals(this.getAMRMProxyService().getPipelines().size(), index); } try { @@ -280,10 +272,10 @@ public void testFinishMultipleApplicationMasters() throws Exception { public void testFinishMultipleApplicationMastersInParallel() throws Exception { int numberOfRequests = 5; - ArrayList testContexts = new ArrayList(); - LOG.info("Creating " + numberOfRequests + " contexts for testing"); + ArrayList testContexts = new ArrayList<>(); + LOG.info("Creating {} contexts for testing", numberOfRequests); for (int i = 0; i < numberOfRequests; i++) { - testContexts.add("test-endpoint-" + Integer.toString(i)); + testContexts.add("test-endpoint-" + i); LOG.info("Created test context: " + testContexts.get(i)); RegisterApplicationMasterResponse registerResponse = @@ -313,11 +305,11 @@ public void testAllocateRequestWithNullValues() throws Exception { FinalApplicationStatus.SUCCEEDED); Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); + Assert.assertTrue(finishResponse.getIsUnregistered()); } @Test - public void testAllocateRequestWithoutRegistering() throws Exception { + public void testAllocateRequestWithoutRegistering() { try { // Try to allocate an application master without registering. @@ -381,51 +373,41 @@ public void testAllocateAndReleaseContainersForMultipleAM() public void testAllocateAndReleaseContainersForMultipleAMInParallel() throws Exception { int numberOfApps = 6; - ArrayList tempAppIds = new ArrayList(); + ArrayList tempAppIds = new ArrayList<>(); for (int i = 0; i < numberOfApps; i++) { - tempAppIds.add(new Integer(i)); + tempAppIds.add(i); } - final ArrayList appIds = tempAppIds; List responses = - runInParallel(appIds, new Function() { - @Override - public Integer invoke(Integer testAppId) { - try { - RegisterApplicationMasterResponse registerResponse = - registerApplicationMaster(testAppId); - Assert.assertNotNull("response is null", registerResponse); - List containers = - getContainersAndAssert(testAppId, 10); - releaseContainersAndAssert(testAppId, containers); - - LOG.info("Successfully registered application master with appId: " - + testAppId); - } catch (Throwable ex) { - LOG.error( - "Failed to register application master with appId: " - + testAppId, ex); - testAppId = null; - } - - return testAppId; + runInParallel(tempAppIds, testAppId -> { + try { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull("response is null", registerResponse); + List containers = + getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + + LOG.info("Successfully registered application master with appId: {}", testAppId); + } catch (Throwable ex) { + LOG.error("Failed to register application master with appId: {}", testAppId, ex); + testAppId = null; } + + return testAppId; }); - Assert.assertEquals( - "Number of responses received does not match with request", - appIds.size(), responses.size()); + Assert.assertEquals("Number of responses received does not match with request", + tempAppIds.size(), responses.size()); for (Integer testAppId : responses) { Assert.assertNotNull(testAppId); - finishApplicationMaster(testAppId.intValue(), - FinalApplicationStatus.SUCCEEDED); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); } } @Test - public void testMultipleAttemptsSameNode() - throws YarnException, IOException, Exception { + public void testMultipleAttemptsSameNode() throws Exception { String user = "hadoop"; ApplicationId appId = ApplicationId.newInstance(1, 1); @@ -444,7 +426,7 @@ public void testMultipleAttemptsSameNode() applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2); getAMRMProxyService().initializePipeline(applicationAttemptId, user, - new Token(), null, null, false, null); + new Token<>(), null, null, false, null); RequestInterceptorChainWrapper chain2 = getAMRMProxyService().getPipelines().get(appId); @@ -461,13 +443,10 @@ private List getContainersAndAssert(int appId, allocateRequest.setResponseId(1); List containers = - new ArrayList(numberOfResourceRequests); - List askList = - new ArrayList(numberOfResourceRequests); + new ArrayList<>(numberOfResourceRequests); + List askList = new ArrayList<>(numberOfResourceRequests); for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) { - askList.add(createResourceRequest( - "test-node-" + Integer.toString(testAppId), 6000, 2, - testAppId % 5, 1)); + askList.add(createResourceRequest("test-node-" + testAppId, 6000, 2, testAppId % 5, 1)); } allocateRequest.setAskList(askList); @@ -495,11 +474,9 @@ private List getContainersAndAssert(int appId, containers.addAll(allocateResponse.getAllocatedContainers()); - LOG.info("Number of allocated containers in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers() - .size())); - LOG.info("Total number of allocated containers: " - + Integer.toString(containers.size())); + LOG.info("Number of allocated containers in this request: {}.", + allocateResponse.getAllocatedContainers().size()); + LOG.info("Total number of allocated containers: {}.", containers.size()); Thread.sleep(10); } @@ -517,8 +494,7 @@ private void releaseContainersAndAssert(int appId, Records.newRecord(AllocateRequest.class); allocateRequest.setResponseId(1); - List relList = - new ArrayList(containers.size()); + List relList = new ArrayList<>(containers.size()); for (Container container : containers) { relList.add(container.getId()); } @@ -556,23 +532,21 @@ private void releaseContainersAndAssert(int appId, allocateResponse.getCompletedContainersStatuses()); containersForReleasedContainerIds.addAll(newlyFinished); - LOG.info("Number of containers received in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers() - .size())); - LOG.info("Total number of containers received: " - + Integer.toString(containersForReleasedContainerIds.size())); + LOG.info("Number of containers received in this request: {}.", + allocateResponse.getAllocatedContainers().size()); + LOG.info("Total number of containers received: {}.", + containersForReleasedContainerIds.size()); Thread.sleep(10); } - Assert.assertEquals(relList.size(), - containersForReleasedContainerIds.size()); + Assert.assertEquals(relList.size(), containersForReleasedContainerIds.size()); } /** * Test AMRMProxy restart with recovery. */ @Test - public void testRecovery() throws YarnException, Exception { + public void testRecovery() throws Exception { Configuration conf = createConfiguration(); // Use the MockRequestInterceptorAcrossRestart instead for the chain @@ -602,7 +576,7 @@ public void testRecovery() throws YarnException, Exception { // At the time of kill, app1 just registerAM, app2 already did one allocate. // Both application should be recovered createAndStartAMRMProxyService(conf); - Assert.assertTrue(getAMRMProxyService().getPipelines().size() == 2); + Assert.assertEquals(2, getAMRMProxyService().getPipelines().size()); allocateResponse = allocate(testAppId1); Assert.assertNotNull(allocateResponse); @@ -610,7 +584,7 @@ public void testRecovery() throws YarnException, Exception { FinishApplicationMasterResponse finishResponse = finishApplicationMaster(testAppId1, FinalApplicationStatus.SUCCEEDED); Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); + Assert.assertTrue(finishResponse.getIsUnregistered()); allocateResponse = allocate(testAppId2); Assert.assertNotNull(allocateResponse); @@ -619,7 +593,7 @@ public void testRecovery() throws YarnException, Exception { finishApplicationMaster(testAppId2, FinalApplicationStatus.SUCCEEDED); Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); + Assert.assertTrue(finishResponse.getIsUnregistered()); int testAppId3 = 3; try { @@ -664,23 +638,21 @@ public void testAppRecoveryFailure() throws YarnException, Exception { @Test public void testCheckIfAppExistsInStateStore() - throws IOException, YarnException { + throws IOException { ApplicationId appId = ApplicationId.newInstance(0, 0); Configuration conf = createConfiguration(); conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); createAndStartAMRMProxyService(conf); - Assert.assertEquals(false, - getAMRMProxyService().checkIfAppExistsInStateStore(appId)); + Assert.assertFalse(getAMRMProxyService().checkIfAppExistsInStateStore(appId)); Configuration distConf = createConfiguration(); conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); createAndStartAMRMProxyService(distConf); - Assert.assertEquals(true, - getAMRMProxyService().checkIfAppExistsInStateStore(appId)); + Assert.assertTrue(getAMRMProxyService().checkIfAppExistsInStateStore(appId)); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 9e3e73f7f9995..b540e7650ebe2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -203,7 +203,7 @@ private List getContainersAndAssert(int numberOfResourceRequests, List containers = new ArrayList<>(numberOfResourceRequests); List askList = new ArrayList<>(numberOfResourceRequests); for (int id = 0; id < numberOfResourceRequests; id++) { - askList.add(createResourceRequest("test-node-" + Integer.toString(id), + askList.add(createResourceRequest("test-node-" + id, 6000, 2, id % 5, 1)); } @@ -217,7 +217,7 @@ private List getContainersAndAssert(int numberOfResourceRequests, containers.addAll(allocateResponse.getAllocatedContainers()); LOG.info("Number of allocated containers in the original request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + allocateResponse.getAllocatedContainers().size()); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -236,10 +236,9 @@ private List getContainersAndAssert(int numberOfResourceRequests, interceptor.drainAllAsyncQueue(false); containers.addAll(allocateResponse.getAllocatedContainers()); - LOG.info("Number of allocated containers in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); - LOG.info("Total number of allocated containers: " - + Integer.toString(containers.size())); + LOG.info("Number of allocated containers in this request: {}.", + allocateResponse.getAllocatedContainers().size()); + LOG.info("Total number of allocated containers: {}.", containers.size()); Thread.sleep(10); } Assert.assertEquals(numberOfAllocationExcepted, containers.size()); @@ -250,7 +249,7 @@ private void releaseContainersAndAssert(List containers) throws Exception { Assert.assertTrue(containers.size() > 0); AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - List relList = new ArrayList(containers.size()); + List relList = new ArrayList<>(containers.size()); for (Container container : containers) { relList.add(container.getId()); } @@ -267,8 +266,7 @@ private void releaseContainersAndAssert(List containers) // The release containers returned by the mock resource managers will be // aggregated and returned back to us, and we can check if total request size // and returned size are the same - List containersForReleasedContainerIds = - new ArrayList(); + List containersForReleasedContainerIds = new ArrayList<>(); List newlyFinished = getCompletedContainerIds( allocateResponse.getCompletedContainersStatuses()); containersForReleasedContainerIds.addAll(newlyFinished); @@ -306,7 +304,7 @@ private void releaseContainersAndAssert(List containers) private void checkAMRMToken(Token amrmToken) { if (amrmToken != null) { // The token should be the one issued by home MockRM - Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0))); + Assert.assertEquals(Integer.toString(0), amrmToken.getKind()); } } @@ -314,69 +312,66 @@ private void checkAMRMToken(Token amrmToken) { public void testMultipleSubClusters() throws Exception { UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(0); - registerReq.setTrackingUrl(""); - - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); - lastResponseId = 0; + ugi.doAs((PrivilegedExceptionAction) () -> { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); - Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); - - // Allocate the first batch of containers, with sc1 and sc2 active - registerSubCluster(SubClusterId.newInstance("SC-1")); - registerSubCluster(SubClusterId.newInstance("SC-2")); - - int numberOfContainers = 3; - List containers = - getContainersAndAssert(numberOfContainers, numberOfContainers * 2); - Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); - - // Allocate the second batch of containers, with sc1 and sc3 active - deRegisterSubCluster(SubClusterId.newInstance("SC-2")); - registerSubCluster(SubClusterId.newInstance("SC-3")); - - numberOfContainers = 1; - containers.addAll( - getContainersAndAssert(numberOfContainers, numberOfContainers * 2)); - Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); - - // Allocate the third batch of containers with only in home sub-cluster - // active - deRegisterSubCluster(SubClusterId.newInstance("SC-1")); - deRegisterSubCluster(SubClusterId.newInstance("SC-3")); - registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); - - numberOfContainers = 2; - containers.addAll( - getContainersAndAssert(numberOfContainers, numberOfContainers * 1)); - Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); - - // Release all containers - releaseContainersAndAssert(containers); - - // Finish the application - FinishApplicationMasterRequest finishReq = - Records.newRecord(FinishApplicationMasterRequest.class); - finishReq.setDiagnostics(""); - finishReq.setTrackingUrl(""); - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - - FinishApplicationMasterResponse finishResponse = - interceptor.finishApplicationMaster(finishReq); - Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); - - return null; - } + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 and sc2 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance("SC-2")); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the second batch of containers, with sc1 and sc3 active + deRegisterSubCluster(SubClusterId.newInstance("SC-2")); + registerSubCluster(SubClusterId.newInstance("SC-3")); + + numberOfContainers = 1; + containers.addAll( + getContainersAndAssert(numberOfContainers, numberOfContainers * 2)); + Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the third batch of containers with only in home sub-cluster + // active + deRegisterSubCluster(SubClusterId.newInstance("SC-1")); + deRegisterSubCluster(SubClusterId.newInstance("SC-3")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + numberOfContainers = 2; + containers.addAll( + getContainersAndAssert(numberOfContainers, numberOfContainers)); + Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertTrue(finishResponse.getIsUnregistered()); + + return null; }); } @@ -387,55 +382,52 @@ public Object run() throws Exception { public void testReregister() throws Exception { UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - - // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(0); - registerReq.setTrackingUrl(""); - - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); - lastResponseId = 0; + ugi.doAs((PrivilegedExceptionAction) () -> { + + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); - Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; - // Allocate the first batch of containers - registerSubCluster(SubClusterId.newInstance("SC-1")); - registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); - interceptor.setShouldReRegisterNext(); + // Allocate the first batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); - int numberOfContainers = 3; - List containers = - getContainersAndAssert(numberOfContainers, numberOfContainers * 2); - Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + interceptor.setShouldReRegisterNext(); - interceptor.setShouldReRegisterNext(); + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - // Release all containers - releaseContainersAndAssert(containers); + interceptor.setShouldReRegisterNext(); - interceptor.setShouldReRegisterNext(); + // Release all containers + releaseContainersAndAssert(containers); - // Finish the application - FinishApplicationMasterRequest finishReq = - Records.newRecord(FinishApplicationMasterRequest.class); - finishReq.setDiagnostics(""); - finishReq.setTrackingUrl(""); - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + interceptor.setShouldReRegisterNext(); - FinishApplicationMasterResponse finishResponse = - interceptor.finishApplicationMaster(finishReq); - Assert.assertNotNull(finishResponse); - Assert.assertTrue(finishResponse.getIsUnregistered()); - return null; - } + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertTrue(finishResponse.getIsUnregistered()); + return null; }); } @@ -498,7 +490,7 @@ public class ConcurrentRegisterAMCallable implements Callable { @Override public RegisterApplicationMasterResponse call() throws Exception { - RegisterApplicationMasterResponse response = null; + RegisterApplicationMasterResponse response; try { // Use port number 1001 to let mock RM block in the register call response = interceptor.registerApplicationMaster( @@ -536,110 +528,107 @@ protected void testRecover(final RegistryOperations registryObj) throws Exception { UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - interceptor = new TestableFederationInterceptor(); - interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, - getConf(), attemptId, "test-user", null, null, null, registryObj)); - interceptor.cleanupRegistry(); - - // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(testAppId); - registerReq.setTrackingUrl(""); - - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); - lastResponseId = 0; + ugi.doAs((PrivilegedExceptionAction) () -> { + interceptor = new TestableFederationInterceptor(); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registryObj)); + interceptor.cleanupRegistry(); - Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); - // Allocate one batch of containers - registerSubCluster(SubClusterId.newInstance("SC-1")); - registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; - int numberOfContainers = 3; - List containers = - getContainersAndAssert(numberOfContainers, numberOfContainers * 2); - Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); - // Make sure all async hb threads are done - interceptor.drainAllAsyncQueue(true); + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); - // Prepare for Federation Interceptor restart and recover - Map recoveredDataMap = - recoverDataMapForAppAttempt(nmStateStore, attemptId); - String scEntry = - FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1"; - if (registryObj == null) { - Assert.assertTrue(recoveredDataMap.containsKey(scEntry)); - } else { - // When AMRMPRoxy HA is enabled, NMSS should not have the UAM token, - // it should be in Registry - Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); - } - - // Preserve the mock RM instances - MockResourceManagerFacade homeRM = interceptor.getHomeRM(); - ConcurrentHashMap secondaries = - interceptor.getSecondaryRMs(); - - // Create a new interceptor instance and recover - interceptor = new TestableFederationInterceptor(homeRM, secondaries); - interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, - getConf(), attemptId, "test-user", null, null, null, registryObj)); - interceptor.recover(recoveredDataMap); - - Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - // SC1 should be initialized to be timed out - Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); - - // The first allocate call expects a fail-over exception and re-register - try { - AllocateRequest allocateRequest = - Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(lastResponseId); - AllocateResponse allocateResponse = - interceptor.allocate(allocateRequest); - lastResponseId = allocateResponse.getResponseId(); - Assert.fail("Expecting an ApplicationMasterNotRegisteredException " - + " after FederationInterceptor restarts and recovers"); - } catch (ApplicationMasterNotRegisteredException e) { - } - interceptor.registerApplicationMaster(registerReq); - lastResponseId = 0; + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - // Release all containers - releaseContainersAndAssert(containers); - - // Finish the application - FinishApplicationMasterRequest finishReq = - Records.newRecord(FinishApplicationMasterRequest.class); - finishReq.setDiagnostics(""); - finishReq.setTrackingUrl(""); - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - - FinishApplicationMasterResponse finishResponse = - interceptor.finishApplicationMaster(finishReq); - Assert.assertNotNull(finishResponse); - Assert.assertTrue(finishResponse.getIsUnregistered()); - - // After the application succeeds, the registry/NMSS entry should be - // cleaned up - if (registryObj != null) { - Assert.assertEquals(0, - interceptor.getRegistryClient().getAllApplications().size()); - } else { - recoveredDataMap = - recoverDataMapForAppAttempt(nmStateStore, attemptId); - Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); - } - return null; + // Make sure all async hb threads are done + interceptor.drainAllAsyncQueue(true); + + // Prepare for Federation Interceptor restart and recover + Map recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + String scEntry = + FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1"; + if (registryObj == null) { + Assert.assertTrue(recoveredDataMap.containsKey(scEntry)); + } else { + // When AMRMPRoxy HA is enabled, NMSS should not have the UAM token, + // it should be in Registry + Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); } + + // Preserve the mock RM instances + MockResourceManagerFacade homeRM = interceptor.getHomeRM(); + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Create a new interceptor instance and recover + interceptor = new TestableFederationInterceptor(homeRM, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registryObj)); + interceptor.recover(recoveredDataMap); + + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // SC1 should be initialized to be timed out + Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); + + // The first allocate call expects a fail-over exception and re-register + try { + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(lastResponseId); + AllocateResponse allocateResponse = + interceptor.allocate(allocateRequest); + lastResponseId = allocateResponse.getResponseId(); + Assert.fail("Expecting an ApplicationMasterNotRegisteredException " + + " after FederationInterceptor restarts and recovers"); + } catch (ApplicationMasterNotRegisteredException e) { + } + interceptor.registerApplicationMaster(registerReq); + lastResponseId = 0; + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertTrue(finishResponse.getIsUnregistered()); + + // After the application succeeds, the registry/NMSS entry should be + // cleaned up + if (registryObj != null) { + Assert.assertEquals(0, + interceptor.getRegistryClient().getAllApplications().size()); + } else { + recoveredDataMap = + recoverDataMapForAppAttempt(nmStateStore, attemptId); + Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); + } + return null; }); } @@ -774,53 +763,48 @@ public void testAllocateResponse() throws Exception { public void testSubClusterTimeOut() throws Exception { UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - // Register the application first time - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(0); - registerReq.setTrackingUrl(""); - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); - lastResponseId = 0; + ugi.doAs((PrivilegedExceptionAction) () -> { + // Register the application first time + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; - registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance("SC-1")); - getContainersAndAssert(1, 1); + getContainersAndAssert(1, 1); - AllocateResponse allocateResponse = - interceptor.generateBaseAllocationResponse(); - Assert.assertEquals(2, allocateResponse.getNumClusterNodes()); - Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size()); + AllocateResponse allocateResponse = interceptor.generateBaseAllocationResponse(); + Assert.assertEquals(2, allocateResponse.getNumClusterNodes()); + Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size()); - // Let all SC timeout (home and SC-1), without an allocate from AM - Thread.sleep(800); + // Let all SC timeout (home and SC-1), without an allocate from AM + Thread.sleep(800); - // Should not be considered timeout, because there's no recent AM - // heartbeat - allocateResponse = interceptor.generateBaseAllocationResponse(); - Assert.assertEquals(2, allocateResponse.getNumClusterNodes()); - Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size()); + // Should not be considered timeout, because there's no recent AM + // heartbeat + allocateResponse = interceptor.generateBaseAllocationResponse(); + Assert.assertEquals(2, allocateResponse.getNumClusterNodes()); + Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size()); - // Generate a duplicate heartbeat from AM, so that it won't really - // trigger a heartbeat to all SC - AllocateRequest allocateRequest = - Records.newRecord(AllocateRequest.class); - // Set to lastResponseId - 1 so that it will be considered a duplicate - // heartbeat and thus not forwarded to all SCs - allocateRequest.setResponseId(lastResponseId - 1); - interceptor.allocate(allocateRequest); - - // Should be considered timeout - allocateResponse = interceptor.generateBaseAllocationResponse(); - Assert.assertEquals(0, allocateResponse.getNumClusterNodes()); - Assert.assertEquals(2, interceptor.getTimedOutSCs(true).size()); - return null; - } + // Generate a duplicate heartbeat from AM, so that it won't really + // trigger a heartbeat to all SC + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + // Set to lastResponseId - 1 so that it will be considered a duplicate + // heartbeat and thus not forwarded to all SCs + allocateRequest.setResponseId(lastResponseId - 1); + interceptor.allocate(allocateRequest); + + // Should be considered timeout + allocateResponse = interceptor.generateBaseAllocationResponse(); + Assert.assertEquals(0, allocateResponse.getNumClusterNodes()); + Assert.assertEquals(2, interceptor.getTimedOutSCs(true).size()); + return null; }); } @@ -834,87 +818,81 @@ public void testSecondAttempt() throws Exception { UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - // Register the application - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); - lastResponseId = 0; + ugi.doAs((PrivilegedExceptionAction) () -> { + // Register the application + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; - Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); - // Allocate one batch of containers - registerSubCluster(SubClusterId.newInstance("SC-1")); - registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); - int numberOfContainers = 3; - List containers = - getContainersAndAssert(numberOfContainers, numberOfContainers * 2); - for (Container c : containers) { - LOG.info("Allocated container " + c.getId()); - } - Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + for (Container c : containers) { + LOG.info("Allocated container {}.", c.getId()); + } + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - // Make sure all async hb threads are done - interceptor.drainAllAsyncQueue(true); + // Make sure all async hb threads are done + interceptor.drainAllAsyncQueue(true); - // Preserve the mock RM instances for secondaries - ConcurrentHashMap secondaries = - interceptor.getSecondaryRMs(); + // Preserve the mock RM instances for secondaries + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); - // Increase the attemptId and create a new interceptor instance for it - attemptId = ApplicationAttemptId.newInstance( - attemptId.getApplicationId(), attemptId.getAttemptId() + 1); + // Increase the attemptId and create a new interceptor instance for it + attemptId = ApplicationAttemptId.newInstance( + attemptId.getApplicationId(), attemptId.getAttemptId() + 1); - interceptor = new TestableFederationInterceptor(null, secondaries); - interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, - getConf(), attemptId, "test-user", null, null, null, registry)); - return null; - } + interceptor = new TestableFederationInterceptor(null, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registry)); + return null; }); // Update the ugi with new attemptId ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - lastResponseId = 0; + ugi.doAs((PrivilegedExceptionAction) () -> { + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + lastResponseId = 0; + + int numberOfContainers = 3; + // Should re-attach secondaries and get the three running containers + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // SC1 should be initialized to be timed out + Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); + Assert.assertEquals(numberOfContainers, + registerResponse.getContainersFromPreviousAttempts().size()); - int numberOfContainers = 3; - // Should re-attach secondaries and get the three running containers - Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - // SC1 should be initialized to be timed out - Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); - Assert.assertEquals(numberOfContainers, - registerResponse.getContainersFromPreviousAttempts().size()); - - // Release all containers - releaseContainersAndAssert( - registerResponse.getContainersFromPreviousAttempts()); - - // Finish the application - FinishApplicationMasterRequest finishReq = - Records.newRecord(FinishApplicationMasterRequest.class); - finishReq.setDiagnostics(""); - finishReq.setTrackingUrl(""); - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - - FinishApplicationMasterResponse finishResponse = - interceptor.finishApplicationMaster(finishReq); - Assert.assertNotNull(finishResponse); - Assert.assertTrue(finishResponse.getIsUnregistered()); - - // After the application succeeds, the registry entry should be deleted - if (interceptor.getRegistryClient() != null) { - Assert.assertEquals(0, - interceptor.getRegistryClient().getAllApplications().size()); - } - return null; + // Release all containers + releaseContainersAndAssert( + registerResponse.getContainersFromPreviousAttempts()); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertTrue(finishResponse.getIsUnregistered()); + + // After the application succeeds, the registry entry should be deleted + if (interceptor.getRegistryClient() != null) { + Assert.assertEquals(0, + interceptor.getRegistryClient().getAllApplications().size()); } + return null; }); } @@ -924,22 +902,17 @@ public void testMergeAllocateResponse() { ContainerStatus cStatus = Records.newRecord(ContainerStatus.class); cStatus.setContainerId(cid); Container container = - Container.newInstance(cid, null, null, null, null, null); - + Container.newInstance(cid, null, null, null, null, null); AllocateResponse homeResponse = Records.newRecord(AllocateResponse.class); homeResponse.setAllocatedContainers(Collections.singletonList(container)); - homeResponse.setCompletedContainersStatuses( - Collections.singletonList(cStatus)); - homeResponse.setUpdatedNodes( - Collections.singletonList(Records.newRecord(NodeReport.class))); - homeResponse.setNMTokens( - Collections.singletonList(Records.newRecord(NMToken.class))); - homeResponse.setUpdatedContainers( - Collections.singletonList( - Records.newRecord(UpdatedContainer.class))); - homeResponse.setUpdateErrors(Collections - .singletonList(Records.newRecord(UpdateContainerError.class))); + homeResponse.setCompletedContainersStatuses(Collections.singletonList(cStatus)); + homeResponse.setUpdatedNodes(Collections.singletonList(Records.newRecord(NodeReport.class))); + homeResponse.setNMTokens(Collections.singletonList(Records.newRecord(NMToken.class))); + homeResponse.setUpdatedContainers(Collections.singletonList( + Records.newRecord(UpdatedContainer.class))); + homeResponse.setUpdateErrors(Collections.singletonList( + Records.newRecord(UpdateContainerError.class))); homeResponse.setAvailableResources(Records.newRecord(Resource.class)); homeResponse.setPreemptionMessage(createDummyPreemptionMessage( ContainerId.newContainerId(attemptId, 0))); @@ -947,15 +920,12 @@ public void testMergeAllocateResponse() { AllocateResponse response = Records.newRecord(AllocateResponse.class); response.setAllocatedContainers(Collections.singletonList(container)); response.setCompletedContainersStatuses(Collections.singletonList(cStatus)); - response.setUpdatedNodes( - Collections.singletonList(Records.newRecord(NodeReport.class))); - response.setNMTokens( - Collections.singletonList(Records.newRecord(NMToken.class))); - response.setUpdatedContainers( - Collections.singletonList( - Records.newRecord(UpdatedContainer.class))); - response.setUpdateErrors(Collections - .singletonList(Records.newRecord(UpdateContainerError.class))); + response.setUpdatedNodes(Collections.singletonList(Records.newRecord(NodeReport.class))); + response.setNMTokens(Collections.singletonList(Records.newRecord(NMToken.class))); + response.setUpdatedContainers(Collections.singletonList( + Records.newRecord(UpdatedContainer.class))); + response.setUpdateErrors(Collections.singletonList( + Records.newRecord(UpdateContainerError.class))); response.setAvailableResources(Records.newRecord(Resource.class)); response.setPreemptionMessage(createDummyPreemptionMessage( ContainerId.newContainerId(attemptId, 1))); @@ -964,14 +934,10 @@ public void testMergeAllocateResponse() { response, SubClusterId.newInstance("SC-1")); Assert.assertEquals(2, - homeResponse.getPreemptionMessage().getContract() - .getContainers().size()); - Assert.assertEquals(2, - homeResponse.getAllocatedContainers().size()); - Assert.assertEquals(2, - homeResponse.getUpdatedNodes().size()); - Assert.assertEquals(2, - homeResponse.getCompletedContainersStatuses().size()); + homeResponse.getPreemptionMessage().getContract().getContainers().size()); + Assert.assertEquals(2, homeResponse.getAllocatedContainers().size()); + Assert.assertEquals(2, homeResponse.getUpdatedNodes().size()); + Assert.assertEquals(2, homeResponse.getCompletedContainersStatuses().size()); } private PreemptionMessage createDummyPreemptionMessage( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptorSecure.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptorSecure.java index 1069fc8e32b5a..0241f4e1aaee5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptorSecure.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptorSecure.java @@ -247,10 +247,10 @@ private void registerSubCluster(SubClusterId subClusterId) private List getContainersAndAssert(int numberOfResourceRequests, int numberOfAllocationExcepted) throws Exception { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - List containers = new ArrayList(numberOfResourceRequests); - List askList = new ArrayList(numberOfResourceRequests); + List containers = new ArrayList<>(numberOfResourceRequests); + List askList = new ArrayList<>(numberOfResourceRequests); for (int id = 0; id < numberOfResourceRequests; id++) { - askList.add(createResourceRequest("test-node-" + Integer.toString(id), 6000, 2, id % 5, 1)); + askList.add(createResourceRequest("test-node-" + id, 6000, 2, id % 5, 1)); } allocateRequest.setAskList(askList); @@ -280,9 +280,9 @@ private List getContainersAndAssert(int numberOfResourceRequests, interceptor.drainAllAsyncQueue(false); containers.addAll(allocateResponse.getAllocatedContainers()); - LOG.info("Number of allocated containers in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); - LOG.info("Total number of allocated containers: {}", containers.size()); + LOG.info("Number of allocated containers in this request: {}.", + allocateResponse.getAllocatedContainers().size()); + LOG.info("Total number of allocated containers: {}.", containers.size()); Thread.sleep(10); } Assert.assertEquals(numberOfAllocationExcepted, containers.size()); @@ -293,7 +293,7 @@ private void releaseContainersAndAssert(List containers) throws Exception { Assert.assertTrue(containers.size() > 0); AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - List relList = new ArrayList(containers.size()); + List relList = new ArrayList<>(containers.size()); for (Container container : containers) { relList.add(container.getId()); } @@ -308,9 +308,9 @@ private void releaseContainersAndAssert(List containers) // The release request will be split and handled by the corresponding UAM. // The release containers returned by the mock resource managers will be - // aggregated and returned back to us and we can check if total request size + // aggregated and returned back to us, and we can check if total request size // and returned size are the same - List containersForReleasedContainerIds = new ArrayList(); + List containersForReleasedContainerIds = new ArrayList<>(); List newlyFinished = getCompletedContainerIds( allocateResponse.getCompletedContainersStatuses()); containersForReleasedContainerIds.addAll(newlyFinished); @@ -331,8 +331,9 @@ private void releaseContainersAndAssert(List containers) newlyFinished = getCompletedContainerIds(allocateResponse.getCompletedContainersStatuses()); containersForReleasedContainerIds.addAll(newlyFinished); - LOG.info("Number of containers received in this request: ", newlyFinished.size()); - LOG.info("Total number of containers received: ", containersForReleasedContainerIds.size()); + LOG.info("Number of containers received in this request: {}.", newlyFinished.size()); + LOG.info("Total number of containers received: {}.", + containersForReleasedContainerIds.size()); Thread.sleep(10); } @@ -342,7 +343,7 @@ private void releaseContainersAndAssert(List containers) private void checkAMRMToken(Token amrmToken) { if (amrmToken != null) { // The token should be the one issued by home MockRM - Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0))); + Assert.assertEquals(Integer.toString(0), amrmToken.getKind()); } } @@ -360,106 +361,103 @@ protected void testRecoverWithRPCClientRM( final RegistryOperations registryObj) throws Exception { UserGroupInformation ugi = this.getUGIWithToken(attemptId); - ugi.doAs(new PrivilegedExceptionAction() { - @Override - public Object run() throws Exception { - interceptor = new TestableFederationInterceptor(mockHomeRm); - interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, conf, attemptId, - "test-user", null, null, null, registryObj)); - interceptor.cleanupRegistry(); - - // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(testAppId); - registerReq.setTrackingUrl(""); - - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); - lastResponseId = 0; - - Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); - - // Allocate one batch of containers - registerSubCluster(SubClusterId.newInstance(SC_ID1)); - registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); - - int numberOfContainers = 3; - List containers = - getContainersAndAssert(numberOfContainers, numberOfContainers * 2); - Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - - // Make sure all async hb threads are done - interceptor.drainAllAsyncQueue(true); - - // Prepare for Federation Interceptor restart and recover - Map recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId); - String scEntry = FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1"; - if (registryObj == null) { - Assert.assertTrue(recoveredDataMap.containsKey(scEntry)); - } else { - // When AMRMPRoxy HA is enabled, NMSS should not have the UAM token, - // it should be in Registry - Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); - } - - // Preserve the mock RM instances - MockResourceManagerFacade homeRM = interceptor.getHomeRM(); - ConcurrentHashMap secondaries = - interceptor.getSecondaryRMs(); - - // Create a new interceptor instance and recover - interceptor = new TestableFederationInterceptor(homeRM, secondaries); - interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, conf, attemptId, - "test-user", null, null, null, registryObj)); - interceptor.setClientRPC(true); - interceptor.recover(recoveredDataMap); - interceptor.setClientRPC(false); - - Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - // SC-1 should be initialized to be timed out - Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); - - // The first allocate call expects a fail-over exception and re-register - try { - AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(lastResponseId); - AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); - lastResponseId = allocateResponse.getResponseId(); - Assert.fail("Expecting an ApplicationMasterNotRegisteredException " - + " after FederationInterceptor restarts and recovers"); - } catch (ApplicationMasterNotRegisteredException e) { - } - interceptor.registerApplicationMaster(registerReq); - lastResponseId = 0; - - // Release all containers - releaseContainersAndAssert(containers); - - // Finish the application - FinishApplicationMasterRequest finishReq = - Records.newRecord(FinishApplicationMasterRequest.class); - finishReq.setDiagnostics(""); - finishReq.setTrackingUrl(""); - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - - FinishApplicationMasterResponse finishResponse = - interceptor.finishApplicationMaster(finishReq); - Assert.assertNotNull(finishResponse); - Assert.assertEquals(true, finishResponse.getIsUnregistered()); - - // After the application succeeds, the registry/NMSS entry should be - // cleaned up - if (registryObj != null) { - Assert.assertEquals(0, interceptor.getRegistryClient().getAllApplications().size()); - } else { - recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId); - Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); - } - return null; + ugi.doAs((PrivilegedExceptionAction) () -> { + interceptor = new TestableFederationInterceptor(mockHomeRm); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, conf, attemptId, + "test-user", null, null, null, registryObj)); + interceptor.cleanupRegistry(); + + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance(SC_ID1)); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Make sure all async hb threads are done + interceptor.drainAllAsyncQueue(true); + + // Prepare for Federation Interceptor restart and recover + Map recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId); + String scEntry = FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1"; + if (registryObj == null) { + Assert.assertTrue(recoveredDataMap.containsKey(scEntry)); + } else { + // When AMRMPRoxy HA is enabled, NMSS should not have the UAM token, + // it should be in Registry + Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); } + + // Preserve the mock RM instances + MockResourceManagerFacade homeRM = interceptor.getHomeRM(); + ConcurrentHashMap secondaries = + interceptor.getSecondaryRMs(); + + // Create a new interceptor instance and recover + interceptor = new TestableFederationInterceptor(homeRM, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, conf, attemptId, + "test-user", null, null, null, registryObj)); + interceptor.setClientRPC(true); + interceptor.recover(recoveredDataMap); + interceptor.setClientRPC(false); + + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // SC-1 should be initialized to be timed out + Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); + + // The first allocate call expects a fail-over exception and re-register + try { + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(lastResponseId); + AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); + lastResponseId = allocateResponse.getResponseId(); + Assert.fail("Expecting an ApplicationMasterNotRegisteredException " + + " after FederationInterceptor restarts and recovers"); + } catch (ApplicationMasterNotRegisteredException e) { + } + interceptor.registerApplicationMaster(registerReq); + lastResponseId = 0; + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertTrue(finishResponse.getIsUnregistered()); + + // After the application succeeds, the registry/NMSS entry should be + // cleaned up + if (registryObj != null) { + Assert.assertEquals(0, interceptor.getRegistryClient().getAllApplications().size()); + } else { + recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId); + Assert.assertFalse(recoveredDataMap.containsKey(scEntry)); + } + return null; }); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index 070131ef39aac..5172e12b64e7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -77,7 +77,7 @@ protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( } @Override - protected AMHeartbeatRequestHandler createHomeHeartbeartHandler( + protected AMHeartbeatRequestHandler createHomeHeartbeatHandler( Configuration conf, ApplicationId appId, AMRMClientRelayer rmProxyRelayer) { return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer); @@ -142,7 +142,7 @@ protected MockResourceManagerFacade getSecondaryRM(String scId) { } /** - * Drain all aysnc heartbeat threads, comes in two favors: + * Drain all async heartbeat threads, comes in two favors: * * 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to * pick up all pending heartbeat requests. Not necessarily wait for all @@ -159,9 +159,9 @@ protected void drainAllAsyncQueue(boolean waitForAsyncHBThreadFinish) LOG.info("waiting to drain home heartbeat handler"); if (waitForAsyncHBThreadFinish) { - getHomeHeartbeartHandler().drainHeartbeatThread(); + getHomeHeartbeatHandler().drainHeartbeatThread(); } else { - while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) { + while (getHomeHeartbeatHandler().getRequestQueueSize() > 0) { try { Thread.sleep(10); } catch (InterruptedException e) { @@ -291,12 +291,9 @@ public TestableAMRequestHandlerThread(Configuration conf, public void run() { try { getUGIWithToken(getAttemptId()) - .doAs(new PrivilegedExceptionAction() { - @Override - public Object run() { - TestableAMRequestHandlerThread.super.run(); - return null; - } + .doAs((PrivilegedExceptionAction) () -> { + TestableAMRequestHandlerThread.super.run(); + return null; }); } catch (Exception e) { }