Skip to content

Commit

Permalink
Readjust the repair plan
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyanlei-8130 committed Aug 3, 2023
1 parent 3b72336 commit 57ecaed
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2866,18 +2866,14 @@ public boolean checkBlockReportLease(BlockReportContext context,
* The given storage is reporting all its blocks.
* Update the (storage{@literal -->}block list) and
* (block{@literal -->}storage list) maps.
* totalReportNum -> totalStorageReportsNum
* currentReportNum -> currentStorageReportIndex
*
* @return true if all known storages of the given DN have finished reporting.
* @throws IOException
*/
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
final BlockListAsLongs newReport,
BlockReportContext context,
int totalReportNum,
int currentReportNum) throws IOException {
BlockReportContext context) throws IOException {
namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
Expand Down Expand Up @@ -2908,13 +2904,12 @@ public boolean processReport(final DatanodeID nodeID,
}
if (namesystem.isInStartupSafeMode()
&& !StorageType.PROVIDED.equals(storageInfo.getStorageType())
&& storageInfo.getBlockReportCount() > 0
&& totalReportNum == currentReportNum) {
&& storageInfo.getBlockReportCount() > 0) {
blockLog.info("BLOCK* processReport 0x{} with lease ID 0x{}: "
+ "discarded non-initial block report from {}"
+ " because namenode still in startup phase",
strBlockReportId, fullBrLeaseId, nodeID);
blockReportLeaseManager.removeLease(node);
removeLease(node);
return !node.hasStaleStorages();
}

Expand Down Expand Up @@ -2962,6 +2957,19 @@ public boolean processReport(final DatanodeID nodeID,
return !node.hasStaleStorages();
}

// Remove the lease when we have received block reports for all storages for a particular DN.
void removeLease(DatanodeDescriptor node) {
boolean needRemoveLease = true;
for (DatanodeStorageInfo sInfo : node.getStorageInfos()) {
if (sInfo.getBlockReportCount() == 0) {
needRemoveLease = false;
}
}
if (needRemoveLease) {
blockReportLeaseManager.removeLease(node);
}
}

public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
final BlockReportContext context) throws IOException {
namesystem.writeLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
final int index = r;
noStaleStorages = bm.runBlockOp(() ->
bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context, reports.length, index + 1));
blocks, context));
}
} else {
throw new InvalidBlockReportLeaseException(context.getReportId(), context.getLeaseId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,12 +1080,12 @@ public void testSafeModeIBR() throws Exception {
reset(node);

bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, 1, 1);
BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, 1, 1);
BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount());

// re-register as if node restarted, should update existing node
Expand All @@ -1096,7 +1096,7 @@ public void testSafeModeIBR() throws Exception {
// send block report, should be processed after restart
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, 1, 1);
BlockListAsLongs.EMPTY, null);
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
Expand Down Expand Up @@ -1125,7 +1125,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception {
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null, 1, 1);
BlockListAsLongs.EMPTY, null);
assertEquals(1, ds.getBlockReportCount());
}

Expand Down Expand Up @@ -1198,7 +1198,7 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception {
// Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
builder.build(), null, 1, 1);
builder.build(), null);
assertEquals(1, ds.getBlockReportCount());

// verify the storage info is correct
Expand Down Expand Up @@ -1249,12 +1249,12 @@ public void testSafeModeWithProvidedStorageBR() throws Exception {
bmPs.getDatanodeManager().addDatanode(node1);

// process reports of provided storage and disk storage
bmPs.processReport(node0, providedStorage, BlockListAsLongs.EMPTY, null, 2, 1);
bmPs.processReport(node0, providedStorage, BlockListAsLongs.EMPTY, null);
bmPs.processReport(node0, new DatanodeStorage(ds0.getStorageID()),
BlockListAsLongs.EMPTY, null, 2, 2);
bmPs.processReport(node1, providedStorage, BlockListAsLongs.EMPTY, null, 2, 1);
BlockListAsLongs.EMPTY, null);
bmPs.processReport(node1, providedStorage, BlockListAsLongs.EMPTY, null);
bmPs.processReport(node1, new DatanodeStorage(ds1.getStorageID()),
BlockListAsLongs.EMPTY, null, 2, 2);
BlockListAsLongs.EMPTY, null);

// The provided stoage report should not affect disk storage report
DatanodeStorageInfo dsPs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doReturn;

/**
* Tests that BlockReportLease in BlockManager.
Expand Down

0 comments on commit 57ecaed

Please sign in to comment.