Skip to content

Commit

Permalink
HDFS-16400. Reconfig DataXceiver parameters for datanode
Browse files Browse the repository at this point in the history
  • Loading branch information
tomscut committed Dec 31, 2021
1 parent 7950548 commit af3ad91
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
Expand Down Expand Up @@ -303,7 +305,8 @@ public class DataNode extends ReconfigurableBase
Arrays.asList(
DFS_DATANODE_DATA_DIR_KEY,
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY));

public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");

Expand Down Expand Up @@ -647,13 +650,37 @@ public String reconfigurePropertyImpl(String property, String newVal)
}
break;
}
case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
return reconfDataXceiverParameters(property, newVal);
default:
break;
}
throw new ReconfigurationException(
property, newVal, getConf().get(property));
}

private String reconfDataXceiverParameters(String property, String newVal)
throws ReconfigurationException {
String result;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
if (property.equals(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY)) {
Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
int threads = (newVal == null ? DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT :
Integer.parseInt(newVal));
result = Integer.toString(threads);
getXferServer().setMaxXceiverCount(threads);
} else {
throw new IllegalArgumentException("Unexpected property " + property +
" in reconfDataXceiverParameters");
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ class DataXceiverServer implements Runnable {
* Enforcing the limit is required in order to avoid data-node
* running out of memory.
*/
int maxXceiverCount =
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
volatile int maxXceiverCount;

/**
* A manager to make sure that cluster balancing does not take too much
Expand Down Expand Up @@ -514,4 +513,15 @@ public boolean updateBalancerMaxConcurrentMovers(final int movers) {
void setMaxReconfigureWaitTime(int max) {
this.maxReconfigureWaitTime = max;
}

public void setMaxXceiverCount(int xceiverCount) {
Preconditions.checkArgument(xceiverCount > 0,
"dfs.datanode.max.transfer.threads should be larger than 0");
maxXceiverCount = xceiverCount;
}

@VisibleForTesting
public int getMaxXceiverCount() {
return maxXceiverCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -365,4 +367,42 @@ public void testBlockReportIntervalReconfiguration()
.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
}
}

@Test
public void testDataXceiverReconfiguration()
throws ReconfigurationException {
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);

// Try invalid values.
try {
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}

// Change properties and verify change.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123));
assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
123, dn.getXferServer().getMaxXceiverCount());

// Revert to default.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null);
assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT, dn.getXferServer().getMaxXceiverCount());

assertEquals(String.format("expect %s is not configured",
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY), null,
dn.getConf().get(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(4, outs.size());
assertEquals(5, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down

0 comments on commit af3ad91

Please sign in to comment.