Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17093. Fix block report lease issue to avoid missing some storages report. #5855

Merged
merged 15 commits into from
Aug 28, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2909,7 +2909,7 @@ public boolean processReport(final DatanodeID nodeID,
+ "discarded non-initial block report from {}"
+ " because namenode still in startup phase",
strBlockReportId, fullBrLeaseId, nodeID);
blockReportLeaseManager.removeLease(node);
removeLease(node);
return !node.hasStaleStorages();
}

Expand Down Expand Up @@ -2957,6 +2957,19 @@ public boolean processReport(final DatanodeID nodeID,
return !node.hasStaleStorages();
}

// Remove the lease when we have received block reports for all storages for a particular DN.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the lease when we have received block reports for all storages for a particular DN.

->

Remove the DN lease only when we have received block reports for all storages for a particular DN.

void removeLease(DatanodeDescriptor node) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: removeLease -> removeDNLeaseIfNeeded()

boolean needRemoveLease = true;
for (DatanodeStorageInfo sInfo : node.getStorageInfos()) {
if (sInfo.getBlockReportCount() == 0) {
needRemoveLease = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fast break here if meet sInfo.getBlockReportCount() == 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I do. Let me add it

}
}
if (needRemoveLease) {
blockReportLeaseManager.removeLease(node);
}
}

public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
final BlockReportContext context) throws IOException {
namesystem.writeLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
Expand Down Expand Up @@ -269,4 +271,84 @@ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
}
return storageBlockReports;
}

@Test(timeout = 360000)
public void testFirstIncompleteBlockReport() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
Random rand = new Random();

try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
cluster.waitActive();

FSNamesystem fsn = cluster.getNamesystem();

NameNode nameNode = cluster.getNameNode();
// pretend to be in safemode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first letter need to be uppercase and end with period at the end of sentence.

NameNodeAdapter.enterSafeMode(nameNode, false);

BlockManager blockManager = fsn.getBlockManager();
BlockManager spyBlockManager = spy(blockManager);
fsn.setBlockManagerForTesting(spyBlockManager);
String poolId = cluster.getNamesystem().getBlockPoolId();

NamenodeProtocols rpcServer = cluster.getNameNodeRpc();

// Test based on one DataNode report to Namenode
DataNode dn = cluster.getDataNodes().get(0);
DatanodeDescriptor datanodeDescriptor = spyBlockManager
.getDatanodeManager().getDatanode(dn.getDatanodeId());
yuyanlei-8130 marked this conversation as resolved.
Show resolved Hide resolved

DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);

// Send heartbeat and request full block report lease
HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);

DelayAnswer delayer = new DelayAnswer(BlockManager.LOG);
doAnswer(delayer).when(spyBlockManager).processReport(
any(DatanodeStorageInfo.class),
any(BlockListAsLongs.class));

ExecutorService pool = Executors.newFixedThreadPool(1);

// Trigger sendBlockReport
BlockReportContext brContext = new BlockReportContext(1, 0,
rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
// Build every storage with 100 blocks for sending report
DatanodeStorage[] datanodeStorages
= new DatanodeStorage[storages.length];
for (int i = 0; i < storages.length; i++) {
datanodeStorages[i] = storages[i].getStorage();
StorageBlockReport[] reports = createReports(datanodeStorages, 100);

// The first multiple send once, simulating the failure of the first report, only send successfully once
if(i == 0){
rpcServer.blockReport(dnRegistration, poolId, reports, brContext);
}

// Send blockReport
DatanodeCommand datanodeCommand = rpcServer.blockReport(dnRegistration, poolId, reports,
brContext);

// Wait until BlockManager calls processReport
delayer.waitForCall();

// Allow blockreport to proceed
delayer.proceed();

// Get result, it will not null if process successfully
assertTrue(datanodeCommand instanceof FinalizeCommand);
assertEquals(poolId, ((FinalizeCommand)datanodeCommand)
.getBlockPoolId());
if(i == 0){
assertEquals(2, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
}else{
assertEquals(1, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
}
}
}
}
}