Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-15413. add dfs.client.read.striped.datanode.max.attempts to fix read ecfile timeout #5829

Open
wants to merge 25 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
Expand Down Expand Up @@ -120,6 +119,7 @@ void skip() {
protected final RawErasureDecoder decoder;
protected final DFSStripedInputStream dfsStripedInputStream;
private long readTo = -1;
private final int readDNMaxAttempts;

protected ECChunk[] decodeInputs;

Expand All @@ -138,6 +138,8 @@ void skip() {
this.corruptedBlocks = corruptedBlocks;
this.decoder = decoder;
this.dfsStripedInputStream = dfsStripedInputStream;
this.readDNMaxAttempts = dfsStripedInputStream.getDFSClient()
.getConf().getStripedReadDnMaxAttempts();

service = new ExecutorCompletionService<>(
dfsStripedInputStream.getStripedReadsThreadPool());
Expand Down Expand Up @@ -233,41 +235,57 @@ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {

private int readToBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ByteBufferStrategy strategy,
ExtendedBlock currentBlock) throws IOException {
LocatedBlock currentBlock, int chunkIndex, long offsetInBlock)
throws IOException {
final int targetLength = strategy.getTargetLength();
int length = 0;
try {
while (length < targetLength) {
int ret = strategy.readFromBlock(blockReader);
if (ret < 0) {
throw new IOException("Unexpected EOS from the reader");
int curAttempts = 0;
while (true) {
curAttempts++;
int length = 0;
try {
while (length < targetLength) {
int ret = strategy.readFromBlock(blockReader);
if (ret < 0) {
throw new IOException("Unexpected EOS from the reader");
}
length += ret;
}
return length;
} catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for {} from {} at {}",
currentBlock, currentNode, ce.getPos());
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock.getBlock(), currentNode);
throw ce;
} catch (IOException e) {
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
if (curAttempts < readDNMaxAttempts) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may have been different in the initial implementation, but on the first pass through the while (true), curAttemtps will already be 1 so there will be no retries by default I think?

if (readerInfos[chunkIndex].reader != null) {
readerInfos[chunkIndex].reader.close();
}
if (dfsStripedInputStream.createBlockReader(currentBlock,
offsetInBlock, targetBlocks,
readerInfos, chunkIndex, readTo)) {
blockReader = readerInfos[chunkIndex].reader;
DFSClient.LOG.warn("Reconnect to {} for block {}",
currentNode.getInfoAddr(), currentBlock.getBlock());
continue;
}
}
length += ret;
DFSClient.LOG.warn("Exception while reading from {} of {} from {}",
currentBlock, dfsStripedInputStream.getSrc(), currentNode, e);
throw e;
}
return length;
} catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for "
+ currentBlock + " from " + currentNode
+ " at " + ce.getPos());
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
throw ce;
} catch (IOException e) {
DFSClient.LOG.warn("Exception while reading from "
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+ currentNode, e);
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
throw e;
}
}

private Callable<BlockReadStats> readCells(final BlockReader reader,
final DatanodeInfo datanode, final long currentReaderOffset,
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
final ExtendedBlock currentBlock) {
final LocatedBlock currentBlock, final int chunkIndex) {
return () -> {
// reader can be null if getBlockReaderWithRetry failed or
// the reader hit exception before
Expand All @@ -284,8 +302,9 @@ private Callable<BlockReadStats> readCells(final BlockReader reader,

int ret = 0;
for (ByteBufferStrategy strategy : strategies) {
int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock);
ret += bytesReead;
int bytesRead = readToBuffer(reader, datanode, strategy, currentBlock,
chunkIndex, alignedStripe.getOffsetInBlock() + ret);
ret += bytesRead;
}
return new BlockReadStats(ret, reader.isShortCircuit(),
reader.getNetworkDistance());
Expand Down Expand Up @@ -318,7 +337,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
readerInfos[chunkIndex].datanode,
readerInfos[chunkIndex].blockReaderOffset,
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
block.getBlock());
block, chunkIndex);

Future<BlockReadStats> request = service.submit(readCallable);
futures.put(request, chunkIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,13 @@ interface StripedRead {
* span 6 DNs, so this default value accommodates 3 read streams
*/
int THREADPOOL_SIZE_DEFAULT = 18;
/**
* The number of times to reconnect to DN
* during the striped read process, the default is 1.
*/
String DATANODE_MAX_ATTEMPTS = PREFIX +
"datanode.max.attempts";
int DATANODE_MAX_ATTEMPTS_DEFAULT = 1;
}

/** dfs.http.client configuration properties */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public class DfsClientConf {

private final boolean deadNodeDetectionEnabled;
private final long leaseHardLimitPeriod;
private final int stripedReadDnMaxAttempts;

public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
Expand Down Expand Up @@ -295,6 +296,13 @@ public DfsClientConf(Configuration conf) {
Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
" must be greater than 0.");
stripedReadDnMaxAttempts =
conf.getInt(
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS,
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS_DEFAULT);
Preconditions.checkArgument(stripedReadDnMaxAttempts > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS +
" must be greater than 0.");
replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);

leaseHardLimitPeriod =
Expand Down Expand Up @@ -697,6 +705,13 @@ public boolean isDeadNodeDetectionEnabled() {
return deadNodeDetectionEnabled;
}

/**
* @return the stripedReadDnMaxAttempts
*/
public int getStripedReadDnMaxAttempts() {
return stripedReadDnMaxAttempts;
}

/**
* @return the leaseHardLimitPeriod
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4387,4 +4387,11 @@ boolean isSlownode() {
public BlockPoolManager getBlockPoolManager() {
return blockPoolManager;
}

@VisibleForTesting
public void closeDataXceiverServer() {
if (xserver != null) {
xserver.closeAllPeers();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4547,6 +4547,15 @@
</description>
</property>

<property>
<name>dfs.client.read.striped.datanode.max.attempts</name>
<value>1</value>
<description>
The number of times to reconnect to DN during
the striped read process, the default is 1.
</description>
</property>

<property>
<name>dfs.client.replica.accessor.builder.classes</name>
<value></value>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;

import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.io.IOException;
import java.util.Arrays;

public class TestDFSStripedInputStreamWithTimeout {

public static final Logger LOG =
LoggerFactory.getLogger(TestDFSStripedInputStreamWithTimeout.class);

private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
private DistributedFileSystem fs;
private final Path dirPath = new Path("/striped");
private Path filePath = new Path(dirPath, "file");
private ErasureCodingPolicy ecPolicy;
private short dataBlocks;
private short parityBlocks;
private int cellSize;
private final int stripesPerBlock = 2;
private int blockSize;
private int blockGroupSize;

@Rule
public Timeout globalTimeout = new Timeout(300000);

public ErasureCodingPolicy getEcPolicy() {
return StripedFileTestUtil.getDefaultECPolicy();
}

@Before
public void setup() throws IOException {
/*
* Initialize erasure coding policy.
*/
ecPolicy = getEcPolicy();
dataBlocks = (short) ecPolicy.getNumDataUnits();
parityBlocks = (short) ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
blockSize = stripesPerBlock * cellSize;
blockGroupSize = dataBlocks * blockSize;
System.out.println("EC policy = " + ecPolicy);

conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 500);

if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set(
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
NativeRSRawErasureCoderFactory.CODER_NAME);
}
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
GenericTestUtils.getRandomizedTempPath());
SimulatedFSDataset.setFactory(conf);
startUp();
}

private void startUp() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
dataBlocks + parityBlocks).build();
cluster.waitActive();
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(getEcPolicy().getName());
fs.mkdirs(dirPath);
fs.getClient()
.setErasureCodingPolicy(dirPath.toString(), ecPolicy.getName());
}

@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}

@Test
public void testPreadTimeout() throws Exception {
final int numBlocks = 2;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
stripesPerBlock, false, ecPolicy);
final int fileSize = numBlocks * blockGroupSize;

LocatedBlocks lbs = fs.getClient().namenode.
getBlockLocations(filePath.toString(), 0, fileSize);
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
assert lb instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock) (lb);
for (int i = 0; i < dataBlocks; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i,
stripesPerBlock * cellSize,
bg.getBlock().getGenerationStamp());
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
cluster.injectBlocks(i, Arrays.asList(blk),
bg.getBlock().getBlockPoolId());
}
}
try {
testReadFileWithAttempt(1);
Assert.fail("It Should fail to read striped time out with 1 attempt . ");
} catch (Exception e) {
Assert.assertTrue(
"Throw IOException error message with 4 missing blocks. ",
e.getMessage().contains("4 missing blocks"));
}

try {
testReadFileWithAttempt(3);
} catch (Exception e) {
Assert.fail("It Should successfully read striped file with 3 attempts. ");
}
}

private void testReadFileWithAttempt(int attempt) throws Exception {
// set dfs client config
cluster.getConfiguration(0)
.setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS,
attempt);
cluster.getConfiguration(0)
.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000);
DistributedFileSystem newFs =
(DistributedFileSystem) cluster.getNewFileSystemInstance(0);
try(DFSStripedInputStream in = new DFSStripedInputStream(newFs.getClient(),
filePath.toString(), false, ecPolicy, null)){
int bufLen = 1024 * 100;
byte[] buf = new byte[bufLen];
int readTotal = 0;
in.seek(readTotal);
int nread = in.read(buf, 0, bufLen);
// Simulated time-consuming processing operations, such as UDF.
// And datanodes close connect because of socket timeout.
cluster.dataNodes.forEach(dn -> dn.getDatanode().closeDataXceiverServer());
in.seek(nread);
// StripeRange 6MB
bufLen = 1024 * 1024 * 6;
buf = new byte[bufLen];
in.read(buf, 0, bufLen);
}
}
}
Loading