Skip to content

Commit

Permalink
HADOOP-18851: Perfm improvement for ZKDT management
Browse files Browse the repository at this point in the history
  • Loading branch information
vikaskr22 committed Sep 12, 2023
1 parent da69485 commit ab7a3a6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,12 @@ protected byte[] createPassword(TokenIdent identifier) {
sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
identifier.setMasterKeyId(currentKey.getKeyId());
DelegationKey delegationCurrentKey = currentKey;
identifier.setMasterKeyId(delegationCurrentKey.getKeyId());
identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + formatTokenId(identifier)
+ ", currentKey: " + currentKey.getKeyId());
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
+ ", currentKey: " + delegationCurrentKey.getKeyId());
byte[] password = createPassword(identifier.getBytes(), delegationCurrentKey.getKey());
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;

import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
Expand Down Expand Up @@ -148,9 +147,9 @@ protected static CuratorFramework getCurator() {
private CuratorCacheBridge keyCache;
private CuratorCacheBridge tokenCache;
private final int seqNumBatchSize;
private AtomicInteger currentSeqNum;
private AtomicInteger currentMaxSeqNum;
private final ReentrantReadWriteLock currentSeqNumLock;
private int currentSeqNum;
private int currentMaxSeqNum;
private final ReentrantLock currentSeqNumLock;
private final boolean isTokenWatcherEnabled;

public ZKDelegationTokenSecretManager(Configuration conf) {
Expand All @@ -166,7 +165,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
this.currentSeqNumLock = new ReentrantReadWriteLock(true);
this.currentSeqNumLock = new ReentrantLock(true);
if (CURATOR_TL.get() != null) {
zkClient =
CURATOR_TL.get().usingNamespace(
Expand Down Expand Up @@ -285,10 +284,10 @@ public void startThreads() throws IOException {
}
// the first batch range should be allocated during this starting window
// by calling the incrSharedCount
currentSeqNum.set(incrSharedCount(delTokSeqCounter, seqNumBatchSize));
currentMaxSeqNum.set(currentSeqNum.get() + seqNumBatchSize);
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched initial range of seq num, from {} to {} ",
currentSeqNum.incrementAndGet(), currentMaxSeqNum);
currentSeqNum+1, currentMaxSeqNum);
} catch (Exception e) {
throw new IOException("Could not start Sequence Counter", e);
}
Expand Down Expand Up @@ -524,22 +523,14 @@ protected int incrementDelegationTokenSeqNum() {
// seen by peers, so only when the range is exhausted it will ask zk for
// another range again
try {
this.currentSeqNumLock.readLock().lock();
if (currentSeqNum.get() >= currentMaxSeqNum.get()) {
this.currentSeqNumLock.lock();
if (currentSeqNum >= currentMaxSeqNum) {
try {
// after a successful batch request, we can get the range starting point
this.currentSeqNumLock.readLock().unlock();
try {
this.currentSeqNumLock.writeLock().lock();
if (currentSeqNum.get() >= currentMaxSeqNum.get()) {
currentSeqNum.set(incrSharedCount(delTokSeqCounter, seqNumBatchSize));
currentMaxSeqNum.set(currentSeqNum.get() + seqNumBatchSize );
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum.get()+1, currentMaxSeqNum);
}
} finally {
this.currentSeqNumLock.writeLock().unlock();
}
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize ;
LOG.info("Fetched new range of seq num, from {} to {} ",
currentSeqNum+1, currentMaxSeqNum);
} catch (InterruptedException e) {
// The ExpirationThread is just finishing.. so dont do anything..
LOG.debug(
Expand All @@ -549,12 +540,10 @@ protected int incrementDelegationTokenSeqNum() {
throw new RuntimeException("Could not increment shared counter !!", e);
}
}
return currentSeqNum.incrementAndGet();
return ++currentSeqNum;
} finally {
if( this.currentSeqNumLock.getReadHoldCount() > 0) {
this.currentSeqNumLock.readLock().unlock();
this.currentSeqNumLock.unlock();
}
}
}

@Override
Expand Down

0 comments on commit ab7a3a6

Please sign in to comment.