From 5dcd9125389d320278133895c215e822ad908253 Mon Sep 17 00:00:00 2001 From: zhangjian232 Date: Fri, 25 Aug 2023 11:56:03 +0800 Subject: [PATCH 1/7] fix:NoNamenodesAvailableException long time when ns failover --- .../resolver/ActiveNamenodeResolver.java | 8 +++ .../resolver/MembershipNamenodeResolver.java | 25 ++++++- .../federation/router/RouterRpcClient.java | 3 + .../federation/MiniRouterDFSCluster.java | 2 +- .../hdfs/server/federation/MockResolver.java | 4 ++ .../TestRouterClientRejectOverload.java | 72 +++++++++++++++++++ 6 files changed, 111 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java index cae1f478604d6..fc5f9ac00959c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java @@ -146,4 +146,12 @@ List getNamenodesForNameserviceId( * @param routerId Unique string identifier for the router. */ void setRouterId(String routerId); + + /** + * Shuffle cache, to ensure that the current nn will not be accessed first next time + * + * @param nsId name service id + * @param namenode namenode contexts + */ + void shuffleCache(String nsId, FederationNamenodeContext namenode); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index db1dcdf1818f3..4fc8a62dea454 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -83,14 +83,12 @@ public class MembershipNamenodeResolver /** Cached lookup of NN for block pool. Invalidated on cache refresh. */ private Map> cacheBP; - public MembershipNamenodeResolver( Configuration conf, StateStoreService store) throws IOException { this.stateStore = store; this.cacheNS = new ConcurrentHashMap<>(); this.cacheBP = new ConcurrentHashMap<>(); - if (this.stateStore != null) { // Request cache updates from the state store this.stateStore.registerCacheExternal(this); @@ -478,4 +476,27 @@ private List getRecentRegistrationForQuery( public void setRouterId(String router) { this.routerId = router; } + + /** + * Shuffle cache, to ensure that the current nn will not be accessed first next time. + * + * + * @param nsId name service id + * @param namenode namenode contexts + */ + @Override + public synchronized void shuffleCache(String nsId, FederationNamenodeContext namenode) { + cacheNS.compute(Pair.of(nsId, false), (ns, namenodeContexts) -> { + if (namenodeContexts != null + && namenodeContexts.size() > 0 + && !namenodeContexts.get(0).getState().equals(ACTIVE) + && namenodeContexts.get(0).getRpcAddress().equals(namenode.getRpcAddress())) { + List rotatedNnContexts = new ArrayList<>(namenodeContexts); + Collections.rotate(rotatedNnContexts, -1); + return rotatedNnContexts; + } else { + return namenodeContexts; + } + }); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 321d97e5dac4b..b376e51b56eb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -599,6 +599,9 @@ public Object invokeMethod( } LOG.error("Cannot get available namenode for {} {} error: {}", nsId, rpcAddress, ioe.getMessage()); + if (this.namenodeResolver != null) { + this.namenodeResolver.shuffleCache(nsId, namenode); + } // Throw RetriableException so that client can retry throw new RetriableException(ioe); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 2c70395870496..96a8140fb3d09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -134,7 +134,7 @@ public class MiniRouterDFSCluster { protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); - protected static final long DEFAULT_CACHE_INTERVAL_MS = + public static final long DEFAULT_CACHE_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); /** Heartbeat interval in milliseconds. */ private long heartbeatInterval; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 4aaa8e7569e88..245df8e6fa491 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -397,6 +397,10 @@ public List getMountPoints(String path) throws IOException { public void setRouterId(String router) { } + @Override + public void shuffleCache(String nsId, FederationNamenodeContext namenode) { + } + /** * Mocks the availability of default namespace. * @param b if true default namespace is unset. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java index 8d776546801ba..055ad65332089 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE; +import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.STANDBY; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToActive; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_CACHE_INTERVAL_MS; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -42,16 +45,19 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.test.GenericTestUtils; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -357,6 +363,72 @@ public void testNoNamenodesAvailable() throws Exception{ assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes()); } + /** + * When failover occurs, the router may record that the ns has no active namenode. + * Only when the router updates the cache next time can the memory status be updated, + * causing the router to report NoNamenodesAvailableException for a long time + */ + @Test + public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception { + setupCluster(false, true); + transitionClusterNSToStandby(cluster); + for (RouterContext routerContext : cluster.getRouters()) { + // Manually trigger the heartbeat + Collection heartbeatServices = routerContext + .getRouter().getNamenodeHeartbeatServices(); + for (NamenodeHeartbeatService service : heartbeatServices) { + service.periodicInvoke(); + } + // Update service cache + routerContext.getRouter().getStateStore().refreshCaches(true); + } + // Record the time after the router first updated the cache + long firstLoadTime = Time.now(); + List namenodes = cluster.getNamenodes(); + + // Make sure all namenodes are in standby state + for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) { + assertTrue(namenodeContext.getNamenode().getNameNodeState() == STANDBY.ordinal()); + } + + Configuration conf = cluster.getRouterClientConf(); + // Set dfs.client.failover.random.order false, to pick 1st router at first + conf.setBoolean("dfs.client.failover.random.order", false); + + DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf); + + for (RouterContext routerContext : cluster.getRouters()) { + // Get the second namenode in the router cache and make it active + List ns0 = routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId("ns0", false); + String nsId = ns0.get(1).getNamenodeId(); + cluster.switchToActive("ns0", nsId); + // Manually trigger the heartbeat, but the router does not manually load the cache + Collection heartbeatServices = routerContext + .getRouter().getNamenodeHeartbeatServices(); + for (NamenodeHeartbeatService service : heartbeatServices) { + service.periodicInvoke(); + } + assertTrue(cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState() == ACTIVE.ordinal()); + } + + // Get router0 metrics + FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0) + .getRouter().getRpcServer().getRPCMetrics(); + // Original failures + long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes(); + + // At this time, the router has recorded 2 standby namenodes in memory, and the first accessed namenode is indeed standby, + // then an NoNamenodesAvailableException will be reported for the first access, and the next access will be successful + routerClient.getFileInfo("/"); + long successReadTime = Time.now(); + assertEquals(originalRouter0Failures + 1, rpcMetrics0.getProxyOpNoNamenodes()); + + // access the active namenode without waiting for the router to update the cache, + // even if there are 2 standby states recorded in the router memory + assertTrue(successReadTime - firstLoadTime < DEFAULT_CACHE_INTERVAL_MS); + } + + @Test public void testAsyncCallerPoolMetrics() throws Exception { setupCluster(true, false); From 29c98dcc951470f37ab9d9cd11f7036aa7b10484 Mon Sep 17 00:00:00 2001 From: zhangjian232 Date: Sat, 26 Aug 2023 10:56:33 +0800 Subject: [PATCH 2/7] reformat and check > 1 when shuffle --- .../resolver/ActiveNamenodeResolver.java | 2 +- .../resolver/MembershipNamenodeResolver.java | 28 +++++++++++++------ .../TestRouterClientRejectOverload.java | 27 ++++++++++++------ 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java index fc5f9ac00959c..a07e1ae8e1184 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java @@ -148,7 +148,7 @@ List getNamenodesForNameserviceId( void setRouterId(String routerId); /** - * Shuffle cache, to ensure that the current nn will not be accessed first next time + * Shuffle cache, to ensure that the current nn will not be accessed first next time. * * @param nsId name service id * @param namenode namenode contexts diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index 4fc8a62dea454..9232293060205 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -83,12 +83,14 @@ public class MembershipNamenodeResolver /** Cached lookup of NN for block pool. Invalidated on cache refresh. */ private Map> cacheBP; + public MembershipNamenodeResolver( Configuration conf, StateStoreService store) throws IOException { this.stateStore = store; this.cacheNS = new ConcurrentHashMap<>(); this.cacheBP = new ConcurrentHashMap<>(); + if (this.stateStore != null) { // Request cache updates from the state store this.stateStore.registerCacheExternal(this); @@ -480,20 +482,30 @@ public void setRouterId(String router) { /** * Shuffle cache, to ensure that the current nn will not be accessed first next time. * - * * @param nsId name service id * @param namenode namenode contexts */ @Override public synchronized void shuffleCache(String nsId, FederationNamenodeContext namenode) { cacheNS.compute(Pair.of(nsId, false), (ns, namenodeContexts) -> { - if (namenodeContexts != null - && namenodeContexts.size() > 0 - && !namenodeContexts.get(0).getState().equals(ACTIVE) - && namenodeContexts.get(0).getRpcAddress().equals(namenode.getRpcAddress())) { - List rotatedNnContexts = new ArrayList<>(namenodeContexts); - Collections.rotate(rotatedNnContexts, -1); - return rotatedNnContexts; + if (namenodeContexts != null && namenodeContexts.size() > 1) { + FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0); + /* + * 1.If the first nn in the cache is active, the active nn priority cannot be lowered. + * This happens when other threads have already updated the cache. + * + * 2.If the first nn in the cache at this time is not the nn + * that needs to be lowered in priority, there is no need to rotate. + * This happens when other threads have already rotated the cache. + */ + if (!firstNamenodeContext.getState().equals(ACTIVE) + && firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) { + List rotatedNnContexts = new ArrayList<>(namenodeContexts); + Collections.rotate(rotatedNnContexts, -1); + return rotatedNnContexts; + } else { + return namenodeContexts; + } } else { return namenodeContexts; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java index 055ad65332089..ac7e0d99f78bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -366,7 +366,7 @@ public void testNoNamenodesAvailable() throws Exception{ /** * When failover occurs, the router may record that the ns has no active namenode. * Only when the router updates the cache next time can the memory status be updated, - * causing the router to report NoNamenodesAvailableException for a long time + * causing the router to report NoNamenodesAvailableException for a long time. */ @Test public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception { @@ -388,7 +388,8 @@ public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception { // Make sure all namenodes are in standby state for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) { - assertTrue(namenodeContext.getNamenode().getNameNodeState() == STANDBY.ordinal()); + assertEquals(STANDBY.ordinal(), + namenodeContext.getNamenode().getNameNodeState()); } Configuration conf = cluster.getRouterClientConf(); @@ -399,7 +400,10 @@ public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception { for (RouterContext routerContext : cluster.getRouters()) { // Get the second namenode in the router cache and make it active - List ns0 = routerContext.getRouter().getNamenodeResolver().getNamenodesForNameserviceId("ns0", false); + List ns0 = routerContext.getRouter() + .getNamenodeResolver() + .getNamenodesForNameserviceId("ns0", false); + String nsId = ns0.get(1).getNamenodeId(); cluster.switchToActive("ns0", nsId); // Manually trigger the heartbeat, but the router does not manually load the cache @@ -408,7 +412,8 @@ public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception { for (NamenodeHeartbeatService service : heartbeatServices) { service.periodicInvoke(); } - assertTrue(cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState() == ACTIVE.ordinal()); + assertEquals(ACTIVE.ordinal(), + cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState()); } // Get router0 metrics @@ -417,14 +422,20 @@ public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception { // Original failures long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes(); - // At this time, the router has recorded 2 standby namenodes in memory, and the first accessed namenode is indeed standby, - // then an NoNamenodesAvailableException will be reported for the first access, and the next access will be successful + /* + * At this time, the router has recorded 2 standby namenodes in memory, + * and the first accessed namenode is indeed standby, + * then an NoNamenodesAvailableException will be reported for the first access, + * and the next access will be successful. + */ routerClient.getFileInfo("/"); long successReadTime = Time.now(); assertEquals(originalRouter0Failures + 1, rpcMetrics0.getProxyOpNoNamenodes()); - // access the active namenode without waiting for the router to update the cache, - // even if there are 2 standby states recorded in the router memory + /* + * access the active namenode without waiting for the router to update the cache, + * even if there are 2 standby states recorded in the router memory. + */ assertTrue(successReadTime - firstLoadTime < DEFAULT_CACHE_INTERVAL_MS); } From 209f5883112aa247eefc1c254f14f4d64be55975 Mon Sep 17 00:00:00 2001 From: zhangjian232 Date: Wed, 30 Aug 2023 15:03:16 +0800 Subject: [PATCH 3/7] reverse if and modify method name --- .../resolver/ActiveNamenodeResolver.java | 5 ++- .../resolver/MembershipNamenodeResolver.java | 45 ++++++++++--------- .../federation/router/RouterRpcClient.java | 2 +- .../federation/MiniRouterDFSCluster.java | 6 ++- .../hdfs/server/federation/MockResolver.java | 2 +- .../TestRouterClientRejectOverload.java | 6 +-- 6 files changed, 36 insertions(+), 30 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java index a07e1ae8e1184..1648152c80aea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java @@ -148,10 +148,11 @@ List getNamenodesForNameserviceId( void setRouterId(String routerId); /** - * Shuffle cache, to ensure that the current nn will not be accessed first next time. + * Rotate cache, make the current namenode have the lowest priority, + * to ensure that the current namenode will not be accessed first next time. * * @param nsId name service id * @param namenode namenode contexts */ - void shuffleCache(String nsId, FederationNamenodeContext namenode); + void rotateCache(String nsId, FederationNamenodeContext namenode); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index 9232293060205..96d41c01e8a0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -480,33 +480,36 @@ public void setRouterId(String router) { } /** - * Shuffle cache, to ensure that the current nn will not be accessed first next time. + * Rotate cache, make the current namenode have the lowest priority, + * to ensure that the current namenode will not be accessed first next time. * * @param nsId name service id * @param namenode namenode contexts */ @Override - public synchronized void shuffleCache(String nsId, FederationNamenodeContext namenode) { + public synchronized void rotateCache(String nsId, FederationNamenodeContext namenode) { cacheNS.compute(Pair.of(nsId, false), (ns, namenodeContexts) -> { - if (namenodeContexts != null && namenodeContexts.size() > 1) { - FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0); - /* - * 1.If the first nn in the cache is active, the active nn priority cannot be lowered. - * This happens when other threads have already updated the cache. - * - * 2.If the first nn in the cache at this time is not the nn - * that needs to be lowered in priority, there is no need to rotate. - * This happens when other threads have already rotated the cache. - */ - if (!firstNamenodeContext.getState().equals(ACTIVE) - && firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) { - List rotatedNnContexts = new ArrayList<>(namenodeContexts); - Collections.rotate(rotatedNnContexts, -1); - return rotatedNnContexts; - } else { - return namenodeContexts; - } - } else { + if (namenodeContexts == null || namenodeContexts.size() <= 1) { + return namenodeContexts; + } + FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0); + /* + * If the first nn in the cache is active, the active nn priority cannot be lowered. + * This happens when other threads have already updated the cache. + */ + if(firstNamenodeContext.getState().equals(ACTIVE)){ + return namenodeContexts; + } + /* + * If the first nn in the cache at this time is not the nn + * that needs to be lowered in priority, there is no need to rotate. + * This happens when other threads have already rotated the cache. + */ + if(firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())){ + List rotatedNnContexts = new ArrayList<>(namenodeContexts); + Collections.rotate(rotatedNnContexts, -1); + return rotatedNnContexts; + }else { return namenodeContexts; } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index b376e51b56eb0..628231a19b625 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -600,7 +600,7 @@ public Object invokeMethod( LOG.error("Cannot get available namenode for {} {} error: {}", nsId, rpcAddress, ioe.getMessage()); if (this.namenodeResolver != null) { - this.namenodeResolver.shuffleCache(nsId, namenode); + this.namenodeResolver.rotateCache(nsId, namenode); } // Throw RetriableException so that client can retry throw new RetriableException(ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 96a8140fb3d09..61205128e8b46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -134,7 +134,7 @@ public class MiniRouterDFSCluster { protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); - public static final long DEFAULT_CACHE_INTERVAL_MS = + protected static final long DEFAULT_CACHE_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); /** Heartbeat interval in milliseconds. */ private long heartbeatInterval; @@ -1202,4 +1202,8 @@ public void waitClusterUp() throws IOException { throw new IOException("Cannot wait for the namenodes", e); } } + + public long getCacheFlushInterval() { + return cacheFlushInterval; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 245df8e6fa491..fef8a20b9fa68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -398,7 +398,7 @@ public void setRouterId(String router) { } @Override - public void shuffleCache(String nsId, FederationNamenodeContext namenode) { + public void rotateCache(String nsId, FederationNamenodeContext namenode) { } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java index ac7e0d99f78bb..176ac4b078250 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToActive; -import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_CACHE_INTERVAL_MS; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -388,8 +387,7 @@ public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception { // Make sure all namenodes are in standby state for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) { - assertEquals(STANDBY.ordinal(), - namenodeContext.getNamenode().getNameNodeState()); + assertEquals(STANDBY.ordinal(), namenodeContext.getNamenode().getNameNodeState()); } Configuration conf = cluster.getRouterClientConf(); @@ -436,7 +434,7 @@ public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception { * access the active namenode without waiting for the router to update the cache, * even if there are 2 standby states recorded in the router memory. */ - assertTrue(successReadTime - firstLoadTime < DEFAULT_CACHE_INTERVAL_MS); + assertTrue(successReadTime - firstLoadTime < cluster.getCacheFlushInterval()); } From cbea4b46aaf7ccdae7811669e0451dff1b7e975e Mon Sep 17 00:00:00 2001 From: zhangjian232 Date: Wed, 30 Aug 2023 16:59:03 +0800 Subject: [PATCH 4/7] refresh the cache for the key Pair.of(nsId, true) at times. --- .../federation/resolver/ActiveNamenodeResolver.java | 3 ++- .../federation/resolver/MembershipNamenodeResolver.java | 9 +++++---- .../hdfs/server/federation/router/RouterRpcClient.java | 2 +- .../hadoop/hdfs/server/federation/MockResolver.java | 3 ++- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java index 1648152c80aea..de89a152c2bbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java @@ -153,6 +153,7 @@ List getNamenodesForNameserviceId( * * @param nsId name service id * @param namenode namenode contexts + * @param listObserversFirst Observer read case, observer NN will be ranked first */ - void rotateCache(String nsId, FederationNamenodeContext namenode); + void rotateCache(String nsId, FederationNamenodeContext namenode, boolean listObserversFirst); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index 96d41c01e8a0f..f37e192c6ddd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -485,10 +485,12 @@ public void setRouterId(String router) { * * @param nsId name service id * @param namenode namenode contexts + * @param listObserversFirst Observer read case, observer NN will be ranked first */ @Override - public synchronized void rotateCache(String nsId, FederationNamenodeContext namenode) { - cacheNS.compute(Pair.of(nsId, false), (ns, namenodeContexts) -> { + public synchronized void rotateCache( + String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) { + cacheNS.compute(Pair.of(nsId, listObserversFirst), (ns, namenodeContexts) -> { if (namenodeContexts == null || namenodeContexts.size() <= 1) { return namenodeContexts; } @@ -509,9 +511,8 @@ public synchronized void rotateCache(String nsId, FederationNamenodeContext name List rotatedNnContexts = new ArrayList<>(namenodeContexts); Collections.rotate(rotatedNnContexts, -1); return rotatedNnContexts; - }else { - return namenodeContexts; } + return namenodeContexts; }); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 628231a19b625..436f624ddc6b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -600,7 +600,7 @@ public Object invokeMethod( LOG.error("Cannot get available namenode for {} {} error: {}", nsId, rpcAddress, ioe.getMessage()); if (this.namenodeResolver != null) { - this.namenodeResolver.rotateCache(nsId, namenode); + this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver); } // Throw RetriableException so that client can retry throw new RetriableException(ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index fef8a20b9fa68..6d11c057e01cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -398,7 +398,8 @@ public void setRouterId(String router) { } @Override - public void rotateCache(String nsId, FederationNamenodeContext namenode) { + public void rotateCache( + String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) { } /** From cb4e08cb364c80d53a83a62f79dee19060636d17 Mon Sep 17 00:00:00 2001 From: zhangjian232 Date: Wed, 30 Aug 2023 17:03:51 +0800 Subject: [PATCH 5/7] check style --- .../federation/resolver/MembershipNamenodeResolver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index f37e192c6ddd7..3c5c8eae7fd4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -499,7 +499,7 @@ public synchronized void rotateCache( * If the first nn in the cache is active, the active nn priority cannot be lowered. * This happens when other threads have already updated the cache. */ - if(firstNamenodeContext.getState().equals(ACTIVE)){ + if (firstNamenodeContext.getState().equals(ACTIVE)) { return namenodeContexts; } /* @@ -507,7 +507,7 @@ public synchronized void rotateCache( * that needs to be lowered in priority, there is no need to rotate. * This happens when other threads have already rotated the cache. */ - if(firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())){ + if (firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) { List rotatedNnContexts = new ArrayList<>(namenodeContexts); Collections.rotate(rotatedNnContexts, -1); return rotatedNnContexts; From d338de4aa57d30349cb7bb1fa32c1d93dfc851e0 Mon Sep 17 00:00:00 2001 From: Jian Zhang <1361320460@qq.com> Date: Sat, 2 Sep 2023 12:44:26 +0800 Subject: [PATCH 6/7] add log and javadoc add log and javadoc --- .../federation/resolver/MembershipNamenodeResolver.java | 2 +- .../hdfs/server/federation/router/RouterRpcClient.java | 1 + .../hadoop/hdfs/server/federation/MiniRouterDFSCluster.java | 5 +++++ .../apache/hadoop/hdfs/server/federation/MockResolver.java | 2 +- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index 3c5c8eae7fd4d..b17a165175c39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -489,7 +489,7 @@ public void setRouterId(String router) { */ @Override public synchronized void rotateCache( - String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) { + String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) { cacheNS.compute(Pair.of(nsId, listObserversFirst), (ns, namenodeContexts) -> { if (namenodeContexts == null || namenodeContexts.size() <= 1) { return namenodeContexts; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 436f624ddc6b1..9a0e1bc2c68f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -601,6 +601,7 @@ public Object invokeMethod( nsId, rpcAddress, ioe.getMessage()); if (this.namenodeResolver != null) { this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver); + LOG.info("Rotate cache of pair: ", nsId, shouldUseObserver); } // Throw RetriableException so that client can retry throw new RetriableException(ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 61205128e8b46..bf22cf01148a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -1203,6 +1203,11 @@ public void waitClusterUp() throws IOException { } } + /** + * Get cache flush interval in milliseconds. + * + * @return Cache flush interval in milliseconds. + */ public long getCacheFlushInterval() { return cacheFlushInterval; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 6d11c057e01cd..554879856ac1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -399,7 +399,7 @@ public void setRouterId(String router) { @Override public void rotateCache( - String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) { + String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) { } /** From 4f3ab96d7f1235fdc7bd5adf79cba150338131b3 Mon Sep 17 00:00:00 2001 From: Jian Zhang <1361320460@qq.com> Date: Mon, 4 Sep 2023 17:48:38 +0800 Subject: [PATCH 7/7] Remove unneeded synchronization and judgment, add log --- .../federation/resolver/MembershipNamenodeResolver.java | 6 +++++- .../hdfs/server/federation/router/RouterRpcClient.java | 6 ++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index b17a165175c39..c0e800e0430d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -488,7 +488,7 @@ public void setRouterId(String router) { * @param listObserversFirst Observer read case, observer NN will be ranked first */ @Override - public synchronized void rotateCache( + public void rotateCache( String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) { cacheNS.compute(Pair.of(nsId, listObserversFirst), (ns, namenodeContexts) -> { if (namenodeContexts == null || namenodeContexts.size() <= 1) { @@ -510,6 +510,10 @@ public synchronized void rotateCache( if (firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) { List rotatedNnContexts = new ArrayList<>(namenodeContexts); Collections.rotate(rotatedNnContexts, -1); + String firstNamenodeId = namenodeContexts.get(0).getNamenodeId(); + LOG.info("Rotate cache of pair , put namenode: {} in the " + + "first position of the cache and namenode: {} in the last position of the cache", + nsId, listObserversFirst, firstNamenodeId, namenode.getNamenodeId()); return rotatedNnContexts; } return namenodeContexts; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 9a0e1bc2c68f0..b38900c3bc264 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -599,10 +599,8 @@ public Object invokeMethod( } LOG.error("Cannot get available namenode for {} {} error: {}", nsId, rpcAddress, ioe.getMessage()); - if (this.namenodeResolver != null) { - this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver); - LOG.info("Rotate cache of pair: ", nsId, shouldUseObserver); - } + // Rotate cache so that client can retry the next namenode in the cache + this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver); // Throw RetriableException so that client can retry throw new RetriableException(ioe); } else {