-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDFS-17093. Fix block report lease issue to avoid missing some storages report. #5855
Changes from 10 commits
a4b76d3
db14c0a
9a24b42
ecd493d
7222fe5
fb8e4d3
3b72336
57ecaed
f4a7c1c
3f614b8
5af06d9
ff1a312
adc77ee
2a542ff
bc401c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2909,7 +2909,7 @@ public boolean processReport(final DatanodeID nodeID, | |
+ "discarded non-initial block report from {}" | ||
+ " because namenode still in startup phase", | ||
strBlockReportId, fullBrLeaseId, nodeID); | ||
blockReportLeaseManager.removeLease(node); | ||
removeDNLeaseIfNeeded(node); | ||
return !node.hasStaleStorages(); | ||
} | ||
|
||
|
@@ -2957,6 +2957,22 @@ public boolean processReport(final DatanodeID nodeID, | |
return !node.hasStaleStorages(); | ||
} | ||
|
||
/** | ||
* Remove the DN lease only when we have received block reports | ||
* for all storages for a particular DN. | ||
*/ | ||
void removeDNLeaseIfNeeded(DatanodeDescriptor node) { | ||
boolean needRemoveLease = true; | ||
for (DatanodeStorageInfo sInfo : node.getStorageInfos()) { | ||
if (sInfo.getBlockReportCount() == 0) { | ||
needRemoveLease = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should fast break here if meet There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I do. Let me add it |
||
} | ||
} | ||
if (needRemoveLease) { | ||
blockReportLeaseManager.removeLease(node); | ||
} | ||
} | ||
|
||
public void removeBRLeaseIfNeeded(final DatanodeID nodeID, | ||
final BlockReportContext context) throws IOException { | ||
namesystem.writeLock(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,8 @@ | |
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; | ||
import org.apache.hadoop.hdfs.server.datanode.DataNode; | ||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; | ||
import org.apache.hadoop.hdfs.server.namenode.NameNode; | ||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; | ||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; | ||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; | ||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; | ||
|
@@ -269,4 +271,85 @@ private StorageBlockReport[] createReports(DatanodeStorage[] dnStorages, | |
} | ||
return storageBlockReports; | ||
} | ||
|
||
@Test(timeout = 360000) | ||
public void testFirstIncompleteBlockReport() throws Exception { | ||
HdfsConfiguration conf = new HdfsConfiguration(); | ||
Random rand = new Random(); | ||
|
||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) | ||
.numDataNodes(1).build()) { | ||
cluster.waitActive(); | ||
|
||
FSNamesystem fsn = cluster.getNamesystem(); | ||
|
||
NameNode nameNode = cluster.getNameNode(); | ||
// pretend to be in safemode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first letter need to be uppercase and end with period at the end of sentence. |
||
NameNodeAdapter.enterSafeMode(nameNode, false); | ||
|
||
BlockManager blockManager = fsn.getBlockManager(); | ||
BlockManager spyBlockManager = spy(blockManager); | ||
fsn.setBlockManagerForTesting(spyBlockManager); | ||
String poolId = cluster.getNamesystem().getBlockPoolId(); | ||
|
||
NamenodeProtocols rpcServer = cluster.getNameNodeRpc(); | ||
|
||
// Test based on one DataNode report to Namenode | ||
DataNode dn = cluster.getDataNodes().get(0); | ||
DatanodeDescriptor datanodeDescriptor = spyBlockManager | ||
.getDatanodeManager().getDatanode(dn.getDatanodeId()); | ||
yuyanlei-8130 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId); | ||
StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId); | ||
|
||
// Send heartbeat and request full block report lease | ||
HeartbeatResponse hbResponse = rpcServer.sendHeartbeat( | ||
dnRegistration, storages, 0, 0, 0, 0, 0, null, true, | ||
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); | ||
|
||
DelayAnswer delayer = new DelayAnswer(BlockManager.LOG); | ||
doAnswer(delayer).when(spyBlockManager).processReport( | ||
any(DatanodeStorageInfo.class), | ||
any(BlockListAsLongs.class)); | ||
|
||
ExecutorService pool = Executors.newFixedThreadPool(1); | ||
|
||
// Trigger sendBlockReport | ||
BlockReportContext brContext = new BlockReportContext(1, 0, | ||
rand.nextLong(), hbResponse.getFullBlockReportLeaseId()); | ||
// Build every storage with 100 blocks for sending report | ||
DatanodeStorage[] datanodeStorages | ||
= new DatanodeStorage[storages.length]; | ||
for (int i = 0; i < storages.length; i++) { | ||
datanodeStorages[i] = storages[i].getStorage(); | ||
StorageBlockReport[] reports = createReports(datanodeStorages, 100); | ||
|
||
// The first multiple send once, simulating the failure of the first report, | ||
// only send successfully once | ||
if(i == 0){ | ||
rpcServer.blockReport(dnRegistration, poolId, reports, brContext); | ||
} | ||
|
||
// Send blockReport | ||
DatanodeCommand datanodeCommand = rpcServer.blockReport(dnRegistration, poolId, reports, | ||
brContext); | ||
|
||
// Wait until BlockManager calls processReport | ||
delayer.waitForCall(); | ||
|
||
// Allow blockreport to proceed | ||
delayer.proceed(); | ||
|
||
// Get result, it will not null if process successfully | ||
assertTrue(datanodeCommand instanceof FinalizeCommand); | ||
assertEquals(poolId, ((FinalizeCommand)datanodeCommand) | ||
.getBlockPoolId()); | ||
if(i == 0){ | ||
assertEquals(2, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount()); | ||
}else{ | ||
assertEquals(1, datanodeDescriptor.getStorageInfos()[i].getBlockReportCount()); | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix checkstyle.