Skip to content

Commit

Permalink
HDFS-13369. Fix for FSCK Report broken with RequestHedgingProxyProvid…
Browse files Browse the repository at this point in the history
…er (#4917)

Contributed-by: navinko <[email protected]>
  • Loading branch information
navinko authored Sep 30, 2022
1 parent e22f5e7 commit 4891bf5
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,28 @@ public static void setCallIdAndRetryCount(int cid, int rc,
Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
Preconditions.checkState(callId.get() == null);
Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
setCallIdAndRetryCountUnprotected(cid, rc, externalHandler);
}

public static void setCallIdAndRetryCountUnprotected(Integer cid, int rc,
Object externalHandler) {
callId.set(cid);
retryCount.set(rc);
EXTERNAL_CALL_HANDLER.set(externalHandler);
}

public static int getCallId() {
return callId.get() != null ? callId.get() : nextCallId();
}

public static int getRetryCount() {
return retryCount.get() != null ? retryCount.get() : 0;
}

public static Object getExternalHandler() {
return EXTERNAL_CALL_HANDLER.get();
}

private final ConcurrentMap<ConnectionId, Connection> connections =
new ConcurrentHashMap<>();
private final Object putLock = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
Expand All @@ -27,20 +26,24 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException;

import org.apache.hadoop.io.retry.MultiException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A FailoverProxyProvider implementation that technically does not "failover"
* per-se. It constructs a wrapper proxy that sends the request to ALL
Expand All @@ -55,7 +58,7 @@ public class RequestHedgingProxyProvider<T> extends
public static final Logger LOG =
LoggerFactory.getLogger(RequestHedgingProxyProvider.class);

class RequestHedgingInvocationHandler implements InvocationHandler {
class RequestHedgingInvocationHandler implements RpcInvocationHandler {

final Map<String, ProxyInfo<T>> targetProxies;
// Proxy of the active nn
Expand Down Expand Up @@ -123,11 +126,18 @@ public RequestHedgingInvocationHandler(
}
executor = Executors.newFixedThreadPool(proxies.size());
completionService = new ExecutorCompletionService<>(executor);
// Set the callId and other informations from current thread.
final int callId = Client.getCallId();
final int retryCount = Client.getRetryCount();
final Object externalHandler = Client.getExternalHandler();
for (final Map.Entry<String, ProxyInfo<T>> pEntry : targetProxies
.entrySet()) {
Callable<Object> c = new Callable<Object>() {
@Override
public Object call() throws Exception {
// Call Id and other informations from parent thread.
Client.setCallIdAndRetryCount(callId, retryCount,
externalHandler);
LOG.trace("Invoking method {} on proxy {}", method,
pEntry.getValue().proxyInfo);
return method.invoke(pEntry.getValue().proxy, args);
Expand All @@ -136,7 +146,9 @@ public Object call() throws Exception {
proxyMap.put(completionService.submit(c), pEntry.getValue());
numAttempts++;
}

// Current thread's callId will not be cleared as RPC happens in
// separate threads. Reset the CallId information Forcefully.
Client.setCallIdAndRetryCountUnprotected(null, 0, null);
Map<String, Exception> badResults = new HashMap<>();
while (numAttempts > 0) {
Future<Object> callResultFuture = completionService.take();
Expand Down Expand Up @@ -189,6 +201,18 @@ public Object call() throws Exception {
throw unwrappedException;
}
}

@Override
public void close() throws IOException {
}

@Override
public ConnectionId getConnectionId() {
if (currentUsedProxy == null) {
return null;
}
return RPC.getConnectionIdForProxy(currentUsedProxy.proxy);
}
}

/** A proxy wrapping {@link RequestHedgingInvocationHandler}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -34,6 +35,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -101,6 +103,8 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
RequestHedgingProxyProvider<ClientProtocol> provider =
new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
createFactory(badMock, goodMock));
Assert.assertTrue(Proxy.getInvocationHandler(
provider.getProxy().proxy) instanceof RpcInvocationHandler);
long[] stats = provider.getProxy().proxy.getStats();
Assert.assertTrue(stats.length == 1);
Mockito.verify(badMock).getStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public void testFormatShouldBeIgnoredForNonFileBasedDirs() throws Exception {
String localhost = "127.0.0.1";
InetSocketAddress nnAddr1 = new InetSocketAddress(localhost, 8020);
InetSocketAddress nnAddr2 = new InetSocketAddress(localhost, 8020);
HATestUtil.setFailoverConfigurations(conf, logicalName, nnAddr1, nnAddr2);
HATestUtil.setFailoverConfigurations(conf, logicalName, null,
nnAddr1, nnAddr2);

conf.set(DFS_NAMENODE_NAME_DIR_KEY,
new File(DFS_BASE_DIR, "name").getAbsolutePath());
Expand Down
Loading

0 comments on commit 4891bf5

Please sign in to comment.