diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 874ee9d08d9ad..ef06299fcfd8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4432,6 +4432,31 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY = FEDERATION_GPG_PREFIX +
"kerberos.principal.hostname";
+ // The application cleaner class to use
+ public static final String GPG_APPCLEANER_CLASS =
+ FEDERATION_GPG_PREFIX + "application.cleaner.class";
+ public static final String DEFAULT_GPG_APPCLEANER_CLASS =
+ "org.apache.hadoop.yarn.server.globalpolicygenerator"
+ + ".applicationcleaner.DefaultApplicationCleaner";
+
+ // The interval at which the application cleaner runs, -1 means disabled
+ public static final String GPG_APPCLEANER_INTERVAL_MS =
+ FEDERATION_GPG_PREFIX + "application.cleaner.interval-ms";
+ public static final long DEFAULT_GPG_APPCLEANER_INTERVAL_MS = TimeUnit.SECONDS.toMillis(-1);
+
+ /**
+ * Specifications on how (many times) to contact Router for apps. We need to
+ * do this because Router might return partial application list because some
+ * sub-cluster RM is not responsive (e.g. failing over).
+ *
+ * Should have three values separated by comma: minimal success retries,
+ * maximum total retry, retry interval (ms).
+ */
+ public static final String GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+ FEDERATION_GPG_PREFIX + "application.cleaner.contact.router.spec";
+ public static final String DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
+ "3,10,600000";
+
public static final String FEDERATION_GPG_POLICY_PREFIX =
FEDERATION_GPG_PREFIX + "policy.generator.";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f82540b8f46cb..9697f7aa88c8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5538,6 +5538,14 @@
LINEAR
+
+
+ The Application Cleaner implementation class for GPG to use.
+
+ yarn.federation.gpg.application.cleaner.class
+ org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.DefaultApplicationCleaner
+
+
Flag to enable cross-origin (CORS) support in the GPG. This flag
requires the CORS filter initializer to be added to the filter initializers
@@ -5546,6 +5554,14 @@
false
+
+
+ The interval at which the application cleaner runs, -1 means disabled.
+
+ yarn.federation.gpg.application.cleaner.interval-ms
+ -1s
+
+
The http address of the GPG web application.
@@ -5556,6 +5572,18 @@
0.0.0.0:8069
+
+
+ Specifications on how (many times) to contact Router for apps. We need to
+ do this because Router might return partial application list because some
+ sub-cluster RM is not responsive (e.g. failing over).
+ Should have three values separated by comma: minimal success retries,
+ maximum total retry, retry interval (ms).
+
+ yarn.federation.gpg.application.cleaner.contact.router.spec
+ 3,10,600000
+
+
The https address of the GPG web application.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 26136b11de6f4..d4c259b51605e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -83,6 +83,9 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
@@ -884,6 +887,33 @@ public void addApplicationHomeSubCluster(ApplicationId applicationId,
}
}
+ /**
+ * Get the {@code ApplicationHomeSubCluster} list representing the mapping of
+ * all submitted applications to it's home sub-cluster.
+ *
+ * @return the mapping of all submitted application to it's home sub-cluster
+ * @throws YarnException if the request is invalid/fails
+ */
+ public List getApplicationsHomeSubCluster() throws YarnException {
+ GetApplicationsHomeSubClusterResponse response = stateStore.getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest.newInstance());
+ return response.getAppsHomeSubClusters();
+ }
+
+ /**
+ * Delete the mapping of home {@code SubClusterId} of a previously submitted
+ * {@code ApplicationId}. Currently response is empty if the operation is
+ * successful, if not an exception reporting reason for a failure.
+ *
+ * @param applicationId the application to delete the home sub-cluster of
+ * @throws YarnException if the request is invalid/fails
+ */
+ public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
+ throws YarnException {
+ stateStore.deleteApplicationHomeSubCluster(
+ DeleteApplicationHomeSubClusterRequest.newInstance(applicationId));
+ }
+
/**
* Update ApplicationHomeSubCluster to FederationStateStore.
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
index a802e37979bb7..02344a51493c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -40,6 +40,7 @@
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
/**
* GPGUtils contains utility functions for the GPG.
@@ -58,11 +59,12 @@ private GPGUtils() {
* @param webAddr WebAddress.
* @param path url path.
* @param returnType return type.
+ * @param selectParam query parameters.
* @param conf configuration.
* @return response entity.
*/
public static T invokeRMWebService(String webAddr, String path, final Class returnType,
- Configuration conf) {
+ Configuration conf, String selectParam) {
Client client = Client.create();
T obj;
@@ -72,6 +74,11 @@ public static T invokeRMWebService(String webAddr, String path, final Class<
String scheme = YarnConfiguration.useHttps(conf) ? HTTPS_PREFIX : HTTP_PREFIX;
String webAddress = scheme + socketAddress.getHostName() + ":" + socketAddress.getPort();
WebResource webResource = client.resource(webAddress);
+
+ if (selectParam != null) {
+ webResource = webResource.queryParam(RMWSConsts.DESELECTS, selectParam);
+ }
+
ClientResponse response = null;
try {
response = webResource.path(RM_WEB_SERVICE_PATH).path(path)
@@ -92,6 +99,21 @@ public static T invokeRMWebService(String webAddr, String path, final Class<
}
}
+ /**
+ * Performs an invocation of the remote RMWebService.
+ *
+ * @param Generic T.
+ * @param webAddr WebAddress.
+ * @param path url path.
+ * @param returnType return type.
+ * @param config configuration.
+ * @return response entity.
+ */
+ public static T invokeRMWebService(String webAddr,
+ String path, final Class returnType, Configuration config) {
+ return invokeRMWebService(webAddr, path, returnType, config, null);
+ }
+
/**
* Creates a uniform weighting of 1.0 for each sub cluster.
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index 81a999d76a28e..ba8ce856cdaa5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp;
@@ -84,6 +85,7 @@ public class GlobalPolicyGenerator extends CompositeService {
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;
+ private ApplicationCleaner applicationCleaner;
private PolicyGenerator policyGenerator;
private String webAppAddress;
private JvmPauseMonitor pauseMonitor;
@@ -125,6 +127,12 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+
+ this.applicationCleaner = FederationStateStoreFacade.createInstance(conf,
+ YarnConfiguration.GPG_APPCLEANER_CLASS,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS, ApplicationCleaner.class);
+ this.applicationCleaner.init(conf, this.gpgContext);
+
this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf);
@@ -149,7 +157,7 @@ protected void serviceStart() throws Exception {
super.serviceStart();
- // Scheduler SubClusterCleaner service
+ // Schedule SubClusterCleaner service
Configuration config = getConfig();
long scCleanerIntervalMs = config.getTimeDuration(
YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
@@ -161,6 +169,18 @@ protected void serviceStart() throws Exception {
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}
+ // Schedule ApplicationCleaner service
+ long appCleanerIntervalMs = config.getTimeDuration(
+ YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS, TimeUnit.MILLISECONDS);
+
+ if (appCleanerIntervalMs > 0) {
+ this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner,
+ 0, appCleanerIntervalMs, TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled application cleaner with interval: {}",
+ DurationFormatUtils.formatDurationISO(appCleanerIntervalMs));
+ }
+
// Schedule PolicyGenerator
// We recommend using yarn.federation.gpg.policy.generator.interval
// instead of yarn.federation.gpg.policy.generator.interval-ms
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
new file mode 100644
index 0000000000000..cd3f7618558e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The ApplicationCleaner is a runnable that cleans up old applications from
+ * table applicationsHomeSubCluster in FederationStateStore.
+ */
+public abstract class ApplicationCleaner implements Runnable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ApplicationCleaner.class);
+
+ private Configuration conf;
+ private GPGContext gpgContext;
+
+ private int minRouterSuccessCount;
+ private int maxRouterRetry;
+ private long routerQueryIntevalMillis;
+
+ public void init(Configuration config, GPGContext context)
+ throws YarnException {
+
+ this.gpgContext = context;
+ this.conf = config;
+
+ String routerSpecString =
+ this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
+ YarnConfiguration.DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC);
+ String[] specs = routerSpecString.split(",");
+ if (specs.length != 3) {
+ throw new YarnException("Expect three comma separated values in "
+ + YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC + " but get "
+ + routerSpecString);
+ }
+ this.minRouterSuccessCount = Integer.parseInt(specs[0]);
+ this.maxRouterRetry = Integer.parseInt(specs[1]);
+ this.routerQueryIntevalMillis = Long.parseLong(specs[2]);
+
+ if (this.minRouterSuccessCount > this.maxRouterRetry) {
+ throw new YarnException("minRouterSuccessCount "
+ + this.minRouterSuccessCount
+ + " should not be larger than maxRouterRetry" + this.maxRouterRetry);
+ }
+ if (this.minRouterSuccessCount <= 0) {
+ throw new YarnException("minRouterSuccessCount "
+ + this.minRouterSuccessCount + " should be positive");
+ }
+
+ LOG.info(
+ "Initialized AppCleaner with Router query with min success {}, "
+ + "max retry {}, retry interval {}",
+ this.minRouterSuccessCount, this.maxRouterRetry,
+ DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
+ }
+
+ public GPGContext getGPGContext() {
+ return this.gpgContext;
+ }
+
+ /**
+ * Query router for applications.
+ *
+ * @return the set of applications
+ * @throws YarnRuntimeException when router call fails
+ */
+ public Set getAppsFromRouter() throws YarnRuntimeException {
+ String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
+
+ LOG.info(String.format("Contacting router at: %s", webAppAddress));
+ AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", AppsInfo.class, conf,
+ DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
+
+ Set appSet = new HashSet<>();
+ for (AppInfo appInfo : appsInfo.getApps()) {
+ appSet.add(ApplicationId.fromString(appInfo.getAppId()));
+ }
+ return appSet;
+ }
+
+ /**
+ * Get the list of known applications in the cluster from Router.
+ *
+ * @return the list of known applications
+ * @throws YarnException if get app fails
+ */
+ public Set getRouterKnownApplications() throws YarnException {
+ int successCount = 0, totalAttemptCount = 0;
+ Set resultSet = new HashSet<>();
+ while (totalAttemptCount < this.maxRouterRetry) {
+ try {
+ Set routerApps = getAppsFromRouter();
+ resultSet.addAll(routerApps);
+ LOG.info("Attempt {}: {} known apps from Router, {} in total",
+ totalAttemptCount, routerApps.size(), resultSet.size());
+
+ successCount++;
+ if (successCount >= this.minRouterSuccessCount) {
+ return resultSet;
+ }
+
+ // Wait for the next attempt
+ try {
+ Thread.sleep(this.routerQueryIntevalMillis);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep interrupted after attempt {}.", totalAttemptCount);
+ }
+ } catch (Exception e) {
+ LOG.warn("Router query attempt {} failed.", totalAttemptCount, e);
+ } finally {
+ totalAttemptCount++;
+ }
+ }
+ throw new YarnException("Only " + successCount
+ + " success Router queries after " + totalAttemptCount + " retries");
+ }
+
+ @Override
+ public abstract void run();
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
new file mode 100644
index 0000000000000..857d2e645d4c4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The default ApplicationCleaner that cleans up old applications from table
+ * applicationsHomeSubCluster in FederationStateStore.
+ */
+public class DefaultApplicationCleaner extends ApplicationCleaner {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultApplicationCleaner.class);
+
+ @Override
+ public void run() {
+ Date now = new Date();
+ LOG.info("Application cleaner run at time {}", now);
+
+ FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
+ Set candidates = new HashSet<>();
+ try {
+ List response =
+ facade.getApplicationsHomeSubCluster();
+ for (ApplicationHomeSubCluster app : response) {
+ candidates.add(app.getApplicationId());
+ }
+ LOG.info("{} app entries in FederationStateStore", candidates.size());
+
+ Set routerApps = getRouterKnownApplications();
+ LOG.info("{} known applications from Router", routerApps.size());
+
+ candidates.removeAll(routerApps);
+ LOG.info("Deleting {} applications from statestore", candidates.size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Apps to delete: {}.", candidates.stream().map(Object::toString)
+ .collect(Collectors.joining(",")));
+ }
+ for (ApplicationId appId : candidates) {
+ try {
+ facade.deleteApplicationHomeSubCluster(appId);
+ } catch (Exception e) {
+ LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e);
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Application cleaner started at time {} fails. ", now, e);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
new file mode 100644
index 0000000000000..dd302c81f45ea
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
index df28192a0c668..1f0fbd11a741c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
@@ -159,7 +159,7 @@ protected Map> getInfos(
clusterInfo.put(sci.getSubClusterId(), new HashMap<>());
}
Object ret = GPGUtils.invokeRMWebService(sci.getRMWebServiceAddress(),
- e.getValue(), e.getKey(), getConf());
+ e.getValue(), e.getKey(), conf);
clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
}
}
@@ -181,7 +181,7 @@ protected Map getSchedulerInfo(
for (SubClusterInfo sci : activeSubClusters.values()) {
SchedulerTypeInfo sti = GPGUtils
.invokeRMWebService(sci.getRMWebServiceAddress(),
- RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, getConf());
+ RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, conf);
if(sti != null){
schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
} else {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
new file mode 100644
index 0000000000000..2d63c48236fb5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for DefaultApplicationCleaner in GPG.
+ */
+public class TestDefaultApplicationCleaner {
+ private Configuration conf;
+ private MemoryFederationStateStore stateStore;
+ private FederationStateStoreFacade facade;
+ private ApplicationCleaner appCleaner;
+ private GPGContext gpgContext;
+
+ private List appIds;
+ // The list of applications returned by mocked router
+ private Set routerAppIds;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new YarnConfiguration();
+
+ // No Router query retry
+ conf.set(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, "1,1,0");
+
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+
+ facade = FederationStateStoreFacade.getInstance();
+ facade.reinitialize(stateStore, conf);
+
+ gpgContext = new GPGContextImpl();
+ gpgContext.setStateStoreFacade(facade);
+
+ appCleaner = new TestableDefaultApplicationCleaner();
+ appCleaner.init(conf, gpgContext);
+
+ routerAppIds = new HashSet<>();
+
+ appIds = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ ApplicationId appId = ApplicationId.newInstance(0, i);
+ appIds.add(appId);
+
+ SubClusterId subClusterId =
+ SubClusterId.newInstance("SUBCLUSTER-" + i);
+
+ stateStore.addApplicationHomeSubCluster(
+ AddApplicationHomeSubClusterRequest.newInstance(
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
+ }
+ }
+
+ @After
+ public void breakDown() {
+ if (stateStore != null) {
+ stateStore.close();
+ stateStore = null;
+ }
+ }
+
+ @Test
+ public void testFederationStateStoreAppsCleanUp() throws YarnException {
+ // Set first app to be still known by Router
+ ApplicationId appId = appIds.get(0);
+ routerAppIds.add(appId);
+
+ // Another random app not in stateStore known by Router
+ appId = ApplicationId.newInstance(100, 200);
+ routerAppIds.add(appId);
+
+ appCleaner.run();
+
+ // Only one app should be left
+ Assert.assertEquals(1,
+ stateStore
+ .getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest.newInstance())
+ .getAppsHomeSubClusters().size());
+ }
+
+ /**
+ * Testable version of DefaultApplicationCleaner.
+ */
+ public class TestableDefaultApplicationCleaner
+ extends DefaultApplicationCleaner {
+ @Override
+ public Set getAppsFromRouter() throws YarnRuntimeException {
+ return routerAppIds;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
index 72e97f8a75087..446eeee2cd922 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
@@ -299,7 +299,7 @@ public void testCallRM() {
String webAppAddress = getServiceAddress(NetUtils.createSocketAddr(rmAddress));
SchedulerTypeInfo sti = GPGUtils.invokeRMWebService(webAppAddress, RMWSConsts.SCHEDULER,
- SchedulerTypeInfo.class, this.conf);
+ SchedulerTypeInfo.class, conf);
Assert.assertNotNull(sti);
SchedulerInfo schedulerInfo = sti.getSchedulerInfo();