diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index c0e800e0430d4..342bc04dbf853 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -494,28 +494,23 @@ public void rotateCache( if (namenodeContexts == null || namenodeContexts.size() <= 1) { return namenodeContexts; } - FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0); - /* - * If the first nn in the cache is active, the active nn priority cannot be lowered. - * This happens when other threads have already updated the cache. - */ - if (firstNamenodeContext.getState().equals(ACTIVE)) { - return namenodeContexts; + + for (FederationNamenodeContext namenodeContext : namenodeContexts) { + if (namenodeContext.getState() == ACTIVE) { + return namenodeContexts; + } } - /* - * If the first nn in the cache at this time is not the nn - * that needs to be lowered in priority, there is no need to rotate. - * This happens when other threads have already rotated the cache. - */ - if (firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) { - List rotatedNnContexts = new ArrayList<>(namenodeContexts); - Collections.rotate(rotatedNnContexts, -1); - String firstNamenodeId = namenodeContexts.get(0).getNamenodeId(); - LOG.info("Rotate cache of pair , put namenode: {} in the " + - "first position of the cache and namenode: {} in the last position of the cache", - nsId, listObserversFirst, firstNamenodeId, namenode.getNamenodeId()); - return rotatedNnContexts; + + FederationNamenodeContext lastNamenode = namenodeContexts.get(namenodeContexts.size()-1); + + if (lastNamenode.getRpcAddress().equals(namenode.getRpcAddress())) { + return namenodeContexts; } + List rotateNamenodeContexts = + (List) namenodeContexts; + rotateNamenodeContexts.remove(namenode); + rotateNamenodeContexts.add(namenode); + LOG.info("Rotate cache of pair<{}, {}> -> {}", nsId, listObserversFirst, namenodeContexts); return namenodeContexts; }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index b38900c3bc264..c6a56b67540ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -461,10 +461,12 @@ private static IOException toIOException(Exception e) { * @throws NoNamenodesAvailableException Exception that the retry policy * generates for no available namenodes. */ - private RetryDecision shouldRetry(final IOException ioe, final int retryCount, - final String nsId) throws IOException { + private RetryDecision shouldRetry( + final IOException ioe, final int retryCount, final String nsId, + final FederationNamenodeContext namenode, + final boolean listObserverFirst) throws IOException { // check for the case of cluster unavailable state - if (isClusterUnAvailable(nsId)) { + if (isClusterUnAvailable(nsId, namenode, listObserverFirst)) { // we allow to retry once if cluster is unavailable if (retryCount == 0) { return RetryDecision.RETRY; @@ -538,7 +540,7 @@ public Object invokeMethod( ProxyAndInfo client = connection.getClient(); final Object proxy = client.getProxy(); - ret = invoke(nsId, 0, method, proxy, params); + ret = invoke(nsId, namenode, useObserver, 0, method, proxy, params); if (failover && FederationNamenodeServiceState.OBSERVER != namenode.getState()) { // Success on alternate server, update @@ -594,13 +596,16 @@ public Object invokeMethod( se.initCause(ioe); throw se; } else if (ioe instanceof NoNamenodesAvailableException) { + IOException cause = (IOException) ioe.getCause(); if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpNoNamenodes(nsId); } LOG.error("Cannot get available namenode for {} {} error: {}", nsId, rpcAddress, ioe.getMessage()); // Rotate cache so that client can retry the next namenode in the cache - this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver); + if (shouldRotateCache(cause)) { + this.namenodeResolver.rotateCache(nsId, namenode, useObserver); + } // Throw RetriableException so that client can retry throw new RetriableException(ioe); } else { @@ -708,7 +713,9 @@ private void addClientInfoToCallerContext(UserGroupInformation ugi) { * @return Response from the remote server * @throws IOException If error occurs. */ - private Object invoke(String nsId, int retryCount, final Method method, + private Object invoke( + String nsId, FederationNamenodeContext namenode, Boolean listObserverFirst, + int retryCount, final Method method, final Object obj, final Object... params) throws IOException { try { return method.invoke(obj, params); @@ -721,14 +728,14 @@ private Object invoke(String nsId, int retryCount, final Method method, IOException ioe = (IOException) cause; // Check if we should retry. - RetryDecision decision = shouldRetry(ioe, retryCount, nsId); + RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst); if (decision == RetryDecision.RETRY) { if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpRetries(); } // retry - return invoke(nsId, ++retryCount, method, obj, params); + return invoke(nsId, namenode, listObserverFirst, ++retryCount, method, obj, params); } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { // failover, invoker looks for standby exceptions for failover. if (ioe instanceof StandbyException) { @@ -772,13 +779,22 @@ public static boolean isUnavailableException(IOException ioe) { * Check if the cluster of given nameservice id is available. * * @param nsId nameservice ID. + * @param namenode + * @param listObserverFirst * @return true if the cluster with given nameservice id is available. * @throws IOException if error occurs. */ - private boolean isClusterUnAvailable(String nsId) throws IOException { + private boolean isClusterUnAvailable( + String nsId, FederationNamenodeContext namenode, + boolean listObserverFirst) throws IOException { + // Use observer and the namenode that causes the exception is an observer, + // false is returned so that the oberver can be marked as unavailable,so other observers + // or active namenode which is standby in the cache of the router can be retried. + if (listObserverFirst && namenode.getState() == FederationNamenodeServiceState.OBSERVER) { + return false; + } List nnState = this.namenodeResolver - .getNamenodesForNameserviceId(nsId, false); - + .getNamenodesForNameserviceId(nsId, listObserverFirst); if (nnState != null) { for (FederationNamenodeContext nnContext : nnState) { // Once we find one NN is in active state, we assume this @@ -1830,4 +1846,24 @@ private LongAccumulator getTimeOfLastCallToActive(String namespaceId) { return lastActiveNNRefreshTimes .computeIfAbsent(namespaceId, key -> new LongAccumulator(Math::max, 0)); } + + /** + * Determine whether router rotated cache is required when NoNamenodesAvailableException occurs. + * + * @param ioe cause of the NoNamenodesAvailableException. + * @return true if NoNamenodesAvailableException occurs due to + * {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception}, + * otherwise false. + */ + private boolean shouldRotateCache(IOException ioe) { + if (isUnavailableException(ioe)) { + return true; + } + if (ioe instanceof RemoteException) { + RemoteException re = (RemoteException) ioe; + ioe = re.unwrapRemoteException(); + ioe = getCleanException(ioe); + } + return isUnavailableException(ioe); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index bf22cf01148a3..70d1f808958ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -132,9 +132,9 @@ public class MiniRouterDFSCluster { /** Mini cluster. */ private MiniDFSCluster cluster; - protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS = + public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); - protected static final long DEFAULT_CACHE_INTERVAL_MS = + public static final long DEFAULT_CACHE_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5); /** Heartbeat interval in milliseconds. */ private long heartbeatInterval; @@ -253,6 +253,19 @@ public FileSystem getFileSystemWithObserverReadProxyProvider() throws IOExceptio return DistributedFileSystem.get(observerReadConf); } + public FileSystem getFileSystemWithConfiguredFailoverProxyProvider() throws IOException { + conf.set(DFS_NAMESERVICES, + conf.get(DFS_NAMESERVICES)+ ",router-service"); + conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1"); + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1", + getFileSystemURI().toString()); + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + "router-service", ConfiguredFailoverProxyProvider.class.getName()); + DistributedFileSystem.setDefaultUri(conf, "hdfs://router-service"); + + return DistributedFileSystem.get(conf); + } + public DFSClient getClient(UserGroupInformation user) throws IOException, URISyntaxException, InterruptedException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNoNamenodesAvailableLongTime.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNoNamenodesAvailableLongTime.java new file mode 100644 index 0000000000000..f6b6c2256a882 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNoNamenodesAvailableLongTime.java @@ -0,0 +1,415 @@ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.fs.permission.AclEntryType.USER; +import static org.apache.hadoop.fs.permission.FsAction.ALL; +import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.Lists; +import org.junit.After; +import org.junit.Test; + + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +/** + * When failover occurs, the router may record that the ns has no active namenode + * even if there is actually an active namenode. + * Only when the router updates the cache next time can the memory status be updated, + * causing the router to report NoNamenodesAvailableException for a long time, + * + * @see org.apache.hadoop.hdfs.server.federation.router.NoNamenodesAvailableException + */ +public class TestNoNamenodesAvailableLongTime { + + // router load cache interval 10s + private static final long CACHE_FLUSH_INTERVAL_MS = 10000; + private StateStoreDFSCluster cluster; + private FileSystem fileSystem; + private RouterContext routerContext; + private FederationRPCMetrics rpcMetrics; + + @After + public void cleanup() throws IOException { + rpcMetrics = null; + routerContext = null; + if (fileSystem != null) { + fileSystem.close(); + fileSystem = null; + } + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + /** + * Set up state store cluster. + * + * @param numNameservices number of name services + * @param numberOfObserver number of observer + * @param useObserver whether to use observer + */ + private void setupCluster(int numNameservices, int numberOfObserver, boolean useObserver) + throws Exception { + if (!useObserver) { + numberOfObserver = 0; + } + int numberOfNamenode = 2 + numberOfObserver; + cluster = new StateStoreDFSCluster(true, numNameservices, numberOfNamenode, + DEFAULT_HEARTBEAT_INTERVAL_MS, CACHE_FLUSH_INTERVAL_MS); + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .metrics() + .admin() + .rpc() + .heartbeat() + .build(); + + // Set router observer related configs + if (useObserver) { + routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + routerConf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + routerConf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); + } + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4); + + // No need for datanodes + cluster.setNumDatanodesPerNameservice(0); + cluster.addRouterOverrides(routerConf); + + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + List nnList = cluster.getNamenodes(ns); + cluster.switchToActive(ns, nnList.get(0).getNamenodeId()); + cluster.switchToStandby(ns, nnList.get(1).getNamenodeId()); + for (int i = 2; i < numberOfNamenode; i++) { + cluster.switchToObserver(ns, nnList.get(i).getNamenodeId()); + } + } + } + + cluster.startRouters(); + cluster.waitClusterUp(); + } + + /** + * Initialize the test environment and start the cluster so that + * there is no active namenode record in the router cache, + * but the second non-observer namenode in the router cache is actually active. + */ + private void initEnv(int numberOfObserver, boolean useObserver) throws Exception { + setupCluster(1, numberOfObserver, useObserver); + // Transition all namenodes in the cluster are standby. + transitionActiveToStandby(); + // + allRoutersHeartbeat(); + allRoutersLoadCache(); + + List namenodes = cluster.getNamenodes(); + + // Make sure all namenodes are in standby state + for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) { + assertNotEquals(ACTIVE.ordinal(), namenodeContext.getNamenode().getNameNodeState()); + } + + routerContext = cluster.getRandomRouter(); + + // Get the second namenode in the router cache and make it active + setSecondNonObserverNamenodeInTheRouterCacheActive(numberOfObserver, false); + allRoutersHeartbeat(); + + // Get router metrics + rpcMetrics = routerContext.getRouter().getRpcServer().getRPCMetrics(); + + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", useObserver)); + + // Retries is 2 (see FailoverOnNetworkExceptionRetry#shouldRetry, will fail + // when reties > max.attempts), so total access is 3. + routerContext.getConf().setInt("dfs.client.retry.max.attempts", 1); + + if (useObserver) { + fileSystem = routerContext.getFileSystemWithObserverReadProxyProvider(); + } else { + fileSystem = routerContext.getFileSystemWithConfiguredFailoverProxyProvider(); + } + } + + /** + * If NoNamenodesAvailableException occurs due to + * {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception}, + * should rotated Cache. + */ + @Test + public void testShouldRotatedCache() throws Exception { + // 2 namenodes: 1 active, 1 standby. + // But there is no active namenode in router cache. + initEnv(0, false); + // At this time, the router has recorded 2 standby namenodes in memory. + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false)); + + Path path = new Path("/test.file"); + // The first create operation will cause NoNamenodesAvailableException and RotatedCache. + // After retrying, create and complete operation will be executed successfully. + fileSystem.create(path); + assertEquals(1, rpcMetrics.getProxyOpNoNamenodes()); + + // At this time, the router has recorded 2 standby namenodes in memory, + // the operation can be successful without waiting for the router load cache. + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false)); + } + + /** + * If a request still fails even if it is sent to active, + * then the change operation itself is illegal, + * the cache should not be rotated due to illegal operations. + */ + @Test + public void testShouldNotBeRotatedCache() throws Exception { + testShouldRotatedCache(); + long proxyOpNoNamenodes = rpcMetrics.getProxyOpNoNamenodes(); + Path path = new Path("/test.file"); + /* + * we have put the actually active namenode at the front of the cache by rotating the cache. + * Therefore, the setPermission operation does not cause NoNamenodesAvailableException. + */ + fileSystem.setPermission(path, FsPermission.createImmutable((short)0640)); + assertEquals(proxyOpNoNamenodes, rpcMetrics.getProxyOpNoNamenodes()); + + // At this time, the router has recorded 2 standby namenodes in memory + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false)); + + /* + * Even if the router transfers the illegal request to active, + * NoNamenodesAvailableException will still be generated. + * Therefore, rotated cache is not needed. + */ + List aclSpec = Lists.newArrayList(aclEntry(DEFAULT, USER, "foo", ALL)); + try { + fileSystem.setAcl(path, aclSpec); + }catch (RemoteException e) { + assertTrue(e.getMessage().contains( + "org.apache.hadoop.hdfs.server.federation.router.NoNamenodesAvailableException: " + + "No namenodes available under nameservice ns0")); + assertTrue(e.getMessage().contains( + "org.apache.hadoop.hdfs.protocol.AclException: Invalid ACL: " + + "only directories may have a default ACL. Path: /test.file")); + } + // Retries is 2 (see FailoverOnNetworkExceptionRetry#shouldRetry, will fail + // when reties > max.attempts), so total access is 3. + assertEquals(proxyOpNoNamenodes + 3, rpcMetrics.getProxyOpNoNamenodes()); + proxyOpNoNamenodes = rpcMetrics.getProxyOpNoNamenodes(); + + // So legal operations can be accessed normally without reporting NoNamenodesAvailableException. + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false)); + fileSystem.getFileStatus(path); + assertEquals(proxyOpNoNamenodes, rpcMetrics.getProxyOpNoNamenodes()); + + // At this time, the router has recorded 2 standby namenodes in memory, + // the operation can be successful without waiting for the router load cache. + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", false)); + } + + /** + * In the observer scenario, NoNamenodesAvailableException occurs, + * the operation can be successful without waiting for the router load cache. + */ + @Test + public void testUseObserver() throws Exception { + // 4 namenodes: 2 observers, 1 active, 1 standby. + // But there is no active namenode in router cache. + initEnv(2, true); + + Path path = new Path("/"); + // At this time, the router has recorded 2 standby namenodes in memory. + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", true)); + + // The first msync operation will cause NoNamenodesAvailableException and RotatedCache. + // After retrying, msync and getFileInfo operation will be executed successfully. + fileSystem.getFileStatus(path); + assertEquals(1, rpcMetrics.getObserverProxyOps()); + assertEquals(1, rpcMetrics.getProxyOpNoNamenodes()); + + // At this time, the router has recorded 2 standby namenodes in memory, + // the operation can be successful without waiting for the router load cache. + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", true)); + } + + /** + * In a multi-observer environment, if at least one observer is normal, + * read requests can still succeed even if NoNamenodesAvailableException occurs. + */ + @Test + public void testAtLeastOneObserverNormal() throws Exception { + // 4 namenodes: 2 observers, 1 active, 1 standby. + // But there is no active namenode in router cache. + initEnv(2, true); + // Shutdown one observer. + stopObserver(1); + + /* + * The first msync operation will cause NoNamenodesAvailableException and RotatedCache. + * After retrying, msync operation will be executed successfully. + * Each read request will shuffle the observer, + * if the getFileInfo operation is sent to the downed observer, + * it will cause NoNamenodesAvailableException, + * at this time, the request can be retried to the normal observer, + * no NoNamenodesAvailableException will be generated and the operation will be successful. + */ + fileSystem.getFileStatus(new Path("/")); + assertEquals(1, rpcMetrics.getProxyOpNoNamenodes()); + assertEquals(1, rpcMetrics.getObserverProxyOps()); + + // At this time, the router has recorded 2 standby namenodes in memory, + // the operation can be successful without waiting for the router load cache. + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", true)); + } + + /** + * If all obervers are down, read requests can succeed, + * even if a NoNamenodesAvailableException occurs. + */ + @Test + public void testAllObserverAbnormality() throws Exception { + // 4 namenodes: 2 observers, 1 active, 1 standby. + // But there is no active namenode in router cache. + initEnv(2, true); + // Shutdown all observers. + stopObserver(2); + + /* + * The first msync operation will cause NoNamenodesAvailableException and RotatedCache. + * After retrying, msync operation will be executed successfully. + * The getFileInfo operation retried 2 namenodes, both causing UnavailableException, + * and continued to retry to the standby namenode, + * causing NoNamenodesAvailableException and RotatedCache, + * and the execution was successful after retrying. + */ + fileSystem.getFileStatus(new Path("/")); + assertEquals(2, rpcMetrics.getProxyOpFailureCommunicate()); + assertEquals(2, rpcMetrics.getProxyOpNoNamenodes()); + + // At this time, the router has recorded 2 standby namenodes in memory, + // the operation can be successful without waiting for the router load cache. + assertTrue(routerCacheNoActiveNamenode(routerContext, "ns0", true)); + } + + /** + * Determine whether cache of the router has an active namenode. + * + * @return true if no active namenode, otherwise false. + */ + private boolean routerCacheNoActiveNamenode( + RouterContext context, String nsId, boolean useObserver) throws IOException { + List namenodes + = context.getRouter().getNamenodeResolver().getNamenodesForNameserviceId(nsId, useObserver); + for (FederationNamenodeContext namenode : namenodes) { + if (namenode.getState() == FederationNamenodeServiceState.ACTIVE){ + return false; + } + } + return true; + } + + /** + * All routers in the cluster force loadcache. + */ + private void allRoutersLoadCache() { + for (MiniRouterDFSCluster.RouterContext context : cluster.getRouters()) { + // Update service cache + context.getRouter().getStateStore().refreshCaches(true); + } + } + + /** + * Set the second non-observer state namenode in the router cache to active. + */ + private void setSecondNonObserverNamenodeInTheRouterCacheActive( + int numberOfObserver, boolean useObserver) throws IOException { + List ns0 = routerContext.getRouter() + .getNamenodeResolver() + .getNamenodesForNameserviceId("ns0", useObserver); + + String nsId = ns0.get(numberOfObserver+1).getNamenodeId(); + cluster.switchToActive("ns0", nsId); + assertEquals(ACTIVE.ordinal(), + cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState()); + + } + + /** + * All routers in the cluster force heartbeat. + */ + private void allRoutersHeartbeat() throws IOException { + for (RouterContext context : cluster.getRouters()) { + // Manually trigger the heartbeat, but the router does not manually load the cache + Collection heartbeatServices = context + .getRouter().getNamenodeHeartbeatServices(); + for (NamenodeHeartbeatService service : heartbeatServices) { + service.periodicInvoke(); + } + } + } + + /** + * Transition the active namenode in the cluster to standby. + */ + private void transitionActiveToStandby() { + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + List nnList = cluster.getNamenodes(ns); + for (MiniRouterDFSCluster.NamenodeContext namenodeContext : nnList) { + if (namenodeContext.getNamenode().isActiveState()) { + cluster.switchToStandby(ns, namenodeContext.getNamenodeId()); + } + } + } + } + } + + /** + * Shutdown oberver namenode in the cluste. + * + * @param num The number of shutdown oberver. + */ + private void stopObserver(int num) { + int nnIndex; + for (nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) { + NameNode nameNode = cluster.getCluster().getNameNode(nnIndex); + if (nameNode != null && nameNode.isObserverState()) { + cluster.getCluster().shutdownNameNode(nnIndex); + num--; + if (num == 0) { + break; + } + } + } + } +}