Skip to content

Commit

Permalink
HDFS-15889. Erasure Coding: Limit the number of concurrent EC Reconst…
Browse files Browse the repository at this point in the history
…ruct blocks in NameNode
  • Loading branch information
huhaiyang committed Mar 13, 2021
1 parent 2b62b12 commit 03b4022
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT = true;

public static final String DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED =
"dfs.namenode.reconstruct.ecblock-groups.limit.enable";
public static final boolean DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED_DEFAULT = false;

public static final String DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT =
"dfs.namenode.reconstruct.ecblock-groups.limit";
public static final long DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_DEFAULT = 1000;

@Deprecated
public static final String DFS_WEBHDFS_USER_PATTERN_KEY =
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;

Expand Down Expand Up @@ -469,6 +470,12 @@ public long getTotalECBlockGroups() {
/** Storages accessible from multiple DNs. */
private final ProvidedStorageMap providedStorageMap;

/** Whether to enable limit EC block reconstruct.*/
private volatile boolean reconstructECBlockGroupsLimitEnabled;
private volatile long reconstructECBlockGroupsLimit;
private final LightWeightHashSet<BlockInfo> reconstructECBlockGroups =
new LightWeightHashSet<>();

public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
Expand Down Expand Up @@ -625,13 +632,22 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);

this.reconstructECBlockGroupsLimitEnabled = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED_DEFAULT);
this.reconstructECBlockGroupsLimit = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_DEFAULT);

LOG.info("defaultReplication = {}", defaultReplication);
LOG.info("maxReplication = {}", maxReplication);
LOG.info("minReplication = {}", minReplication);
LOG.info("maxReplicationStreams = {}", maxReplicationStreams);
LOG.info("redundancyRecheckInterval = {}ms", redundancyRecheckIntervalMs);
LOG.info("encryptDataTransfer = {}", encryptDataTransfer);
LOG.info("maxNumBlocksToLog = {}", maxNumBlocksToLog);
LOG.info("reconstructECBlockGroupsLimit = {}", reconstructECBlockGroupsLimit);
LOG.info("reconstructECBlockGroupsLimitEnabled = {}", reconstructECBlockGroupsLimitEnabled);
}

private static BlockTokenSecretManager createBlockTokenSecretManager(
Expand Down Expand Up @@ -1295,6 +1311,8 @@ public LocatedBlock convertLastBlockToUnderConstruction(
new DatanodeStorageInfo[locations.size()];
locations.toArray(removedBlockTargets);
DatanodeStorageInfo.decrementBlocksScheduled(removedBlockTargets);
//Remove block from reconstructECBlockGroups queue.
removeReconstructECBlockGroups(lastBlock);
}

// remove this block from the list of pending blocks to be deleted.
Expand Down Expand Up @@ -2089,12 +2107,21 @@ int computeReconstructionWorkForBlocks(
final DatanodeStorageInfo[] targets = rw.getTargets();
if (targets == null || targets.length == 0) {
rw.resetTargets();
if (removeReconstructECBlockGroups(rw.getBlock())) {
LOG.debug("Removing block {} from reconstructECBlockGroups, " +
"now size {}", rw.getBlock(), getReconstructECBlockGroupCount());
}
continue;
}

synchronized (neededReconstruction) {
if (validateReconstructionWork(rw)) {
scheduledWork++;
} else {
if (removeReconstructECBlockGroups(rw.getBlock())) {
LOG.debug("Removing block {} from reconstructECBlockGroups, " +
"now size {}", rw.getBlock(), getReconstructECBlockGroupCount());
}
}
}
}
Expand Down Expand Up @@ -2200,6 +2227,26 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
additionalReplRequired = additionalReplRequired -
numReplicas.decommissioning() -
numReplicas.liveEnteringMaintenanceReplicas();
if (reconstructECBlockGroupsLimitEnabled &&
numReplicas.liveReplicas() < requiredRedundancy) {
LOG.debug("Prepare creating an ErasureCodingWork to {} reconstruct, " +
"currently there are {} EC BlockGroups to reconstruct and " +
"the limit is {}", block, getReconstructECBlockGroupCount(),
reconstructECBlockGroupsLimit);
if (checkReconstructECBlockGroups(block) ||
getReconstructECBlockGroupCount() >= this.reconstructECBlockGroupsLimit) {
LOG.warn("Currently there are {} EC BlockGroups to reconstruct and " +
"the limit is {} block {} cannot create",
getReconstructECBlockGroupCount(),
reconstructECBlockGroupsLimit, block);
return null;
}
addReconstructECBlockGroups(block);
LOG.debug("Complete creating an ErasureCodingWork to {} reconstruct, " +
"currently there are {} EC BlockGroups to reconstruct and " +
"the limit is {}", block, getReconstructECBlockGroupCount(),
reconstructECBlockGroupsLimit);
}
}
final DatanodeDescriptor[] newSrcNodes =
new DatanodeDescriptor[srcNodes.length];
Expand Down Expand Up @@ -4220,6 +4267,7 @@ public void addBlock(DatanodeStorageInfo storageInfo, Block block,
if (storedBlock != null &&
block.getGenerationStamp() == storedBlock.getGenerationStamp()) {
if (pendingReconstruction.decrement(storedBlock, storageInfo)) {
removeReconstructECBlockGroups(storedBlock);
NameNode.getNameNodeMetrics().incSuccessfulReReplications();
}
}
Expand Down Expand Up @@ -4649,6 +4697,7 @@ public void removeBlock(BlockInfo block) {
}
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
postponedMisreplicatedBlocks.remove(block);
removeReconstructECBlockGroups(block);
}

public BlockInfo getStoredBlock(Block block) {
Expand Down Expand Up @@ -5134,6 +5183,7 @@ public void clearQueues() {
invalidateBlocks.clear();
datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear();
clearReconstructECBlockGroups();
};

public static LocatedBlock newLocatedBlock(
Expand Down Expand Up @@ -5498,4 +5548,47 @@ public void disableSPS() {
public StoragePolicySatisfyManager getSPSManager() {
return spsManager;
}

boolean removeReconstructECBlockGroups(BlockInfo block) {
if (reconstructECBlockGroupsLimitEnabled && block.isStriped()) {
synchronized (reconstructECBlockGroups) {
if (reconstructECBlockGroups.remove(block)) {
LOG.debug("Removing block {} from reconstructECBlockGroups, " +
"now size {}", block, getReconstructECBlockGroupCount());
return true;
}
}
}
return false;
}

boolean checkReconstructECBlockGroups(BlockInfo block) {
synchronized (reconstructECBlockGroups) {
if (reconstructECBlockGroups.contains(block) &&
reconstructECBlockGroups.remove(block)) {
LOG.warn("Check Removing block {} from reconstructECBlockGroups, " +
"now size {}", block, getReconstructECBlockGroupCount());
return true;
}
}
return false;
}

boolean addReconstructECBlockGroups(BlockInfo block) {
synchronized (reconstructECBlockGroups) {
return reconstructECBlockGroups.add(block);
}
}

public long getReconstructECBlockGroupCount() {
synchronized (reconstructECBlockGroups) {
return reconstructECBlockGroups.size();
}
}

void clearReconstructECBlockGroups(){
synchronized (reconstructECBlockGroups) {
reconstructECBlockGroups.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;

import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.Assert;
Expand Down Expand Up @@ -430,4 +434,92 @@ public void testReconstructionWork() throws Exception {
dfsCluster.shutdown();
}
}

@Test
public void testRecoveryTasksForBlockGroupsLimit() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1000);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT_ENABLED,
true);
long limit = 2;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCT_EC_BLOCK_GROUPS_LIMIT, limit);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 1).build();
try {
cluster.waitActive();
cluster.getFileSystem().enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
final int numBlocks = 6;
DFSTestUtil.createStripedFile(cluster, filePath,
dirPath, numBlocks, 1, true);
// all blocks will be located at first GROUP_SIZE DNs, the last DN is
// empty because of the util function createStripedFile

// make sure the file is complete in NN
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
.getINode4Write(filePath.toString()).asFile();
assertFalse(fileNode.isUnderConstruction());
assertTrue(fileNode.isStriped());
BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(numBlocks, blocks.length);

BlockManager bm = cluster.getNamesystem().getBlockManager();

BlockInfo firstBlock = fileNode.getBlocks()[0];
DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);

// make numOfMissed internal blocks missed
for (int i = 0; i < 1; i++) {
DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor();
assertEquals(numBlocks, missedNode.numBlocks());
bm.getDatanodeManager().removeDatanode(missedNode);
}
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
assertEquals( numBlocks, bm.getLowRedundancyBlocksCount());
// all the reconstruction work will be scheduled on the last DN
DataNode lastDn = cluster.getDataNodes().get(groupSize);
DatanodeDescriptor last = bm.getDatanodeManager().getDatanode
(lastDn.getDatanodeId());
BlockManagerTestUtil.getComputedDatanodeWork(bm);
int count = 0;
while (bm.getPendingReconstructionBlocksCount() > 0) {
count++;
assertEquals("Counting the number of outstanding EC tasks", limit,
last.getNumberOfBlocksToBeErasureCoded());
assertEquals(limit, bm.getPendingReconstructionBlocksCount());
assertEquals(limit, bm.getReconstructECBlockGroupCount());
List<BlockECReconstructionInfo> reconstruction =
last.getErasureCodeCommand((int) limit);

for (BlockECReconstructionInfo info : reconstruction) {
String poolId = cluster.getNamesystem().getBlockPoolId();
// let two datanodes (other than the one that already has the data) to
// report to NN
DatanodeRegistration dnR = lastDn.getDNRegistrationForBP(poolId);
StorageReceivedDeletedBlocks[] report = {
new StorageReceivedDeletedBlocks(
new DatanodeStorage("Fake-storage-ID-Ignored"),
new ReceivedDeletedBlockInfo[]{new ReceivedDeletedBlockInfo(
info.getExtendedBlock().getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, "")})
};
cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report);
}
bm.flushBlockOps();
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
}
assertEquals(numBlocks/limit, count);
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
assertEquals(0, last.getNumberOfBlocksToBeErasureCoded());
assertEquals(0, bm.getPendingReconstructionBlocksCount());
} finally {
cluster.shutdown();
}
}
}

0 comments on commit 03b4022

Please sign in to comment.