Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New rbf nonnavia #2

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -146,4 +146,13 @@ List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
* @param routerId Unique string identifier for the router.
*/
void setRouterId(String routerId);

/**
* Rotate cache, make the current namenode have the lowest priority,
* to ensure that the current namenode will not be accessed first next time.
*
* @param nsId name service id
* @param namenode namenode contexts
*/
void rotateCache(String nsId, FederationNamenodeContext namenode);
}
Original file line number Diff line number Diff line change
@@ -478,4 +478,40 @@ private List<MembershipState> getRecentRegistrationForQuery(
public void setRouterId(String router) {
this.routerId = router;
}

/**
* Rotate cache, make the current namenode have the lowest priority,
* to ensure that the current namenode will not be accessed first next time.
*
* @param nsId name service id
* @param namenode namenode contexts
*/
@Override
public synchronized void rotateCache(String nsId, FederationNamenodeContext namenode) {
cacheNS.compute(Pair.of(nsId, false), (ns, namenodeContexts) -> {
if (namenodeContexts == null || namenodeContexts.size() <= 1) {
return namenodeContexts;
}
FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0);
/*
* If the first nn in the cache is active, the active nn priority cannot be lowered.
* This happens when other threads have already updated the cache.
*/
if(firstNamenodeContext.getState().equals(ACTIVE)){
return namenodeContexts;
}
/*
* If the first nn in the cache at this time is not the nn
* that needs to be lowered in priority, there is no need to rotate.
* This happens when other threads have already rotated the cache.
*/
if(firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())){
List<FederationNamenodeContext> rotatedNnContexts = new ArrayList<>(namenodeContexts);
Collections.rotate(rotatedNnContexts, -1);
return rotatedNnContexts;
}else {
return namenodeContexts;
}
});
}
}
Original file line number Diff line number Diff line change
@@ -599,6 +599,9 @@ public Object invokeMethod(
}
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
if (this.namenodeResolver != null) {
this.namenodeResolver.rotateCache(nsId, namenode);
}
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
} else {
Original file line number Diff line number Diff line change
@@ -1202,4 +1202,8 @@ public void waitClusterUp() throws IOException {
throw new IOException("Cannot wait for the namenodes", e);
}
}

public long getCacheFlushInterval() {
return cacheFlushInterval;
}
}
Original file line number Diff line number Diff line change
@@ -397,6 +397,10 @@ public List<String> getMountPoints(String path) throws IOException {
public void setRouterId(String router) {
}

@Override
public void rotateCache(String nsId, FederationNamenodeContext namenode) {
}

/**
* Mocks the availability of default namespace.
* @param b if true default namespace is unset.
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.STANDBY;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateThrowExceptionRouterRpcServer;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby;
@@ -42,16 +44,19 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
@@ -357,6 +362,82 @@ public void testNoNamenodesAvailable() throws Exception{
assertEquals(originalRouter0Failures, rpcMetrics0.getProxyOpNoNamenodes());
}

/**
* When failover occurs, the router may record that the ns has no active namenode.
* Only when the router updates the cache next time can the memory status be updated,
* causing the router to report NoNamenodesAvailableException for a long time.
*/
@Test
public void testNoNamenodesAvailableLongTimeWhenNsFailover() throws Exception {
setupCluster(false, true);
transitionClusterNSToStandby(cluster);
for (RouterContext routerContext : cluster.getRouters()) {
// Manually trigger the heartbeat
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
.getRouter().getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
// Update service cache
routerContext.getRouter().getStateStore().refreshCaches(true);
}
// Record the time after the router first updated the cache
long firstLoadTime = Time.now();
List<MiniRouterDFSCluster.NamenodeContext> namenodes = cluster.getNamenodes();

// Make sure all namenodes are in standby state
for (MiniRouterDFSCluster.NamenodeContext namenodeContext : namenodes) {
assertEquals(STANDBY.ordinal(), namenodeContext.getNamenode().getNameNodeState());
}

Configuration conf = cluster.getRouterClientConf();
// Set dfs.client.failover.random.order false, to pick 1st router at first
conf.setBoolean("dfs.client.failover.random.order", false);

DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);

for (RouterContext routerContext : cluster.getRouters()) {
// Get the second namenode in the router cache and make it active
List<? extends FederationNamenodeContext> ns0 = routerContext.getRouter()
.getNamenodeResolver()
.getNamenodesForNameserviceId("ns0", false);

String nsId = ns0.get(1).getNamenodeId();
cluster.switchToActive("ns0", nsId);
// Manually trigger the heartbeat, but the router does not manually load the cache
Collection<NamenodeHeartbeatService> heartbeatServices = routerContext
.getRouter().getNamenodeHeartbeatServices();
for (NamenodeHeartbeatService service : heartbeatServices) {
service.periodicInvoke();
}
assertEquals(ACTIVE.ordinal(),
cluster.getNamenode("ns0", nsId).getNamenode().getNameNodeState());
}

// Get router0 metrics
FederationRPCMetrics rpcMetrics0 = cluster.getRouters().get(0)
.getRouter().getRpcServer().getRPCMetrics();
// Original failures
long originalRouter0Failures = rpcMetrics0.getProxyOpNoNamenodes();

/*
* At this time, the router has recorded 2 standby namenodes in memory,
* and the first accessed namenode is indeed standby,
* then an NoNamenodesAvailableException will be reported for the first access,
* and the next access will be successful.
*/
routerClient.getFileInfo("/");
long successReadTime = Time.now();
assertEquals(originalRouter0Failures + 1, rpcMetrics0.getProxyOpNoNamenodes());

/*
* access the active namenode without waiting for the router to update the cache,
* even if there are 2 standby states recorded in the router memory.
*/
assertTrue(successReadTime - firstLoadTime < cluster.getCacheFlushInterval());
}


@Test
public void testAsyncCallerPoolMetrics() throws Exception {
setupCluster(true, false);