Skip to content

Commit

Permalink
HDFS-17094. EC: Fix bug in block recovery when there are stale datano…
Browse files Browse the repository at this point in the history
…des. (#5854)

Reviewed-by: He Xiaoqiao <[email protected]>
Signed-off-by: Tao Li <[email protected]>
  • Loading branch information
zhangshuyan0 authored Jul 20, 2023
1 parent 23ecc32 commit 7ba2bd6
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ public byte[] getBlockIndices() {
return indices;
}

public byte[] getBlockIndicesForSpecifiedStorages(List<Integer> storageIdx) {
byte[] indices = new byte[storageIdx.size()];
for (int i = 0; i < indices.length; i++) {
indices[i] = BlockIdManager.getBlockIndex(replicas[storageIdx.get(i)]);
}
return indices;
}

public int getNumExpectedLocations() {
return replicas.length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1720,9 +1720,11 @@ private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
// Skip stale nodes during recovery
final List<DatanodeStorageInfo> recoveryLocations =
new ArrayList<>(storages.length);
for (DatanodeStorageInfo storage : storages) {
if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storage);
final List<Integer> storageIdx = new ArrayList<>(storages.length);
for (int i = 0; i < storages.length; ++i) {
if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storages[i]);
storageIdx.add(i);
}
}
// If we are performing a truncate recovery than set recovery fields
Expand Down Expand Up @@ -1755,7 +1757,8 @@ private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
uc.getBlockRecoveryId());
if (b.isStriped()) {
rBlock = new RecoveringStripedBlock(rBlock, uc.getBlockIndices(),
rBlock = new RecoveringStripedBlock(rBlock,
uc.getBlockIndicesForSpecifiedStorages(storageIdx),
((BlockInfoStriped) b).getErasureCodingPolicy());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -188,6 +192,62 @@ public void testLeaseRecovery() throws Exception {
}
}

/**
* Test lease recovery for EC policy when one internal block located on
* stale datanode.
*/
@Test
public void testLeaseRecoveryWithStaleDataNode() {
LOG.info("blockLengthsSuite: " +
Arrays.toString(blockLengthsSuite));
long staleInterval = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);

for (int i = 0; i < blockLengthsSuite.length; i++) {
BlockLengths blockLengths = blockLengthsSuite[i];
try {
writePartialBlocks(blockLengths.getBlockLengths());

// Get block info for the last block and mark corresponding datanode
// as stale.
LocatedBlock locatedblock =
TestInterDatanodeProtocol.getLastLocatedBlock(
dfs.dfs.getNamenode(), p.toString());
DatanodeInfo firstDataNode = locatedblock.getLocations()[0];
DatanodeDescriptor dnDes = cluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager()
.getDatanode(firstDataNode);
DataNodeTestUtils.setHeartbeatsDisabledForTests(
cluster.getDataNode(dnDes.getIpcPort()), true);
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));

long[] longArray = new long[blockLengths.getBlockLengths().length - 1];
for (int j = 0; j < longArray.length; ++j) {
longArray[j] = blockLengths.getBlockLengths()[j + 1];
}
int safeLength = (int) StripedBlockUtil.getSafeLength(ecPolicy,
longArray);
int checkDataLength = Math.min(testFileLength, safeLength);
recoverLease();
List<Long> oldGS = new ArrayList<>();
oldGS.add(1001L);
StripedFileTestUtil.checkData(dfs, p, checkDataLength,
new ArrayList<>(), oldGS, blockGroupSize);

DataNodeTestUtils.setHeartbeatsDisabledForTests(
cluster.getDataNode(dnDes.getIpcPort()), false);
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, 0);

} catch (Throwable e) {
String msg = "failed testCase at i=" + i + ", blockLengths="
+ blockLengths + "\n"
+ StringUtils.stringifyException(e);
Assert.fail(msg);
}
}
}

@Test
public void testSafeLength() {
checkSafeLength(0, 0); // Length of: 0
Expand Down

0 comments on commit 7ba2bd6

Please sign in to comment.