Skip to content

Commit

Permalink
HDFS-16413. Reconfig dfs usage parameters for datanode (#3863) (#4125)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomscut authored Mar 31, 2022
1 parent cfca024 commit 0ecb34f
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -154,10 +155,20 @@ boolean running() {
/**
* How long in between runs of the background refresh.
*/
long getRefreshInterval() {
@VisibleForTesting
public long getRefreshInterval() {
return refreshInterval;
}

/**
* Randomize the refresh interval timing by this amount, the actual interval will be chosen
* uniformly between {@code interval-jitter} and {@code interval+jitter}.
*/
@VisibleForTesting
public long getJitter() {
return jitter;
}

/**
* Reset the current used data amount. This should be called
* when the cached value is re-computed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
Expand Down Expand Up @@ -144,6 +148,8 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
Expand Down Expand Up @@ -344,7 +350,9 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY));
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
FS_DU_INTERVAL_KEY,
FS_GETSPACEUSED_JITTER_KEY));

public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");

Expand Down Expand Up @@ -668,6 +676,9 @@ public String reconfigurePropertyImpl(String property, String newVal)
case DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY:
case DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY:
return reconfSlowDiskParameters(property, newVal);
case FS_DU_INTERVAL_KEY:
case FS_GETSPACEUSED_JITTER_KEY:
return reconfDfsUsageParameters(property, newVal);
default:
break;
}
Expand Down Expand Up @@ -849,6 +860,43 @@ private String reconfSlowDiskParameters(String property, String newVal)
}
}

private String reconfDfsUsageParameters(String property, String newVal)
throws ReconfigurationException {
String result = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
if (property.equals(FS_DU_INTERVAL_KEY)) {
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
long interval = (newVal == null ? FS_DU_INTERVAL_DEFAULT :
Long.parseLong(newVal));
result = Long.toString(interval);
List<FsVolumeImpl> volumeList = data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(interval, null);
}
}
} else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) {
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
long jitter = (newVal == null ? FS_GETSPACEUSED_JITTER_DEFAULT :
Long.parseLong(newVal));
result = Long.toString(jitter);
List<FsVolumeImpl> volumeList = data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (BlockPoolSlice value : blockPoolSlices.values()) {
value.updateDfsUsageConfig(null, jitter);
}
}
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
} catch (IllegalArgumentException | IOException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
Expand Down Expand Up @@ -679,4 +680,9 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
* @throws IOException
*/
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;

/**
* Get the volume list.
*/
List<FsVolumeImpl> getVolumeList();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -78,14 +79,17 @@

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;

/**
* A block pool slice represents a portion of a block pool stored on a volume.
* Taken together, all BlockPoolSlices sharing a block pool ID across a
* cluster represent a single block pool.
*
* This class is synchronized by {@link FsVolumeImpl}.
*/
class BlockPoolSlice {
public class BlockPoolSlice {
static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class);

private final String bpid;
Expand All @@ -111,6 +115,8 @@ class BlockPoolSlice {
private final Timer timer;
private final int maxDataLength;
private final FileIoProvider fileIoProvider;
private final Configuration config;
private final File bpDir;

private static ForkJoinPool addReplicaThreadPool = null;
private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
Expand All @@ -124,7 +130,7 @@ public int compare(File f1, File f2) {
};

// TODO:FEDERATION scalability issue - a thread per DU is needed
private final GetSpaceUsed dfsUsage;
private volatile GetSpaceUsed dfsUsage;

/**
* Create a blook pool slice
Expand All @@ -137,6 +143,8 @@ public int compare(File f1, File f2) {
*/
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
Configuration conf, Timer timer) throws IOException {
this.config = conf;
this.bpDir = bpDir;
this.bpid = bpid;
this.volume = volume;
this.fileIoProvider = volume.getFileIoProvider();
Expand Down Expand Up @@ -228,6 +236,35 @@ public void run() {
SHUTDOWN_HOOK_PRIORITY);
}

public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException {
// Close the old dfsUsage if it is CachingGetSpaceUsed.
if (dfsUsage instanceof CachingGetSpaceUsed) {
((CachingGetSpaceUsed) dfsUsage).close();
}
if (interval != null) {
Preconditions.checkArgument(interval > 0,
FS_DU_INTERVAL_KEY + " should be larger than 0");
config.setLong(FS_DU_INTERVAL_KEY, interval);
}
if (jitter != null) {
Preconditions.checkArgument(jitter >= 0,
FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0");
config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter);
}
// Start new dfsUsage.
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
.setVolume(volume)
.setPath(bpDir)
.setConf(config)
.setInitialUsed(loadDfsUsed())
.build();
}

@VisibleForTesting
public GetSpaceUsed getDfsUsage() {
return dfsUsage;
}

private synchronized static void initializeAddReplicaPool(Configuration conf,
FsDatasetImpl dataset) {
if (addReplicaThreadPool == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3431,5 +3431,10 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) {
}
}
}

@Override
public List<FsVolumeImpl> getVolumeList() {
return volumes.getVolumes();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ long getRecentReserved() {
return recentReserved;
}

public Map<String, BlockPoolSlice> getBlockPoolSlices() {
return bpSlices;
}

long getReserved(){
return reserved != null ? reserved.getReserved() : 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import javax.management.ObjectName;
import javax.management.StandardMBean;

import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.thirdparty.com.google.common.math.LongMath;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -1595,5 +1596,10 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
}
return Collections.unmodifiableSet(replicas);
}

@Override
public List<FsVolumeImpl> getVolumeList() {
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.hadoop.hdfs.server.datanode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
Expand Down Expand Up @@ -49,15 +53,21 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -673,4 +683,77 @@ public void testSlowDiskParameters() throws ReconfigurationException, IOExceptio
dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs());
}
}

@Test
public void testDfsUsageParameters() throws ReconfigurationException {
String[] dfsUsageParameters = {
FS_DU_INTERVAL_KEY,
FS_GETSPACEUSED_JITTER_KEY};

for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);

// Try invalid values.
for (String parameter : dfsUsageParameters) {
try {
dn.reconfigureProperty(parameter, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}

try {
dn.reconfigureProperty(parameter, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
}

// Change and verify properties.
for (String parameter : dfsUsageParameters) {
dn.reconfigureProperty(parameter, "99");
}
List<FsVolumeImpl> volumeList = dn.data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
if (dfsUsage instanceof CachingGetSpaceUsed) {
assertEquals(99,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
assertEquals(99,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
}
}
}

// Revert to default and verify.
for (String parameter : dfsUsageParameters) {
dn.reconfigureProperty(parameter, null);
}
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
if (dfsUsage instanceof CachingGetSpaceUsed) {
assertEquals(String.format("expect %s is not configured",
FS_DU_INTERVAL_KEY), FS_DU_INTERVAL_DEFAULT,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
assertEquals(String.format("expect %s is not configured",
FS_GETSPACEUSED_JITTER_KEY), FS_GETSPACEUSED_JITTER_DEFAULT,
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
}
assertEquals(String.format("expect %s is not configured",
FS_DU_INTERVAL_KEY), null,
dn.getConf().get(FS_DU_INTERVAL_KEY));
assertEquals(String.format("expect %s is not configured",
FS_GETSPACEUSED_JITTER_KEY), null,
dn.getConf().get(FS_GETSPACEUSED_JITTER_KEY));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
Expand Down Expand Up @@ -465,4 +466,9 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
return Collections.EMPTY_SET;
}

@Override
public List<FsVolumeImpl> getVolumeList() {
return null;
}
}
Loading

0 comments on commit 0ecb34f

Please sign in to comment.