Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17601. [ARR] RouterRpcServer supports asynchronous rpc. #7108

Merged
merged 10 commits into from
Nov 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;

import java.io.FileNotFoundException;
Expand All @@ -49,6 +55,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -68,6 +75,9 @@
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -791,6 +801,46 @@ <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
return invokeOnNs(method, clazz, io, nss);
}

/**
* Invokes the method at default namespace, if default namespace is not
* available then at the other available namespaces.
* If the namespace is unavailable, retry with other namespaces.
* Asynchronous version of invokeAtAvailableNs method.
* @param <T> expected return type.
* @param method the remote method.
* @return the response received after invoking method.
* @throws IOException
*/
<T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
throws IOException {
String nsId = subclusterResolver.getDefaultNamespace();
// If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
// If no namespace is available, throw IOException.
IOException io = new IOException("No namespace available.");

asyncComplete(null);
if (!nsId.isEmpty()) {
asyncTry(() -> {
getRPCClient().invokeSingle(nsId, method, clazz);
});

asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> {
if (!clientProto.isUnavailableSubclusterException(ioe)) {
LOG.debug("{} exception cannot be retried",
ioe.getClass().getSimpleName());
throw ioe;
}
nss.removeIf(n -> n.getNameserviceId().equals(nsId));
invokeOnNsAsync(method, clazz, io, nss);
}, IOException.class);
} else {
// If not have default NS.
invokeOnNsAsync(method, clazz, io, nss);
}
return asyncReturn(clazz);
}

/**
* Invoke the method sequentially on available namespaces,
* throw no namespace available exception, if no namespaces are available.
Expand Down Expand Up @@ -824,6 +874,61 @@ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
throw ioe;
}

/**
* Invoke the method sequentially on available namespaces,
* throw no namespace available exception, if no namespaces are available.
* Asynchronous version of invokeOnNs method.
* @param method the remote method.
* @param clazz Class for the return type.
* @param ioe IOException .
* @param nss List of name spaces in the federation
* @return the response received after invoking method.
* @throws IOException
*/
<T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
Set<FederationNamespaceInfo> nss) throws IOException {
if (nss.isEmpty()) {
throw ioe;
}

asyncComplete(null);
Iterator<FederationNamespaceInfo> nsIterator = nss.iterator();
asyncForEach(nsIterator, (foreach, fnInfo) -> {
String nsId = fnInfo.getNameserviceId();
LOG.debug("Invoking {} on namespace {}", method, nsId);
asyncTry(() -> {
getRPCClient().invokeSingle(nsId, method, clazz);
asyncApply(result -> {
if (result != null) {
foreach.breakNow();
return result;
}
return null;
});
});

asyncCatch((CatchFunction<T, IOException>)(ret, ex) -> {
LOG.debug("Failed to invoke {} on namespace {}", method, nsId, ex);
// Ignore the exception and try on other namespace, if the tried
// namespace is unavailable, else throw the received exception.
if (!clientProto.isUnavailableSubclusterException(ex)) {
throw ex;
}
return null;
}, IOException.class);
});

asyncApply(obj -> {
if (obj == null) {
// Couldn't get a response from any of the namespace, throw ioe.
throw ioe;
}
return obj;
});

return asyncReturn(clazz);
}

@Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
Expand Down Expand Up @@ -875,6 +980,10 @@ public HdfsFileStatus create(String src, FsPermission masked,
*/
RemoteLocation getCreateLocation(final String src) throws IOException {
final List<RemoteLocation> locations = getLocationsForPath(src, true);
if (isAsync()) {
getCreateLocationAsync(src, locations);
return asyncReturn(RemoteLocation.class);
}
return getCreateLocation(src, locations);
}

Expand Down Expand Up @@ -911,6 +1020,44 @@ RemoteLocation getCreateLocation(
return createLocation;
}

/**
* Get the location to create a file. It checks if the file already existed
* in one of the locations.
* Asynchronous version of getCreateLocation method.
*
* @param src Path of the file to check.
* @param locations Prefetched locations for the file.
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
*/
RemoteLocation getCreateLocationAsync(
final String src, final List<RemoteLocation> locations)
throws IOException {

if (locations == null || locations.isEmpty()) {
throw new IOException("Cannot get locations to create " + src);
}

RemoteLocation createLocation = locations.get(0);
if (locations.size() > 1) {
asyncTry(() -> {
getExistingLocationAsync(src, locations);
asyncApply((ApplyFunction<RemoteLocation, RemoteLocation>) existingLocation -> {
if (existingLocation != null) {
LOG.debug("{} already exists in {}.", src, existingLocation);
return existingLocation;
}
return createLocation;
});
});
asyncCatch((o, e) -> createLocation, FileNotFoundException.class);
} else {
asyncComplete(createLocation);
}

return asyncReturn(RemoteLocation.class);
}

/**
* Gets the remote location where the file exists.
* @param src the name of file.
Expand All @@ -932,6 +1079,31 @@ private RemoteLocation getExistingLocation(String src,
return null;
}

/**
* Gets the remote location where the file exists.
* Asynchronous version of getExistingLocation method.
* @param src the name of file.
* @param locations all the remote locations.
* @return the remote location of the file if it exists, else null.
* @throws IOException in case of any exception.
*/
private RemoteLocation getExistingLocationAsync(String src,
List<RemoteLocation> locations) throws IOException {
RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam());
getRPCClient().invokeConcurrent(
locations, method, true, false, HdfsFileStatus.class);
asyncApply((ApplyFunction<Map<RemoteLocation, HdfsFileStatus>, Object>) results -> {
for (RemoteLocation loc : locations) {
if (results.get(loc) != null) {
return loc;
}
}
return null;
});
return asyncReturn(RemoteLocation.class);
}

@Override // ClientProtocol
public LastBlockWithStatus append(String src, final String clientName,
final EnumSetWritable<CreateFlag> flag) throws IOException {
Expand Down Expand Up @@ -1186,6 +1358,38 @@ public DatanodeInfo[] getDatanodeReport(
return toArray(datanodes, DatanodeInfo.class);
}

/**
* Get the datanode report with a timeout.
* Asynchronous version of the getDatanodeReport method.
* @param type Type of the datanode.
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
public DatanodeInfo[] getDatanodeReportAsync(
DatanodeReportType type, boolean requireResponse, long timeOutMs)
throws IOException {
checkOperation(OperationCategory.UNCHECKED);

Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getDatanodeReport",
new Class<?>[] {DatanodeReportType.class}, type);

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
getRPCClient().invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
DatanodeInfo[]>) results -> {
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
});
return asyncReturn(DatanodeInfo[].class);
}

@Override // ClientProtocol
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
Expand All @@ -1204,6 +1408,11 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
return getDatanodeStorageReportMap(type, true, -1);
}

public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMapAsync(
DatanodeReportType type) throws IOException {
return getDatanodeStorageReportMapAsync(type, true, -1);
}

/**
* Get the list of datanodes per subcluster.
*
Expand Down Expand Up @@ -1236,6 +1445,42 @@ public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMap(
return ret;
}

/**
* Get the list of datanodes per subcluster.
* Asynchronous version of getDatanodeStorageReportMap method.
* @param type Type of the datanodes to get.
* @param requireResponse If true an exception will be thrown if all calls do
* not complete. If false exceptions are ignored and all data results
* successfully received are returned.
* @param timeOutMs Time out for the reply in milliseconds.
* @return nsId to datanode list.
* @throws IOException If the method cannot be invoked remotely.
*/
public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMapAsync(
DatanodeReportType type, boolean requireResponse, long timeOutMs)
throws IOException {

Map<String, DatanodeStorageReport[]> ret = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
new Class<?>[] {DatanodeReportType.class}, type);
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
getRPCClient().invokeConcurrent(
nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeStorageReport[]>,
Map<String, DatanodeStorageReport[]>>) results -> {
for (Entry<FederationNamespaceInfo, DatanodeStorageReport[]> entry :
results.entrySet()) {
FederationNamespaceInfo ns = entry.getKey();
String nsId = ns.getNameserviceId();
DatanodeStorageReport[] result = entry.getValue();
ret.put(nsId, result);
}
return ret;
});
return asyncReturn(ret.getClass());
}

@Override // ClientProtocol
public boolean setSafeMode(SafeModeAction action, boolean isChecked)
throws IOException {
Expand Down Expand Up @@ -2051,6 +2296,37 @@ public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOu
return toArray(datanodes, DatanodeInfo.class);
}

/**
* Get the slow running datanodes report with a timeout.
* Asynchronous version of the getSlowDatanodeReport method.
*
* @param requireResponse If we require all the namespaces to report.
* @param timeOutMs Time out for the reply in milliseconds.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long timeOutMs)
throws IOException {
checkOperation(OperationCategory.UNCHECKED);

Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");

Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
getRPCClient().invokeConcurrent(nss, method, requireResponse, false,
timeOutMs, DatanodeInfo[].class);

asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
DatanodeInfo[]>) results -> {
updateDnMap(results, datanodesMap);
// Map -> Array
Collection<DatanodeInfo> datanodes = datanodesMap.values();
return toArray(datanodes, DatanodeInfo.class);
});

return asyncReturn(DatanodeInfo[].class);
}

private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
Map<String, DatanodeInfo> datanodesMap) {
for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
Expand Down
Loading
Loading