From 915f8d3992781cca6c4821e017c4e1e4e319c66b Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 10 Oct 2024 20:29:32 +0800 Subject: [PATCH 01/10] HDFS-17601. [ARR] RouterRpcServer supports asynchronous rpc. --- .../federation/router/RouterRpcServer.java | 269 ++++++++++++++++++ 1 file changed, 269 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c23c21c6dfb67..30c6cdcf2f16a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -37,6 +37,13 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption; +import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient.isExpectedClass; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; import java.io.FileNotFoundException; @@ -49,6 +56,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -68,6 +76,8 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; +import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; @@ -791,6 +801,46 @@ T invokeAtAvailableNs(RemoteMethod method, Class clazz) return invokeOnNs(method, clazz, io, nss); } + /** + * Invokes the method at default namespace, if default namespace is not + * available then at the other available namespaces. + * If the namespace is unavailable, retry with other namespaces. + * Asynchronous version of invokeAtAvailableNs method. + * @param expected return type. + * @param method the remote method. + * @return the response received after invoking method. + * @throws IOException + */ + T invokeAtAvailableNsAsync(RemoteMethod method, Class clazz) + throws IOException { + String nsId = subclusterResolver.getDefaultNamespace(); + // If default Ns is not present return result from first namespace. + Set nss = namenodeResolver.getNamespaces(); + // If no namespace is available, throw IOException. + IOException io = new IOException("No namespace available."); + + asyncComplete(null); + if (!nsId.isEmpty()) { + asyncTry(() -> { + rpcClient.invokeSingle(nsId, method, clazz); + }); + + asyncCatch((AsyncCatchFunction)(res, ioe) -> { + if (!clientProto.isUnavailableSubclusterException(ioe)) { + LOG.debug("{} exception cannot be retried", + ioe.getClass().getSimpleName()); + throw ioe; + } + nss.removeIf(n -> n.getNameserviceId().equals(nsId)); + invokeOnNs(method, clazz, io, nss); + }, IOException.class); + } else { + // If not have default NS. + invokeOnNsAsync(method, clazz, io, nss); + } + return asyncReturn(clazz); + } + /** * Invoke the method sequentially on available namespaces, * throw no namespace available exception, if no namespaces are available. @@ -824,6 +874,60 @@ T invokeOnNs(RemoteMethod method, Class clazz, IOException ioe, throw ioe; } + /** + * Invoke the method sequentially on available namespaces, + * throw no namespace available exception, if no namespaces are available. + * Asynchronous version of invokeOnNs method. + * @param method the remote method. + * @param clazz Class for the return type. + * @param ioe IOException . + * @param nss List of name spaces in the federation + * @return the response received after invoking method. + * @throws IOException + */ + T invokeOnNsAsync(RemoteMethod method, Class clazz, IOException ioe, + Set nss) throws IOException { + if (nss.isEmpty()) { + throw ioe; + } + + asyncComplete(null); + Iterator nsIterator = nss.iterator(); + asyncForEach(nsIterator, (foreach, fnInfo) -> { + String nsId = fnInfo.getNameserviceId(); + LOG.debug("Invoking {} on namespace {}", method, nsId); + asyncTry(() -> { + rpcClient.invokeSingle(nsId, method, clazz); + asyncApply(result -> { + if (result != null && isExpectedClass(clazz, result)) { + foreach.breakNow(); + return result; + } + return null; + }); + }); + + asyncCatch((AsyncCatchFunction)(ret, ex) -> { + LOG.debug("Failed to invoke {} on namespace {}", method, nsId, ex); + // Ignore the exception and try on other namespace, if the tried + // namespace is unavailable, else throw the received exception. + if (!clientProto.isUnavailableSubclusterException(ex)) { + throw ex; + } + }, IOException.class); + }); + + asyncApply(obj -> { + if (obj == null) { + // Couldn't get a response from any of the namespace, throw ioe. + throw ioe; + } + return obj; + }); + + return asyncReturn(clazz); + } + @Override // ClientProtocol public Token getDelegationToken(Text renewer) throws IOException { @@ -875,6 +979,10 @@ public HdfsFileStatus create(String src, FsPermission masked, */ RemoteLocation getCreateLocation(final String src) throws IOException { final List locations = getLocationsForPath(src, true); + if (isAsync()) { + getCreateLocationAsync(src, locations); + return asyncReturn(RemoteLocation.class); + } return getCreateLocation(src, locations); } @@ -911,6 +1019,43 @@ RemoteLocation getCreateLocation( return createLocation; } + /** + * Get the location to create a file. It checks if the file already existed + * in one of the locations. + * + * @param src Path of the file to check. + * @param locations Prefetched locations for the file. + * @return The remote location for this file. + * @throws IOException If the file has no creation location. + */ + RemoteLocation getCreateLocationAsync( + final String src, final List locations) + throws IOException { + + if (locations == null || locations.isEmpty()) { + throw new IOException("Cannot get locations to create " + src); + } + + RemoteLocation createLocation = locations.get(0); + if (locations.size() > 1) { + asyncTry(() -> { + getExistingLocationAsync(src, locations); + asyncApply((ApplyFunction) existingLocation -> { + if (existingLocation != null) { + LOG.debug("{} already exists in {}.", src, existingLocation); + return existingLocation; + } + return createLocation; + }); + }); + asyncCatch((o, e) -> createLocation, FileNotFoundException.class); + } else { + asyncComplete(createLocation); + } + + return asyncReturn(RemoteLocation.class); + } + /** * Gets the remote location where the file exists. * @param src the name of file. @@ -932,6 +1077,31 @@ private RemoteLocation getExistingLocation(String src, return null; } + /** + * Gets the remote location where the file exists. + * Asynchronous version of getExistingLocation method. + * @param src the name of file. + * @param locations all the remote locations. + * @return the remote location of the file if it exists, else null. + * @throws IOException in case of any exception. + */ + private RemoteLocation getExistingLocationAsync(String src, + List locations) throws IOException { + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class[] {String.class}, new RemoteParam()); + rpcClient.invokeConcurrent( + locations, method, true, false, HdfsFileStatus.class); + asyncApply((ApplyFunction, Object>) results -> { + for (RemoteLocation loc : locations) { + if (results.get(loc) != null) { + return loc; + } + } + return null; + }); + return asyncReturn(null); + } + @Override // ClientProtocol public LastBlockWithStatus append(String src, final String clientName, final EnumSetWritable flag) throws IOException { @@ -1186,6 +1356,38 @@ public DatanodeInfo[] getDatanodeReport( return toArray(datanodes, DatanodeInfo.class); } + /** + * Get the datanode report with a timeout. + * Asynchronous version of the getDatanodeReport method. + * @param type Type of the datanode. + * @param requireResponse If we require all the namespaces to report. + * @param timeOutMs Time out for the reply in milliseconds. + * @return List of datanodes. + * @throws IOException If it cannot get the report. + */ + public DatanodeInfo[] getDatanodeReportAsync( + DatanodeReportType type, boolean requireResponse, long timeOutMs) + throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + Map datanodesMap = new LinkedHashMap<>(); + RemoteMethod method = new RemoteMethod("getDatanodeReport", + new Class[] {DatanodeReportType.class}, type); + + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, requireResponse, false, + timeOutMs, DatanodeInfo[].class); + + asyncApply((ApplyFunction, + DatanodeInfo[]>) results -> { + updateDnMap(results, datanodesMap); + // Map -> Array + Collection datanodes = datanodesMap.values(); + return toArray(datanodes, DatanodeInfo.class); + }); + return asyncReturn(DatanodeInfo[].class); + } + @Override // ClientProtocol public DatanodeStorageReport[] getDatanodeStorageReport( DatanodeReportType type) throws IOException { @@ -1236,6 +1438,42 @@ public Map getDatanodeStorageReportMap( return ret; } + /** + * Get the list of datanodes per subcluster. + * Asynchronous version of getDatanodeStorageReportMap method. + * @param type Type of the datanodes to get. + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param timeOutMs Time out for the reply in milliseconds. + * @return nsId to datanode list. + * @throws IOException If the method cannot be invoked remotely. + */ + public Map getDatanodeStorageReportMapAsync( + DatanodeReportType type, boolean requireResponse, long timeOutMs) + throws IOException { + + Map ret = new LinkedHashMap<>(); + RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", + new Class[] {DatanodeReportType.class}, type); + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent( + nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class); + + asyncApply((ApplyFunction, + Map>) results -> { + for (Entry entry : + results.entrySet()) { + FederationNamespaceInfo ns = entry.getKey(); + String nsId = ns.getNameserviceId(); + DatanodeStorageReport[] result = entry.getValue(); + ret.put(nsId, result); + } + return ret; + }); + return asyncReturn(ret.getClass()); + } + @Override // ClientProtocol public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException { @@ -2051,6 +2289,37 @@ public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOu return toArray(datanodes, DatanodeInfo.class); } + /** + * Get the slow running datanodes report with a timeout. + * Asynchronous version of the getSlowDatanodeReport method. + * + * @param requireResponse If we require all the namespaces to report. + * @param timeOutMs Time out for the reply in milliseconds. + * @return List of datanodes. + * @throws IOException If it cannot get the report. + */ + public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long timeOutMs) + throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + Map datanodesMap = new LinkedHashMap<>(); + RemoteMethod method = new RemoteMethod("getSlowDatanodeReport"); + + Set nss = namenodeResolver.getNamespaces(); + rpcClient.invokeConcurrent(nss, method, requireResponse, false, + timeOutMs, DatanodeInfo[].class); + + asyncApply((ApplyFunction, + DatanodeInfo[]>) results -> { + updateDnMap(results, datanodesMap); + // Map -> Array + Collection datanodes = datanodesMap.values(); + return toArray(datanodes, DatanodeInfo.class); + }); + + return asyncReturn(DatanodeInfo[].class); + } + private void updateDnMap(Map results, Map datanodesMap) { for (Entry entry : From 49d758770a01e77bc4009a130ecc0f602e58f17a Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Fri, 11 Oct 2024 11:22:01 +0800 Subject: [PATCH 02/10] use CatchFuntion. --- .../hdfs/server/federation/router/RouterRpcServer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 30c6cdcf2f16a..bc723cad66c14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; @@ -832,7 +833,7 @@ T invokeAtAvailableNsAsync(RemoteMethod method, Class clazz) throw ioe; } nss.removeIf(n -> n.getNameserviceId().equals(nsId)); - invokeOnNs(method, clazz, io, nss); + invokeOnNsAsync(method, clazz, io, nss); }, IOException.class); } else { // If not have default NS. @@ -907,13 +908,14 @@ T invokeOnNsAsync(RemoteMethod method, Class clazz, IOException ioe, }); }); - asyncCatch((AsyncCatchFunction)(ret, ex) -> { + asyncCatch((CatchFunction)(ret, ex) -> { LOG.debug("Failed to invoke {} on namespace {}", method, nsId, ex); // Ignore the exception and try on other namespace, if the tried // namespace is unavailable, else throw the received exception. if (!clientProto.isUnavailableSubclusterException(ex)) { throw ex; } + return null; }, IOException.class); }); From 7d6bff243ca8443bc27a19d142549c025460f04e Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Fri, 11 Oct 2024 13:49:05 +0800 Subject: [PATCH 03/10] fix blank lines. --- .../hdfs/server/federation/router/RouterRpcServer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index bc723cad66c14..9252b96d9800c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1379,9 +1379,9 @@ public DatanodeInfo[] getDatanodeReportAsync( Set nss = namenodeResolver.getNamespaces(); rpcClient.invokeConcurrent(nss, method, requireResponse, false, timeOutMs, DatanodeInfo[].class); - + asyncApply((ApplyFunction, - DatanodeInfo[]>) results -> { + DatanodeInfo[]>) results -> { updateDnMap(results, datanodesMap); // Map -> Array Collection datanodes = datanodesMap.values(); @@ -2293,7 +2293,7 @@ public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOu /** * Get the slow running datanodes report with a timeout. - * Asynchronous version of the getSlowDatanodeReport method. + * Asynchronous version of the getSlowDatanodeReport method. * * @param requireResponse If we require all the namespaces to report. * @param timeOutMs Time out for the reply in milliseconds. @@ -2310,7 +2310,7 @@ public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long t Set nss = namenodeResolver.getNamespaces(); rpcClient.invokeConcurrent(nss, method, requireResponse, false, timeOutMs, DatanodeInfo[].class); - + asyncApply((ApplyFunction, DatanodeInfo[]>) results -> { updateDnMap(results, datanodesMap); From c5f26e25549ba4ba3c16ea228927b7679b37a969 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Fri, 11 Oct 2024 14:15:08 +0800 Subject: [PATCH 04/10] fix return value of getExistingLocationAsync --- .../hadoop/hdfs/server/federation/router/RouterRpcServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 9252b96d9800c..54140d77952d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1101,7 +1101,7 @@ private RemoteLocation getExistingLocationAsync(String src, } return null; }); - return asyncReturn(null); + return asyncReturn(RemoteLocation.class); } @Override // ClientProtocol From 5123f66cbbb10667892b7816763efe082c1f2c60 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Fri, 11 Oct 2024 16:31:50 +0800 Subject: [PATCH 05/10] fix javadoc --- .../hadoop/hdfs/server/federation/router/RouterRpcServer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 54140d77952d1..cc63d9036b2f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1024,6 +1024,7 @@ RemoteLocation getCreateLocation( /** * Get the location to create a file. It checks if the file already existed * in one of the locations. + * Asynchronous version of getCreateLocation method. * * @param src Path of the file to check. * @param locations Prefetched locations for the file. From cb4d99a8f0727ece662a62208c43e22f313e4063 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Mon, 14 Oct 2024 11:10:16 +0800 Subject: [PATCH 06/10] use getRPCClient() instead of rpcClient in async methods,. --- .../server/federation/router/RouterRpcServer.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index cc63d9036b2f3..febaedec21706 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -823,7 +823,7 @@ T invokeAtAvailableNsAsync(RemoteMethod method, Class clazz) asyncComplete(null); if (!nsId.isEmpty()) { asyncTry(() -> { - rpcClient.invokeSingle(nsId, method, clazz); + getRPCClient().invokeSingle(nsId, method, clazz); }); asyncCatch((AsyncCatchFunction)(res, ioe) -> { @@ -898,7 +898,7 @@ T invokeOnNsAsync(RemoteMethod method, Class clazz, IOException ioe, String nsId = fnInfo.getNameserviceId(); LOG.debug("Invoking {} on namespace {}", method, nsId); asyncTry(() -> { - rpcClient.invokeSingle(nsId, method, clazz); + getRPCClient().invokeSingle(nsId, method, clazz); asyncApply(result -> { if (result != null && isExpectedClass(clazz, result)) { foreach.breakNow(); @@ -1092,7 +1092,7 @@ private RemoteLocation getExistingLocationAsync(String src, List locations) throws IOException { RemoteMethod method = new RemoteMethod("getFileInfo", new Class[] {String.class}, new RemoteParam()); - rpcClient.invokeConcurrent( + getRPCClient().invokeConcurrent( locations, method, true, false, HdfsFileStatus.class); asyncApply((ApplyFunction, Object>) results -> { for (RemoteLocation loc : locations) { @@ -1378,7 +1378,7 @@ public DatanodeInfo[] getDatanodeReportAsync( new Class[] {DatanodeReportType.class}, type); Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, requireResponse, false, + getRPCClient().invokeConcurrent(nss, method, requireResponse, false, timeOutMs, DatanodeInfo[].class); asyncApply((ApplyFunction, @@ -1460,7 +1460,7 @@ public Map getDatanodeStorageReportMapAsync( RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", new Class[] {DatanodeReportType.class}, type); Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent( + getRPCClient().invokeConcurrent( nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class); asyncApply((ApplyFunction, @@ -2309,7 +2309,7 @@ public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long t RemoteMethod method = new RemoteMethod("getSlowDatanodeReport"); Set nss = namenodeResolver.getNamespaces(); - rpcClient.invokeConcurrent(nss, method, requireResponse, false, + getRPCClient().invokeConcurrent(nss, method, requireResponse, false, timeOutMs, DatanodeInfo[].class); asyncApply((ApplyFunction, From ee52b3cb56fabcc8aef74fa2718e54bcf47a60e4 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Wed, 30 Oct 2024 11:08:29 +0800 Subject: [PATCH 07/10] =?UTF-8?q?remove=20unnecessary=20condition=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hadoop/hdfs/server/federation/router/RouterRpcServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index febaedec21706..807c2ce3b19bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -900,7 +900,7 @@ T invokeOnNsAsync(RemoteMethod method, Class clazz, IOException ioe, asyncTry(() -> { getRPCClient().invokeSingle(nsId, method, clazz); asyncApply(result -> { - if (result != null && isExpectedClass(clazz, result)) { + if (result != null) { foreach.breakNow(); return result; } From e0ca34bcbd126566bbff2f1f1c641b95e949beda Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 31 Oct 2024 11:03:03 +0800 Subject: [PATCH 08/10] add getDatanodeStorageReportMapAsync with one paramerter. --- .../hdfs/server/federation/router/RouterRpcServer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 807c2ce3b19bf..3e093e6d0bc45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1409,6 +1409,11 @@ public Map getDatanodeStorageReportMap( return getDatanodeStorageReportMap(type, true, -1); } + public Map getDatanodeStorageReportMapAsync( + DatanodeReportType type) throws IOException { + return getDatanodeStorageReportMapAsync(type, true, -1); + } + /** * Get the list of datanodes per subcluster. * From a703b8956784a56eedf73274458a273a9a70a3cc Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Mon, 4 Nov 2024 10:48:10 +0800 Subject: [PATCH 09/10] fix unused import --- .../hadoop/hdfs/server/federation/router/RouterRpcServer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 3e093e6d0bc45..7445ca1127567 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -37,7 +37,6 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption; -import static org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient.isExpectedClass; import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch; import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; From bee8a8d1f98794ef8cb81d4bccdad0ac223e72a9 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Fri, 8 Nov 2024 10:58:09 +0800 Subject: [PATCH 10/10] add UT. --- .../router/TestRouterAsyncRpcServer.java | 191 ++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java new file mode 100644 index 0000000000000..72dc6815442d8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.ipc.CallerContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Used to test the async functionality of {@link RouterRpcServer}. + */ +public class TestRouterAsyncRpcServer { + private static Configuration routerConf; + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + private static String ns0; + + /** Random Router for this federated cluster. */ + private MiniRouterDFSCluster.RouterContext router; + private FileSystem routerFs; + private RouterRpcServer asyncRouterRpcServer; + + @BeforeClass + public static void setUpCluster() throws Exception { + cluster = new MiniRouterDFSCluster(true, 1, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); + cluster.setNumDatanodesPerNameservice(3); + + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + // Start routers with only an RPC service + routerConf = new RouterConfigBuilder() + .rpc() + .build(); + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + cluster.waitActiveNamespaces(); + ns0 = cluster.getNameservices().get(0); + } + + @AfterClass + public static void shutdownCluster() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + router = cluster.getRandomRouter(); + routerFs = router.getFileSystem(); + RouterRpcServer routerRpcServer = router.getRouterRpcServer(); + routerRpcServer.initAsyncThreadPool(); + RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( + routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), + routerRpcServer.getRPCMonitor(), + routerRpcServer.getRouterStateIdContext()); + asyncRouterRpcServer = Mockito.spy(routerRpcServer); + Mockito.when(asyncRouterRpcServer.getRPCClient()).thenReturn(asyncRpcClient); + + // Create mock locations + MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); + resolver.addLocation("/", ns0, "/"); + FsPermission permission = new FsPermission("705"); + routerFs.mkdirs(new Path("/testdir"), permission); + } + + @After + public void tearDown() throws IOException { + // clear client context + CallerContext.setCurrent(null); + boolean delete = routerFs.delete(new Path("/testdir")); + assertTrue(delete); + if (routerFs != null) { + routerFs.close(); + } + } + + /** + * Test that the async RPC server can invoke a method at an available Namenode. + */ + @Test + public void testInvokeAtAvailableNsAsync() throws Exception { + RemoteMethod method = new RemoteMethod("getStoragePolicies"); + asyncRouterRpcServer.invokeAtAvailableNsAsync(method, BlockStoragePolicy[].class); + BlockStoragePolicy[] storagePolicies = syncReturn(BlockStoragePolicy[].class); + assertEquals(8, storagePolicies.length); + } + + /** + * Test get create location async. + */ + @Test + public void testGetCreateLocationAsync() throws Exception { + final List locations = + asyncRouterRpcServer.getLocationsForPath("/testdir", true); + asyncRouterRpcServer.getCreateLocationAsync("/testdir", locations); + RemoteLocation remoteLocation = syncReturn(RemoteLocation.class); + assertNotNull(remoteLocation); + assertEquals(ns0, remoteLocation.getNameserviceId()); + } + + /** + * Test get datanode report async. + */ + @Test + public void testGetDatanodeReportAsync() throws Exception { + asyncRouterRpcServer.getDatanodeReportAsync( + HdfsConstants.DatanodeReportType.ALL, true, 0); + DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class); + assertEquals(3, datanodeInfos.length); + + // Get the namespace where the datanode is located + asyncRouterRpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL); + Map map = syncReturn(Map.class); + assertEquals(1, map.size()); + assertEquals(3, map.get(ns0).length); + + DatanodeInfo[] slowDatanodeReport1 = + asyncRouterRpcServer.getSlowDatanodeReport(true, 0); + + asyncRouterRpcServer.getSlowDatanodeReportAsync(true, 0); + DatanodeInfo[] slowDatanodeReport2 = syncReturn(DatanodeInfo[].class); + assertEquals(slowDatanodeReport1, slowDatanodeReport2); + } +}