Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17243. Add the parameter storage type for getBlocks method #6238

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
HDFS-17243. Add the parameter storage type for getBlocks method
haiyang1987 committed Oct 30, 2023
commit 1e1cd0fc74c24305b58dc7a1fe6a8dcc8ceaf164
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -53,7 +54,7 @@ public RouterNamenodeProtocol(RouterRpcServer server) {

@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize, long hotBlockTimeInterval) throws IOException {
long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException {
rpcServer.checkOperation(OperationCategory.READ);

// Get the namespace where the datanode is located
@@ -79,8 +80,8 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
if (nsId != null) {
RemoteMethod method = new RemoteMethod(
NamenodeProtocol.class, "getBlocks", new Class<?>[]
{DatanodeInfo.class, long.class, long.class, long.class},
datanode, size, minBlockSize, hotBlockTimeInterval);
{DatanodeInfo.class, long.class, long.class, long.class, StorageType.class},
datanode, size, minBlockSize, hotBlockTimeInterval, storageType);
return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
}
return null;
Original file line number Diff line number Diff line change
@@ -1612,9 +1612,9 @@ public DatanodeInfo[] getSlowDatanodeReport() throws IOException {

@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize, long hotBlockTimeInterval) throws IOException {
long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException {
return nnProto.getBlocks(datanode, size, minBlockSize,
hotBlockTimeInterval);
hotBlockTimeInterval, storageType);
}

@Override // NamenodeProtocol
Original file line number Diff line number Diff line change
@@ -1385,9 +1385,11 @@ public void testProxyGetBlocks() throws Exception {

// Verify that checking that datanode works
BlocksWithLocations routerBlockLocations =
routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
null);
BlocksWithLocations nnBlockLocations =
nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
null);
BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
assertEquals(nnBlocks.length, routerBlocks.length);
Original file line number Diff line number Diff line change
@@ -89,7 +89,9 @@ public GetBlocksResponseProto getBlocks(RpcController unused,
BlocksWithLocations blocks;
try {
blocks = impl.getBlocks(dnInfo, request.getSize(),
request.getMinBlockSize(), request.getTimeInterval());
request.getMinBlockSize(), request.getTimeInterval(),
request.hasStorageType() ?
PBHelperClient.convertStorageType(request.getStorageType()): null);
} catch (IOException e) {
throw new ServiceException(e);
}
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto;
@@ -101,11 +102,15 @@ public Object getUnderlyingProxyObject() {

@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval)
minBlockSize, long timeInterval, StorageType storageType)
throws IOException {
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
GetBlocksRequestProto.Builder builder = GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
.setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build();
.setMinBlockSize(minBlockSize).setTimeInterval(timeInterval);
if (storageType != null) {
builder.setStorageType(PBHelperClient.convertStorageType(storageType));
}
GetBlocksRequestProto req = builder.build();
return PBHelper.convert(ipc(() -> rpcProxy.getBlocks(NULL_CONTROLLER, req)
.getBlocks()));
}
Original file line number Diff line number Diff line change
@@ -839,7 +839,7 @@ private long getBlockList() throws IOException, IllegalArgumentException {
final long size = Math.min(getBlocksSize, blocksToReceive);
final BlocksWithLocations newBlksLocs =
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize,
hotBlockTimeInterval);
hotBlockTimeInterval, storageType);

if (LOG.isTraceEnabled()) {
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.apache.hadoop.ha.HAServiceProtocol;
@@ -255,7 +256,7 @@ public URI getNameNodeUri() {

/** @return blocks with locations. */
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval) throws IOException {
minBlockSize, long timeInterval, StorageType storageType) throws IOException {
if (getBlocksRateLimiter != null) {
getBlocksRateLimiter.acquire();
}
@@ -274,7 +275,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
} else {
nnProxy = namenode;
}
return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval);
return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval, storageType);
} finally {
if (isRequestStandby) {
LOG.info("Request #getBlocks to Standby NameNode success. " +
Original file line number Diff line number Diff line change
@@ -1720,8 +1720,8 @@ private boolean isHotBlock(BlockInfo blockInfo, long time) {

/** Get all blocks with location information from a datanode. */
public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
final long size, final long minBlockSize, final long timeInterval) throws
UnregisteredNodeException {
final long size, final long minBlockSize, final long timeInterval,
final StorageType storageType) throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
if (node == null) {
blockLog.warn("BLOCK* getBlocks: Asking for blocks from an" +
@@ -1735,10 +1735,11 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
return new BlocksWithLocations(new BlockWithLocations[0]);
}

// skip stale storage
// skip stale storage, then choose specific storage type.
DatanodeStorageInfo[] storageInfos = Arrays
.stream(node.getStorageInfos())
.filter(s -> !s.areBlockContentsStale())
.filter(s -> storageType == null || s.getStorageType().equals(storageType))
.toArray(DatanodeStorageInfo[]::new);

// starting from a random block
Original file line number Diff line number Diff line change
@@ -1946,10 +1946,13 @@ public boolean isInStandbyState() {
*
* @param datanode on which blocks are located
* @param size total size of blocks
* @param minimumBlockSize
* @param minimumBlockSize each block should be of this minimum Block Size
* @param timeInterval prefer to get blocks which are belong to
* the cold files accessed before the time interval
* @param storageType the given storage type {@link StorageType}
*/
public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
minimumBlockSize, long timeInterval) throws IOException {
minimumBlockSize, long timeInterval, StorageType storageType) throws IOException {
OperationCategory checkOp =
isGetBlocksCheckOperationEnabled ? OperationCategory.READ :
OperationCategory.UNCHECKED;
@@ -1958,7 +1961,7 @@ public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
try {
checkOperation(checkOp);
return getBlockManager().getBlocksWithLocations(datanode, size,
minimumBlockSize, timeInterval);
minimumBlockSize, timeInterval, storageType);
} finally {
readUnlock("getBlocks");
}
Original file line number Diff line number Diff line change
@@ -649,7 +649,7 @@ private static UserGroupInformation getRemoteUser() throws IOException {
/////////////////////////////////////////////////////
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval)
minBlockSize, long timeInterval, StorageType storageType)
throws IOException {
String operationName = "getBlocks";
if(size <= 0) {
@@ -663,7 +663,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval, storageType);
}

@Override // NamenodeProtocol
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -76,14 +77,15 @@ public interface NamenodeProtocol {
* @param minBlockSize each block should be of this minimum Block Size
* @param hotBlockTimeInterval prefer to get blocks which are belong to
* the cold files accessed before the time interval
* @param storageType the given storage type {@link StorageType}
* @return BlocksWithLocations a list of blocks &amp; their locations
* @throws IOException if size is less than or equal to 0 or
datanode does not exist
*/
@Idempotent
@ReadOnly
BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long hotBlockTimeInterval) throws IOException;
minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException;

/**
* Get the current block keys
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ message GetBlocksRequestProto {
// For more info refer HDFS-13356
optional uint64 minBlockSize = 3 [default = 10485760];
optional uint64 timeInterval = 4 [default = 0];
optional StorageTypeProto storageType = 5;
}


Loading