From 4ef2322b6d7c1d1ae1cd9e62042f2db0ae42fc7c Mon Sep 17 00:00:00 2001 From: huhaiyang Date: Mon, 6 Nov 2023 11:20:25 +0800 Subject: [PATCH] HDFS-17243. Add the parameter storage type for getBlocks method (#6238). Contributed by Haiyang Hu. Reviewed-by: He Xiaoqiao Reviewed-by: Tao Li Signed-off-by: Shuyan Zhang --- .../router/RouterNamenodeProtocol.java | 7 +- .../federation/router/RouterRpcServer.java | 4 +- .../federation/router/TestRouterRpc.java | 6 +- ...amenodeProtocolServerSideTranslatorPB.java | 4 +- .../NamenodeProtocolTranslatorPB.java | 11 +- .../hdfs/server/balancer/Dispatcher.java | 2 +- .../server/balancer/NameNodeConnector.java | 5 +- .../server/blockmanagement/BlockManager.java | 7 +- .../hdfs/server/namenode/FSNamesystem.java | 9 +- .../server/namenode/NameNodeRpcServer.java | 4 +- .../server/protocol/NamenodeProtocol.java | 4 +- .../src/main/proto/NamenodeProtocol.proto | 1 + .../org/apache/hadoop/hdfs/TestGetBlocks.java | 110 ++++++++++++++++-- .../hdfs/server/balancer/TestBalancer.java | 2 +- .../balancer/TestBalancerWithHANameNodes.java | 2 +- 15 files changed, 142 insertions(+), 36 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java index 278d282fd7e6f..a5a047d115cd2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterNamenodeProtocol.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -53,7 +54,7 @@ public RouterNamenodeProtocol(RouterRpcServer server) { @Override public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, - long minBlockSize, long hotBlockTimeInterval) throws IOException { + long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException { rpcServer.checkOperation(OperationCategory.READ); // Get the namespace where the datanode is located @@ -79,8 +80,8 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, if (nsId != null) { RemoteMethod method = new RemoteMethod( NamenodeProtocol.class, "getBlocks", new Class[] - {DatanodeInfo.class, long.class, long.class, long.class}, - datanode, size, minBlockSize, hotBlockTimeInterval); + {DatanodeInfo.class, long.class, long.class, long.class, StorageType.class}, + datanode, size, minBlockSize, hotBlockTimeInterval, storageType); return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class); } return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index cae61b7d927dd..2aa2eae5305d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1612,9 +1612,9 @@ public DatanodeInfo[] getSlowDatanodeReport() throws IOException { @Override // NamenodeProtocol public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, - long minBlockSize, long hotBlockTimeInterval) throws IOException { + long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException { return nnProto.getBlocks(datanode, size, minBlockSize, - hotBlockTimeInterval); + hotBlockTimeInterval, storageType); } @Override // NamenodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index d44b40b052385..93e905b4eafff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -1385,9 +1385,11 @@ public void testProxyGetBlocks() throws Exception { // Verify that checking that datanode works BlocksWithLocations routerBlockLocations = - routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0); + routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0, + null); BlocksWithLocations nnBlockLocations = - nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0); + nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0, + null); BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks(); BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks(); assertEquals(nnBlocks.length, routerBlocks.length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java index e89a6b62b507d..f4025366391c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java @@ -89,7 +89,9 @@ public GetBlocksResponseProto getBlocks(RpcController unused, BlocksWithLocations blocks; try { blocks = impl.getBlocks(dnInfo, request.getSize(), - request.getMinBlockSize(), request.getTimeInterval()); + request.getMinBlockSize(), request.getTimeInterval(), + request.hasStorageType() ? + PBHelperClient.convertStorageType(request.getStorageType()): null); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java index fd40e0ecef34c..87518aa1e231a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto; @@ -101,11 +102,15 @@ public Object getUnderlyingProxyObject() { @Override public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long - minBlockSize, long timeInterval) + minBlockSize, long timeInterval, StorageType storageType) throws IOException { - GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder() + GetBlocksRequestProto.Builder builder = GetBlocksRequestProto.newBuilder() .setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size) - .setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build(); + .setMinBlockSize(minBlockSize).setTimeInterval(timeInterval); + if (storageType != null) { + builder.setStorageType(PBHelperClient.convertStorageType(storageType)); + } + GetBlocksRequestProto req = builder.build(); return PBHelper.convert(ipc(() -> rpcProxy.getBlocks(NULL_CONTROLLER, req) .getBlocks())); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 98a6d8449b629..6ad0e4d22a854 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -839,7 +839,7 @@ private long getBlockList() throws IOException, IllegalArgumentException { final long size = Math.min(getBlocksSize, blocksToReceive); final BlocksWithLocations newBlksLocs = nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize, - hotBlockTimeInterval); + hotBlockTimeInterval, storageType); if (LOG.isTraceEnabled()) { LOG.trace("getBlocks(" + getDatanodeInfo() + ", " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 34be025203d47..e8274fdbe779b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter; import org.apache.hadoop.ha.HAServiceProtocol; @@ -255,7 +256,7 @@ public URI getNameNodeUri() { /** @return blocks with locations. */ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long - minBlockSize, long timeInterval) throws IOException { + minBlockSize, long timeInterval, StorageType storageType) throws IOException { if (getBlocksRateLimiter != null) { getBlocksRateLimiter.acquire(); } @@ -274,7 +275,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long } else { nnProxy = namenode; } - return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval); + return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval, storageType); } finally { if (isRequestStandby) { LOG.info("Request #getBlocks to Standby NameNode success. " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 2351bb4782873..2d216be945772 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1720,8 +1720,8 @@ private boolean isHotBlock(BlockInfo blockInfo, long time) { /** Get all blocks with location information from a datanode. */ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode, - final long size, final long minBlockSize, final long timeInterval) throws - UnregisteredNodeException { + final long size, final long minBlockSize, final long timeInterval, + final StorageType storageType) throws UnregisteredNodeException { final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode); if (node == null) { blockLog.warn("BLOCK* getBlocks: Asking for blocks from an" + @@ -1735,10 +1735,11 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode, return new BlocksWithLocations(new BlockWithLocations[0]); } - // skip stale storage + // skip stale storage, then choose specific storage type. DatanodeStorageInfo[] storageInfos = Arrays .stream(node.getStorageInfos()) .filter(s -> !s.areBlockContentsStale()) + .filter(s -> storageType == null || s.getStorageType().equals(storageType)) .toArray(DatanodeStorageInfo[]::new); // starting from a random block diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 3d360c6d0dd2a..7918daf6b9db8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1946,10 +1946,13 @@ public boolean isInStandbyState() { * * @param datanode on which blocks are located * @param size total size of blocks - * @param minimumBlockSize + * @param minimumBlockSize each block should be of this minimum Block Size + * @param timeInterval prefer to get blocks which are belong to + * the cold files accessed before the time interval + * @param storageType the given storage type {@link StorageType} */ public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long - minimumBlockSize, long timeInterval) throws IOException { + minimumBlockSize, long timeInterval, StorageType storageType) throws IOException { OperationCategory checkOp = isGetBlocksCheckOperationEnabled ? OperationCategory.READ : OperationCategory.UNCHECKED; @@ -1958,7 +1961,7 @@ public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long try { checkOperation(checkOp); return getBlockManager().getBlocksWithLocations(datanode, size, - minimumBlockSize, timeInterval); + minimumBlockSize, timeInterval, storageType); } finally { readUnlock("getBlocks"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 4a041dbec2758..f02688d1629f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -649,7 +649,7 @@ private static UserGroupInformation getRemoteUser() throws IOException { ///////////////////////////////////////////////////// @Override // NamenodeProtocol public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long - minBlockSize, long timeInterval) + minBlockSize, long timeInterval, StorageType storageType) throws IOException { String operationName = "getBlocks"; if(size <= 0) { @@ -663,7 +663,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long checkNNStartup(); namesystem.checkSuperuserPrivilege(operationName); namesystem.checkNameNodeSafeMode("Cannot execute getBlocks"); - return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval); + return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval, storageType); } @Override // NamenodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java index 44ffb85f79ece..03ddc5ef8b1e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; @@ -76,6 +77,7 @@ public interface NamenodeProtocol { * @param minBlockSize each block should be of this minimum Block Size * @param hotBlockTimeInterval prefer to get blocks which are belong to * the cold files accessed before the time interval + * @param storageType the given storage type {@link StorageType} * @return BlocksWithLocations a list of blocks & their locations * @throws IOException if size is less than or equal to 0 or datanode does not exist @@ -83,7 +85,7 @@ public interface NamenodeProtocol { @Idempotent @ReadOnly BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long - minBlockSize, long hotBlockTimeInterval) throws IOException; + minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException; /** * Get the current block keys diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto index 32cdade055eee..29a9aa01b68d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto @@ -48,6 +48,7 @@ message GetBlocksRequestProto { // For more info refer HDFS-13356 optional uint64 minBlockSize = 3 [default = 10485760]; optional uint64 timeInterval = 4 [default = 0]; + optional StorageTypeProto storageType = 5; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java index cad7d0fb5026a..5abb8adc14e8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestGetBlocks.java @@ -20,6 +20,7 @@ import static org.junit.Assert.*; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -36,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.SafeModeAction; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; @@ -239,26 +241,29 @@ public void testGetBlocks() throws Exception { DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy(); // Should return all 13 blocks, as minBlockSize is not passed - locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks(); + locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0, + null).getBlocks(); assertEquals(blkLocsSize, locs.length); assertEquals(locs[0].getStorageIDs().length, replicationFactor); assertEquals(locs[1].getStorageIDs().length, replicationFactor); // Should return 12 blocks, as minBlockSize is blkSize - locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0).getBlocks(); + locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0, + null).getBlocks(); assertEquals(blkLocsSize - 1, locs.length); assertEquals(locs[0].getStorageIDs().length, replicationFactor); assertEquals(locs[1].getStorageIDs().length, replicationFactor); // get blocks of size BlockSize from dataNodes[0] locs = namenode.getBlocks(dataNodes[0], blkSize, - blkSize, 0).getBlocks(); + blkSize, 0, null).getBlocks(); assertEquals(locs.length, 1); assertEquals(locs[0].getStorageIDs().length, replicationFactor); // get blocks of size 1 from dataNodes[0] - locs = namenode.getBlocks(dataNodes[0], 1, 1, 0).getBlocks(); + locs = namenode.getBlocks(dataNodes[0], 1, 1, 0, + null).getBlocks(); assertEquals(locs.length, 1); assertEquals(locs[0].getStorageIDs().length, replicationFactor); @@ -283,7 +288,8 @@ public void testGetBlocks() throws Exception { // Namenode should refuse to provide block locations to the balancer // while in safemode. - locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks(); + locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0, + null).getBlocks(); assertEquals(blkLocsSize, locs.length); assertFalse(fs.isInSafeMode()); LOG.info("Entering safe mode"); @@ -310,7 +316,8 @@ private void getBlocksWithException(NamenodeProtocol namenode, // Namenode should refuse should fail LambdaTestUtils.intercept(exClass, - msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0)); + msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0, + null)); } /** @@ -450,7 +457,7 @@ public void testGetBlocksWithHotBlockTimeInterval() throws Exception { .getBlockLocations(fileNew, 0, fileLen).getLocatedBlocks(); BlockWithLocations[] locsAll = namenode.getBlocks( - dataNodes[0], fileLen*2, 0, hotInterval).getBlocks(); + dataNodes[0], fileLen*2, 0, hotInterval, null).getBlocks(); assertEquals(locsAll.length, 4); for(int i = 0; i < blockNum; i++) { @@ -461,7 +468,7 @@ public void testGetBlocksWithHotBlockTimeInterval() throws Exception { } BlockWithLocations[] locs2 = namenode.getBlocks( - dataNodes[0], fileLen*2, 0, hotInterval).getBlocks(); + dataNodes[0], fileLen*2, 0, hotInterval, null).getBlocks(); for(int i = 0; i < 2; i++) { assertTrue(belongToFile(locs2[i], locatedBlocksOld)); } @@ -508,7 +515,7 @@ public void testReadSkipStaleStorage() throws Exception { // check blocks count equals to blockNum BlockWithLocations[] blocks = namenode.getBlocks( - dataNodes[0], fileLen*2, 0, 0).getBlocks(); + dataNodes[0], fileLen*2, 0, 0, null).getBlocks(); assertEquals(blockNum, blocks.length); // calculate the block count on storage[0] @@ -524,13 +531,94 @@ public void testReadSkipStaleStorage() throws Exception { // set storage[0] stale storageInfos[0].setBlockContentsStale(true); blocks = namenode.getBlocks( - dataNodes[0], fileLen*2, 0, 0).getBlocks(); + dataNodes[0], fileLen*2, 0, 0, null).getBlocks(); assertEquals(blockNum - count, blocks.length); // set all storage stale bm0.getDatanodeManager().markAllDatanodesStale(); blocks = namenode.getBlocks( - dataNodes[0], fileLen*2, 0, 0).getBlocks(); + dataNodes[0], fileLen*2, 0, 0, null).getBlocks(); assertEquals(0, blocks.length); } + + @Test + public void testChooseSpecifyStorageType() throws Exception { + final short repFactor = (short) 1; + final int fileLen = BLOCK_SIZE; + final Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .storageTypes(new StorageType[] {StorageType.DISK, StorageType.SSD}). + storagesPerDatanode(2).build()) { + cluster.waitActive(); + + // Get storage info. + ClientProtocol client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + DatanodeInfo[] dataNodes = client.getDatanodeReport(DatanodeReportType.ALL); + BlockManager bm0 = cluster.getNamesystem(0).getBlockManager(); + DatanodeStorageInfo[] storageInfos = bm0.getDatanodeManager() + .getDatanode(dataNodes[0].getDatanodeUuid()).getStorageInfos(); + assert Arrays.stream(storageInfos) + .anyMatch(datanodeStorageInfo -> { + String storageTypeName = datanodeStorageInfo.getStorageType().name(); + return storageTypeName.equals("SSD") || storageTypeName.equals("DISK"); + }) : "No 'SSD' or 'DISK' storage types found."; + + // Create hdfs file. + Path ssdDir = new Path("/testChooseSSD"); + DistributedFileSystem fs = cluster.getFileSystem(); + Path ssdFile = new Path(ssdDir, "file"); + fs.mkdirs(ssdDir); + fs.setStoragePolicy(ssdDir, "ALL_SSD"); + DFSTestUtil.createFile(fs, ssdFile, false, 1024, fileLen, + BLOCK_SIZE, repFactor, 0, true); + DFSTestUtil.waitReplication(fs, ssdFile, repFactor); + BlockLocation[] locations = fs.getClient() + .getBlockLocations(ssdFile.toUri().getPath(), 0, Long.MAX_VALUE); + assertEquals(1, locations.length); + assertEquals("SSD", locations[0].getStorageTypes()[0].name()); + + Path diskDir = new Path("/testChooseDisk"); + fs = cluster.getFileSystem(); + Path diskFile = new Path(diskDir, "file"); + fs.mkdirs(diskDir); + fs.setStoragePolicy(diskDir, "HOT"); + DFSTestUtil.createFile(fs, diskFile, false, 1024, fileLen, + BLOCK_SIZE, repFactor, 0, true); + DFSTestUtil.waitReplication(fs, diskFile, repFactor); + locations = fs.getClient() + .getBlockLocations(diskFile.toUri().getPath(), 0, Long.MAX_VALUE); + assertEquals(1, locations.length); + assertEquals("DISK", locations[0].getStorageTypes()[0].name()); + + InetSocketAddress addr = new InetSocketAddress("localhost", + cluster.getNameNodePort()); + NamenodeProtocol namenode = NameNodeProxies.createProxy(conf, + DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy(); + + // Check blocks count equals to blockNum. + // If StorageType is not specified will get all blocks. + BlockWithLocations[] blocks = namenode.getBlocks( + dataNodes[0], fileLen * 2, 0, 0, + null).getBlocks(); + assertEquals(2, blocks.length); + + // Check the count of blocks with a StorageType of DISK. + blocks = namenode.getBlocks( + dataNodes[0], fileLen * 2, 0, 0, + StorageType.DISK).getBlocks(); + assertEquals(1, blocks.length); + assertEquals("DISK", blocks[0].getStorageTypes()[0].name()); + + // Check the count of blocks with a StorageType of SSD. + blocks = namenode.getBlocks( + dataNodes[0], fileLen * 2, 0, 0, + StorageType.SSD).getBlocks(); + assertEquals(1, blocks.length); + assertEquals("SSD", blocks[0].getStorageTypes()[0].name()); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 2a56c25d0d46e..23d1cb441bb8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -1891,7 +1891,7 @@ public BlocksWithLocations answer(InvocationOnMock invocation) numGetBlocksCalls.incrementAndGet(); return blk; }}).when(fsnSpy).getBlocks(any(DatanodeID.class), - anyLong(), anyLong(), anyLong()); + anyLong(), anyLong(), anyLong(), any(StorageType.class)); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index d69051c8d7af7..dbd76ee614515 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -277,7 +277,7 @@ private void testBalancerWithObserver(boolean withObserverFailure) int expectedObserverIdx = withObserverFailure ? 3 : 2; int expectedCount = (i == expectedObserverIdx) ? 2 : 0; verify(namesystemSpies.get(i), times(expectedCount)) - .getBlocks(any(), anyLong(), anyLong(), anyLong()); + .getBlocks(any(), anyLong(), anyLong(), anyLong(), any()); } } finally { if (qjmhaCluster != null) {