diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index 283e773c81795..cafa5135e68e6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -115,12 +115,12 @@ private String formatTokenId(TokenIdent id) {
   /**
    * Access to currentKey is protected by this object lock
    */
-  private DelegationKey currentKey;
+  private volatile DelegationKey currentKey;
   
-  private long keyUpdateInterval;
-  private long tokenMaxLifetime;
-  private long tokenRemoverScanInterval;
-  private long tokenRenewInterval;
+  private final long keyUpdateInterval;
+  private final long tokenMaxLifetime;
+  private final long tokenRemoverScanInterval;
+  private final long tokenRenewInterval;
   /**
    * Whether to store a token's tracking ID in its TokenInformation.
    * Can be overridden by a subclass.
@@ -486,17 +486,18 @@ private synchronized void removeExpiredKeys() {
   }
   
   @Override
-  protected synchronized byte[] createPassword(TokenIdent identifier) {
+  protected byte[] createPassword(TokenIdent identifier) {
     int sequenceNum;
     long now = Time.now();
     sequenceNum = incrementDelegationTokenSeqNum();
     identifier.setIssueDate(now);
     identifier.setMaxDate(now + tokenMaxLifetime);
-    identifier.setMasterKeyId(currentKey.getKeyId());
+    DelegationKey delegationCurrentKey = currentKey;
+    identifier.setMasterKeyId(delegationCurrentKey.getKeyId());
     identifier.setSequenceNumber(sequenceNum);
     LOG.info("Creating password for identifier: " + formatTokenId(identifier)
-        + ", currentKey: " + currentKey.getKeyId());
-    byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
+        + ", currentKey: " + delegationCurrentKey.getKeyId());
+    byte[] password = createPassword(identifier.getBytes(), delegationCurrentKey.getKey());
     DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
         + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
     try {
@@ -521,7 +522,6 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
    */
   protected DelegationTokenInformation checkToken(TokenIdent identifier)
       throws InvalidToken {
-    assert Thread.holdsLock(this);
     DelegationTokenInformation info = getTokenInfo(identifier);
     String err;
     if (info == null) {
@@ -541,7 +541,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier)
   }
   
   @Override
-  public synchronized byte[] retrievePassword(TokenIdent identifier)
+  public byte[] retrievePassword(TokenIdent identifier)
       throws InvalidToken {
     return checkToken(identifier).getPassword();
   }
@@ -553,7 +553,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) {
     return null;
   }
 
-  public synchronized String getTokenTrackingId(TokenIdent identifier) {
+  public String getTokenTrackingId(TokenIdent identifier) {
     DelegationTokenInformation info = getTokenInfo(identifier);
     if (info == null) {
       return null;
@@ -567,7 +567,7 @@ public synchronized String getTokenTrackingId(TokenIdent identifier) {
    * @param password Password in the token.
    * @throws InvalidToken InvalidToken.
    */
-  public synchronized void verifyToken(TokenIdent identifier, byte[] password)
+  public void verifyToken(TokenIdent identifier, byte[] password)
       throws InvalidToken {
     byte[] storedPassword = retrievePassword(identifier);
     if (!MessageDigest.isEqual(password, storedPassword)) {
@@ -584,7 +584,7 @@ public synchronized void verifyToken(TokenIdent identifier, byte[] password)
    * @throws InvalidToken if the token is invalid
    * @throws AccessControlException if the user can't renew token
    */
-  public synchronized long renewToken(Token<TokenIdent> token,
+  public long renewToken(Token<TokenIdent> token,
                          String renewer) throws InvalidToken, IOException {
     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
     DataInputStream in = new DataInputStream(buf);
@@ -646,7 +646,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
    * @throws InvalidToken for invalid token
    * @throws AccessControlException if the user isn't allowed to cancel
    */
-  public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
+  public TokenIdent cancelToken(Token<TokenIdent> token,
       String canceller) throws IOException {
     ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
     DataInputStream in = new DataInputStream(buf);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
index fb9a2951f598a..687560a7c13dd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -27,6 +27,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Stream;
 
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
@@ -148,7 +149,7 @@ protected static CuratorFramework getCurator() {
   private final int seqNumBatchSize;
   private int currentSeqNum;
   private int currentMaxSeqNum;
-
+  private final ReentrantLock currentSeqNumLock;
   private final boolean isTokenWatcherEnabled;
 
   public ZKDelegationTokenSecretManager(Configuration conf) {
@@ -164,6 +165,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) {
         ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
     isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
         ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
+    this.currentSeqNumLock = new ReentrantLock(true);
     if (CURATOR_TL.get() != null) {
       zkClient =
           CURATOR_TL.get().usingNamespace(
@@ -520,24 +522,28 @@ protected int incrementDelegationTokenSeqNum() {
     // The secret manager will keep a local range of seq num which won't be
     // seen by peers, so only when the range is exhausted it will ask zk for
     // another range again
-    if (currentSeqNum >= currentMaxSeqNum) {
-      try {
-        // after a successful batch request, we can get the range starting point
-        currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
-        currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
-        LOG.info("Fetched new range of seq num, from {} to {} ",
-            currentSeqNum+1, currentMaxSeqNum);
-      } catch (InterruptedException e) {
-        // The ExpirationThread is just finishing.. so dont do anything..
-        LOG.debug(
-            "Thread interrupted while performing token counter increment", e);
-        Thread.currentThread().interrupt();
-      } catch (Exception e) {
-        throw new RuntimeException("Could not increment shared counter !!", e);
+    try {
+      this.currentSeqNumLock.lock();
+      if (currentSeqNum >= currentMaxSeqNum) {
+        try {
+          // after a successful batch request, we can get the range starting point
+          currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
+          currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
+          LOG.info("Fetched new range of seq num, from {} to {} ",
+              currentSeqNum+1, currentMaxSeqNum);
+        } catch (InterruptedException e) {
+          // The ExpirationThread is just finishing.. so dont do anything..
+          LOG.debug(
+                  "Thread interrupted while performing token counter increment", e);
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          throw new RuntimeException("Could not increment shared counter !!", e);
+        }
       }
+      return ++currentSeqNum;
+    } finally {
+      this.currentSeqNumLock.unlock();
     }
-
-    return ++currentSeqNum;
   }
 
   @Override