diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 122bfe2e5f58d..1f046e87d6af3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -2201,76 +2201,4 @@ public void testBlockReportSetNoAckBlockToInvalidate() throws Exception { assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb)); } } - - @Test - public void testTwoFBRSInARow() throws Exception{ - DatanodeDescriptor node = spy(nodes.get(0)); - node.setAlive(true); - - DatanodeRegistration nodeReg = - new DatanodeRegistration(node, null, null, ""); - - // pretend to be in safemode - doReturn(true).when(fsn).isInStartupSafeMode(); - - // register new node - bm.getDatanodeManager().registerDatanode(nodeReg); - bm.getDatanodeManager().addDatanode(node); - assertEquals(node, bm.getDatanodeManager().getDatanode(node)); - - DatanodeStorageInfo[] storageInfos = node.getStorageInfos(); - for (int r = 0; r < storageInfos.length; r++) { - DatanodeStorageInfo storageInfo = storageInfos[r]; - assertEquals(0, storageInfo.getBlockReportCount()); - } - - reset(node); - // 1.Here's the original logic - BlockReportLeaseManager brlm = new BlockReportLeaseManager(new Configuration()); - // This is the first FBR - for (int r = 0; r < storageInfos.length; r++) { - bm.processReport(node, new DatanodeStorage(storageInfos[r].getStorageID()), - BlockListAsLongs.EMPTY, null, storageInfos.length, r + 1); - //The first FBR times out because the namenode is busy, - break; - } - //and then the second FBR is the same - for (int r = 0; r < storageInfos.length; r++) { - bm.processReport(node, new DatanodeStorage(storageInfos[r].getStorageID()), - BlockListAsLongs.EMPTY, null, storageInfos.length, r + 1); - //In the original method, sending FBR twice from one disk will call removeLease - brlm.removeLease(node); - //Starting from the second disk, FBR will fail because the lease is removed - break; - } - - assertEquals(2, storageInfos[0].getBlockReportCount()); - assertEquals(0, storageInfos[1].getBlockReportCount()); - assertEquals(0, storageInfos[2].getBlockReportCount()); - assertEquals(0, storageInfos[3].getBlockReportCount()); - assertEquals(0, storageInfos[4].getBlockReportCount()); - assertEquals(0, storageInfos[5].getBlockReportCount()); - - - reset(node); - //2. Now the processReport method calls removeLease only when it determines that it is the last disk - // This is the first FBR - for (int r = 0; r < storageInfos.length; r++) { - bm.processReport(node, new DatanodeStorage(storageInfos[r].getStorageID()), - BlockListAsLongs.EMPTY, null, storageInfos.length, r + 1); - //The first FBR times out because the namenode is busy, and then the second FBR is the same - break; - } - for (int r = 0; r < storageInfos.length; r++) { - bm.processReport(node, new DatanodeStorage(storageInfos[r].getStorageID()), - BlockListAsLongs.EMPTY, null, storageInfos.length, r + 1); - } - - assertEquals(2, storageInfos[0].getBlockReportCount()); - assertEquals(1, storageInfos[1].getBlockReportCount()); - assertEquals(1, storageInfos[2].getBlockReportCount()); - assertEquals(1, storageInfos[3].getBlockReportCount()); - assertEquals(1, storageInfos[4].getBlockReportCount()); - assertEquals(1, storageInfos[5].getBlockReportCount()); - } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java index 225f7fc96c458..48ded7558bac1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java @@ -53,6 +53,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doReturn; /** * Tests that BlockReportLease in BlockManager. @@ -269,4 +270,87 @@ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages, } return storageBlockReports; } + + @Test + public void testFirstIncompleteBlockReport() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + Random rand = new Random(); + + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { + cluster.waitActive(); + + FSNamesystem fsn = cluster.getNamesystem(); + + // pretend to be in safemode + doReturn(true).when(fsn).isInStartupSafeMode(); + + BlockManager blockManager = fsn.getBlockManager(); + BlockManager spyBlockManager = spy(blockManager); + fsn.setBlockManagerForTesting(spyBlockManager); + String poolId = cluster.getNamesystem().getBlockPoolId(); + + NamenodeProtocols rpcServer = cluster.getNameNodeRpc(); + + // Test based on one DataNode report to Namenode + DataNode dn = cluster.getDataNodes().get(0); + DatanodeDescriptor datanodeDescriptor = spyBlockManager + .getDatanodeManager().getDatanode(dn.getDatanodeId()); + + DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId); + StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId); + + // Send heartbeat and request full block report lease + HeartbeatResponse hbResponse = rpcServer.sendHeartbeat( + dnRegistration, storages, 0, 0, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + + DelayAnswer delayer = new DelayAnswer(BlockManager.LOG); + doAnswer(delayer).when(spyBlockManager).processReport( + any(DatanodeStorageInfo.class), + any(BlockListAsLongs.class)); + + ExecutorService pool = Executors.newFixedThreadPool(1); + + // Trigger sendBlockReport + BlockReportContext brContext = new BlockReportContext(1, 0, + rand.nextLong(), hbResponse.getFullBlockReportLeaseId()); + // Build every storage with 100 blocks for sending report + DatanodeStorage[] datanodeStorages + = new DatanodeStorage[storages.length]; + for (int i = 0; i < storages.length; i++) { + datanodeStorages[i] = storages[i].getStorage(); + StorageBlockReport[] reports = createReports(datanodeStorages, 100); + + // The first multiple send once, simulating the failure of the first report, only send successfully once + if(i == 0){ + rpcServer.blockReport(dnRegistration, poolId, reports, brContext); + } + + // Send blockReport + DatanodeCommand datanodeCommand = rpcServer.blockReport(dnRegistration, poolId, reports, + brContext); + + // Wait until BlockManager calls processReport + delayer.waitForCall(); + + // Remove full block report lease about dn + spyBlockManager.getBlockReportLeaseManager() + .removeLease(datanodeDescriptor); + + // Allow blockreport to proceed + delayer.proceed(); + + // Get result, it will not null if process successfully + assertTrue(datanodeCommand instanceof FinalizeCommand); + assertEquals(poolId, ((FinalizeCommand)datanodeCommand) + .getBlockPoolId()); + if(i == 0){ + assertEquals(2, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount()); + }else{ + assertEquals(1, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount()); + } + } + } + } }