diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java index 389dc83c8b504..eb65b3843a9c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminBackoffMonitor.java @@ -198,7 +198,7 @@ public void run() { int numDecommissioningNodes = numTrackedNodes + numQueuedNodes; if (numDecommissioningNodes > maxConcurrentTrackedNodes) { LOG.warn( - "There are {} nodes decommissioning but only {} nodes will be tracked at a time. " + "{} nodes are decommissioning but only {} nodes will be tracked at a time. " + "{} nodes are currently queued waiting to be decommissioned.", numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes); @@ -206,7 +206,7 @@ public void run() { final List unhealthyDns = outOfServiceNodeBlocks.keySet().stream() .filter(dn -> !blockManager.isNodeHealthyForDecommissionOrMaintenance(dn)) .collect(Collectors.toList()); - identifyUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> { + getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> { getPendingNodes().add(dn); outOfServiceNodeBlocks.remove(dn); pendingRep.remove(dn); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java index ba86555ace7a2..bb9e9efb8c4a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminDefaultMonitor.java @@ -287,12 +287,12 @@ private void check() { int numDecommissioningNodes = numTrackedNodes + numQueuedNodes; if (numDecommissioningNodes > maxConcurrentTrackedNodes) { LOG.warn( - "There are {} nodes decommissioning but only {} nodes will be tracked at a time. " + "{} nodes are decommissioning but only {} nodes will be tracked at a time. " + "{} nodes are currently queued waiting to be decommissioned.", numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes); // Re-queue unhealthy nodes to make space for decommissioning healthy nodes - identifyUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> { + getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> { getPendingNodes().add(dn); outOfServiceNodeBlocks.remove(dn); }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java index 11a4227625bca..f9761c2dfcb4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminMonitorBase.java @@ -42,7 +42,7 @@ public abstract class DatanodeAdminMonitorBase * Sort by lastUpdate time descending order, such that unhealthy * nodes are de-prioritized given they cannot be decommissioned. */ - public static final Comparator PENDING_NODES_QUEUE_COMPARATOR = + static final Comparator PENDING_NODES_QUEUE_COMPARATOR = (dn1, dn2) -> Long.compare(dn2.getLastUpdate(), dn1.getLastUpdate()); protected BlockManager blockManager; @@ -171,9 +171,9 @@ public Queue getPendingNodes() { * * @param unhealthyDns The unhealthy datanodes which may be re-queued * @param numDecommissioningNodes The total number of nodes being decommissioned - * @return List of unhealthy nodes to be re-queued + * @return Stream of unhealthy nodes to be re-queued */ - Stream identifyUnhealthyNodesToRequeue( + Stream getUnhealthyNodesToRequeue( final List unhealthyDns, int numDecommissioningNodes) { if (!unhealthyDns.isEmpty()) { // Compute the number of unhealthy nodes to re-queue diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeAdminMonitorBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeAdminMonitorBase.java index d412c95f0732b..451c63be131ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeAdminMonitorBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeAdminMonitorBase.java @@ -61,12 +61,9 @@ public class TestDatanodeAdminMonitorBase { @Test public void testPendingNodesQueueOrdering() { final PriorityQueue pendingNodes = - new PriorityQueue<>(10, - DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR); + new PriorityQueue<>(DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR); - for (int i = 0; i < NUM_DATANODE; i++) { - pendingNodes.add(NODES[i]); - } + pendingNodes.addAll(Arrays.asList(NODES)); for (int i = 0; i < NUM_DATANODE; i++) { final DatanodeDescriptor dn = pendingNodes.poll();