diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c23c21c6dfb67..7445ca1127567 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -37,6 +37,12 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry; import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI; import java.io.FileNotFoundException; @@ -49,6 +55,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -68,6 +75,9 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil; +import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction; +import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader; import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; @@ -791,6 +801,46 @@ T invokeAtAvailableNs(RemoteMethod method, Class clazz) return invokeOnNs(method, clazz, io, nss); } + /** + * Invokes the method at default namespace, if default namespace is not + * available then at the other available namespaces. + * If the namespace is unavailable, retry with other namespaces. + * Asynchronous version of invokeAtAvailableNs method. + * @param expected return type. + * @param method the remote method. + * @return the response received after invoking method. + * @throws IOException + */ + T invokeAtAvailableNsAsync(RemoteMethod method, Class clazz) + throws IOException { + String nsId = subclusterResolver.getDefaultNamespace(); + // If default Ns is not present return result from first namespace. + Set nss = namenodeResolver.getNamespaces(); + // If no namespace is available, throw IOException. + IOException io = new IOException("No namespace available."); + + asyncComplete(null); + if (!nsId.isEmpty()) { + asyncTry(() -> { + getRPCClient().invokeSingle(nsId, method, clazz); + }); + + asyncCatch((AsyncCatchFunction)(res, ioe) -> { + if (!clientProto.isUnavailableSubclusterException(ioe)) { + LOG.debug("{} exception cannot be retried", + ioe.getClass().getSimpleName()); + throw ioe; + } + nss.removeIf(n -> n.getNameserviceId().equals(nsId)); + invokeOnNsAsync(method, clazz, io, nss); + }, IOException.class); + } else { + // If not have default NS. + invokeOnNsAsync(method, clazz, io, nss); + } + return asyncReturn(clazz); + } + /** * Invoke the method sequentially on available namespaces, * throw no namespace available exception, if no namespaces are available. @@ -824,6 +874,61 @@ T invokeOnNs(RemoteMethod method, Class clazz, IOException ioe, throw ioe; } + /** + * Invoke the method sequentially on available namespaces, + * throw no namespace available exception, if no namespaces are available. + * Asynchronous version of invokeOnNs method. + * @param method the remote method. + * @param clazz Class for the return type. + * @param ioe IOException . + * @param nss List of name spaces in the federation + * @return the response received after invoking method. + * @throws IOException + */ + T invokeOnNsAsync(RemoteMethod method, Class clazz, IOException ioe, + Set nss) throws IOException { + if (nss.isEmpty()) { + throw ioe; + } + + asyncComplete(null); + Iterator nsIterator = nss.iterator(); + asyncForEach(nsIterator, (foreach, fnInfo) -> { + String nsId = fnInfo.getNameserviceId(); + LOG.debug("Invoking {} on namespace {}", method, nsId); + asyncTry(() -> { + getRPCClient().invokeSingle(nsId, method, clazz); + asyncApply(result -> { + if (result != null) { + foreach.breakNow(); + return result; + } + return null; + }); + }); + + asyncCatch((CatchFunction)(ret, ex) -> { + LOG.debug("Failed to invoke {} on namespace {}", method, nsId, ex); + // Ignore the exception and try on other namespace, if the tried + // namespace is unavailable, else throw the received exception. + if (!clientProto.isUnavailableSubclusterException(ex)) { + throw ex; + } + return null; + }, IOException.class); + }); + + asyncApply(obj -> { + if (obj == null) { + // Couldn't get a response from any of the namespace, throw ioe. + throw ioe; + } + return obj; + }); + + return asyncReturn(clazz); + } + @Override // ClientProtocol public Token getDelegationToken(Text renewer) throws IOException { @@ -875,6 +980,10 @@ public HdfsFileStatus create(String src, FsPermission masked, */ RemoteLocation getCreateLocation(final String src) throws IOException { final List locations = getLocationsForPath(src, true); + if (isAsync()) { + getCreateLocationAsync(src, locations); + return asyncReturn(RemoteLocation.class); + } return getCreateLocation(src, locations); } @@ -911,6 +1020,44 @@ RemoteLocation getCreateLocation( return createLocation; } + /** + * Get the location to create a file. It checks if the file already existed + * in one of the locations. + * Asynchronous version of getCreateLocation method. + * + * @param src Path of the file to check. + * @param locations Prefetched locations for the file. + * @return The remote location for this file. + * @throws IOException If the file has no creation location. + */ + RemoteLocation getCreateLocationAsync( + final String src, final List locations) + throws IOException { + + if (locations == null || locations.isEmpty()) { + throw new IOException("Cannot get locations to create " + src); + } + + RemoteLocation createLocation = locations.get(0); + if (locations.size() > 1) { + asyncTry(() -> { + getExistingLocationAsync(src, locations); + asyncApply((ApplyFunction) existingLocation -> { + if (existingLocation != null) { + LOG.debug("{} already exists in {}.", src, existingLocation); + return existingLocation; + } + return createLocation; + }); + }); + asyncCatch((o, e) -> createLocation, FileNotFoundException.class); + } else { + asyncComplete(createLocation); + } + + return asyncReturn(RemoteLocation.class); + } + /** * Gets the remote location where the file exists. * @param src the name of file. @@ -932,6 +1079,31 @@ private RemoteLocation getExistingLocation(String src, return null; } + /** + * Gets the remote location where the file exists. + * Asynchronous version of getExistingLocation method. + * @param src the name of file. + * @param locations all the remote locations. + * @return the remote location of the file if it exists, else null. + * @throws IOException in case of any exception. + */ + private RemoteLocation getExistingLocationAsync(String src, + List locations) throws IOException { + RemoteMethod method = new RemoteMethod("getFileInfo", + new Class[] {String.class}, new RemoteParam()); + getRPCClient().invokeConcurrent( + locations, method, true, false, HdfsFileStatus.class); + asyncApply((ApplyFunction, Object>) results -> { + for (RemoteLocation loc : locations) { + if (results.get(loc) != null) { + return loc; + } + } + return null; + }); + return asyncReturn(RemoteLocation.class); + } + @Override // ClientProtocol public LastBlockWithStatus append(String src, final String clientName, final EnumSetWritable flag) throws IOException { @@ -1186,6 +1358,38 @@ public DatanodeInfo[] getDatanodeReport( return toArray(datanodes, DatanodeInfo.class); } + /** + * Get the datanode report with a timeout. + * Asynchronous version of the getDatanodeReport method. + * @param type Type of the datanode. + * @param requireResponse If we require all the namespaces to report. + * @param timeOutMs Time out for the reply in milliseconds. + * @return List of datanodes. + * @throws IOException If it cannot get the report. + */ + public DatanodeInfo[] getDatanodeReportAsync( + DatanodeReportType type, boolean requireResponse, long timeOutMs) + throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + Map datanodesMap = new LinkedHashMap<>(); + RemoteMethod method = new RemoteMethod("getDatanodeReport", + new Class[] {DatanodeReportType.class}, type); + + Set nss = namenodeResolver.getNamespaces(); + getRPCClient().invokeConcurrent(nss, method, requireResponse, false, + timeOutMs, DatanodeInfo[].class); + + asyncApply((ApplyFunction, + DatanodeInfo[]>) results -> { + updateDnMap(results, datanodesMap); + // Map -> Array + Collection datanodes = datanodesMap.values(); + return toArray(datanodes, DatanodeInfo.class); + }); + return asyncReturn(DatanodeInfo[].class); + } + @Override // ClientProtocol public DatanodeStorageReport[] getDatanodeStorageReport( DatanodeReportType type) throws IOException { @@ -1204,6 +1408,11 @@ public Map getDatanodeStorageReportMap( return getDatanodeStorageReportMap(type, true, -1); } + public Map getDatanodeStorageReportMapAsync( + DatanodeReportType type) throws IOException { + return getDatanodeStorageReportMapAsync(type, true, -1); + } + /** * Get the list of datanodes per subcluster. * @@ -1236,6 +1445,42 @@ public Map getDatanodeStorageReportMap( return ret; } + /** + * Get the list of datanodes per subcluster. + * Asynchronous version of getDatanodeStorageReportMap method. + * @param type Type of the datanodes to get. + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param timeOutMs Time out for the reply in milliseconds. + * @return nsId to datanode list. + * @throws IOException If the method cannot be invoked remotely. + */ + public Map getDatanodeStorageReportMapAsync( + DatanodeReportType type, boolean requireResponse, long timeOutMs) + throws IOException { + + Map ret = new LinkedHashMap<>(); + RemoteMethod method = new RemoteMethod("getDatanodeStorageReport", + new Class[] {DatanodeReportType.class}, type); + Set nss = namenodeResolver.getNamespaces(); + getRPCClient().invokeConcurrent( + nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class); + + asyncApply((ApplyFunction, + Map>) results -> { + for (Entry entry : + results.entrySet()) { + FederationNamespaceInfo ns = entry.getKey(); + String nsId = ns.getNameserviceId(); + DatanodeStorageReport[] result = entry.getValue(); + ret.put(nsId, result); + } + return ret; + }); + return asyncReturn(ret.getClass()); + } + @Override // ClientProtocol public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException { @@ -2051,6 +2296,37 @@ public DatanodeInfo[] getSlowDatanodeReport(boolean requireResponse, long timeOu return toArray(datanodes, DatanodeInfo.class); } + /** + * Get the slow running datanodes report with a timeout. + * Asynchronous version of the getSlowDatanodeReport method. + * + * @param requireResponse If we require all the namespaces to report. + * @param timeOutMs Time out for the reply in milliseconds. + * @return List of datanodes. + * @throws IOException If it cannot get the report. + */ + public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long timeOutMs) + throws IOException { + checkOperation(OperationCategory.UNCHECKED); + + Map datanodesMap = new LinkedHashMap<>(); + RemoteMethod method = new RemoteMethod("getSlowDatanodeReport"); + + Set nss = namenodeResolver.getNamespaces(); + getRPCClient().invokeConcurrent(nss, method, requireResponse, false, + timeOutMs, DatanodeInfo[].class); + + asyncApply((ApplyFunction, + DatanodeInfo[]>) results -> { + updateDnMap(results, datanodesMap); + // Map -> Array + Collection datanodes = datanodesMap.values(); + return toArray(datanodes, DatanodeInfo.class); + }); + + return asyncReturn(DatanodeInfo[].class); + } + private void updateDnMap(Map results, Map datanodesMap) { for (Entry entry : diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java new file mode 100644 index 0000000000000..72dc6815442d8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.ipc.CallerContext; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT; +import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Used to test the async functionality of {@link RouterRpcServer}. + */ +public class TestRouterAsyncRpcServer { + private static Configuration routerConf; + /** Federated HDFS cluster. */ + private static MiniRouterDFSCluster cluster; + private static String ns0; + + /** Random Router for this federated cluster. */ + private MiniRouterDFSCluster.RouterContext router; + private FileSystem routerFs; + private RouterRpcServer asyncRouterRpcServer; + + @BeforeClass + public static void setUpCluster() throws Exception { + cluster = new MiniRouterDFSCluster(true, 1, 2, + DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); + cluster.setNumDatanodesPerNameservice(3); + + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + // Start routers with only an RPC service + routerConf = new RouterConfigBuilder() + .rpc() + .build(); + + // Reduce the number of RPC clients threads to overload the Router easy + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1); + routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration( + RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS); + cluster.addRouterOverrides(routerConf); + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + cluster.waitActiveNamespaces(); + ns0 = cluster.getNameservices().get(0); + } + + @AfterClass + public static void shutdownCluster() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + router = cluster.getRandomRouter(); + routerFs = router.getFileSystem(); + RouterRpcServer routerRpcServer = router.getRouterRpcServer(); + routerRpcServer.initAsyncThreadPool(); + RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient( + routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(), + routerRpcServer.getRPCMonitor(), + routerRpcServer.getRouterStateIdContext()); + asyncRouterRpcServer = Mockito.spy(routerRpcServer); + Mockito.when(asyncRouterRpcServer.getRPCClient()).thenReturn(asyncRpcClient); + + // Create mock locations + MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver(); + resolver.addLocation("/", ns0, "/"); + FsPermission permission = new FsPermission("705"); + routerFs.mkdirs(new Path("/testdir"), permission); + } + + @After + public void tearDown() throws IOException { + // clear client context + CallerContext.setCurrent(null); + boolean delete = routerFs.delete(new Path("/testdir")); + assertTrue(delete); + if (routerFs != null) { + routerFs.close(); + } + } + + /** + * Test that the async RPC server can invoke a method at an available Namenode. + */ + @Test + public void testInvokeAtAvailableNsAsync() throws Exception { + RemoteMethod method = new RemoteMethod("getStoragePolicies"); + asyncRouterRpcServer.invokeAtAvailableNsAsync(method, BlockStoragePolicy[].class); + BlockStoragePolicy[] storagePolicies = syncReturn(BlockStoragePolicy[].class); + assertEquals(8, storagePolicies.length); + } + + /** + * Test get create location async. + */ + @Test + public void testGetCreateLocationAsync() throws Exception { + final List locations = + asyncRouterRpcServer.getLocationsForPath("/testdir", true); + asyncRouterRpcServer.getCreateLocationAsync("/testdir", locations); + RemoteLocation remoteLocation = syncReturn(RemoteLocation.class); + assertNotNull(remoteLocation); + assertEquals(ns0, remoteLocation.getNameserviceId()); + } + + /** + * Test get datanode report async. + */ + @Test + public void testGetDatanodeReportAsync() throws Exception { + asyncRouterRpcServer.getDatanodeReportAsync( + HdfsConstants.DatanodeReportType.ALL, true, 0); + DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class); + assertEquals(3, datanodeInfos.length); + + // Get the namespace where the datanode is located + asyncRouterRpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL); + Map map = syncReturn(Map.class); + assertEquals(1, map.size()); + assertEquals(3, map.get(ns0).length); + + DatanodeInfo[] slowDatanodeReport1 = + asyncRouterRpcServer.getSlowDatanodeReport(true, 0); + + asyncRouterRpcServer.getSlowDatanodeReportAsync(true, 0); + DatanodeInfo[] slowDatanodeReport2 = syncReturn(DatanodeInfo[].class); + assertEquals(slowDatanodeReport1, slowDatanodeReport2); + } +}