Skip to content

Commit

Permalink
HADOOP-18851: Performance improvement for DelegationTokenSecretManage…
Browse files Browse the repository at this point in the history
…r. (apache#6001). Contributed by Vikas Kumar.

Signed-off-by: Wei-Chiu Chuang <[email protected]>
Signed-off-by: He Xiaoqiao <[email protected]>
  • Loading branch information
vikaskr22 authored Sep 15, 2023
1 parent 23360b3 commit e283375
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ private String formatTokenId(TokenIdent id) {
/**
* Access to currentKey is protected by this object lock
*/
private DelegationKey currentKey;
private volatile DelegationKey currentKey;

private long keyUpdateInterval;
private long tokenMaxLifetime;
private long tokenRemoverScanInterval;
private long tokenRenewInterval;
private final long keyUpdateInterval;
private final long tokenMaxLifetime;
private final long tokenRemoverScanInterval;
private final long tokenRenewInterval;
/**
* Whether to store a token's tracking ID in its TokenInformation.
* Can be overridden by a subclass.
Expand Down Expand Up @@ -486,17 +486,18 @@ private synchronized void removeExpiredKeys() {
}

@Override
protected synchronized byte[] createPassword(TokenIdent identifier) {
protected byte[] createPassword(TokenIdent identifier) {
int sequenceNum;
long now = Time.now();
sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
identifier.setMasterKeyId(currentKey.getKeyId());
DelegationKey delegationCurrentKey = currentKey;
identifier.setMasterKeyId(delegationCurrentKey.getKeyId());
identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + formatTokenId(identifier)
+ ", currentKey: " + currentKey.getKeyId());
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
+ ", currentKey: " + delegationCurrentKey.getKeyId());
byte[] password = createPassword(identifier.getBytes(), delegationCurrentKey.getKey());
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try {
Expand All @@ -521,7 +522,6 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
*/
protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
assert Thread.holdsLock(this);
DelegationTokenInformation info = getTokenInfo(identifier);
String err;
if (info == null) {
Expand All @@ -541,7 +541,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier)
}

@Override
public synchronized byte[] retrievePassword(TokenIdent identifier)
public byte[] retrievePassword(TokenIdent identifier)
throws InvalidToken {
return checkToken(identifier).getPassword();
}
Expand All @@ -553,7 +553,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) {
return null;
}

public synchronized String getTokenTrackingId(TokenIdent identifier) {
public String getTokenTrackingId(TokenIdent identifier) {
DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
return null;
Expand All @@ -567,7 +567,7 @@ public synchronized String getTokenTrackingId(TokenIdent identifier) {
* @param password Password in the token.
* @throws InvalidToken InvalidToken.
*/
public synchronized void verifyToken(TokenIdent identifier, byte[] password)
public void verifyToken(TokenIdent identifier, byte[] password)
throws InvalidToken {
byte[] storedPassword = retrievePassword(identifier);
if (!MessageDigest.isEqual(password, storedPassword)) {
Expand All @@ -584,7 +584,7 @@ public synchronized void verifyToken(TokenIdent identifier, byte[] password)
* @throws InvalidToken if the token is invalid
* @throws AccessControlException if the user can't renew token
*/
public synchronized long renewToken(Token<TokenIdent> token,
public long renewToken(Token<TokenIdent> token,
String renewer) throws InvalidToken, IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
Expand Down Expand Up @@ -646,7 +646,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
* @throws InvalidToken for invalid token
* @throws AccessControlException if the user isn't allowed to cancel
*/
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
public TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
Expand Down Expand Up @@ -148,7 +149,7 @@ protected static CuratorFramework getCurator() {
private final int seqNumBatchSize;
private int currentSeqNum;
private int currentMaxSeqNum;

private final ReentrantLock currentSeqNumLock;
private final boolean isTokenWatcherEnabled;

public ZKDelegationTokenSecretManager(Configuration conf) {
Expand All @@ -164,6 +165,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
this.currentSeqNumLock = new ReentrantLock(true);
if (CURATOR_TL.get() != null) {
zkClient =
CURATOR_TL.get().usingNamespace(
Expand Down Expand Up @@ -520,24 +522,28 @@ protected int incrementDelegationTokenSeqNum() {
// The secret manager will keep a local range of seq num which won't be
// seen by peers, so only when the range is exhausted it will ask zk for
// another range again
if (currentSeqNum >= currentMaxSeqNum) {
try {
// after a successful batch request, we can get the range starting point
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum+1, currentMaxSeqNum);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug(
"Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
try {
this.currentSeqNumLock.lock();
if (currentSeqNum >= currentMaxSeqNum) {
try {
// after a successful batch request, we can get the range starting point
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum+1, currentMaxSeqNum);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug(
"Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
}
}
return ++currentSeqNum;
} finally {
this.currentSeqNumLock.unlock();
}

return ++currentSeqNum;
}

@Override
Expand Down

0 comments on commit e283375

Please sign in to comment.