Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:NoNamenodesAvailableException long time when ns failover #1

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,14 @@ List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
* @param routerId Unique string identifier for the router.
*/
void setRouterId(String routerId);

/**
* Rotate cache, make the current namenode have the lowest priority,
* to ensure that the current namenode will not be accessed first next time.
*
* @param nsId name service id
* @param namenode namenode contexts
* @param listObserversFirst Observer read case, observer NN will be ranked first
*/
void rotateCache(String nsId, FederationNamenodeContext namenode, boolean listObserversFirst);
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,4 +478,45 @@ private List<MembershipState> getRecentRegistrationForQuery(
public void setRouterId(String router) {
this.routerId = router;
}

/**
* Rotate cache, make the current namenode have the lowest priority,
* to ensure that the current namenode will not be accessed first next time.
*
* @param nsId name service id
* @param namenode namenode contexts
* @param listObserversFirst Observer read case, observer NN will be ranked first
*/
@Override
public void rotateCache(
String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) {
cacheNS.compute(Pair.of(nsId, listObserversFirst), (ns, namenodeContexts) -> {
if (namenodeContexts == null || namenodeContexts.size() <= 1) {
return namenodeContexts;
}
FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0);
/*
* If the first nn in the cache is active, the active nn priority cannot be lowered.
* This happens when other threads have already updated the cache.
*/
if (firstNamenodeContext.getState().equals(ACTIVE)) {
return namenodeContexts;
}
/*
* If the first nn in the cache at this time is not the nn
* that needs to be lowered in priority, there is no need to rotate.
* This happens when other threads have already rotated the cache.
*/
if (firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) {
List<FederationNamenodeContext> rotatedNnContexts = new ArrayList<>(namenodeContexts);
Collections.rotate(rotatedNnContexts, -1);
String firstNamenodeId = namenodeContexts.get(0).getNamenodeId();
LOG.info("Rotate cache of pair <ns: {}, observer first: {}>, put namenode: {} in the " +
"first position of the cache and namenode: {} in the last position of the cache",
nsId, listObserversFirst, firstNamenodeId, namenode.getNamenodeId());
return rotatedNnContexts;
}
return namenodeContexts;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,8 @@ public Object invokeMethod(
}
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
// Rotate cache so that client can retry the next namenode in the cache
this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver);
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1202,4 +1202,13 @@ public void waitClusterUp() throws IOException {
throw new IOException("Cannot wait for the namenodes", e);
}
}

/**
* Get cache flush interval in milliseconds.
*
* @return Cache flush interval in milliseconds.
*/
public long getCacheFlushInterval() {
return cacheFlushInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,11 @@ public List<String> getMountPoints(String path) throws IOException {
public void setRouterId(String router) {
}

@Override
public void rotateCache(
String nsId, FederationNamenodeContext namenode, boolean listObserversFirst) {
}

/**
* Mocks the availability of default namespace.
* @param b if true default namespace is unset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.STANDBY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby;
Expand All @@ -42,16 +44,19 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -357,6 +362,82 @@ public void testNoNamenodesAvailable() throws Exception{
assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes());
}

/**
* When failover occurs, the router may record that the ns has no active namenode.
* Only when the router updates the cache next time can the memory status be updated,
* causing the router to report NoNamenodesAvailableException for a long time.
*/
@Test
public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception {
setupCluster(false, true);
transitionClusterNSToStandby(cluster);
for (RouterContext routerContext : cluster.getRouters()) {
// Manually trigger the heartbeat
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
.getRouter().getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
// Update service cache
routerContext.getRouter().getStateStore().refreshCaches(true);
}
// Record the time after the router first updated the cache
long firstLoadTime = Time.now();
List<MiniRouterDFSCluster.NamenodeContext> namenodes = cluster.getNamenodes();

// Make sure all namenodes are in standby state
for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) {
assertEquals(STANDBY.ordinal(), namenodeContext.getNamenode().getNameNodeState());
}

Configuration conf = cluster.getRouterClientConf();
// Set dfs.client.failover.random.order false, to pick 1st router at first
conf.setBoolean("dfs.client.failover.random.order", false);

DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);

for (RouterContext routerContext : cluster.getRouters()) {
// Get the second namenode in the router cache and make it active
List<? extends FederationNamenodeContext> ns0 = routerContext.getRouter()
.getNamenodeResolver()
.getNamenodesForNameserviceId("ns0", false);

String nsId = ns0.get(1).getNamenodeId();
cluster.switchToActive("ns0", nsId);
// Manually trigger the heartbeat, but the router does not manually load the cache
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
.getRouter().getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
assertEquals(ACTIVE.ordinal(),
cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState());
}

// Get router0 metrics
FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0)
.getRouter().getRpcServer().getRPCMetrics();
// Original failures
long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();

/*
* At this time, the router has recorded 2 standby namenodes in memory,
* and the first accessed namenode is indeed standby,
* then an NoNamenodesAvailableException will be reported for the first access,
* and the next access will be successful.
*/
routerClient.getFileInfo("/");
long successReadTime = Time.now();
assertEquals(originalRouter0Failures + 1, rpcMetrics0.getProxyOpNoNamenodes());

/*
* access the active namenode without waiting for the router to update the cache,
* even if there are 2 standby states recorded in the router memory.
*/
assertTrue(successReadTime - firstLoadTime < cluster.getCacheFlushInterval());
}


@Test
public void testAsyncCallerPoolMetrics() throws Exception {
setupCluster(true, false);
Expand Down