Skip to content

Commit

Permalink
HDFS-16303. Improve handling of datanode lost while decommissioning r…
Browse files Browse the repository at this point in the history
…evision 5
  • Loading branch information
Kevin Wikant committed Dec 23, 2021
1 parent 44cd931 commit b57cbef
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ public void run() {
int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
LOG.warn(
"There are {} nodes decommissioning but only {} nodes will be tracked at a time. "
"{} nodes are decommissioning but only {} nodes will be tracked at a time. "
+ "{} nodes are currently queued waiting to be decommissioned.",
numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);

// Re-queue unhealthy nodes to make space for decommissioning healthy nodes
final List<DatanodeDescriptor> unhealthyDns = outOfServiceNodeBlocks.keySet().stream()
.filter(dn -> !blockManager.isNodeHealthyForDecommissionOrMaintenance(dn))
.collect(Collectors.toList());
identifyUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
getPendingNodes().add(dn);
outOfServiceNodeBlocks.remove(dn);
pendingRep.remove(dn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,12 @@ private void check() {
int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
LOG.warn(
"There are {} nodes decommissioning but only {} nodes will be tracked at a time. "
"{} nodes are decommissioning but only {} nodes will be tracked at a time. "
+ "{} nodes are currently queued waiting to be decommissioned.",
numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);

// Re-queue unhealthy nodes to make space for decommissioning healthy nodes
identifyUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
getPendingNodes().add(dn);
outOfServiceNodeBlocks.remove(dn);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class DatanodeAdminMonitorBase
* Sort by lastUpdate time descending order, such that unhealthy
* nodes are de-prioritized given they cannot be decommissioned.
*/
public static final Comparator<DatanodeDescriptor> PENDING_NODES_QUEUE_COMPARATOR =
static final Comparator<DatanodeDescriptor> PENDING_NODES_QUEUE_COMPARATOR =
(dn1, dn2) -> Long.compare(dn2.getLastUpdate(), dn1.getLastUpdate());

protected BlockManager blockManager;
Expand Down Expand Up @@ -171,9 +171,9 @@ public Queue<DatanodeDescriptor> getPendingNodes() {
*
* @param unhealthyDns The unhealthy datanodes which may be re-queued
* @param numDecommissioningNodes The total number of nodes being decommissioned
* @return List of unhealthy nodes to be re-queued
* @return Stream of unhealthy nodes to be re-queued
*/
Stream<DatanodeDescriptor> identifyUnhealthyNodesToRequeue(
Stream<DatanodeDescriptor> getUnhealthyNodesToRequeue(
final List<DatanodeDescriptor> unhealthyDns, int numDecommissioningNodes) {
if (!unhealthyDns.isEmpty()) {
// Compute the number of unhealthy nodes to re-queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,9 @@ public class TestDatanodeAdminMonitorBase {
@Test
public void testPendingNodesQueueOrdering() {
final PriorityQueue<DatanodeDescriptor> pendingNodes =
new PriorityQueue<>(10,
DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR);
new PriorityQueue<>(DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR);

for (int i = 0; i < NUM_DATANODE; i++) {
pendingNodes.add(NODES[i]);
}
pendingNodes.addAll(Arrays.asList(NODES));

for (int i = 0; i < NUM_DATANODE; i++) {
final DatanodeDescriptor dn = pendingNodes.poll();
Expand Down

0 comments on commit b57cbef

Please sign in to comment.