Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18851: Performance improvement for DelegationTokenSecretManager. #6001

Merged
merged 5 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ private String formatTokenId(TokenIdent id) {
/**
* Access to currentKey is protected by this object lock
*/
private DelegationKey currentKey;
private volatile DelegationKey currentKey;

private long keyUpdateInterval;
private long tokenMaxLifetime;
private long tokenRemoverScanInterval;
private long tokenRenewInterval;
private final long keyUpdateInterval;
private final long tokenMaxLifetime;
private final long tokenRemoverScanInterval;
private final long tokenRenewInterval;
/**
* Whether to store a token's tracking ID in its TokenInformation.
* Can be overridden by a subclass.
Expand Down Expand Up @@ -486,7 +486,7 @@ private synchronized void removeExpiredKeys() {
}

@Override
protected synchronized byte[] createPassword(TokenIdent identifier) {
protected byte[] createPassword(TokenIdent identifier) {
int sequenceNum;
long now = Time.now();
sequenceNum = incrementDelegationTokenSeqNum();
Expand Down Expand Up @@ -521,7 +521,6 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
*/
vikaskr22 marked this conversation as resolved.
Show resolved Hide resolved
protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
assert Thread.holdsLock(this);
DelegationTokenInformation info = getTokenInfo(identifier);
String err;
if (info == null) {
Expand All @@ -541,7 +540,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier)
}

@Override
public synchronized byte[] retrievePassword(TokenIdent identifier)
public byte[] retrievePassword(TokenIdent identifier)
throws InvalidToken {
return checkToken(identifier).getPassword();
}
Expand All @@ -553,7 +552,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) {
return null;
}

public synchronized String getTokenTrackingId(TokenIdent identifier) {
public String getTokenTrackingId(TokenIdent identifier) {
DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
return null;
Expand All @@ -567,7 +566,7 @@ public synchronized String getTokenTrackingId(TokenIdent identifier) {
* @param password Password in the token.
* @throws InvalidToken InvalidToken.
*/
public synchronized void verifyToken(TokenIdent identifier, byte[] password)
public void verifyToken(TokenIdent identifier, byte[] password)
throws InvalidToken {
byte[] storedPassword = retrievePassword(identifier);
if (!MessageDigest.isEqual(password, storedPassword)) {
Expand All @@ -584,7 +583,7 @@ public synchronized void verifyToken(TokenIdent identifier, byte[] password)
* @throws InvalidToken if the token is invalid
* @throws AccessControlException if the user can't renew token
*/
public synchronized long renewToken(Token<TokenIdent> token,
public long renewToken(Token<TokenIdent> token,
String renewer) throws InvalidToken, IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
Expand Down Expand Up @@ -646,7 +645,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
* @throws InvalidToken for invalid token
* @throws AccessControlException if the user isn't allowed to cancel
*/
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
public TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
Expand Down Expand Up @@ -149,6 +150,9 @@ protected static CuratorFramework getCurator() {
private int currentSeqNum;
private int currentMaxSeqNum;

private final ReentrantLock currentSeqNumLock;


jojochuang marked this conversation as resolved.
Show resolved Hide resolved
private final boolean isTokenWatcherEnabled;

public ZKDelegationTokenSecretManager(Configuration conf) {
Expand All @@ -164,6 +168,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
this.currentSeqNumLock = new ReentrantLock(true);
if (CURATOR_TL.get() != null) {
zkClient =
CURATOR_TL.get().usingNamespace(
Expand Down Expand Up @@ -520,24 +525,32 @@ protected int incrementDelegationTokenSeqNum() {
// The secret manager will keep a local range of seq num which won't be
// seen by peers, so only when the range is exhausted it will ask zk for
// another range again
if (currentSeqNum >= currentMaxSeqNum) {
try {
// after a successful batch request, we can get the range starting point
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum+1, currentMaxSeqNum);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug(
"Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
try{
jojochuang marked this conversation as resolved.
Show resolved Hide resolved
this.currentSeqNumLock.lock();
vikaskr22 marked this conversation as resolved.
Show resolved Hide resolved
if (currentSeqNum >= currentMaxSeqNum) {
try {
// after a successful batch request, we can get the range starting point
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum+1, currentMaxSeqNum);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug(
"Thread interrupted while performing token counter increment", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new RuntimeException("Could not increment shared counter !!", e);
}
}

return ++currentSeqNum;

}finally{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style:

  1. extra blank.
  2. leave a blank space before or after brace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorporated. Thanks.

this.currentSeqNumLock.unlock();
}

return ++currentSeqNum;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra blank here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated. Thanks.

}

@Override
Expand Down