Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16423. Balancer should not get blocks on stale storages #3883

Merged
merged 1 commit into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1659,9 +1659,16 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
if(numBlocks == 0) {
return new BlocksWithLocations(new BlockWithLocations[0]);
}

// skip stale storage
DatanodeStorageInfo[] storageInfos = Arrays
.stream(node.getStorageInfos())
.filter(s -> !s.areBlockContentsStale())
.toArray(DatanodeStorageInfo[]::new);

// starting from a random block
int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
Iterator<BlockInfo> iter = node.getBlockIterator(startBlock);
Iterator<BlockInfo> iter = node.getBlockIterator(startBlock, storageInfos);
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
List<BlockInfo> pending = new ArrayList<BlockInfo>();
long totalSize = 0;
Expand All @@ -1680,8 +1687,8 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
}
}
if(totalSize<size) {
iter = node.getBlockIterator(); // start from the beginning
for(int i=0; i<startBlock&&totalSize<size; i++) {
iter = node.getBlockIterator(0, storageInfos); // start from the beginning
for(int i = 0; i < startBlock && totalSize < size && iter.hasNext(); i++) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
if (curBlock.getNumBytes() < minBlockSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,17 @@ Iterator<BlockInfo> getBlockIterator(final int startBlock) {
return new BlockIterator(startBlock, getStorageInfos());
}

/**
* Get iterator, which starts iterating from the specified block and storages.
*
* @param startBlock on which blocks are start iterating
* @param storageInfos specified storages
*/
Iterator<BlockInfo> getBlockIterator(
final int startBlock, final DatanodeStorageInfo[] storageInfos) {
return new BlockIterator(startBlock, storageInfos);
}

@VisibleForTesting
public void incrementPendingReplicationWithoutTargets() {
pendingReplicationWithoutTargets++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public boolean areBlockContentsStale() {
return blockContentsStale;
}

@VisibleForTesting
public void setBlockContentsStale(boolean value) {
blockContentsStale = value;
}

void markStaleAfterFailover() {
heartbeatedSinceFailover = false;
blockContentsStale = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
Expand Down Expand Up @@ -468,4 +469,68 @@ public void testGetBlocksWithHotBlockTimeInterval() throws Exception {
cluster.shutdown();
}
}

@Test
public void testReadSkipStaleStorage() throws Exception {
final short repFactor = (short) 1;
final int blockNum = 64;
final int storageNum = 2;
final int fileLen = BLOCK_SIZE * blockNum;
final Path path = new Path("testReadSkipStaleStorage");
final Configuration conf = new HdfsConfiguration();

conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.storagesPerDatanode(storageNum)
.build();
cluster.waitActive();

FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, path, false, 1024, fileLen,
BLOCK_SIZE, repFactor, 0, true);

// get datanode info
ClientProtocol client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
DatanodeInfo[] dataNodes = client.getDatanodeReport(DatanodeReportType.ALL);

// get storage info
BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
DatanodeStorageInfo[] storageInfos = bm0.getDatanodeManager()
.getDatanode(dataNodes[0].getDatanodeUuid()).getStorageInfos();

InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort());
NamenodeProtocol namenode = NameNodeProxies.createProxy(conf,
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();

// check blocks count equals to blockNum
BlockWithLocations[] blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0, 0).getBlocks();
assertEquals(blockNum, blocks.length);

// calculate the block count on storage[0]
int count = 0;
for (BlockWithLocations b : blocks) {
for (String s : b.getStorageIDs()) {
if (s.equals(storageInfos[0].getStorageID())) {
count++;
}
}
}

// set storage[0] stale
storageInfos[0].setBlockContentsStale(true);
blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0, 0).getBlocks();
assertEquals(blockNum - count, blocks.length);

// set all storage stale
bm0.getDatanodeManager().markAllDatanodesStale();
blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0, 0).getBlocks();
assertEquals(0, blocks.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void testBalancerServiceBalanceTwice() throws Exception {
TestBalancer.initConf(conf);
try {
setupCluster(conf);
TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0);
long totalCapacity = addOneDataNode(conf); // make cluster imbalanced

Thread balancerThread =
Expand Down Expand Up @@ -193,6 +194,7 @@ public void testBalancerServiceOnError() throws Exception {
cluster.transitionToActive(0);
cluster.waitActive();

TestBalancerWithHANameNodes.waitStoragesNoStale(cluster, client, 0);
long totalCapacity = addOneDataNode(conf);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, BalancerParameters.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,17 @@
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Test;
import org.slf4j.LoggerFactory;
Expand All @@ -75,6 +80,26 @@ public class TestBalancerWithHANameNodes {
TestBalancer.initTestSetup();
}

public static void waitStoragesNoStale(MiniDFSCluster cluster,
ClientProtocol client, int nnIndex) throws Exception {
// trigger a full block report and wait all storages out of stale
cluster.triggerBlockReports();
DatanodeInfo[] dataNodes = client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
GenericTestUtils.waitFor(() -> {
BlockManager bm = cluster.getNamesystem(nnIndex).getBlockManager();
for (DatanodeInfo dn : dataNodes) {
DatanodeStorageInfo[] storageInfos = bm.getDatanodeManager()
.getDatanode(dn.getDatanodeUuid()).getStorageInfos();
for (DatanodeStorageInfo s : storageInfos) {
if (s.areBlockContentsStale()) {
return false;
}
}
}
return true;
}, 300, 60000);
}

/**
* Test a cluster with even distribution, then a new empty node is added to
* the cluster. Test start a cluster with specified number of nodes, and fills
Expand Down Expand Up @@ -103,13 +128,17 @@ public void testBalancerWithHANameNodes() throws Exception {
client = NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
ClientProtocol.class).getProxy();

doTest(conf);
doTest(conf, true);
} finally {
cluster.shutdown();
}
}

void doTest(Configuration conf) throws Exception {
doTest(conf, false);
}

void doTest(Configuration conf, boolean withHA) throws Exception {
int numOfDatanodes = TEST_CAPACITIES.length;
long totalCapacity = TestBalancer.sum(TEST_CAPACITIES);
// fill up the cluster to be 30% full
Expand All @@ -123,6 +152,12 @@ void doTest(Configuration conf) throws Exception {
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
cluster.getNameNode(1));
}

// all storages are stale after HA
if (withHA) {
waitStoragesNoStale(cluster, client, 0);
}

// start up an empty node with the same capacity and on the same rack
long newNodeCapacity = TestBalancer.CAPACITY; // new node's capacity
String newNodeRack = TestBalancer.RACK2; // new node's rack
Expand Down