Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17093. Fix block report lease issue to avoid missing some storages report. #5855

Merged
merged 15 commits into from
Aug 28, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2866,14 +2866,18 @@ public boolean checkBlockReportLease(BlockReportContext context,
* The given storage is reporting all its blocks.
* Update the (storage{@literal -->}block list) and
* (block{@literal -->}storage list) maps.
* totalReportNum -> totalStorageReportsNum
* currentReportNum -> currentStorageReportIndex
*
* @return true if all known storages of the given DN have finished reporting.
* @throws IOException
*/
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
final BlockListAsLongs newReport,
BlockReportContext context) throws IOException {
BlockReportContext context,
int totalReportNum,
int currentReportNum) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a. Please add some Javadoc about added parameter.
b. Will this name be more readable?
totalReportNum -> totalStorageReportsNum,
currentReportNum -> storageReportIndex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok

namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
Expand Down Expand Up @@ -2904,7 +2908,8 @@ public boolean processReport(final DatanodeID nodeID,
}
if (namesystem.isInStartupSafeMode()
&& !StorageType.PROVIDED.equals(storageInfo.getStorageType())
&& storageInfo.getBlockReportCount() > 0) {
&& storageInfo.getBlockReportCount() > 0
&& totalReportNum == currentReportNum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a datanode report twice during namenode safemode, the second report will be almost completely processed, which may extend startup time. How about modify code like this? This can also avoid changes in the method signature.

if (namesystem.isInStartupSafeMode()
          && !StorageType.PROVIDED.equals(storageInfo.getStorageType())
          && storageInfo.getBlockReportCount() > 0) {
        blockLog.info("BLOCK* processReport 0x{} with lease ID 0x{}: "
            + "discarded non-initial block report from datanode {} storage {} "
            + " because namenode still in startup phase",
            strBlockReportId, fullBrLeaseId, nodeID, storageInfo.getStorageID());
        boolean needRemoveLease = true;
        for (DatanodeStorageInfo sInfo : node.getStorageInfos()) {
          if (sInfo.getBlockReportCount() == 0) {
            needRemoveLease = false;
          }
        }
        if (needRemoveLease) {
          blockReportLeaseManager.removeLease(node);
        }
        return !node.hasStaleStorages();
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangshuyan0 Thank you for your retrial,This change can achieve the same effect, but I think node.hasStaleStorages() is also a Datanode-level operation that should also be called on the last disk, but logically, functionally, it's not that different。Listen to other people's opinions ,@Hexiaoqiao What do you think about that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like @zhangshuyan0's proposal better.

The following section of code can also be separated to be a function on its own.

// Remove the lease when we have received block reports for all storages for a particular DN.
void removeLease() {

        for (DatanodeStorageInfo sInfo : node.getStorageInfos()) {
          if (sInfo.getBlockReportCount() == 0) {
            needRemoveLease = false;
          }
        }
        if (needRemoveLease) {
          blockReportLeaseManager.removeLease(node);
        }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this will be a good solution with condition blockReportCount == 0, consider that one disk failed but not checked in time. Will it affect this logic here? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two of the cases will come into this logic

  1. The namenode is restarting while receiving FBRS from all datanodes and is in safe mode
  2. When the namenode is in secure mode for some reason while it has been running for a long time
    In the first case, if the datanode has a failed disk, the datanode will send the FBR for the normal disk and the namenode will handle it normally
    In the second case, blockReportCount == 0 will always be false if no new disks are added to the datanode
    So I recommend keeping the code as it is and not using blockReportCount == 0

blockLog.info("BLOCK* processReport 0x{} with lease ID 0x{}: "
+ "discarded non-initial block report from {}"
+ " because namenode still in startup phase",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
final int index = r;
noStaleStorages = bm.runBlockOp(() ->
bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context));
blocks, context, reports.length, index + 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding me of the shortcomings of my plan. I will try to improve it. However, the solution in this PR may not work. If datanode send one storage report per RPC, reports.length will be 1 here. Your code totalReportNum == currentReportNum will always be true. So block report lease will be removed as before. This repairing will be ineffective.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. +1, We have to solve this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding me. I think we'll have to think of something else

}
} else {
throw new InvalidBlockReportLeaseException(context.getReportId(), context.getLeaseId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,12 +1080,12 @@ public void testSafeModeIBR() throws Exception {
reset(node);

bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 1, 1);
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 1, 1);
assertEquals(1, ds.getBlockReportCount());

// re-register as if node restarted, should update existing node
Expand All @@ -1096,7 +1096,7 @@ public void testSafeModeIBR() throws Exception {
// send block report, should be processed after restart
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 1, 1);
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
Expand Down Expand Up @@ -1125,7 +1125,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception {
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 1, 1);
assertEquals(1, ds.getBlockReportCount());
}

Expand Down Expand Up @@ -1198,7 +1198,7 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception {
// Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
builder.build(), null);
builder.build(), null, 1, 1);
assertEquals(1, ds.getBlockReportCount());

// verify the storage info is correct
Expand Down Expand Up @@ -1249,12 +1249,12 @@ public void testSafeModeWithProvidedStorageBR() throws Exception {
bmPs.getDatanodeManager().addDatanode(node1);

// process reports of provided storage and disk storage
bmPs.processReport(node0, providedStorage, BlockListAsLongs.EMPTY, null);
bmPs.processReport(node0, providedStorage, BlockListAsLongs.EMPTY, null, 2, 1);
bmPs.processReport(node0, new DatanodeStorage(ds0.getStorageID()),
BlockListAsLongs.EMPTY, null);
bmPs.processReport(node1, providedStorage, BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 2, 2);
bmPs.processReport(node1, providedStorage, BlockListAsLongs.EMPTY, null, 2, 1);
bmPs.processReport(node1, new DatanodeStorage(ds1.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 2, 2);

// The provided stoage report should not affect disk storage report
DatanodeStorageInfo dsPs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
Expand Down Expand Up @@ -53,6 +55,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doReturn;

/**
* Tests that BlockReportLease in BlockManager.
Expand Down Expand Up @@ -269,4 +272,84 @@ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages,
}
return storageBlockReports;
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need add a timeout here.

public void testFirstIncompleteBlockReport() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
Random rand = new Random();

try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
cluster.waitActive();

FSNamesystem fsn = cluster.getNamesystem();

NameNode nameNode = cluster.getNameNode();
// pretend to be in safemode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first letter need to be uppercase and end with period at the end of sentence.

NameNodeAdapter.enterSafeMode(nameNode, false);

BlockManager blockManager = fsn.getBlockManager();
BlockManager spyBlockManager = spy(blockManager);
fsn.setBlockManagerForTesting(spyBlockManager);
String poolId = cluster.getNamesystem().getBlockPoolId();

NamenodeProtocols rpcServer = cluster.getNameNodeRpc();

// Test based on one DataNode report to Namenode
DataNode dn = cluster.getDataNodes().get(0);
DatanodeDescriptor datanodeDescriptor = spyBlockManager
.getDatanodeManager().getDatanode(dn.getDatanodeId());
yuyanlei-8130 marked this conversation as resolved.
Show resolved Hide resolved

DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId);
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId);

// Send heartbeat and request full block report lease
HeartbeatResponse hbResponse = rpcServer.sendHeartbeat(
dnRegistration, storages, 0, 0, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);

DelayAnswer delayer = new DelayAnswer(BlockManager.LOG);
doAnswer(delayer).when(spyBlockManager).processReport(
any(DatanodeStorageInfo.class),
any(BlockListAsLongs.class));

ExecutorService pool = Executors.newFixedThreadPool(1);

// Trigger sendBlockReport
BlockReportContext brContext = new BlockReportContext(1, 0,
rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
// Build every storage with 100 blocks for sending report
DatanodeStorage[] datanodeStorages
= new DatanodeStorage[storages.length];
for (int i = 0; i < storages.length; i++) {
datanodeStorages[i] = storages[i].getStorage();
StorageBlockReport[] reports = createReports(datanodeStorages, 100);

// The first multiple send once, simulating the failure of the first report, only send successfully once
if(i == 0){
rpcServer.blockReport(dnRegistration, poolId, reports, brContext);
}

// Send blockReport
DatanodeCommand datanodeCommand = rpcServer.blockReport(dnRegistration, poolId, reports,
brContext);

// Wait until BlockManager calls processReport
delayer.waitForCall();

// Allow blockreport to proceed
delayer.proceed();

// Get result, it will not null if process successfully
assertTrue(datanodeCommand instanceof FinalizeCommand);
assertEquals(poolId, ((FinalizeCommand)datanodeCommand)
.getBlockPoolId());
if(i == 0){
assertEquals(2, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
}else{
assertEquals(1, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount());
}
}
}
}
}