Skip to content

Commit

Permalink
HDFS-17243. Add the parameter storage type for getBlocks method (#6238)…
Browse files Browse the repository at this point in the history
…. Contributed by Haiyang Hu.

Reviewed-by: He Xiaoqiao <[email protected]>
Reviewed-by: Tao Li <[email protected]>
Signed-off-by: Shuyan Zhang <[email protected]>
  • Loading branch information
haiyang1987 authored Nov 6, 2023
1 parent c15fd3b commit 4ef2322
Show file tree
Hide file tree
Showing 15 changed files with 142 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
Expand Down Expand Up @@ -53,7 +54,7 @@ public RouterNamenodeProtocol(RouterRpcServer server) {

@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize, long hotBlockTimeInterval) throws IOException {
long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException {
rpcServer.checkOperation(OperationCategory.READ);

// Get the namespace where the datanode is located
Expand All @@ -79,8 +80,8 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
if (nsId != null) {
RemoteMethod method = new RemoteMethod(
NamenodeProtocol.class, "getBlocks", new Class<?>[]
{DatanodeInfo.class, long.class, long.class, long.class},
datanode, size, minBlockSize, hotBlockTimeInterval);
{DatanodeInfo.class, long.class, long.class, long.class, StorageType.class},
datanode, size, minBlockSize, hotBlockTimeInterval, storageType);
return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1612,9 +1612,9 @@ public DatanodeInfo[] getSlowDatanodeReport() throws IOException {

@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize, long hotBlockTimeInterval) throws IOException {
long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException {
return nnProto.getBlocks(datanode, size, minBlockSize,
hotBlockTimeInterval);
hotBlockTimeInterval, storageType);
}

@Override // NamenodeProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1385,9 +1385,11 @@ public void testProxyGetBlocks() throws Exception {

// Verify that checking that datanode works
BlocksWithLocations routerBlockLocations =
routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
null);
BlocksWithLocations nnBlockLocations =
nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
null);
BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
assertEquals(nnBlocks.length, routerBlocks.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public GetBlocksResponseProto getBlocks(RpcController unused,
BlocksWithLocations blocks;
try {
blocks = impl.getBlocks(dnInfo, request.getSize(),
request.getMinBlockSize(), request.getTimeInterval());
request.getMinBlockSize(), request.getTimeInterval(),
request.hasStorageType() ?
PBHelperClient.convertStorageType(request.getStorageType()): null);
} catch (IOException e) {
throw new ServiceException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto;
Expand Down Expand Up @@ -101,11 +102,15 @@ public Object getUnderlyingProxyObject() {

@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval)
minBlockSize, long timeInterval, StorageType storageType)
throws IOException {
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
GetBlocksRequestProto.Builder builder = GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
.setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build();
.setMinBlockSize(minBlockSize).setTimeInterval(timeInterval);
if (storageType != null) {
builder.setStorageType(PBHelperClient.convertStorageType(storageType));
}
GetBlocksRequestProto req = builder.build();
return PBHelper.convert(ipc(() -> rpcProxy.getBlocks(NULL_CONTROLLER, req)
.getBlocks()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ private long getBlockList() throws IOException, IllegalArgumentException {
final long size = Math.min(getBlocksSize, blocksToReceive);
final BlocksWithLocations newBlksLocs =
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize,
hotBlockTimeInterval);
hotBlockTimeInterval, storageType);

if (LOG.isTraceEnabled()) {
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.apache.hadoop.ha.HAServiceProtocol;
Expand Down Expand Up @@ -255,7 +256,7 @@ public URI getNameNodeUri() {

/** @return blocks with locations. */
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval) throws IOException {
minBlockSize, long timeInterval, StorageType storageType) throws IOException {
if (getBlocksRateLimiter != null) {
getBlocksRateLimiter.acquire();
}
Expand All @@ -274,7 +275,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
} else {
nnProxy = namenode;
}
return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval);
return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval, storageType);
} finally {
if (isRequestStandby) {
LOG.info("Request #getBlocks to Standby NameNode success. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1720,8 +1720,8 @@ private boolean isHotBlock(BlockInfo blockInfo, long time) {

/** Get all blocks with location information from a datanode. */
public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
final long size, final long minBlockSize, final long timeInterval) throws
UnregisteredNodeException {
final long size, final long minBlockSize, final long timeInterval,
final StorageType storageType) throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
if (node == null) {
blockLog.warn("BLOCK* getBlocks: Asking for blocks from an" +
Expand All @@ -1735,10 +1735,11 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
return new BlocksWithLocations(new BlockWithLocations[0]);
}

// skip stale storage
// skip stale storage, then choose specific storage type.
DatanodeStorageInfo[] storageInfos = Arrays
.stream(node.getStorageInfos())
.filter(s -> !s.areBlockContentsStale())
.filter(s -> storageType == null || s.getStorageType().equals(storageType))
.toArray(DatanodeStorageInfo[]::new);

// starting from a random block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1946,10 +1946,13 @@ public boolean isInStandbyState() {
*
* @param datanode on which blocks are located
* @param size total size of blocks
* @param minimumBlockSize
* @param minimumBlockSize each block should be of this minimum Block Size
* @param timeInterval prefer to get blocks which are belong to
* the cold files accessed before the time interval
* @param storageType the given storage type {@link StorageType}
*/
public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
minimumBlockSize, long timeInterval) throws IOException {
minimumBlockSize, long timeInterval, StorageType storageType) throws IOException {
OperationCategory checkOp =
isGetBlocksCheckOperationEnabled ? OperationCategory.READ :
OperationCategory.UNCHECKED;
Expand All @@ -1958,7 +1961,7 @@ public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
try {
checkOperation(checkOp);
return getBlockManager().getBlocksWithLocations(datanode, size,
minimumBlockSize, timeInterval);
minimumBlockSize, timeInterval, storageType);
} finally {
readUnlock("getBlocks");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ private static UserGroupInformation getRemoteUser() throws IOException {
/////////////////////////////////////////////////////
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval)
minBlockSize, long timeInterval, StorageType storageType)
throws IOException {
String operationName = "getBlocks";
if(size <= 0) {
Expand All @@ -663,7 +663,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval, storageType);
}

@Override // NamenodeProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
Expand Down Expand Up @@ -76,14 +77,15 @@ public interface NamenodeProtocol {
* @param minBlockSize each block should be of this minimum Block Size
* @param hotBlockTimeInterval prefer to get blocks which are belong to
* the cold files accessed before the time interval
* @param storageType the given storage type {@link StorageType}
* @return BlocksWithLocations a list of blocks &amp; their locations
* @throws IOException if size is less than or equal to 0 or
datanode does not exist
*/
@Idempotent
@ReadOnly
BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long hotBlockTimeInterval) throws IOException;
minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException;

/**
* Get the current block keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ message GetBlocksRequestProto {
// For more info refer HDFS-13356
optional uint64 minBlockSize = 3 [default = 10485760];
optional uint64 timeInterval = 4 [default = 0];
optional StorageTypeProto storageType = 5;
}


Expand Down
Loading

0 comments on commit 4ef2322

Please sign in to comment.