From af3ad91bcfb245b7e2c28aecd3ff5b8e465e22e0 Mon Sep 17 00:00:00 2001 From: tom lee Date: Fri, 31 Dec 2021 09:14:27 +0800 Subject: [PATCH] HDFS-16400. Reconfig DataXceiver parameters for datanode --- .../hadoop/hdfs/server/datanode/DataNode.java | 29 +++++++++++++- .../server/datanode/DataXceiverServer.java | 14 ++++++- .../datanode/TestDataNodeReconfiguration.java | 40 +++++++++++++++++++ .../hadoop/hdfs/tools/TestDFSAdmin.java | 2 +- 4 files changed, 81 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 8009577c06219..3c2a271395f18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -38,6 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT; @@ -303,7 +305,8 @@ public class DataNode extends ReconfigurableBase Arrays.asList( DFS_DATANODE_DATA_DIR_KEY, DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, - DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + DFS_DATANODE_MAX_RECEIVER_THREADS_KEY)); public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); @@ -647,6 +650,8 @@ public String reconfigurePropertyImpl(String property, String newVal) } break; } + case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY: + return reconfDataXceiverParameters(property, newVal); default: break; } @@ -654,6 +659,28 @@ public String reconfigurePropertyImpl(String property, String newVal) property, newVal, getConf().get(property)); } + private String reconfDataXceiverParameters(String property, String newVal) + throws ReconfigurationException { + String result; + try { + LOG.info("Reconfiguring {} to {}", property, newVal); + if (property.equals(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY)) { + Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized."); + int threads = (newVal == null ? DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT : + Integer.parseInt(newVal)); + result = Integer.toString(threads); + getXferServer().setMaxXceiverCount(threads); + } else { + throw new IllegalArgumentException("Unexpected property " + property + + " in reconfDataXceiverParameters"); + } + LOG.info("RECONFIGURE* changed {} to {}", property, newVal); + return result; + } catch (IllegalArgumentException e) { + throw new ReconfigurationException(property, newVal, getConf().get(property), e); + } + } + /** * Get a list of the keys of the re-configurable properties in configuration. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 77fa70e3cc54f..3a67b76e43467 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -68,8 +68,7 @@ class DataXceiverServer implements Runnable { * Enforcing the limit is required in order to avoid data-node * running out of memory. */ - int maxXceiverCount = - DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT; + volatile int maxXceiverCount; /** * A manager to make sure that cluster balancing does not take too much @@ -514,4 +513,15 @@ public boolean updateBalancerMaxConcurrentMovers(final int movers) { void setMaxReconfigureWaitTime(int max) { this.maxReconfigureWaitTime = max; } + + public void setMaxXceiverCount(int xceiverCount) { + Preconditions.checkArgument(xceiverCount > 0, + "dfs.datanode.max.transfer.threads should be larger than 0"); + maxXceiverCount = xceiverCount; + } + + @VisibleForTesting + public int getMaxXceiverCount() { + return maxXceiverCount; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index 47ac08c3b165f..b94ac70d8d19f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -365,4 +367,42 @@ public void testBlockReportIntervalReconfiguration() .getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); } } + + @Test + public void testDataXceiverReconfiguration() + throws ReconfigurationException { + for (int i = 0; i < NUM_DATA_NODE; i++) { + DataNode dn = cluster.getDataNodes().get(i); + + // Try invalid values. + try { + dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, "text"); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting NumberFormatException", + expected.getCause() instanceof NumberFormatException); + } + try { + dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(-1)); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting IllegalArgumentException", + expected.getCause() instanceof IllegalArgumentException); + } + + // Change properties and verify change. + dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123)); + assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY), + 123, dn.getXferServer().getMaxXceiverCount()); + + // Revert to default. + dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null); + assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY), + DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT, dn.getXferServer().getMaxXceiverCount()); + + assertEquals(String.format("expect %s is not configured", + DFS_DATANODE_MAX_RECEIVER_THREADS_KEY), null, + dn.getConf().get(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY)); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 351d883ab56fb..a478de06a0e21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -338,7 +338,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("datanode", address, outs, errs); - assertEquals(4, outs.size()); + assertEquals(5, outs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); }