diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index c29d91c421af4..86faad455bf28 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -318,6 +318,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT = true; + public static final String DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED = + "dfs.namenode.reconstruct.ecblock-groups.limit.enable"; + public static final boolean DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED_DEFAULT = false; + + public static final String DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT = + "dfs.namenode.reconstruct.ecblock-groups.limit"; + public static final long DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_DEFAULT = 1000; + @Deprecated public static final String DFS_WEBHDFS_USER_PATTERN_KEY = HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index fdf300f913dd0..ac0fa3a46b046 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.util.FoldedTreeSet; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.server.namenode.CacheManager; @@ -469,6 +470,12 @@ public long getTotalECBlockGroups() { /** Storages accessible from multiple DNs. */ private final ProvidedStorageMap providedStorageMap; + /** Whether to enable limit EC block reconstruct.*/ + private volatile boolean reconstructECBlockGroupsLimitEnabled; + private volatile long reconstructECBlockGroupsLimit; + private final LightWeightHashSet reconstructECBlockGroups = + new LightWeightHashSet<>(); + public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -625,6 +632,13 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED, DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT); + this.reconstructECBlockGroupsLimitEnabled = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED, + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED_DEFAULT); + this.reconstructECBlockGroupsLimit = conf.getLong( + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT, + DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_DEFAULT); + LOG.info("defaultReplication = {}", defaultReplication); LOG.info("maxReplication = {}", maxReplication); LOG.info("minReplication = {}", minReplication); @@ -632,6 +646,8 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, LOG.info("redundancyRecheckInterval = {}ms", redundancyRecheckIntervalMs); LOG.info("encryptDataTransfer = {}", encryptDataTransfer); LOG.info("maxNumBlocksToLog = {}", maxNumBlocksToLog); + LOG.info("reconstructECBlockGroupsLimit = {}", reconstructECBlockGroupsLimit); + LOG.info("reconstructECBlockGroupsLimitEnabled = {}", reconstructECBlockGroupsLimitEnabled); } private static BlockTokenSecretManager createBlockTokenSecretManager( @@ -1295,6 +1311,8 @@ public LocatedBlock convertLastBlockToUnderConstruction( new DatanodeStorageInfo[locations.size()]; locations.toArray(removedBlockTargets); DatanodeStorageInfo.decrementBlocksScheduled(removedBlockTargets); + //Remove block from reconstructECBlockGroups queue. + removeReconstructECBlockGroups(lastBlock); } // remove this block from the list of pending blocks to be deleted. @@ -2089,12 +2107,21 @@ int computeReconstructionWorkForBlocks( final DatanodeStorageInfo[] targets = rw.getTargets(); if (targets == null || targets.length == 0) { rw.resetTargets(); + if (removeReconstructECBlockGroups(rw.getBlock())) { + LOG.debug("Removing block {} from reconstructECBlockGroups, " + + "now size {}", rw.getBlock(), getReconstructECBlockGroupCount()); + } continue; } synchronized (neededReconstruction) { if (validateReconstructionWork(rw)) { scheduledWork++; + } else { + if (removeReconstructECBlockGroups(rw.getBlock())) { + LOG.debug("Removing block {} from reconstructECBlockGroups, " + + "now size {}", rw.getBlock(), getReconstructECBlockGroupCount()); + } } } } @@ -2200,6 +2227,26 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block, additionalReplRequired = additionalReplRequired - numReplicas.decommissioning() - numReplicas.liveEnteringMaintenanceReplicas(); + if (reconstructECBlockGroupsLimitEnabled && + numReplicas.liveReplicas() < requiredRedundancy) { + LOG.debug("Prepare creating an ErasureCodingWork to {} reconstruct, " + + "currently there are {} EC BlockGroups to reconstruct and " + + "the limit is {}", block, getReconstructECBlockGroupCount(), + reconstructECBlockGroupsLimit); + if (checkReconstructECBlockGroups(block) || + getReconstructECBlockGroupCount() >= this.reconstructECBlockGroupsLimit) { + LOG.warn("Currently there are {} EC BlockGroups to reconstruct and " + + "the limit is {} block {} cannot create", + getReconstructECBlockGroupCount(), + reconstructECBlockGroupsLimit, block); + return null; + } + addReconstructECBlockGroups(block); + LOG.debug("Complete creating an ErasureCodingWork to {} reconstruct, " + + "currently there are {} EC BlockGroups to reconstruct and " + + "the limit is {}", block, getReconstructECBlockGroupCount(), + reconstructECBlockGroupsLimit); + } } final DatanodeDescriptor[] newSrcNodes = new DatanodeDescriptor[srcNodes.length]; @@ -4220,6 +4267,7 @@ public void addBlock(DatanodeStorageInfo storageInfo, Block block, if (storedBlock != null && block.getGenerationStamp() == storedBlock.getGenerationStamp()) { if (pendingReconstruction.decrement(storedBlock, storageInfo)) { + removeReconstructECBlockGroups(storedBlock); NameNode.getNameNodeMetrics().incSuccessfulReReplications(); } } @@ -4649,6 +4697,7 @@ public void removeBlock(BlockInfo block) { } neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL); postponedMisreplicatedBlocks.remove(block); + removeReconstructECBlockGroups(block); } public BlockInfo getStoredBlock(Block block) { @@ -5134,6 +5183,7 @@ public void clearQueues() { invalidateBlocks.clear(); datanodeManager.clearPendingQueues(); postponedMisreplicatedBlocks.clear(); + clearReconstructECBlockGroups(); }; public static LocatedBlock newLocatedBlock( @@ -5498,4 +5548,47 @@ public void disableSPS() { public StoragePolicySatisfyManager getSPSManager() { return spsManager; } + + boolean removeReconstructECBlockGroups(BlockInfo block) { + if (reconstructECBlockGroupsLimitEnabled && block.isStriped()) { + synchronized (reconstructECBlockGroups) { + if (reconstructECBlockGroups.remove(block)) { + LOG.debug("Removing block {} from reconstructECBlockGroups, " + + "now size {}", block, getReconstructECBlockGroupCount()); + return true; + } + } + } + return false; + } + + boolean checkReconstructECBlockGroups(BlockInfo block) { + synchronized (reconstructECBlockGroups) { + if (reconstructECBlockGroups.contains(block) && + reconstructECBlockGroups.remove(block)) { + LOG.warn("Check Removing block {} from reconstructECBlockGroups, " + + "now size {}", block, getReconstructECBlockGroupCount()); + return true; + } + } + return false; + } + + boolean addReconstructECBlockGroups(BlockInfo block) { + synchronized (reconstructECBlockGroups) { + return reconstructECBlockGroups.add(block); + } + } + + public long getReconstructECBlockGroupCount() { + synchronized (reconstructECBlockGroups) { + return reconstructECBlockGroups.size(); + } + } + + void clearReconstructECBlockGroups(){ + synchronized (reconstructECBlockGroups) { + reconstructECBlockGroups.clear(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java index 94a9ae1a4e0f0..45fcf555b9c01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java @@ -43,6 +43,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.junit.Assert; @@ -430,4 +434,92 @@ public void testReconstructionWork() throws Exception { dfsCluster.shutdown(); } } + + @Test + public void testRecoveryTasksForBlockGroupsLimit() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + 1000); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED, + true); + long limit = 2; + conf.setLong(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT, limit); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 1).build(); + try { + cluster.waitActive(); + cluster.getFileSystem().enableErasureCodingPolicy( + StripedFileTestUtil.getDefaultECPolicy().getName()); + final int numBlocks = 6; + DFSTestUtil.createStripedFile(cluster, filePath, + dirPath, numBlocks, 1, true); + // all blocks will be located at first GROUP_SIZE DNs, the last DN is + // empty because of the util function createStripedFile + + // make sure the file is complete in NN + final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() + .getINode4Write(filePath.toString()).asFile(); + assertFalse(fileNode.isUnderConstruction()); + assertTrue(fileNode.isStriped()); + BlockInfo[] blocks = fileNode.getBlocks(); + assertEquals(numBlocks, blocks.length); + + BlockManager bm = cluster.getNamesystem().getBlockManager(); + + BlockInfo firstBlock = fileNode.getBlocks()[0]; + DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock); + + // make numOfMissed internal blocks missed + for (int i = 0; i < 1; i++) { + DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor(); + assertEquals(numBlocks, missedNode.numBlocks()); + bm.getDatanodeManager().removeDatanode(missedNode); + } + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); + assertEquals( numBlocks, bm.getLowRedundancyBlocksCount()); + // all the reconstruction work will be scheduled on the last DN + DataNode lastDn = cluster.getDataNodes().get(groupSize); + DatanodeDescriptor last = bm.getDatanodeManager().getDatanode + (lastDn.getDatanodeId()); + BlockManagerTestUtil.getComputedDatanodeWork(bm); + int count = 0; + while (bm.getPendingReconstructionBlocksCount() > 0) { + count++; + assertEquals("Counting the number of outstanding EC tasks", limit, + last.getNumberOfBlocksToBeErasureCoded()); + assertEquals(limit, bm.getPendingReconstructionBlocksCount()); + assertEquals(limit, bm.getReconstructECBlockGroupCount()); + List reconstruction = + last.getErasureCodeCommand((int) limit); + + for (BlockECReconstructionInfo info : reconstruction) { + String poolId = cluster.getNamesystem().getBlockPoolId(); + // let two datanodes (other than the one that already has the data) to + // report to NN + DatanodeRegistration dnR = lastDn.getDNRegistrationForBP(poolId); + StorageReceivedDeletedBlocks[] report = { + new StorageReceivedDeletedBlocks( + new DatanodeStorage("Fake-storage-ID-Ignored"), + new ReceivedDeletedBlockInfo[]{new ReceivedDeletedBlockInfo( + info.getExtendedBlock().getLocalBlock(), + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, "")}) + }; + cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report); + } + bm.flushBlockOps(); + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); + BlockManagerTestUtil.getComputedDatanodeWork(bm); + } + assertEquals(numBlocks/limit, count); + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); + assertEquals(0, last.getNumberOfBlocksToBeErasureCoded()); + assertEquals(0, bm.getPendingReconstructionBlocksCount()); + } finally { + cluster.shutdown(); + } + } }