Skip to content

Commit

Permalink
HDFS-17091. Blocks on DECOMMISSIONING DNs should be sorted properly i…
Browse files Browse the repository at this point in the history
…n LocatedBlocks.
  • Loading branch information
wangyuanben committed Jul 17, 2023
1 parent c44823d commit 789f773
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,20 @@ public static SecureRandom getSecureRandom() {
public static class ServiceComparator implements Comparator<DatanodeInfo> {
@Override
public int compare(DatanodeInfo a, DatanodeInfo b) {
// Decommissioned nodes will still be moved to the end of the list
// Decommissioned nodes will be moved to the end of the list.
if (a.isDecommissioned()) {
return b.isDecommissioned() ? 0 : 1;
} else if (b.isDecommissioned()) {
return -1;
}

// Decommissioning nodes will be placed before decommissioned nodes.
if (a.isDecommissionInProgress()) {
return b.isDecommissionInProgress() ? 0 : 1;
} else if (b.isDecommissionInProgress()) {
return -1;
}

// ENTERING_MAINTENANCE nodes should be after live nodes.
if (a.isEnteringMaintenance()) {
return b.isEnteringMaintenance() ? 0 : 1;
Expand All @@ -159,9 +166,9 @@ public int compare(DatanodeInfo a, DatanodeInfo b) {

/**
* Comparator for sorting DataNodeInfo[] based on
* slow, stale, entering_maintenance and decommissioned states.
* slow, stale, entering_maintenance, decommissioning and decommissioned states.
* Order: live {@literal ->} slow {@literal ->} stale {@literal ->}
* entering_maintenance {@literal ->} decommissioned
* entering_maintenance {@literal ->} decommissioning {@literal ->} decommissioned
*/
@InterfaceAudience.Private
public static class StaleAndSlowComparator extends ServiceComparator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,9 @@ public DatanodeStatistics getDatanodeStatistics() {
}

private boolean isInactive(DatanodeInfo datanode) {
return datanode.isDecommissioned() || datanode.isEnteringMaintenance() ||
return datanode.isDecommissioned() ||
datanode.isDecommissionInProgress() ||
datanode.isEnteringMaintenance() ||
(avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
}

Expand Down Expand Up @@ -540,7 +542,7 @@ public int getMaxSlowpeerCollectNodes() {
/**
* Sort the non-striped located blocks by the distance to the target host.
*
* For striped blocks, it will only move decommissioned/stale/slow
* For striped blocks, it will only move decommissioned/decommissioning/stale/slow
* nodes to the bottom. For example, assume we have storage list:
* d0, d1, d2, d3, d4, d5, d6, d7, d8, d9
* mapping to block indices:
Expand Down Expand Up @@ -570,7 +572,7 @@ public void sortLocatedBlocks(final String targetHost,
}

/**
* Move decommissioned/entering_maintenance/stale/slow
* Move decommissioned/decommissioning/entering_maintenance/stale/slow
* datanodes to the bottom. After sorting it will
* update block indices and block tokens respectively.
*
Expand All @@ -588,7 +590,8 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
locToIndex.put(di[i], lsb.getBlockIndices()[i]);
locToToken.put(di[i], lsb.getBlockTokens()[i]);
}
// Move decommissioned/stale datanodes to the bottom
// Arrange the order of datanodes as follows:
// live(in-service) -> stale -> entering_maintenance -> decommissioning -> decommissioned
Arrays.sort(di, comparator);

// must update cache since we modified locations array
Expand All @@ -602,7 +605,7 @@ private void sortLocatedStripedBlock(final LocatedBlock lb,
}

/**
* Move decommissioned/entering_maintenance/stale/slow
* Move decommissioned/decommissioning/entering_maintenance/stale/slow
* datanodes to the bottom. Also, sort nodes by network
* distance.
*
Expand Down Expand Up @@ -634,8 +637,8 @@ private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
}

DatanodeInfoWithStorage[] di = lb.getLocations();
// Move decommissioned/entering_maintenance/stale/slow
// datanodes to the bottom
// Arrange the order of datanodes as follows:
// live(in-service) -> stale -> entering_maintenance -> decommissioning -> decommissioned
Arrays.sort(di, comparator);

// Sort nodes by network distance only for located blocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class TestSortLocatedBlock {
*/
@Test(timeout = 30000)
public void testWithStaleDatanodes() throws IOException {
long blockID = Long.MIN_VALUE;
long blockID = Long.MAX_VALUE;
int totalDns = 5;
DatanodeInfo[] locs = new DatanodeInfo[totalDns];

Expand Down Expand Up @@ -125,10 +125,10 @@ public void testWithStaleDatanodes() throws IOException {
*
* After sorting the expected datanodes list will be:
* live -> slow -> stale -> staleAndSlow ->
* entering_maintenance -> decommissioned.
* entering_maintenance -> decommissioning -> decommissioned.
*
* avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=true
* d5 -> d4 -> d3 -> d2 -> d1 -> d0
* d6 -> d5 -> d4 -> d3 -> d2 -> d1 -> d0
*/
@Test(timeout = 30000)
public void testAviodStaleAndSlowDatanodes() throws IOException {
Expand All @@ -137,7 +137,7 @@ public void testAviodStaleAndSlowDatanodes() throws IOException {

ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
locatedBlocks.add(new LocatedBlock(
new ExtendedBlock("pool", Long.MIN_VALUE,
new ExtendedBlock("pool", Long.MAX_VALUE,
1024L, new Date().getTime()), locs));

// sort located blocks
Expand All @@ -148,19 +148,21 @@ public void testAviodStaleAndSlowDatanodes() throws IOException {
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();

// assert location order:
// live -> stale -> entering_maintenance -> decommissioned
// live -> stale -> entering_maintenance -> decommissioning -> decommissioned
// live
assertEquals(locs[5].getIpAddr(), locations[0].getIpAddr());
assertEquals(locs[6].getIpAddr(), locations[0].getIpAddr());
// slow
assertEquals(locs[4].getIpAddr(), locations[1].getIpAddr());
assertEquals(locs[5].getIpAddr(), locations[1].getIpAddr());
// stale
assertEquals(locs[3].getIpAddr(), locations[2].getIpAddr());
assertEquals(locs[4].getIpAddr(), locations[2].getIpAddr());
// stale and slow
assertEquals(locs[2].getIpAddr(), locations[3].getIpAddr());
assertEquals(locs[3].getIpAddr(), locations[3].getIpAddr());
// entering_maintenance
assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr());
assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr());
// decommissioning
assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr());
// decommissioned
assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr());
assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr());
}

/**
Expand All @@ -169,10 +171,10 @@ public void testAviodStaleAndSlowDatanodes() throws IOException {
*
* After sorting the expected datanodes list will be:
* (live <-> slow) -> (stale <-> staleAndSlow) ->
* entering_maintenance -> decommissioned.
* entering_maintenance -> decommissioning -> decommissioned.
*
* avoidStaleDataNodesForRead=true && avoidSlowDataNodesForRead=false
* (d5 <-> d4) -> (d3 <-> d2) -> d1 -> d0
* (d6 <-> d5) -> (d4 <-> d3) -> d2 -> d1 -> d0
*/
@Test(timeout = 30000)
public void testAviodStaleDatanodes() throws IOException {
Expand All @@ -181,7 +183,7 @@ public void testAviodStaleDatanodes() throws IOException {

ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
locatedBlocks.add(new LocatedBlock(
new ExtendedBlock("pool", Long.MIN_VALUE,
new ExtendedBlock("pool", Long.MAX_VALUE,
1024L, new Date().getTime()), locs));

// sort located blocks
Expand All @@ -192,21 +194,23 @@ public void testAviodStaleDatanodes() throws IOException {
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();

// assert location order:
// live -> stale -> entering_maintenance -> decommissioned
// live -> stale -> entering_maintenance -> decommissioning -> decommissioned.
// live
assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() &&
locs[4].getIpAddr() == locations[1].getIpAddr()) ||
locs[6].getIpAddr() == locations[1].getIpAddr()) ||
(locs[5].getIpAddr() == locations[1].getIpAddr() &&
locs[4].getIpAddr() == locations[0].getIpAddr()));
locs[6].getIpAddr() == locations[0].getIpAddr()));
// stale
assertTrue((locs[3].getIpAddr() == locations[2].getIpAddr() &&
locs[2].getIpAddr() == locations[3].getIpAddr()) ||
(locs[3].getIpAddr() == locations[3].getIpAddr() &&
locs[2].getIpAddr() == locations[2].getIpAddr()));
assertTrue((locs[4].getIpAddr() == locations[3].getIpAddr() &&
locs[3].getIpAddr() == locations[2].getIpAddr()) ||
(locs[4].getIpAddr() == locations[2].getIpAddr() &&
locs[3].getIpAddr() == locations[3].getIpAddr()));
// entering_maintenance
assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr());
assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr());
// decommissioning
assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr());
// decommissioned
assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr());
assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr());
}

/**
Expand All @@ -215,10 +219,10 @@ public void testAviodStaleDatanodes() throws IOException {
*
* After sorting the expected datanodes list will be:
* (live <-> stale) -> (slow <-> staleAndSlow) ->
* entering_maintenance -> decommissioned.
* entering_maintenance -> decommissioning -> decommissioned.
*
* avoidStaleDataNodesForRead=false && avoidSlowDataNodesForRead=true
* (d5 -> d3) -> (d4 <-> d2) -> d1 -> d0
* (d6 -> d4) -> (d5 <-> d3) -> d2 -> d1 -> d0
*/
@Test(timeout = 30000)
public void testAviodSlowDatanodes() throws IOException {
Expand All @@ -227,7 +231,7 @@ public void testAviodSlowDatanodes() throws IOException {

ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
locatedBlocks.add(new LocatedBlock(
new ExtendedBlock("pool", Long.MIN_VALUE,
new ExtendedBlock("pool", Long.MAX_VALUE,
1024L, new Date().getTime()), locs));

// sort located blocks
Expand All @@ -238,34 +242,87 @@ public void testAviodSlowDatanodes() throws IOException {
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();

// assert location order:
// live -> slow -> entering_maintenance -> decommissioned
// live -> slow -> entering_maintenance -> decommissioning -> decommissioned.
// live
assertTrue((locs[5].getIpAddr() == locations[0].getIpAddr() &&
locs[3].getIpAddr() == locations[1].getIpAddr()) ||
(locs[5].getIpAddr() == locations[1].getIpAddr() &&
locs[3].getIpAddr() == locations[0].getIpAddr()));
assertTrue((locs[6].getIpAddr() == locations[0].getIpAddr() &&
locs[4].getIpAddr() == locations[1].getIpAddr()) ||
(locs[6].getIpAddr() == locations[1].getIpAddr() &&
locs[4].getIpAddr() == locations[0].getIpAddr()));
// slow
assertTrue((locs[4].getIpAddr() == locations[2].getIpAddr() &&
locs[2].getIpAddr() == locations[3].getIpAddr()) ||
(locs[4].getIpAddr() == locations[3].getIpAddr() &&
locs[2].getIpAddr() == locations[2].getIpAddr()));
assertTrue((locs[5].getIpAddr() == locations[2].getIpAddr() &&
locs[3].getIpAddr() == locations[3].getIpAddr()) ||
(locs[5].getIpAddr() == locations[3].getIpAddr() &&
locs[3].getIpAddr() == locations[2].getIpAddr()));
// entering_maintenance
assertEquals(locs[1].getIpAddr(), locations[4].getIpAddr());
assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr());
// decommissioning
assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr());
// decommissioned
assertEquals(locs[0].getIpAddr(), locations[5].getIpAddr());
assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr());
}

/**
* Test to verify sorting with multiple state
* datanodes exists in storage lists.
*
* After sorting the expected datanodes list will be:
* (live <-> stale <-> slow <-> staleAndSlow) ->
* entering_maintenance -> decommissioning -> decommissioned.
*
* avoidStaleDataNodesForRead=false && avoidSlowDataNodesForRead=false
* (d6 <-> d5 <-> d4 <-> d3) -> d2 -> d1 -> d0
*/
@Test(timeout = 30000)
public void testWithServiceComparator() throws IOException {
DatanodeManager dm = mockDatanodeManager(false, false);
DatanodeInfo[] locs = mockDatanodes(dm);

// mark live/slow/stale datanodes
ArrayList<DatanodeInfo> list = new ArrayList<>();
for (DatanodeInfo loc : locs) {
list.add(loc);
}

// generate blocks
ArrayList<LocatedBlock> locatedBlocks = new ArrayList<>();
locatedBlocks.add(new LocatedBlock(
new ExtendedBlock("pool", Long.MAX_VALUE,
1024L, new Date().getTime()), locs));

// sort located blocks
dm.sortLocatedBlocks(null, locatedBlocks);

// get locations after sorting
LocatedBlock locatedBlock = locatedBlocks.get(0);
DatanodeInfoWithStorage[] locations = locatedBlock.getLocations();

// assert location order:
// live/slow/stale -> entering_maintenance -> decommissioning -> decommissioned.
// live/slow/stale
assertTrue(list.contains(locations[0]) &&
list.contains(locations[1]) &&
list.contains(locations[2]) &&
list.contains(locations[3]));
// entering_maintenance
assertEquals(locs[2].getIpAddr(), locations[4].getIpAddr());
// decommissioning
assertEquals(locs[1].getIpAddr(), locations[5].getIpAddr());
// decommissioned
assertEquals(locs[0].getIpAddr(), locations[6].getIpAddr());
}

/**
* We mock the following list of datanodes, and create LocatedBlock.
* d0 - decommissioned
* d1 - entering_maintenance
* d2 - stale and slow
* d3 - stale
* d4 - slow
* d5 - live(in-service)
* d1 - decommissioning
* d2 - entering_maintenance
* d3 - stale and slow
* d4 - stale
* d5 - slow
* d6 - live(in-service)
*/
private static DatanodeInfo[] mockDatanodes(DatanodeManager dm) {
int totalDns = 6;
int totalDns = 7;
DatanodeInfo[] locs = new DatanodeInfo[totalDns];

// create datanodes
Expand All @@ -276,17 +333,19 @@ private static DatanodeInfo[] mockDatanodes(DatanodeManager dm) {
}
// set decommissioned state
locs[0].setDecommissioned();
// set decommissioning state
locs[1].startDecommission();
// set entering_maintenance state
locs[1].startMaintenance();
locs[2].startMaintenance();
// set stale and slow state
locs[2].setLastUpdateMonotonic(Time.monotonicNow() -
locs[3].setLastUpdateMonotonic(Time.monotonicNow() -
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1);
dm.addSlowPeers(locs[2].getDatanodeUuid());
dm.addSlowPeers(locs[3].getDatanodeUuid());
// set stale state
locs[3].setLastUpdateMonotonic(Time.monotonicNow() -
locs[4].setLastUpdateMonotonic(Time.monotonicNow() -
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT * 1000 - 1);
// set slow state
dm.addSlowPeers(locs[4].getDatanodeUuid());
dm.addSlowPeers(locs[5].getDatanodeUuid());

return locs;
}
Expand Down

0 comments on commit 789f773

Please sign in to comment.