-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDFS-17093. Fix block report lease issue to avoid missing some storages report. #5855
Changes from 5 commits
a4b76d3
db14c0a
9a24b42
ecd493d
7222fe5
fb8e4d3
3b72336
57ecaed
f4a7c1c
3f614b8
5af06d9
ff1a312
adc77ee
2a542ff
bc401c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2866,14 +2866,18 @@ public boolean checkBlockReportLease(BlockReportContext context, | |
* The given storage is reporting all its blocks. | ||
* Update the (storage{@literal -->}block list) and | ||
* (block{@literal -->}storage list) maps. | ||
* totalReportNum -> totalStorageReportsNum | ||
* currentReportNum -> currentStorageReportIndex | ||
* | ||
* @return true if all known storages of the given DN have finished reporting. | ||
* @throws IOException | ||
*/ | ||
public boolean processReport(final DatanodeID nodeID, | ||
final DatanodeStorage storage, | ||
final BlockListAsLongs newReport, | ||
BlockReportContext context) throws IOException { | ||
BlockReportContext context, | ||
int totalReportNum, | ||
int currentReportNum) throws IOException { | ||
namesystem.writeLock(); | ||
final long startTime = Time.monotonicNow(); //after acquiring write lock | ||
final long endTime; | ||
|
@@ -2904,7 +2908,8 @@ public boolean processReport(final DatanodeID nodeID, | |
} | ||
if (namesystem.isInStartupSafeMode() | ||
&& !StorageType.PROVIDED.equals(storageInfo.getStorageType()) | ||
&& storageInfo.getBlockReportCount() > 0) { | ||
&& storageInfo.getBlockReportCount() > 0 | ||
&& totalReportNum == currentReportNum) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a datanode report twice during namenode safemode, the second report will be almost completely processed, which may extend startup time. How about modify code like this? This can also avoid changes in the method signature.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zhangshuyan0 Thank you for your retrial,This change can achieve the same effect, but I think node.hasStaleStorages() is also a Datanode-level operation that should also be called on the last disk, but logically, functionally, it's not that different。Listen to other people's opinions ,@Hexiaoqiao What do you think about that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like @zhangshuyan0's proposal better. The following section of code can also be separated to be a function on its own.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if this will be a good solution with condition There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Two of the cases will come into this logic
|
||
blockLog.info("BLOCK* processReport 0x{} with lease ID 0x{}: " | ||
+ "discarded non-initial block report from {}" | ||
+ " because namenode still in startup phase", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1650,7 +1650,7 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg, | |
final int index = r; | ||
noStaleStorages = bm.runBlockOp(() -> | ||
bm.processReport(nodeReg, reports[index].getStorage(), | ||
blocks, context)); | ||
blocks, context, reports.length, index + 1)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for reminding me of the shortcomings of my plan. I will try to improve it. However, the solution in this PR may not work. If datanode send one storage report per RPC, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great catch. +1, We have to solve this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for reminding me. I think we'll have to think of something else |
||
} | ||
} else { | ||
throw new InvalidBlockReportLeaseException(context.getReportId(), context.getLeaseId()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,8 @@ | |
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; | ||
import org.apache.hadoop.hdfs.server.datanode.DataNode; | ||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; | ||
import org.apache.hadoop.hdfs.server.namenode.NameNode; | ||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; | ||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; | ||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; | ||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; | ||
|
@@ -53,6 +55,7 @@ | |
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.Mockito.doAnswer; | ||
import static org.mockito.Mockito.spy; | ||
import static org.mockito.Mockito.doReturn; | ||
|
||
/** | ||
* Tests that BlockReportLease in BlockManager. | ||
|
@@ -269,4 +272,88 @@ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages, | |
} | ||
return storageBlockReports; | ||
} | ||
|
||
@Test | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need add a timeout here. |
||
public void testFirstIncompleteBlockReport() throws Exception { | ||
HdfsConfiguration conf = new HdfsConfiguration(); | ||
Random rand = new Random(); | ||
|
||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) | ||
.numDataNodes(1).build()) { | ||
cluster.waitActive(); | ||
|
||
FSNamesystem fsn = cluster.getNamesystem(); | ||
|
||
NameNode nameNode = cluster.getNameNode(); | ||
// pretend to be in safemode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first letter need to be uppercase and end with period at the end of sentence. |
||
NameNodeAdapter.enterSafeMode(nameNode, false); | ||
|
||
BlockManager blockManager = fsn.getBlockManager(); | ||
BlockManager spyBlockManager = spy(blockManager); | ||
fsn.setBlockManagerForTesting(spyBlockManager); | ||
String poolId = cluster.getNamesystem().getBlockPoolId(); | ||
|
||
NamenodeProtocols rpcServer = cluster.getNameNodeRpc(); | ||
|
||
// Test based on one DataNode report to Namenode | ||
DataNode dn = cluster.getDataNodes().get(0); | ||
DatanodeDescriptor datanodeDescriptor = spyBlockManager | ||
.getDatanodeManager().getDatanode(dn.getDatanodeId()); | ||
yuyanlei-8130 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId); | ||
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId); | ||
|
||
// Send heartbeat and request full block report lease | ||
HeartbeatResponse hbResponse = rpcServer.sendHeartbeat( | ||
dnRegistration, storages, 0, 0, 0, 0, 0, null, true, | ||
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); | ||
|
||
DelayAnswer delayer = new DelayAnswer(BlockManager.LOG); | ||
doAnswer(delayer).when(spyBlockManager).processReport( | ||
any(DatanodeStorageInfo.class), | ||
any(BlockListAsLongs.class)); | ||
|
||
ExecutorService pool = Executors.newFixedThreadPool(1); | ||
|
||
// Trigger sendBlockReport | ||
BlockReportContext brContext = new BlockReportContext(1, 0, | ||
rand.nextLong(), hbResponse.getFullBlockReportLeaseId()); | ||
// Build every storage with 100 blocks for sending report | ||
DatanodeStorage[] datanodeStorages | ||
= new DatanodeStorage[storages.length]; | ||
for (int i = 0; i < storages.length; i++) { | ||
datanodeStorages[i] = storages[i].getStorage(); | ||
StorageBlockReport[] reports = createReports(datanodeStorages, 100); | ||
|
||
// The first multiple send once, simulating the failure of the first report, only send successfully once | ||
if(i == 0){ | ||
rpcServer.blockReport(dnRegistration, poolId, reports, brContext); | ||
} | ||
|
||
// Send blockReport | ||
DatanodeCommand datanodeCommand = rpcServer.blockReport(dnRegistration, poolId, reports, | ||
brContext); | ||
|
||
// Wait until BlockManager calls processReport | ||
delayer.waitForCall(); | ||
|
||
// Remove full block report lease about dn | ||
spyBlockManager.getBlockReportLeaseManager() | ||
.removeLease(datanodeDescriptor); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem in this UT is the same as before, you still actively call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This removeLease operation should be in the processReport method, so let me modify that,This is misleading |
||
|
||
// Allow blockreport to proceed | ||
delayer.proceed(); | ||
|
||
// Get result, it will not null if process successfully | ||
assertTrue(datanodeCommand instanceof FinalizeCommand); | ||
assertEquals(poolId, ((FinalizeCommand)datanodeCommand) | ||
.getBlockPoolId()); | ||
if(i == 0){ | ||
assertEquals(2, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount()); | ||
}else{ | ||
assertEquals(1, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount()); | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a. Please add some Javadoc about added parameter.
b. Will this name be more readable?
totalReportNum -> totalStorageReportsNum,
currentReportNum -> storageReportIndex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok