Skip to content

Commit

Permalink
HADOOP-18757. S3A Committer only finalizes the commits in a single th…
Browse files Browse the repository at this point in the history
…read (#5706)

Contributed by Moditha Hewasinghage

<!--
  Thanks for sending a pull request!
    1. If this is your first time, please read our contributor guidelines: https://cwiki.apache.org/confluence/display/HADOOP/How+To+Contribute
    2. Make sure your PR title starts with JIRA issue id, e.g., 'HADOOP-17799. Your PR title ...'.
-->

### Description of PR

### How was this patch tested?

### For code changes:

- [ ] Does the title or this PR starts with the corresponding JIRA issue id (e.g. 'HADOOP-17799. Your PR title ...')?
- [ ] Object storage: have the integration tests been executed and the endpoint declared according to the connector-specific documentation?
- [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
- [ ] If applicable, have you updated the `LICENSE`, `LICENSE-binary`, `NOTICE-binary` files?
  • Loading branch information
yuyanlei-8130 committed Jul 19, 2023
1 parent b6b2590 commit a4b76d3
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2873,7 +2873,9 @@ public boolean checkBlockReportLease(BlockReportContext context,
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
final BlockListAsLongs newReport,
BlockReportContext context) throws IOException {
BlockReportContext context,
int totalReportNum,
int currentReportNum) throws IOException {
namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
Expand Down Expand Up @@ -2904,7 +2906,8 @@ public boolean processReport(final DatanodeID nodeID,
}
if (namesystem.isInStartupSafeMode()
&& !StorageType.PROVIDED.equals(storageInfo.getStorageType())
&& storageInfo.getBlockReportCount() > 0) {
&& storageInfo.getBlockReportCount() > 0
&& totalReportNum == currentReportNum) {
blockLog.info("BLOCK* processReport 0x{} with lease ID 0x{}: "
+ "discarded non-initial block report from {}"
+ " because namenode still in startup phase",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
final int index = r;
noStaleStorages = bm.runBlockOp(() ->
bm.processReport(nodeReg, reports[index].getStorage(),
blocks, context));
blocks, context, reports.length, index+1));
}
} else {
throw new InvalidBlockReportLeaseException(context.getReportId(), context.getLeaseId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,12 +1080,12 @@ public void testSafeModeIBR() throws Exception {
reset(node);

bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 1, 1);
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 1, 1);
assertEquals(1, ds.getBlockReportCount());

// re-register as if node restarted, should update existing node
Expand All @@ -1096,7 +1096,7 @@ public void testSafeModeIBR() throws Exception {
// send block report, should be processed after restart
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 1, 1);
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
Expand Down Expand Up @@ -1125,7 +1125,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception {
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 1, 1);
assertEquals(1, ds.getBlockReportCount());
}

Expand Down Expand Up @@ -1198,7 +1198,7 @@ public void testSafeModeIBRBeforeFirstFullBR() throws Exception {
// Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
builder.build(), null);
builder.build(), null, 1, 1);
assertEquals(1, ds.getBlockReportCount());

// verify the storage info is correct
Expand Down Expand Up @@ -1249,12 +1249,12 @@ public void testSafeModeWithProvidedStorageBR() throws Exception {
bmPs.getDatanodeManager().addDatanode(node1);

// process reports of provided storage and disk storage
bmPs.processReport(node0, providedStorage, BlockListAsLongs.EMPTY, null);
bmPs.processReport(node0, providedStorage, BlockListAsLongs.EMPTY, null, 2, 1);
bmPs.processReport(node0, new DatanodeStorage(ds0.getStorageID()),
BlockListAsLongs.EMPTY, null);
bmPs.processReport(node1, providedStorage, BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 2, 2);
bmPs.processReport(node1, providedStorage, BlockListAsLongs.EMPTY, null, 2, 1);
bmPs.processReport(node1, new DatanodeStorage(ds1.getStorageID()),
BlockListAsLongs.EMPTY, null);
BlockListAsLongs.EMPTY, null, 2, 2);

// The provided stoage report should not affect disk storage report
DatanodeStorageInfo dsPs =
Expand Down

0 comments on commit a4b76d3

Please sign in to comment.