Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17093. Fix block report lease issue to avoid missing some storages report. #5855

Merged
merged 15 commits into from
Aug 28, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2909,7 +2909,7 @@ public boolean processReport(final DatanodeID nodeID,
+ "discarded non-initial block report from {}"
+ " because namenode still in startup phase",
strBlockReportId, fullBrLeaseId, nodeID);
blockReportLeaseManager.removeLease(node);
removeDNLeaseIfNeeded(node);
return !node.hasStaleStorages();
}

Expand Down Expand Up @@ -2957,6 +2957,22 @@ public boolean processReport(final DatanodeID nodeID,
return !node.hasStaleStorages();
}

/**
* Remove the DN lease only when we have received block reports,
* for all storages for a particular DN.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix checkstyle.

*/
void removeDNLeaseIfNeeded(DatanodeDescriptor node) {
boolean needRemoveLease = true;
for (DatanodeStorageInfo sInfo : node.getStorageInfos()) {
if (sInfo.getBlockReportCount() == 0) {
needRemoveLease = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fast break here if meet sInfo.getBlockReportCount() == 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I do. Let me add it

}
}
if (needRemoveLease) {
blockReportLeaseManager.removeLease(node);
}
}

public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
final BlockReportContext context) throws IOException {
namesystem.writeLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
Expand Down Expand Up @@ -269,4 +271,83 @@ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
}
return storageBlockReports;
}

@Test(timeout = 360000)
public void testFirstIncompleteBlockReport() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
Random rand = new Random();

try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
cluster.waitActive();

FSNamesystem fsn = cluster.getNamesystem();

NameNode nameNode = cluster.getNameNode();
// Pretend to be in safemode.
NameNodeAdapter.enterSafeMode(nameNode, false);

BlockManager blockManager = fsn.getBlockManager();
BlockManager spyBlockManager = spy(blockManager);
fsn.setBlockManagerForTesting(spyBlockManager);
String poolId = cluster.getNamesystem().getBlockPoolId();

NamenodeProtocols rpcServer = cluster.getNameNodeRpc();

// Test based on one DataNode report to Namenode.
DataNode dn = cluster.getDataNodes().get(0);
DatanodeDescriptor datanodeDescriptor = spyBlockManager
.getDatanodeManager().getDatanode(dn.getDatanodeId());

DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);

// Send heartbeat and request full block report lease.
HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);

DelayAnswer delayer = new DelayAnswer(BlockManager.LOG);
doAnswer(delayer).when(spyBlockManager).processReport(
any(DatanodeStorageInfo.class),
any(BlockListAsLongs.class));

// Trigger sendBlockReport.
BlockReportContext brContext = new BlockReportContext(1, 0,
rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
// Build every storage with 100 blocks for sending report.
DatanodeStorage[] datanodeStorages
= new DatanodeStorage[storages.length];
for (int i = 0; i < storages.length; i++) {
datanodeStorages[i] = storages[i].getStorage();
StorageBlockReport[] reports = createReports(datanodeStorages, 100);

// The first multiple send once, simulating the failure of the first report,
// only send successfully once.
if(i == 0){
rpcServer.blockReport(dnRegistration, poolId, reports, brContext);
}

// Send blockReport.
DatanodeCommand datanodeCommand = rpcServer.blockReport(dnRegistration, poolId, reports,
brContext);

// Wait until BlockManager calls processReport.
delayer.waitForCall();

// Allow blockreport to proceed.
delayer.proceed();

// Get result, it will not null if process successfully.
assertTrue(datanodeCommand instanceof FinalizeCommand);
assertEquals(poolId, ((FinalizeCommand)datanodeCommand)
.getBlockPoolId());
if(i == 0){
assertEquals(2, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
}else{
assertEquals(1, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
}
}
}
}
}