Skip to content

Commit

Permalink
HDFS-17565. EC: dfs.datanode.ec.reconstruction.threads should support…
Browse files Browse the repository at this point in the history
… dynamic reconfigured.
  • Loading branch information
zhengchenyu committed Jul 8, 2024
1 parent 134dcf1 commit 14911e8
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
Expand Down Expand Up @@ -374,7 +376,8 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY,
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY,
DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY,
DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY));
DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
DFS_DN_EC_RECONSTRUCTION_THREADS_KEY));

public static final String METRICS_LOG_NAME = "DataNodeMetricsLog";

Expand Down Expand Up @@ -740,6 +743,8 @@ public String reconfigurePropertyImpl(String property, String newVal)
return reconfDiskBalancerParameters(property, newVal);
case DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY:
return reconfSlowIoWarningThresholdParameters(property, newVal);
case DFS_DN_EC_RECONSTRUCTION_THREADS_KEY:
return reconfStripedReconstructionParameters(property, newVal);
default:
break;
}
Expand Down Expand Up @@ -1079,6 +1084,22 @@ private String reconfSlowIoWarningThresholdParameters(String property, String ne
}
}

private String reconfStripedReconstructionParameters(String property, String newVal)
throws ReconfigurationException {
String result = null;
try {
if (property.equals(DFS_DN_EC_RECONSTRUCTION_THREADS_KEY)) {
int size = (newVal == null ? DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT :
Integer.parseInt(newVal));
result = Long.toString(size);
ecWorker.setStripedReconstructionPoolSize(size);
}
return result;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.erasurecode;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -37,6 +38,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_KEY;

/**
* ErasureCodingWorker handles the erasure coding reconstruction work commands.
* These commands would be issued from Namenode as part of Datanode's heart beat
Expand Down Expand Up @@ -172,4 +176,19 @@ public void shutDown() {
public float getXmitWeight() {
return xmitWeight;
}

public void setStripedReconstructionPoolSize(int size) {
Preconditions.checkArgument(size > 0,
DFS_DN_EC_RECONSTRUCTION_THREADS_KEY + " should be greater than 0");
this.stripedReconstructionPool.setCorePoolSize(size);
this.stripedReconstructionPool.setMaximumPoolSize(size);
}

@VisibleForTesting
public int getStripedReconstructionPoolSize() {
int poolSize = this.stripedReconstructionPool.getCorePoolSize();
Preconditions.checkArgument(poolSize == this.stripedReconstructionPool.getMaximumPoolSize(),
"The maximum pool size should be equal to core pool size");
return poolSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_THREADS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -948,4 +950,35 @@ public void testSlowIoWarningThresholdReconfiguration() throws Exception {
}
}

@Test
public void testDataNodeECReconstructionThreads() throws Exception {
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);

// Verify DFS_DN_EC_RECONSTRUCTION_THREADS_KEY.
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.datanode.ec.reconstruction.threads from "
+ "'8' to 'text'",
() -> dn.reconfigureProperty(DFS_DN_EC_RECONSTRUCTION_THREADS_KEY, "text"));
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.datanode.ec.reconstruction.threads from "
+ "'8' to '-1'",
() -> dn.reconfigureProperty(DFS_DN_EC_RECONSTRUCTION_THREADS_KEY, "-1"));
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.datanode.ec.reconstruction.threads from "
+ "'8' to '0'",
() -> dn.reconfigureProperty(DFS_DN_EC_RECONSTRUCTION_THREADS_KEY, "0"));

// Set value is 10.
dn.reconfigureProperty(DFS_DN_EC_RECONSTRUCTION_THREADS_KEY,
String.valueOf(10));
assertEquals(10, dn.getErasureCodingWorker().getStripedReconstructionPoolSize());

// Set default value.
dn.reconfigureProperty(DFS_DN_EC_RECONSTRUCTION_THREADS_KEY, null);
assertEquals(DFS_DN_EC_RECONSTRUCTION_THREADS_DEFAULT,
dn.getErasureCodingWorker().getStripedReconstructionPoolSize());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(26, outs.size());
assertEquals(27, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down

0 comments on commit 14911e8

Please sign in to comment.