Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Onn nonnava #3

Open
wants to merge 10 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -494,28 +494,23 @@ public void rotateCache(
if (namenodeContexts == null || namenodeContexts.size() <= 1) {
return namenodeContexts;
}
FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0);
/*
* If the first nn in the cache is active, the active nn priority cannot be lowered.
* This happens when other threads have already updated the cache.
*/
if (firstNamenodeContext.getState().equals(ACTIVE)) {
return namenodeContexts;

for (FederationNamenodeContext namenodeContext : namenodeContexts) {
if (namenodeContext.getState() == ACTIVE) {
return namenodeContexts;
}
}
/*
* If the first nn in the cache at this time is not the nn
* that needs to be lowered in priority, there is no need to rotate.
* This happens when other threads have already rotated the cache.
*/
if (firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) {
List<FederationNamenodeContext> rotatedNnContexts = new ArrayList<>(namenodeContexts);
Collections.rotate(rotatedNnContexts, -1);
String firstNamenodeId = namenodeContexts.get(0).getNamenodeId();
LOG.info("Rotate cache of pair <ns: {}, observer first: {}>, put namenode: {} in the " +
"first position of the cache and namenode: {} in the last position of the cache",
nsId, listObserversFirst, firstNamenodeId, namenode.getNamenodeId());
return rotatedNnContexts;

FederationNamenodeContext lastNamenode = namenodeContexts.get(namenodeContexts.size()-1);

if (lastNamenode.getRpcAddress().equals(namenode.getRpcAddress())) {
return namenodeContexts;
}
List<FederationNamenodeContext> rotateNamenodeContexts =
(List<FederationNamenodeContext>) namenodeContexts;
rotateNamenodeContexts.remove(namenode);
rotateNamenodeContexts.add(namenode);
LOG.info("Rotate cache of pair<{}, {}> -> {}", nsId, listObserversFirst, namenodeContexts);
return namenodeContexts;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,12 @@ private static IOException toIOException(Exception e) {
* @throws NoNamenodesAvailableException Exception that the retry policy
* generates for no available namenodes.
*/
private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
final String nsId) throws IOException {
private RetryDecision shouldRetry(
final IOException ioe, final int retryCount, final String nsId,
final FederationNamenodeContext namenode,
final boolean listObserverFirst) throws IOException {
// check for the case of cluster unavailable state
if (isClusterUnAvailable(nsId)) {
if (isClusterUnAvailable(nsId, namenode, listObserverFirst)) {
// we allow to retry once if cluster is unavailable
if (retryCount == 0) {
return RetryDecision.RETRY;
Expand Down Expand Up @@ -538,7 +540,7 @@ public Object invokeMethod(
ProxyAndInfo<?> client = connection.getClient();
final Object proxy = client.getProxy();

ret = invoke(nsId, 0, method, proxy, params);
ret = invoke(nsId, namenode, useObserver, 0, method, proxy, params);
if (failover &&
FederationNamenodeServiceState.OBSERVER != namenode.getState()) {
// Success on alternate server, update
Expand Down Expand Up @@ -594,13 +596,16 @@ public Object invokeMethod(
se.initCause(ioe);
throw se;
} else if (ioe instanceof NoNamenodesAvailableException) {
IOException cause = (IOException) ioe.getCause();
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpNoNamenodes(nsId);
}
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
// Rotate cache so that client can retry the next namenode in the cache
this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver);
if (shouldRotateCache(cause)) {
this.namenodeResolver.rotateCache(nsId, namenode, useObserver);
}
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
} else {
Expand Down Expand Up @@ -708,7 +713,9 @@ private void addClientInfoToCallerContext(UserGroupInformation ugi) {
* @return Response from the remote server
* @throws IOException If error occurs.
*/
private Object invoke(String nsId, int retryCount, final Method method,
private Object invoke(
String nsId, FederationNamenodeContext namenode, Boolean listObserverFirst,
int retryCount, final Method method,
final Object obj, final Object... params) throws IOException {
try {
return method.invoke(obj, params);
Expand All @@ -721,14 +728,14 @@ private Object invoke(String nsId, int retryCount, final Method method,
IOException ioe = (IOException) cause;

// Check if we should retry.
RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst);
if (decision == RetryDecision.RETRY) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpRetries();
}

// retry
return invoke(nsId, ++retryCount, method, obj, params);
return invoke(nsId, namenode, listObserverFirst, ++retryCount, method, obj, params);
} else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
// failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) {
Expand Down Expand Up @@ -772,13 +779,22 @@ public static boolean isUnavailableException(IOException ioe) {
* Check if the cluster of given nameservice id is available.
*
* @param nsId nameservice ID.
* @param namenode
* @param listObserverFirst
* @return true if the cluster with given nameservice id is available.
* @throws IOException if error occurs.
*/
private boolean isClusterUnAvailable(String nsId) throws IOException {
private boolean isClusterUnAvailable(
String nsId, FederationNamenodeContext namenode,
boolean listObserverFirst) throws IOException {
// Use observer and the namenode that causes the exception is an observer,
// false is returned so that the oberver can be marked as unavailable,so other observers
// or active namenode which is standby in the cache of the router can be retried.
if (listObserverFirst && namenode.getState() == FederationNamenodeServiceState.OBSERVER) {
return false;
}
List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
.getNamenodesForNameserviceId(nsId, false);

.getNamenodesForNameserviceId(nsId, listObserverFirst);
if (nnState != null) {
for (FederationNamenodeContext nnContext : nnState) {
// Once we find one NN is in active state, we assume this
Expand Down Expand Up @@ -1830,4 +1846,24 @@ private LongAccumulator getTimeOfLastCallToActive(String namespaceId) {
return lastActiveNNRefreshTimes
.computeIfAbsent(namespaceId, key -> new LongAccumulator(Math::max, 0));
}

/**
* Determine whether router rotated cache is required when NoNamenodesAvailableException occurs.
*
* @param ioe cause of the NoNamenodesAvailableException.
* @return true if NoNamenodesAvailableException occurs due to
* {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception},
* otherwise false.
*/
private boolean shouldRotateCache(IOException ioe) {
if (isUnavailableException(ioe)) {
return true;
}
if (ioe instanceof RemoteException) {
RemoteException re = (RemoteException) ioe;
ioe = re.unwrapRemoteException();
ioe = getCleanException(ioe);
}
return isUnavailableException(ioe);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ public class MiniRouterDFSCluster {
/** Mini cluster. */
private MiniDFSCluster cluster;

protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
public static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
protected static final long DEFAULT_CACHE_INTERVAL_MS =
public static final long DEFAULT_CACHE_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
/** Heartbeat interval in milliseconds. */
private long heartbeatInterval;
Expand Down Expand Up @@ -253,6 +253,19 @@ public FileSystem getFileSystemWithObserverReadProxyProvider() throws IOExceptio
return DistributedFileSystem.get(observerReadConf);
}

public FileSystem getFileSystemWithConfiguredFailoverProxyProvider() throws IOException {
conf.set(DFS_NAMESERVICES,
conf.get(DFS_NAMESERVICES)+ ",router-service");
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
getFileSystemURI().toString());
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + "router-service", ConfiguredFailoverProxyProvider.class.getName());
DistributedFileSystem.setDefaultUri(conf, "hdfs://router-service");

return DistributedFileSystem.get(conf);
}

public DFSClient getClient(UserGroupInformation user)
throws IOException, URISyntaxException, InterruptedException {

Expand Down
Loading