Skip to content

Commit

Permalink
YARN-9768. RM Renew Delegation token thread should timeout and retry.…
Browse files Browse the repository at this point in the history
… Contributed by Manikandan R.
  • Loading branch information
Inigo Goiri committed Jan 21, 2020
1 parent 8cfc367 commit 0696828
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand Down Expand Up @@ -729,6 +730,19 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES =
12800;

public static final String RM_DT_RENEWER_THREAD_TIMEOUT =
RM_PREFIX + "delegation-token-renewer.thread-timeout";
public static final long DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT =
TimeUnit.SECONDS.toMillis(60); // 60 Seconds
public static final String RM_DT_RENEWER_THREAD_RETRY_INTERVAL =
RM_PREFIX + "delegation-token-renewer.thread-retry-interval";
public static final long DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL =
TimeUnit.SECONDS.toMillis(60); // 60 Seconds
public static final String RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
RM_PREFIX + "delegation-token-renewer.thread-retry-max-attempts";
public static final int DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS =
10;

public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,30 @@
<value>86400000</value>
</property>

<property>
<description>
RM DelegationTokenRenewer thread timeout
</description>
<name>yarn.resourcemanager.delegation-token-renewer.thread-timeout</name>
<value>60s</value>
</property>

<property>
<description>
Default maximum number of retries for each RM DelegationTokenRenewer thread
</description>
<name>yarn.resourcemanager.delegation-token-renewer.thread-retry-max-attempts</name>
<value>10</value>
</property>

<property>
<description>
Time interval between each RM DelegationTokenRenewer thread retry attempt
</description>
<name>yarn.resourcemanager.delegation-token-renewer.thread-retry-interval</name>
<value>60s</value>
</property>

<property>
<description>
Thread pool size for RMApplicationHistoryWriter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -36,10 +37,12 @@
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -115,6 +118,12 @@ public class DelegationTokenRenewer extends AbstractService {
private boolean tokenKeepAliveEnabled;
private boolean hasProxyUserPrivileges;
private long credentialsValidTimeRemaining;
private long tokenRenewerThreadTimeout;
private long tokenRenewerThreadRetryInterval;
private int tokenRenewerThreadRetryMaxAttempts;
private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
new HashMap<>();
private boolean delegationTokenRenewerPoolTrackerFlag = true;

// this config is supposedly not used by end-users.
public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
Expand All @@ -140,6 +149,17 @@ protected void serviceInit(Configuration conf) throws Exception {
this.credentialsValidTimeRemaining =
conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
tokenRenewerThreadTimeout =
conf.getTimeDuration(YarnConfiguration.RM_DT_RENEWER_THREAD_TIMEOUT,
YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_TIMEOUT,
TimeUnit.MILLISECONDS);
tokenRenewerThreadRetryInterval = conf.getTimeDuration(
YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_INTERVAL,
TimeUnit.MILLISECONDS);
tokenRenewerThreadRetryMaxAttempts =
conf.getInt(YarnConfiguration.RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_DT_RENEWER_THREAD_RETRY_MAX_ATTEMPTS);
setLocalSecretManagerAndServiceAddr();
renewerService = createNewThreadPoolService(conf);
pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
Expand Down Expand Up @@ -184,6 +204,11 @@ protected void serviceStart() throws Exception {
serviceStateLock.writeLock().lock();
isServiceStarted = true;
serviceStateLock.writeLock().unlock();

if (delegationTokenRenewerPoolTrackerFlag) {
renewerService.submit(new DelegationTokenRenewerPoolTracker());
}

while(!pendingEventQueue.isEmpty()) {
processDelegationTokenRenewerEvent(pendingEventQueue.take());
}
Expand All @@ -195,7 +220,9 @@ private void processDelegationTokenRenewerEvent(
serviceStateLock.readLock().lock();
try {
if (isServiceStarted) {
renewerService.execute(new DelegationTokenRenewerRunnable(evt));
Future<?> future =
renewerService.submit(new DelegationTokenRenewerRunnable(evt));
futures.put(evt, future);
} else {
pendingEventQueue.add(evt);
}
Expand Down Expand Up @@ -476,7 +503,8 @@ private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
for (Iterator<Map.Entry<String, String>> itor =
tokenConf.iterator(); itor.hasNext(); ) {
Map.Entry<String, String> entry = itor.next();
LOG.info(entry.getKey() + " ===> " + entry.getValue());
LOG.debug("Token conf key is {} and value is {}",
entry.getKey(), entry.getValue());
}
}
} else {
Expand Down Expand Up @@ -894,7 +922,100 @@ public void run() {
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}


@VisibleForTesting
public void setDelegationTokenRenewerPoolTracker(boolean flag) {
delegationTokenRenewerPoolTrackerFlag = flag;
}

/**
* Create a timer task to retry the token renewer event which would be
* scheduled at defined intervals based on the configuration.
*
* @param evt
* @return Timer Task
*/
private TimerTask getTimerTask(AbstractDelegationTokenRenewerAppEvent evt) {
return new TimerTask() {
@Override
public void run() {
LOG.info("Retrying token renewer thread for appid = {} and "
+ "attempt is {}", evt.getApplicationId(),
evt.getAttempt());
evt.incrAttempt();

Collection<Token<?>> tokens =
evt.getCredentials().getAllTokens();
for (Token<?> token : tokens) {
DelegationTokenToRenew dttr = allTokens.get(token);
if (dttr != null) {
removeFailedDelegationToken(dttr);
}
}

DelegationTokenRenewerAppRecoverEvent event =
new DelegationTokenRenewerAppRecoverEvent(
evt.getApplicationId(), evt.getCredentials(),
evt.shouldCancelAtEnd(), evt.getUser(), evt.getTokenConf());
event.setAttempt(evt.getAttempt());
processDelegationTokenRenewerEvent(event);
}
};
}

/**
* Runnable class to set timeout for futures of all threads running in
* renewerService thread pool executor asynchronously.
*
* In case of timeout exception, retries would be attempted with defined
* intervals till no. of retry attempt reaches max attempt.
*/
private final class DelegationTokenRenewerPoolTracker
implements Runnable {

DelegationTokenRenewerPoolTracker() {
}

/**
* Keep traversing <Future> of renewer pool threads and wait for specific
* timeout. In case of timeout exception, retry the event till no. of
* attempts reaches max attempts with specific interval.
*/
@Override
public void run() {
while (true) {
for (Map.Entry<DelegationTokenRenewerEvent, Future<?>> entry : futures
.entrySet()) {
DelegationTokenRenewerEvent evt = entry.getKey();
Future<?> future = entry.getValue();
try {
future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {

// Cancel thread and retry the same event in case of timeout
if (future != null && !future.isDone() && !future.isCancelled()) {
future.cancel(true);
futures.remove(evt);
if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) {
renewalTimer.schedule(
getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt),
tokenRenewerThreadRetryInterval);
} else {
LOG.info(
"Exhausted max retry attempts {} in token renewer "
+ "thread for {}",
tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId());
}
}
} catch (Exception e) {
LOG.info("Problem in submitting renew tasks in token renewer "
+ "thread.", e);
}
}
}
}
}

/*
* This will run as a separate thread and will process individual events. It
* is done in this way to make sure that the token renewal as a part of
Expand Down Expand Up @@ -1016,6 +1137,10 @@ public boolean shouldCancelAtEnd() {
public String getUser() {
return user;
}

private Configuration getTokenConf() {
return tokenConf;
}
}

enum DelegationTokenRenewerEventType {
Expand All @@ -1028,6 +1153,7 @@ private static class DelegationTokenRenewerEvent extends
AbstractEvent<DelegationTokenRenewerEventType> {

private ApplicationId appId;
private int attempt = 1;

public DelegationTokenRenewerEvent(ApplicationId appId,
DelegationTokenRenewerEventType type) {
Expand All @@ -1038,6 +1164,18 @@ public DelegationTokenRenewerEvent(ApplicationId appId,
public ApplicationId getApplicationId() {
return appId;
}

public void incrAttempt() {
attempt++;
}

public int getAttempt() {
return attempt;
}

public void setAttempt(int attempt) {
this.attempt = attempt;
}
}

// only for testing
Expand Down
Loading

0 comments on commit 0696828

Please sign in to comment.