Skip to content

Commit

Permalink
Resubmit the unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyanlei-8130 committed Jul 25, 2023
1 parent 9a24b42 commit ecd493d
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2201,76 +2201,4 @@ public void testBlockReportSetNoAckBlockToInvalidate() throws Exception {
assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb));
}
}

@Test
public void testTwoFBRSInARow() throws Exception{
DatanodeDescriptor node = spy(nodes.get(0));
node.setAlive(true);

DatanodeRegistration nodeReg =
new DatanodeRegistration(node, null, null, "");

// pretend to be in safemode
doReturn(true).when(fsn).isInStartupSafeMode();

// register new node
bm.getDatanodeManager().registerDatanode(nodeReg);
bm.getDatanodeManager().addDatanode(node);
assertEquals(node, bm.getDatanodeManager().getDatanode(node));

DatanodeStorageInfo[] storageInfos = node.getStorageInfos();
for (int r = 0; r < storageInfos.length; r++) {
DatanodeStorageInfo storageInfo = storageInfos[r];
assertEquals(0, storageInfo.getBlockReportCount());
}

reset(node);
// 1.Here's the original logic
BlockReportLeaseManager brlm = new BlockReportLeaseManager(new Configuration());
// This is the first FBR
for (int r = 0; r < storageInfos.length; r++) {
bm.processReport(node, new DatanodeStorage(storageInfos[r].getStorageID()),
BlockListAsLongs.EMPTY, null, storageInfos.length, r + 1);
//The first FBR times out because the namenode is busy,
break;
}
//and then the second FBR is the same
for (int r = 0; r < storageInfos.length; r++) {
bm.processReport(node, new DatanodeStorage(storageInfos[r].getStorageID()),
BlockListAsLongs.EMPTY, null, storageInfos.length, r + 1);
//In the original method, sending FBR twice from one disk will call removeLease
brlm.removeLease(node);
//Starting from the second disk, FBR will fail because the lease is removed
break;
}

assertEquals(2, storageInfos[0].getBlockReportCount());
assertEquals(0, storageInfos[1].getBlockReportCount());
assertEquals(0, storageInfos[2].getBlockReportCount());
assertEquals(0, storageInfos[3].getBlockReportCount());
assertEquals(0, storageInfos[4].getBlockReportCount());
assertEquals(0, storageInfos[5].getBlockReportCount());


reset(node);
//2. Now the processReport method calls removeLease only when it determines that it is the last disk
// This is the first FBR
for (int r = 0; r < storageInfos.length; r++) {
bm.processReport(node, new DatanodeStorage(storageInfos[r].getStorageID()),
BlockListAsLongs.EMPTY, null, storageInfos.length, r + 1);
//The first FBR times out because the namenode is busy, and then the second FBR is the same
break;
}
for (int r = 0; r < storageInfos.length; r++) {
bm.processReport(node, new DatanodeStorage(storageInfos[r].getStorageID()),
BlockListAsLongs.EMPTY, null, storageInfos.length, r + 1);
}

assertEquals(2, storageInfos[0].getBlockReportCount());
assertEquals(1, storageInfos[1].getBlockReportCount());
assertEquals(1, storageInfos[2].getBlockReportCount());
assertEquals(1, storageInfos[3].getBlockReportCount());
assertEquals(1, storageInfos[4].getBlockReportCount());
assertEquals(1, storageInfos[5].getBlockReportCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doReturn;

/**
* Tests that BlockReportLease in BlockManager.
Expand Down Expand Up @@ -269,4 +270,87 @@ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
}
return storageBlockReports;
}

@Test
public void testFirstIncompleteBlockReport() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
Random rand = new Random();

try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
cluster.waitActive();

FSNamesystem fsn = cluster.getNamesystem();

// pretend to be in safemode
doReturn(true).when(fsn).isInStartupSafeMode();

BlockManager blockManager = fsn.getBlockManager();
BlockManager spyBlockManager = spy(blockManager);
fsn.setBlockManagerForTesting(spyBlockManager);
String poolId = cluster.getNamesystem().getBlockPoolId();

NamenodeProtocols rpcServer = cluster.getNameNodeRpc();

// Test based on one DataNode report to Namenode
DataNode dn = cluster.getDataNodes().get(0);
DatanodeDescriptor datanodeDescriptor = spyBlockManager
.getDatanodeManager().getDatanode(dn.getDatanodeId());

DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);

// Send heartbeat and request full block report lease
HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);

DelayAnswer delayer = new DelayAnswer(BlockManager.LOG);
doAnswer(delayer).when(spyBlockManager).processReport(
any(DatanodeStorageInfo.class),
any(BlockListAsLongs.class));

ExecutorService pool = Executors.newFixedThreadPool(1);

// Trigger sendBlockReport
BlockReportContext brContext = new BlockReportContext(1, 0,
rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
// Build every storage with 100 blocks for sending report
DatanodeStorage[] datanodeStorages
= new DatanodeStorage[storages.length];
for (int i = 0; i < storages.length; i++) {
datanodeStorages[i] = storages[i].getStorage();
StorageBlockReport[] reports = createReports(datanodeStorages, 100);

// The first multiple send once, simulating the failure of the first report, only send successfully once
if(i == 0){
rpcServer.blockReport(dnRegistration, poolId, reports, brContext);
}

// Send blockReport
DatanodeCommand datanodeCommand = rpcServer.blockReport(dnRegistration, poolId, reports,
brContext);

// Wait until BlockManager calls processReport
delayer.waitForCall();

// Remove full block report lease about dn
spyBlockManager.getBlockReportLeaseManager()
.removeLease(datanodeDescriptor);

// Allow blockreport to proceed
delayer.proceed();

// Get result, it will not null if process successfully
assertTrue(datanodeCommand instanceof FinalizeCommand);
assertEquals(poolId, ((FinalizeCommand)datanodeCommand)
.getBlockPoolId());
if(i == 0){
assertEquals(2, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
}else{
assertEquals(1, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
}
}
}
}
}

0 comments on commit ecd493d

Please sign in to comment.