diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 06c3fa4c64a2c..be7cc89f5da78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -729,6 +730,19 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
12800;
+ public static final String RM_DT_RENEWER_THREAD_TIMEOUT =
+ RM_PREFIX + "delegation-token-renewer.thread-timeout";
+ public static final long DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT =
+ TimeUnit.SECONDS.toMillis(60); // 60 Seconds
+ public static final String RM_DT_RENEWER_THREAD_RETRY_INTERVAL =
+ RM_PREFIX + "delegation-token-renewer.thread-retry-interval";
+ public static final long DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL =
+ TimeUnit.SECONDS.toMillis(60); // 60 Seconds
+ public static final String RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
+ RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts";
+ public static final int DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
+ 10;
+
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index c96a7e4cfe518..5277be40b0982 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -957,6 +957,30 @@
86400000
+
+
+ RM DelegationTokenRenewer thread timeout
+
+ yarn.resourcemanager.delegation-token-renewer.thread-timeout
+ 60s
+
+
+
+
+ Default maximum number of retries for each RM DelegationTokenRenewer thread
+
+ yarn.resourcemanager.delegation-token-renewer.thread-retry-max-attempts
+ 10
+
+
+
+
+ Time interval between each RM DelegationTokenRenewer thread retry attempt
+
+ yarn.resourcemanager.delegation-token-renewer.thread-retry-interval
+ 60s
+
+
Thread pool size for RMApplicationHistoryWriter.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index d3ed5032363f6..fd8935debbcaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -36,10 +37,12 @@
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -115,6 +118,12 @@ public class DelegationTokenRenewer extends AbstractService {
private boolean tokenKeepAliveEnabled;
private boolean hasProxyUserPrivileges;
private long credentialsValidTimeRemaining;
+ private long tokenRenewerThreadTimeout;
+ private long tokenRenewerThreadRetryInterval;
+ private int tokenRenewerThreadRetryMaxAttempts;
+ private final Map> futures =
+ new HashMap<>();
+ private boolean delegationTokenRenewerPoolTrackerFlag = true;
// this config is supposedly not used by end-users.
public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
@@ -140,6 +149,17 @@ protected void serviceInit(Configuration conf) throws Exception {
this.credentialsValidTimeRemaining =
conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
+ tokenRenewerThreadTimeout =
+ conf.getTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT,
+ YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ tokenRenewerThreadRetryInterval = conf.getTimeDuration(
+ YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
+ YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ tokenRenewerThreadRetryMaxAttempts =
+ conf.getInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
setLocalSecretManagerAndServiceAddr();
renewerService = createNewThreadPoolService(conf);
pendingEventQueue = new LinkedBlockingQueue();
@@ -184,6 +204,11 @@ protected void serviceStart() throws Exception {
serviceStateLock.writeLock().lock();
isServiceStarted = true;
serviceStateLock.writeLock().unlock();
+
+ if (delegationTokenRenewerPoolTrackerFlag) {
+ renewerService.submit(new DelegationTokenRenewerPoolTracker());
+ }
+
while(!pendingEventQueue.isEmpty()) {
processDelegationTokenRenewerEvent(pendingEventQueue.take());
}
@@ -195,7 +220,9 @@ private void processDelegationTokenRenewerEvent(
serviceStateLock.readLock().lock();
try {
if (isServiceStarted) {
- renewerService.execute(new DelegationTokenRenewerRunnable(evt));
+ Future> future =
+ renewerService.submit(new DelegationTokenRenewerRunnable(evt));
+ futures.put(evt, future);
} else {
pendingEventQueue.add(evt);
}
@@ -476,7 +503,8 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
for (Iterator> itor =
tokenConf.iterator(); itor.hasNext(); ) {
Map.Entry entry = itor.next();
- LOG.info(entry.getKey() + " ===> " + entry.getValue());
+ LOG.debug("Token conf key is {} and value is {}",
+ entry.getKey(), entry.getValue());
}
}
} else {
@@ -894,7 +922,100 @@ public void run() {
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
-
+
+ @VisibleForTesting
+ public void setDelegationTokenRenewerPoolTracker(boolean flag) {
+ delegationTokenRenewerPoolTrackerFlag = flag;
+ }
+
+ /**
+ * Create a timer task to retry the token renewer event which would be
+ * scheduled at defined intervals based on the configuration.
+ *
+ * @param evt
+ * @return Timer Task
+ */
+ private TimerTask getTimerTask(AbstractDelegationTokenRenewerAppEvent evt) {
+ return new TimerTask() {
+ @Override
+ public void run() {
+ LOG.info("Retrying token renewer thread for appid = {} and "
+ + "attempt is {}", evt.getApplicationId(),
+ evt.getAttempt());
+ evt.incrAttempt();
+
+ Collection> tokens =
+ evt.getCredentials().getAllTokens();
+ for (Token> token : tokens) {
+ DelegationTokenToRenew dttr = allTokens.get(token);
+ if (dttr != null) {
+ removeFailedDelegationToken(dttr);
+ }
+ }
+
+ DelegationTokenRenewerAppRecoverEvent event =
+ new DelegationTokenRenewerAppRecoverEvent(
+ evt.getApplicationId(), evt.getCredentials(),
+ evt.shouldCancelAtEnd(), evt.getUser(), evt.getTokenConf());
+ event.setAttempt(evt.getAttempt());
+ processDelegationTokenRenewerEvent(event);
+ }
+ };
+ }
+
+ /**
+ * Runnable class to set timeout for futures of all threads running in
+ * renewerService thread pool executor asynchronously.
+ *
+ * In case of timeout exception, retries would be attempted with defined
+ * intervals till no. of retry attempt reaches max attempt.
+ */
+ private final class DelegationTokenRenewerPoolTracker
+ implements Runnable {
+
+ DelegationTokenRenewerPoolTracker() {
+ }
+
+ /**
+ * Keep traversing of renewer pool threads and wait for specific
+ * timeout. In case of timeout exception, retry the event till no. of
+ * attempts reaches max attempts with specific interval.
+ */
+ @Override
+ public void run() {
+ while (true) {
+ for (Map.Entry> entry : futures
+ .entrySet()) {
+ DelegationTokenRenewerEvent evt = entry.getKey();
+ Future> future = entry.getValue();
+ try {
+ future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+
+ // Cancel thread and retry the same event in case of timeout
+ if (future != null && !future.isDone() && !future.isCancelled()) {
+ future.cancel(true);
+ futures.remove(evt);
+ if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) {
+ renewalTimer.schedule(
+ getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt),
+ tokenRenewerThreadRetryInterval);
+ } else {
+ LOG.info(
+ "Exhausted max retry attempts {} in token renewer "
+ + "thread for {}",
+ tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId());
+ }
+ }
+ } catch (Exception e) {
+ LOG.info("Problem in submitting renew tasks in token renewer "
+ + "thread.", e);
+ }
+ }
+ }
+ }
+ }
+
/*
* This will run as a separate thread and will process individual events. It
* is done in this way to make sure that the token renewal as a part of
@@ -1016,6 +1137,10 @@ public boolean shouldCancelAtEnd() {
public String getUser() {
return user;
}
+
+ private Configuration getTokenConf() {
+ return tokenConf;
+ }
}
enum DelegationTokenRenewerEventType {
@@ -1028,6 +1153,7 @@ private static class DelegationTokenRenewerEvent extends
AbstractEvent {
private ApplicationId appId;
+ private int attempt = 1;
public DelegationTokenRenewerEvent(ApplicationId appId,
DelegationTokenRenewerEventType type) {
@@ -1038,6 +1164,18 @@ public DelegationTokenRenewerEvent(ApplicationId appId,
public ApplicationId getApplicationId() {
return appId;
}
+
+ public void incrAttempt() {
+ attempt++;
+ }
+
+ public int getAttempt() {
+ return attempt;
+ }
+
+ public void setAttempt(int attempt) {
+ this.attempt = attempt;
+ }
}
// only for testing
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 5f6d4402967c4..0205460efa4ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -42,6 +43,7 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -93,6 +95,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -230,6 +233,7 @@ public void setUp() throws Exception {
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+ delegationTokenRenewer.setDelegationTokenRenewerPoolTracker(false);
delegationTokenRenewer.setRMContext(mockContext);
delegationTokenRenewer.init(conf);
delegationTokenRenewer.start();
@@ -632,6 +636,7 @@ public void testDTKeepAlive1 () throws Exception {
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+ localDtr.setDelegationTokenRenewerPoolTracker(false);
localDtr.setRMContext(mockContext);
localDtr.init(lconf);
localDtr.start();
@@ -712,6 +717,7 @@ public void testDTKeepAlive2() throws Exception {
InetSocketAddress sockAddr =
InetSocketAddress.createUnresolved("localhost", 1234);
when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+ localDtr.setDelegationTokenRenewerPoolTracker(false);
localDtr.setRMContext(mockContext);
localDtr.init(lconf);
localDtr.start();
@@ -1612,4 +1618,173 @@ protected Token>[] obtainSystemTokensForUser(String user,
// Ensure incrTokenSequenceNo has been called for token renewal as well.
Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo();
}
-}
+
+ /**
+ * Test case to ensure token renewer threads are timed out by inducing
+ * artificial delay.
+ *
+ * Because of time out, retries would be attempted till it reaches max retry
+ * attempt and finally asserted using used threads count.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 30000)
+ public void testTokenThreadTimeout() throws Exception {
+ Configuration yarnConf = new YarnConfiguration();
+ yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
+ true);
+ yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ yarnConf.setClass(YarnConfiguration.RM_STORE, MemoryRMStateStore.class,
+ RMStateStore.class);
+ yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5,
+ TimeUnit.SECONDS);
+ yarnConf.setTimeDuration(
+ YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5,
+ TimeUnit.SECONDS);
+ yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
+ 3);
+ UserGroupInformation.setConfiguration(yarnConf);
+
+ Text userText = new Text("user1");
+ DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText,
+ new Text("renewer1"), userText);
+ final Token originalToken =
+ new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(),
+ new Text("service1"));
+
+ Credentials credentials = new Credentials();
+ credentials.addToken(userText, originalToken);
+
+ AtomicBoolean renewDelay = new AtomicBoolean(false);
+
+ // -1 is because of thread allocated to pool tracker runnable tasks
+ AtomicInteger threadCounter = new AtomicInteger(-1);
+ renewDelay.set(true);
+ DelegationTokenRenewer renewer = createNewDelegationTokenRenewerForTimeout(
+ yarnConf, threadCounter, renewDelay);
+
+ MockRM rm = new TestSecurityMockRM(yarnConf) {
+ @Override
+ protected DelegationTokenRenewer createDelegationTokenRenewer() {
+ return renewer;
+ }
+ };
+
+ rm.start();
+ rm.submitApp(200, "name", "user",
+ new HashMap(), false, "default", 1,
+ credentials);
+
+ int attempts = yarnConf.getInt(
+ YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
+
+ GenericTestUtils.waitFor(() -> threadCounter.get() >= attempts, 2000,
+ 30000);
+
+ // Ensure no. of threads has been used in renewer service thread pool is
+ // higher than the configured max retry attempts
+ assertTrue(threadCounter.get() >= attempts);
+ rm.close();
+ }
+
+ /**
+ * Test case to ensure token renewer threads are running as usual and finally
+ * asserted only 1 thread has been used.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 30000)
+ public void testTokenThreadTimeoutWithoutDelay() throws Exception {
+ Configuration yarnConf = new YarnConfiguration();
+ yarnConf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
+ true);
+ yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ yarnConf.set(YarnConfiguration.RM_STORE,
+ MemoryRMStateStore.class.getName());
+ yarnConf.setTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT, 5,
+ TimeUnit.SECONDS);
+ yarnConf.setTimeDuration(
+ YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL, 5,
+ TimeUnit.SECONDS);
+ yarnConf.setInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
+ 3);
+ UserGroupInformation.setConfiguration(yarnConf);
+
+ Text userText = new Text("user1");
+ DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(userText,
+ new Text("renewer1"), userText);
+ final Token originalToken =
+ new Token<>(dtId.getBytes(), "password1".getBytes(), dtId.getKind(),
+ new Text("service1"));
+
+ Credentials credentials = new Credentials();
+ credentials.addToken(userText, originalToken);
+
+ AtomicBoolean renewDelay = new AtomicBoolean(false);
+
+ // -1 is because of thread allocated to pool tracker runnable tasks
+ AtomicInteger threadCounter = new AtomicInteger(-1);
+ DelegationTokenRenewer renwer = createNewDelegationTokenRenewerForTimeout(
+ yarnConf, threadCounter, renewDelay);
+
+ MockRM rm = new TestSecurityMockRM(yarnConf) {
+ @Override
+ protected DelegationTokenRenewer createDelegationTokenRenewer() {
+ return renwer;
+ }
+ };
+
+ rm.start();
+ rm.submitApp(200, "name", "user",
+ new HashMap(), false, "default", 1,
+ credentials);
+
+ GenericTestUtils.waitFor(() -> threadCounter.get() == 1, 2000, 40000);
+
+ // Ensure only one thread has been used in renewer service thread pool.
+ assertEquals(threadCounter.get(), 1);
+ rm.close();
+ }
+
+ private DelegationTokenRenewer createNewDelegationTokenRenewerForTimeout(
+ Configuration config, final AtomicInteger renewerCounter,
+ final AtomicBoolean renewDelay) {
+ DelegationTokenRenewer renew = new DelegationTokenRenewer() {
+ @Override
+ protected ThreadPoolExecutor createNewThreadPoolService(
+ Configuration configuration) {
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 5, 3L,
+ TimeUnit.SECONDS, new LinkedBlockingQueue()) {
+ @Override
+ public Future> submit(Runnable r) {
+ renewerCounter.incrementAndGet();
+ return super.submit(r);
+ }
+ };
+ return pool;
+ }
+
+ @Override
+ protected void renewToken(final DelegationTokenToRenew dttr)
+ throws IOException {
+ try {
+ if (renewDelay.get()) {
+ // Delay for 4 times than the configured timeout
+ Thread.sleep(config.getTimeDuration(
+ YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT,
+ YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT,
+ TimeUnit.MILLISECONDS) * 4);
+ }
+ super.renewToken(dttr);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep Interrupted", e);
+ }
+ }
+ };
+ renew.setDelegationTokenRenewerPoolTracker(true);
+ return renew;
+ }
+}
\ No newline at end of file