Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
singer-bin committed Mar 14, 2022
1 parent a32cfc2 commit b8763c3
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;


import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_CLASSNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
Expand Down Expand Up @@ -149,6 +150,8 @@
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.util.*;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
Expand Down Expand Up @@ -341,7 +344,7 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY));
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,FS_GETSPACEUSED_CLASSNAME));

public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");

Expand Down Expand Up @@ -673,13 +676,30 @@ public String reconfigurePropertyImpl(String property, String newVal)
case DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY:
case DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY:
return reconfSlowDiskParameters(property, newVal);
case FS_GETSPACEUSED_CLASSNAME:
reconfSpaceUsedKlass();
return newVal;
default:
break;
}
throw new ReconfigurationException(
property, newVal, getConf().get(property));
}

private void reconfSpaceUsedKlass(){
List<FsVolumeImpl> volumeList = data.getVolumeList();
for (FsVolumeImpl fsVolume : volumeList) {
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
for (Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
try {
entry.getValue().refreshSpaceUsedKlass(getNewConf());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

private String reconfDataXceiverParameters(String property, String newVal)
throws ReconfigurationException {
String result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
Expand Down Expand Up @@ -679,4 +680,9 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
* @throws IOException
*/
MountVolumeMap getMountVolumeMap() throws IOException;

/**
* Get the volume list.
*/
List<FsVolumeImpl> getVolumeList();
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
*
* This class is synchronized by {@link FsVolumeImpl}.
*/
class BlockPoolSlice {
public class BlockPoolSlice {
static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class);

private final String bpid;
Expand Down Expand Up @@ -115,6 +115,7 @@ class BlockPoolSlice {
private final Timer timer;
private final int maxDataLength;
private final FileIoProvider fileIoProvider;
private final File bpDir;

private static ForkJoinPool addReplicaThreadPool = null;
private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
Expand All @@ -128,7 +129,7 @@ public int compare(File f1, File f2) {
};

// TODO:FEDERATION scalability issue - a thread per DU is needed
private final GetSpaceUsed dfsUsage;
private volatile GetSpaceUsed dfsUsage;

/**
* Create a blook pool slice
Expand All @@ -141,6 +142,7 @@ public int compare(File f1, File f2) {
*/
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
Configuration conf, Timer timer) throws IOException {
this.bpDir = bpDir;
this.bpid = bpid;
this.volume = volume;
this.fileIoProvider = volume.getFileIoProvider();
Expand Down Expand Up @@ -232,6 +234,15 @@ public void run() {
SHUTDOWN_HOOK_PRIORITY);
}

public void refreshSpaceUsedKlass(Configuration conf) throws IOException {
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
.setVolume(volume)
.setPath(bpDir)
.setConf(conf)
.setInitialUsed(loadDfsUsed())
.build();
}

private synchronized static void initializeAddReplicaPool(Configuration conf,
FsDatasetImpl dataset) {
if (addReplicaThreadPool == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3575,6 +3575,11 @@ public MountVolumeMap getMountVolumeMap() {
return volumes.getMountVolumeMap();
}

@Override
public List<FsVolumeImpl> getVolumeList() {
return volumes.getVolumes();
}

@Override
public boolean isDeletingBlock(String bpid, long blockId) {
synchronized(deletingBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ long getReserved(){
return reserved != null ? reserved.getReserved() : 0;
}

public Map<String, BlockPoolSlice> getBlockPoolSlices() {
return bpSlices;
}

@VisibleForTesting
BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
BlockPoolSlice bp = bpSlices.get(bpid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
import org.apache.hadoop.thirdparty.com.google.common.math.LongMath;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -1605,5 +1606,10 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
public MountVolumeMap getMountVolumeMap() {
return null;
}

@Override
public List<FsVolumeImpl> getVolumeList() {
return null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
Expand Down Expand Up @@ -467,4 +468,9 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
public MountVolumeMap getMountVolumeMap() {
return null;
}

@Override
public List<FsVolumeImpl> getVolumeList() {
return null;
}
}

0 comments on commit b8763c3

Please sign in to comment.