Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16896 clear ignoredNodes list when we clear deadnode list on ref… #5322

Merged
merged 10 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ boolean deadNodesContain(DatanodeInfo nodeInfo) {
}

/**
* Grab the open-file info from namenode
* Grab the open-file info from namenode.
mccormickt12 marked this conversation as resolved.
Show resolved Hide resolved
* @param refreshLocatedBlocks whether to re-fetch locatedblocks
*/
void openInfo(boolean refreshLocatedBlocks) throws IOException {
Expand Down Expand Up @@ -940,7 +940,8 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
* @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
* false.
*/
private DNAddrPair chooseDataNode(LocatedBlock block,
@VisibleForTesting
DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
throws IOException {
while (true) {
Expand All @@ -955,6 +956,14 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
}
}

/**
* RefetchLocations should only be called when there are no active requests
* to datanodes. In the hedged read case this means futures should be empty.
* @param block The locatedBlock to get new datanode locations for.
* @param ignoredNodes A list of ignored nodes. This list can be null and can be cleared.
* @return the locatedBlock with updated datanode locations.
* @throws IOException
*/
mccormickt12 marked this conversation as resolved.
Show resolved Hide resolved
private LocatedBlock refetchLocations(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
Expand Down Expand Up @@ -999,13 +1008,24 @@ private LocatedBlock refetchLocations(LocatedBlock block,
throw new InterruptedIOException(
"Interrupted while choosing DataNode for read.");
}
clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
clearCachedNodeState(ignoredNodes);
openInfo(true);
block = refreshLocatedBlock(block);
failures++;
return block;
}

/**
* Clear both the dead nodes and the ignored nodes
* @param ignoredNodes is cleared
*/
private void clearCachedNodeState(Collection<DatanodeInfo> ignoredNodes) {
clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
if (ignoredNodes != null) {
ignoredNodes.clear();
}
}

/**
* Get the best node from which to stream the data.
* @param block LocatedBlock, containing nodes in priority order.
Expand Down Expand Up @@ -1337,8 +1357,12 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
} catch (InterruptedException ie) {
// Ignore and retry
}
if (refetch) {
refetchLocations(block, ignored);
// If refetch is true, then all nodes are in deadNodes or ignoredNodes.
// We should loop through all futures and remove them, so we do not
// have concurrent requests to the same node.
// Once all futures are cleared, we can clear the ignoredNodes and retry.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignored and dead nodes are cleared, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the thing i am trying to emphasize is the && futures.isEmpty() check which is specific to how ignored nodes is cleared

if (refetch && futures.isEmpty()) {
block = refetchLocations(block, ignored);
}
// We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -35,11 +36,14 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -200,6 +204,25 @@ public void testDeferredRegistrationGetAllBlocks() throws IOException {
testWithRegistrationMethod(DFSInputStream::getAllBlocks);
}

/**
* If the ignoreList contains all datanodes, the ignoredList should be cleared to take advantage
* of retries built into chooseDataNode. This is needed for hedged reads
* @throws IOException
*/
@Test
public void testClearIgnoreListChooseDataNode() throws IOException {
final String fileName = "/test_cache_locations";
filePath = createFile(fileName);

try (DFSInputStream fin = dfsClient.open(fileName)) {
LocatedBlocks existing = fin.locatedBlocks;
LocatedBlock block = existing.getLastLocatedBlock();
ArrayList<DatanodeInfo> ignoreList = new ArrayList<>(Arrays.asList(block.getLocations()));
Assert.assertNotNull(fin.chooseDataNode(block, ignoreList, true));
Assert.assertEquals(0, ignoreList.size());
}
}

@FunctionalInterface
interface ThrowingConsumer {
void accept(DFSInputStream fin) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
input.read(0, buffer, 0, 1024);
Assert.fail("Reading the block should have thrown BlockMissingException");
} catch (BlockMissingException e) {
assertEquals(3, input.getHedgedReadOpsLoopNumForTesting());
// The result of 9 is due to 2 blocks by 4 iterations plus one because
// hedgedReadOpsLoopNumForTesting is incremented at start of the loop.
assertEquals(9, input.getHedgedReadOpsLoopNumForTesting());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are tripling the IO per hedged request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are actually 4x'ing, the comment was meant to help clarify.
If you recall the issue was we only previously tried each block once, the change is to make hedged reads follow the same number of retires as non hedged reads which has 3 retry loops.
This example has 2 blocks, and the last loop is when it exists.
Previously 2+1 and now 8+1

assertTrue(metrics.getHedgedReadOps() == 0);
} finally {
Mockito.reset(injector);
Expand Down