From 913d6900b90d5292a41eea2cc6191ea3aef45f59 Mon Sep 17 00:00:00 2001 From: Vikas Kumar Date: Wed, 30 Aug 2023 15:13:17 +0530 Subject: [PATCH 1/4] HADOOP-18851: Perfm improvement for ZKDT management --- .../AbstractDelegationTokenSecretManager.java | 23 +++++----- .../ZKDelegationTokenSecretManager.java | 43 ++++++++++++------- 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index 283e773c81795..bfc56c11c7066 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -115,12 +115,12 @@ private String formatTokenId(TokenIdent id) { /** * Access to currentKey is protected by this object lock */ - private DelegationKey currentKey; + private volatile DelegationKey currentKey; - private long keyUpdateInterval; - private long tokenMaxLifetime; - private long tokenRemoverScanInterval; - private long tokenRenewInterval; + private final long keyUpdateInterval; + private final long tokenMaxLifetime; + private final long tokenRemoverScanInterval; + private final long tokenRenewInterval; /** * Whether to store a token's tracking ID in its TokenInformation. * Can be overridden by a subclass. @@ -486,7 +486,7 @@ private synchronized void removeExpiredKeys() { } @Override - protected synchronized byte[] createPassword(TokenIdent identifier) { + protected byte[] createPassword(TokenIdent identifier) { int sequenceNum; long now = Time.now(); sequenceNum = incrementDelegationTokenSeqNum(); @@ -521,7 +521,6 @@ protected synchronized byte[] createPassword(TokenIdent identifier) { */ protected DelegationTokenInformation checkToken(TokenIdent identifier) throws InvalidToken { - assert Thread.holdsLock(this); DelegationTokenInformation info = getTokenInfo(identifier); String err; if (info == null) { @@ -541,7 +540,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier) } @Override - public synchronized byte[] retrievePassword(TokenIdent identifier) + public byte[] retrievePassword(TokenIdent identifier) throws InvalidToken { return checkToken(identifier).getPassword(); } @@ -553,7 +552,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) { return null; } - public synchronized String getTokenTrackingId(TokenIdent identifier) { + public String getTokenTrackingId(TokenIdent identifier) { DelegationTokenInformation info = getTokenInfo(identifier); if (info == null) { return null; @@ -567,7 +566,7 @@ public synchronized String getTokenTrackingId(TokenIdent identifier) { * @param password Password in the token. * @throws InvalidToken InvalidToken. */ - public synchronized void verifyToken(TokenIdent identifier, byte[] password) + public void verifyToken(TokenIdent identifier, byte[] password) throws InvalidToken { byte[] storedPassword = retrievePassword(identifier); if (!MessageDigest.isEqual(password, storedPassword)) { @@ -584,7 +583,7 @@ public synchronized void verifyToken(TokenIdent identifier, byte[] password) * @throws InvalidToken if the token is invalid * @throws AccessControlException if the user can't renew token */ - public synchronized long renewToken(Token token, + public long renewToken(Token token, String renewer) throws InvalidToken, IOException { ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); @@ -646,7 +645,7 @@ public synchronized long renewToken(Token token, * @throws InvalidToken for invalid token * @throws AccessControlException if the user isn't allowed to cancel */ - public synchronized TokenIdent cancelToken(Token token, + public TokenIdent cancelToken(Token token, String canceller) throws IOException { ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index fb9a2951f598a..491c6dd26a270 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; @@ -149,6 +150,9 @@ protected static CuratorFramework getCurator() { private int currentSeqNum; private int currentMaxSeqNum; + private final ReentrantLock currentSeqNumLock; + + private final boolean isTokenWatcherEnabled; public ZKDelegationTokenSecretManager(Configuration conf) { @@ -164,6 +168,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) { ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT); isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT); + this.currentSeqNumLock = new ReentrantLock(true); if (CURATOR_TL.get() != null) { zkClient = CURATOR_TL.get().usingNamespace( @@ -520,24 +525,32 @@ protected int incrementDelegationTokenSeqNum() { // The secret manager will keep a local range of seq num which won't be // seen by peers, so only when the range is exhausted it will ask zk for // another range again - if (currentSeqNum >= currentMaxSeqNum) { - try { - // after a successful batch request, we can get the range starting point - currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); - currentMaxSeqNum = currentSeqNum + seqNumBatchSize; - LOG.info("Fetched new range of seq num, from {} to {} ", - currentSeqNum+1, currentMaxSeqNum); - } catch (InterruptedException e) { - // The ExpirationThread is just finishing.. so dont do anything.. - LOG.debug( - "Thread interrupted while performing token counter increment", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - throw new RuntimeException("Could not increment shared counter !!", e); + try{ + this.currentSeqNumLock.lock(); + if (currentSeqNum >= currentMaxSeqNum) { + try { + // after a successful batch request, we can get the range starting point + currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + LOG.info("Fetched new range of seq num, from {} to {} ", + currentSeqNum+1, currentMaxSeqNum); + } catch (InterruptedException e) { + // The ExpirationThread is just finishing.. so dont do anything.. + LOG.debug( + "Thread interrupted while performing token counter increment", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException("Could not increment shared counter !!", e); + } } + + return ++currentSeqNum; + + }finally{ + this.currentSeqNumLock.unlock(); } - return ++currentSeqNum; + } @Override From da69485cbdb22310314aa1afe9168c8ae64ac04c Mon Sep 17 00:00:00 2001 From: Vikas Kumar Date: Wed, 30 Aug 2023 15:13:17 +0530 Subject: [PATCH 2/4] HADOOP-18851: Perfm improvement for ZKDT management --- .../ZKDelegationTokenSecretManager.java | 52 ++++++++++--------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index 491c6dd26a270..817699b0a303e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Stream; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; @@ -147,12 +148,9 @@ protected static CuratorFramework getCurator() { private CuratorCacheBridge keyCache; private CuratorCacheBridge tokenCache; private final int seqNumBatchSize; - private int currentSeqNum; - private int currentMaxSeqNum; - - private final ReentrantLock currentSeqNumLock; - - + private AtomicInteger currentSeqNum; + private AtomicInteger currentMaxSeqNum; + private final ReentrantReadWriteLock currentSeqNumLock; private final boolean isTokenWatcherEnabled; public ZKDelegationTokenSecretManager(Configuration conf) { @@ -168,7 +166,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) { ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT); isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT); - this.currentSeqNumLock = new ReentrantLock(true); + this.currentSeqNumLock = new ReentrantReadWriteLock(true); if (CURATOR_TL.get() != null) { zkClient = CURATOR_TL.get().usingNamespace( @@ -287,10 +285,10 @@ public void startThreads() throws IOException { } // the first batch range should be allocated during this starting window // by calling the incrSharedCount - currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); - currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + currentSeqNum.set(incrSharedCount(delTokSeqCounter, seqNumBatchSize)); + currentMaxSeqNum.set(currentSeqNum.get() + seqNumBatchSize); LOG.info("Fetched initial range of seq num, from {} to {} ", - currentSeqNum+1, currentMaxSeqNum); + currentSeqNum.incrementAndGet(), currentMaxSeqNum); } catch (Exception e) { throw new IOException("Could not start Sequence Counter", e); } @@ -525,15 +523,23 @@ protected int incrementDelegationTokenSeqNum() { // The secret manager will keep a local range of seq num which won't be // seen by peers, so only when the range is exhausted it will ask zk for // another range again - try{ - this.currentSeqNumLock.lock(); - if (currentSeqNum >= currentMaxSeqNum) { + try { + this.currentSeqNumLock.readLock().lock(); + if (currentSeqNum.get() >= currentMaxSeqNum.get()) { try { // after a successful batch request, we can get the range starting point - currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); - currentMaxSeqNum = currentSeqNum + seqNumBatchSize; - LOG.info("Fetched new range of seq num, from {} to {} ", - currentSeqNum+1, currentMaxSeqNum); + this.currentSeqNumLock.readLock().unlock(); + try { + this.currentSeqNumLock.writeLock().lock(); + if (currentSeqNum.get() >= currentMaxSeqNum.get()) { + currentSeqNum.set(incrSharedCount(delTokSeqCounter, seqNumBatchSize)); + currentMaxSeqNum.set(currentSeqNum.get() + seqNumBatchSize ); + LOG.info("Fetched new range of seq num, from {} to {} ", + currentSeqNum.get()+1, currentMaxSeqNum); + } + } finally { + this.currentSeqNumLock.writeLock().unlock(); + } } catch (InterruptedException e) { // The ExpirationThread is just finishing.. so dont do anything.. LOG.debug( @@ -543,14 +549,12 @@ protected int incrementDelegationTokenSeqNum() { throw new RuntimeException("Could not increment shared counter !!", e); } } - - return ++currentSeqNum; - - }finally{ - this.currentSeqNumLock.unlock(); + return currentSeqNum.incrementAndGet(); + } finally { + if( this.currentSeqNumLock.getReadHoldCount() > 0) { + this.currentSeqNumLock.readLock().unlock(); + } } - - } @Override From ab7a3a697b898ac4ec717206bf875cd01d0919f1 Mon Sep 17 00:00:00 2001 From: Vikas Kumar Date: Wed, 30 Aug 2023 15:13:17 +0530 Subject: [PATCH 3/4] HADOOP-18851: Perfm improvement for ZKDT management --- .../AbstractDelegationTokenSecretManager.java | 7 ++-- .../ZKDelegationTokenSecretManager.java | 41 +++++++------------ 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index bfc56c11c7066..cafa5135e68e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -492,11 +492,12 @@ protected byte[] createPassword(TokenIdent identifier) { sequenceNum = incrementDelegationTokenSeqNum(); identifier.setIssueDate(now); identifier.setMaxDate(now + tokenMaxLifetime); - identifier.setMasterKeyId(currentKey.getKeyId()); + DelegationKey delegationCurrentKey = currentKey; + identifier.setMasterKeyId(delegationCurrentKey.getKeyId()); identifier.setSequenceNumber(sequenceNum); LOG.info("Creating password for identifier: " + formatTokenId(identifier) - + ", currentKey: " + currentKey.getKeyId()); - byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); + + ", currentKey: " + delegationCurrentKey.getKeyId()); + byte[] password = createPassword(identifier.getBytes(), delegationCurrentKey.getKey()); DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)); try { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index 817699b0a303e..b8be24f05ad7b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Stream; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; @@ -148,9 +147,9 @@ protected static CuratorFramework getCurator() { private CuratorCacheBridge keyCache; private CuratorCacheBridge tokenCache; private final int seqNumBatchSize; - private AtomicInteger currentSeqNum; - private AtomicInteger currentMaxSeqNum; - private final ReentrantReadWriteLock currentSeqNumLock; + private int currentSeqNum; + private int currentMaxSeqNum; + private final ReentrantLock currentSeqNumLock; private final boolean isTokenWatcherEnabled; public ZKDelegationTokenSecretManager(Configuration conf) { @@ -166,7 +165,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) { ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT); isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT); - this.currentSeqNumLock = new ReentrantReadWriteLock(true); + this.currentSeqNumLock = new ReentrantLock(true); if (CURATOR_TL.get() != null) { zkClient = CURATOR_TL.get().usingNamespace( @@ -285,10 +284,10 @@ public void startThreads() throws IOException { } // the first batch range should be allocated during this starting window // by calling the incrSharedCount - currentSeqNum.set(incrSharedCount(delTokSeqCounter, seqNumBatchSize)); - currentMaxSeqNum.set(currentSeqNum.get() + seqNumBatchSize); + currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; LOG.info("Fetched initial range of seq num, from {} to {} ", - currentSeqNum.incrementAndGet(), currentMaxSeqNum); + currentSeqNum+1, currentMaxSeqNum); } catch (Exception e) { throw new IOException("Could not start Sequence Counter", e); } @@ -524,22 +523,14 @@ protected int incrementDelegationTokenSeqNum() { // seen by peers, so only when the range is exhausted it will ask zk for // another range again try { - this.currentSeqNumLock.readLock().lock(); - if (currentSeqNum.get() >= currentMaxSeqNum.get()) { + this.currentSeqNumLock.lock(); + if (currentSeqNum >= currentMaxSeqNum) { try { // after a successful batch request, we can get the range starting point - this.currentSeqNumLock.readLock().unlock(); - try { - this.currentSeqNumLock.writeLock().lock(); - if (currentSeqNum.get() >= currentMaxSeqNum.get()) { - currentSeqNum.set(incrSharedCount(delTokSeqCounter, seqNumBatchSize)); - currentMaxSeqNum.set(currentSeqNum.get() + seqNumBatchSize ); - LOG.info("Fetched new range of seq num, from {} to {} ", - currentSeqNum.get()+1, currentMaxSeqNum); - } - } finally { - this.currentSeqNumLock.writeLock().unlock(); - } + currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize ; + LOG.info("Fetched new range of seq num, from {} to {} ", + currentSeqNum+1, currentMaxSeqNum); } catch (InterruptedException e) { // The ExpirationThread is just finishing.. so dont do anything.. LOG.debug( @@ -549,12 +540,10 @@ protected int incrementDelegationTokenSeqNum() { throw new RuntimeException("Could not increment shared counter !!", e); } } - return currentSeqNum.incrementAndGet(); + return ++currentSeqNum; } finally { - if( this.currentSeqNumLock.getReadHoldCount() > 0) { - this.currentSeqNumLock.readLock().unlock(); + this.currentSeqNumLock.unlock(); } - } } @Override From d5eded867b72eb6d5018ea63ee20bde882f21b29 Mon Sep 17 00:00:00 2001 From: Vikas Kumar Date: Wed, 13 Sep 2023 19:22:09 +0530 Subject: [PATCH 4/4] HADOOP-18851: Perfm improvement for ZKDT management-CodeStyle changes --- .../token/delegation/ZKDelegationTokenSecretManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index b8be24f05ad7b..687560a7c13dd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -528,7 +528,7 @@ protected int incrementDelegationTokenSeqNum() { try { // after a successful batch request, we can get the range starting point currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); - currentMaxSeqNum = currentSeqNum + seqNumBatchSize ; + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; LOG.info("Fetched new range of seq num, from {} to {} ", currentSeqNum+1, currentMaxSeqNum); } catch (InterruptedException e) { @@ -542,8 +542,8 @@ protected int incrementDelegationTokenSeqNum() { } return ++currentSeqNum; } finally { - this.currentSeqNumLock.unlock(); - } + this.currentSeqNumLock.unlock(); + } } @Override