Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into HADOOP-18359
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Nov 7, 2022
2 parents 96013c4 + 6605302 commit 87e9cf3
Show file tree
Hide file tree
Showing 42 changed files with 1,072 additions and 238 deletions.
10 changes: 10 additions & 0 deletions hadoop-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,21 @@
<artifactId>hadoop-hdfs-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-native-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-nativetask</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3616,7 +3616,7 @@ private Block addStoredBlock(final BlockInfo block,
if (storedBlock == null || storedBlock.isDeleted()) {
// If this block does not belong to anyfile, then we are done.
blockLog.debug("BLOCK* addStoredBlock: {} on {} size {} but it does not belong to any file",
block, node, block.getNumBytes());
reportedBlock, node, reportedBlock.getNumBytes());
// we could add this block to invalidate set of this datanode.
// it will happen in next block report otherwise.
return block;
Expand All @@ -3631,12 +3631,12 @@ private Block addStoredBlock(final BlockInfo block,
(node.isDecommissioned() || node.isDecommissionInProgress()) ? 0 : 1;
if (logEveryBlock) {
blockLog.info("BLOCK* addStoredBlock: {} is added to {} (size={})",
node, storedBlock, storedBlock.getNumBytes());
node, reportedBlock, reportedBlock.getNumBytes());
}
} else if (result == AddBlockResult.REPLACED) {
curReplicaDelta = 0;
blockLog.warn("BLOCK* addStoredBlock: block {} moved to storageType " +
"{} on node {}", storedBlock, storageInfo.getStorageType(), node);
"{} on node {}", reportedBlock, storageInfo.getStorageType(), node);
} else {
// if the same block is added again and the replica was corrupt
// previously because of a wrong gen stamp, remove it from the
Expand All @@ -3646,8 +3646,8 @@ private Block addStoredBlock(final BlockInfo block,
curReplicaDelta = 0;
if (blockLog.isDebugEnabled()) {
blockLog.debug("BLOCK* addStoredBlock: Redundant addStoredBlock request"
+ " received for {} on node {} size {}", storedBlock, node,
storedBlock.getNumBytes());
+ " received for {} on node {} size {}", reportedBlock, node,
reportedBlock.getNumBytes());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
Expand Down Expand Up @@ -70,10 +71,10 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
outOfServiceNodeBlocks = new HashMap<>();

/**
* The numbe of blocks to process when moving blocks to pendingReplication
* The number of blocks to process when moving blocks to pendingReplication
* before releasing and reclaiming the namenode lock.
*/
private int blocksPerLock;
private volatile int blocksPerLock;

/**
* The number of blocks that have been checked on this tick.
Expand All @@ -82,7 +83,7 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
/**
* The maximum number of blocks to hold in PendingRep at any time.
*/
private int pendingRepLimit;
private volatile int pendingRepLimit;

/**
* The list of blocks which have been placed onto the replication queue
Expand Down Expand Up @@ -801,6 +802,26 @@ private boolean isBlockReplicatedOk(DatanodeDescriptor datanode,
return false;
}

@VisibleForTesting
@Override
public int getPendingRepLimit() {
return pendingRepLimit;
}

public void setPendingRepLimit(int pendingRepLimit) {
this.pendingRepLimit = pendingRepLimit;
}

@VisibleForTesting
@Override
public int getBlocksPerLock() {
return blocksPerLock;
}

public void setBlocksPerLock(int blocksPerLock) {
this.blocksPerLock = blocksPerLock;
}

static class BlockStats {
private LightWeightHashSet<Long> openFiles =
new LightWeightLinkedSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -137,6 +138,28 @@ public int getNumNodesChecked() {
return numNodesChecked;
}

@VisibleForTesting
@Override
public int getPendingRepLimit() {
return 0;
}

@Override
public void setPendingRepLimit(int pendingRepLimit) {
// nothing.
}

@VisibleForTesting
@Override
public int getBlocksPerLock() {
return 0;
}

@Override
public void setBlocksPerLock(int blocksPerLock) {
// nothing.
}

@Override
public void run() {
LOG.debug("DatanodeAdminMonitor is running.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,4 +419,30 @@ void runMonitorForTest() throws ExecutionException, InterruptedException {
executor.submit(monitor).get();
}

public void refreshPendingRepLimit(int pendingRepLimit, String key) {
ensurePositiveInt(pendingRepLimit, key);
this.monitor.setPendingRepLimit(pendingRepLimit);
}

@VisibleForTesting
public int getPendingRepLimit() {
return this.monitor.getPendingRepLimit();
}

public void refreshBlocksPerLock(int blocksPerLock, String key) {
ensurePositiveInt(blocksPerLock, key);
this.monitor.setBlocksPerLock(blocksPerLock);
}

@VisibleForTesting
public int getBlocksPerLock() {
return this.monitor.getBlocksPerLock();
}

private void ensurePositiveInt(int val, String key) {
Preconditions.checkArgument(
(val > 0),
key + " = '" + val + "' is invalid. " +
"It should be a positive, non-zero integer value.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ public interface DatanodeAdminMonitorInterface extends Runnable {
void setBlockManager(BlockManager bm);
void setDatanodeAdminManager(DatanodeAdminManager dnm);
void setNameSystem(Namesystem ns);

int getPendingRepLimit();

void setPendingRepLimit(int pendingRepLimit);

int getBlocksPerLock();

void setBlocksPerLock(int blocksPerLock);
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT;

import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
Expand Down Expand Up @@ -353,7 +357,9 @@ public enum OperationCategory {
DFS_BLOCK_INVALIDATE_LIMIT_KEY,
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY,
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY));
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK));

private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
Expand Down Expand Up @@ -2321,6 +2327,10 @@ protected String reconfigurePropertyImpl(String property, String newVal)
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
} else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) {
return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal);
} else if (property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT) ||
(property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK))) {
return reconfigureDecommissionBackoffMonitorParameters(datanodeManager, property,
newVal);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
Expand Down Expand Up @@ -2601,6 +2611,34 @@ private String reconfigureBlockInvalidateLimit(final DatanodeManager datanodeMan
}
}

private String reconfigureDecommissionBackoffMonitorParameters(
final DatanodeManager datanodeManager, final String property, final String newVal)
throws ReconfigurationException {
String newSetting = null;
try {
if (property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT)) {
int pendingRepLimit = (newVal == null ?
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT_DEFAULT :
Integer.parseInt(newVal));
datanodeManager.getDatanodeAdminManager().refreshPendingRepLimit(pendingRepLimit,
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT);
newSetting = String.valueOf(datanodeManager.getDatanodeAdminManager().getPendingRepLimit());
} else if (property.equals(
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK)) {
int blocksPerLock = (newVal == null ?
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK_DEFAULT :
Integer.parseInt(newVal));
datanodeManager.getDatanodeAdminManager().refreshBlocksPerLock(blocksPerLock,
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK);
newSetting = String.valueOf(datanodeManager.getDatanodeAdminManager().getBlocksPerLock());
}
LOG.info("RECONFIGURE* changed reconfigureDecommissionBackoffMonitorParameters {} to {}",
property, newSetting);
return newSetting;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

@Override // ReconfigurableBase
protected Configuration getNewConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminBackoffMonitor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminMonitorInterface;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
Expand Down Expand Up @@ -62,6 +64,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;

public class TestNameNodeReconfigure {
Expand Down Expand Up @@ -567,6 +571,87 @@ private List<Boolean> validatePeerReport(String jsonReport) {
return containReport;
}

@Test
public void testReconfigureDecommissionBackoffMonitorParameters()
throws ReconfigurationException, IOException {
Configuration conf = new HdfsConfiguration();
conf.setClass(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MONITOR_CLASS,
DatanodeAdminBackoffMonitor.class, DatanodeAdminMonitorInterface.class);
int defaultPendingRepLimit = 1000;
conf.setInt(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, defaultPendingRepLimit);
int defaultBlocksPerLock = 1000;
conf.setInt(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
defaultBlocksPerLock);

try (MiniDFSCluster newCluster = new MiniDFSCluster.Builder(conf).build()) {
newCluster.waitActive();
final NameNode nameNode = newCluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem
.getBlockManager().getDatanodeManager();

// verify defaultPendingRepLimit.
assertEquals(datanodeManager.getDatanodeAdminManager().getPendingRepLimit(),
defaultPendingRepLimit);

// try invalid pendingRepLimit.
try {
nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
"non-numeric");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.decommission.backoff.monitor.pending.limit from '" +
defaultPendingRepLimit + "' to 'non-numeric'", e.getMessage());
}

try {
nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
"-1");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.decommission.backoff.monitor.pending.limit from '" +
defaultPendingRepLimit + "' to '-1'", e.getMessage());
}

// try correct pendingRepLimit.
nameNode.reconfigureProperty(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
"20000");
assertEquals(datanodeManager.getDatanodeAdminManager().getPendingRepLimit(), 20000);

// verify defaultBlocksPerLock.
assertEquals(datanodeManager.getDatanodeAdminManager().getBlocksPerLock(),
defaultBlocksPerLock);

// try invalid blocksPerLock.
try {
nameNode.reconfigureProperty(
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
"non-numeric");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock from '" +
defaultBlocksPerLock + "' to 'non-numeric'", e.getMessage());
}

try {
nameNode.reconfigureProperty(
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, "-1");
fail("Should not reach here");
} catch (ReconfigurationException e) {
assertEquals("Could not change property " +
"dfs.namenode.decommission.backoff.monitor.pending.blocks.per.lock from '" +
defaultBlocksPerLock + "' to '-1'", e.getMessage());
}

// try correct blocksPerLock.
nameNode.reconfigureProperty(
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, "10000");
assertEquals(datanodeManager.getDatanodeAdminManager().getBlocksPerLock(), 10000);
}
}

@After
public void shutDown() throws IOException {
if (cluster != null) {
Expand Down
Loading

0 comments on commit 87e9cf3

Please sign in to comment.