Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-15413. add dfs.client.read.striped.datanode.max.attempts to fix read ecfile timeout #5829

Open
wants to merge 25 commits into
base: trunk
Choose a base branch
from
Open
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
@@ -120,6 +119,7 @@ void skip() {
protected final RawErasureDecoder decoder;
protected final DFSStripedInputStream dfsStripedInputStream;
private long readTo = -1;
private final int readDNMaxAttempts;

protected ECChunk[] decodeInputs;

@@ -138,6 +138,8 @@ void skip() {
this.corruptedBlocks = corruptedBlocks;
this.decoder = decoder;
this.dfsStripedInputStream = dfsStripedInputStream;
this.readDNMaxAttempts = dfsStripedInputStream.getDFSClient()
.getConf().getStripedReadDnMaxAttempts();

service = new ExecutorCompletionService<>(
dfsStripedInputStream.getStripedReadsThreadPool());
@@ -233,41 +235,62 @@ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {

private int readToBuffer(BlockReader blockReader,
DatanodeInfo currentNode, ByteBufferStrategy strategy,
ExtendedBlock currentBlock) throws IOException {
LocatedBlock currentBlock, int chunkIndex) throws IOException {
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
final int targetLength = strategy.getTargetLength();
int length = 0;
try {
while (length < targetLength) {
int ret = strategy.readFromBlock(blockReader);
if (ret < 0) {
throw new IOException("Unexpected EOS from the reader");
int curAttempts = 0;
while (curAttempts < readDNMaxAttempts) {
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
curAttempts++;
int length = 0;
try {
while (length < targetLength) {
int ret = strategy.readFromBlock(blockReader);
if (ret < 0) {
throw new IOException("Unexpected EOS from the reader");
}
length += ret;
}
return length;
} catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for "
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
+ currentBlock + " from " + currentNode
+ " at " + ce.getPos());
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock.getBlock(), currentNode);
throw ce;
} catch (IOException e) {
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
if (curAttempts < readDNMaxAttempts) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may have been different in the initial implementation, but on the first pass through the while (true), curAttemtps will already be 1 so there will be no retries by default I think?

if (readerInfos[chunkIndex].reader != null) {
readerInfos[chunkIndex].reader.close();
}
if (dfsStripedInputStream.createBlockReader(currentBlock,
alignedStripe.getOffsetInBlock(), targetBlocks,
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
readerInfos, chunkIndex, readTo)) {
blockReader = readerInfos[chunkIndex].reader;
String msg = "Reconnect to " + currentNode.getInfoAddr()
+ " for block " + currentBlock.getBlock();
DFSClient.LOG.warn(msg);
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
}
length += ret;
DFSClient.LOG.warn("Exception while reading from "
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+ currentNode, e);
throw e;
}
return length;
} catch (ChecksumException ce) {
DFSClient.LOG.warn("Found Checksum error for "
+ currentBlock + " from " + currentNode
+ " at " + ce.getPos());
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
throw ce;
} catch (IOException e) {
DFSClient.LOG.warn("Exception while reading from "
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+ currentNode, e);
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
throw e;
}
throw new IOException("Read request interrupted. " +
currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+ currentNode);
}

private Callable<BlockReadStats> readCells(final BlockReader reader,
final DatanodeInfo datanode, final long currentReaderOffset,
final long targetReaderOffset, final ByteBufferStrategy[] strategies,
final ExtendedBlock currentBlock) {
final LocatedBlock currentBlock, final int chunkIndex) {
return () -> {
// reader can be null if getBlockReaderWithRetry failed or
// the reader hit exception before
@@ -284,7 +307,8 @@ private Callable<BlockReadStats> readCells(final BlockReader reader,

int ret = 0;
for (ByteBufferStrategy strategy : strategies) {
int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock);
int bytesReead = readToBuffer(reader, datanode, strategy, currentBlock,
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
chunkIndex);
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
ret += bytesReead;
}
return new BlockReadStats(ret, reader.isShortCircuit(),
@@ -318,7 +342,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
readerInfos[chunkIndex].datanode,
readerInfos[chunkIndex].blockReaderOffset,
alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
block.getBlock());
block, chunkIndex);

Future<BlockReadStats> request = service.submit(readCallable);
futures.put(request, chunkIndex);
Original file line number Diff line number Diff line change
@@ -532,6 +532,13 @@ interface StripedRead {
* span 6 DNs, so this default value accommodates 3 read streams
*/
int THREADPOOL_SIZE_DEFAULT = 18;
/**
* The number of times to reconnect to DN
* during the striped read process, the default is 1.
*/
String DATANODE_MAX_ATTEMPTS = PREFIX +
"datanode.max.attempts";
int DATANODE_MAX_ATTEMPTS_DEFAULT = 1;
}

/** dfs.http.client configuration properties */
Original file line number Diff line number Diff line change
@@ -166,6 +166,7 @@ public class DfsClientConf {

private final boolean deadNodeDetectionEnabled;
private final long leaseHardLimitPeriod;
private final int stripedReadDnMaxAttempts;

public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
@@ -295,6 +296,13 @@ public DfsClientConf(Configuration conf) {
Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
" must be greater than 0.");
stripedReadDnMaxAttempts =
conf.getInt(
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS,
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS_DEFAULT);
Preconditions.checkArgument(stripedReadDnMaxAttempts > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS +
" must be greater than 0.");
replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);

leaseHardLimitPeriod =
@@ -697,6 +705,13 @@ public boolean isDeadNodeDetectionEnabled() {
return deadNodeDetectionEnabled;
}

/**
* @return the stripedReadDnMaxAttempts
*/
public int getStripedReadDnMaxAttempts() {
return stripedReadDnMaxAttempts;
}

/**
* @return the leaseHardLimitPeriod
*/
Original file line number Diff line number Diff line change
@@ -4547,6 +4547,15 @@
</description>
</property>

<property>
<name>dfs.client.read.striped.datanode.max.attempts</name>
<value>1</value>
<description>
The number of times to reconnect to DN during
the striped read process, the default is 1.
</description>
</property>

Check failure on line 4558 in hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml#L4558

blanks: end of line
<property>
<name>dfs.client.replica.accessor.builder.classes</name>
<value></value>
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;

import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import java.io.IOException;
import java.util.Arrays;

public class TestDFSStripedInputStreamWithTimeout {

public static final Logger LOG =
LoggerFactory.getLogger(TestDFSStripedInputStreamWithTimeout.class);

private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
private DistributedFileSystem fs;
private final Path dirPath = new Path("/striped");
private Path filePath = new Path(dirPath, "file");
private ErasureCodingPolicy ecPolicy;
private short dataBlocks;
private short parityBlocks;
private int cellSize;
private final int stripesPerBlock = 2;
private int blockSize;
private int blockGroupSize;

@Rule
public Timeout globalTimeout = new Timeout(300000);

public ErasureCodingPolicy getEcPolicy() {
return StripedFileTestUtil.getDefaultECPolicy();
}

@Before
public void setup() throws IOException {
/*
* Initialize erasure coding policy.
*/
ecPolicy = getEcPolicy();
dataBlocks = (short) ecPolicy.getNumDataUnits();
parityBlocks = (short) ecPolicy.getNumParityUnits();
cellSize = ecPolicy.getCellSize();
blockSize = stripesPerBlock * cellSize;
blockGroupSize = dataBlocks * blockSize;
System.out.println("EC policy = " + ecPolicy);

conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
Neilxzn marked this conversation as resolved.
Show resolved Hide resolved
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 1000);

if (ErasureCodeNative.isNativeCodeLoaded()) {
conf.set(
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
NativeRSRawErasureCoderFactory.CODER_NAME);
}
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
GenericTestUtils.getRandomizedTempPath());
SimulatedFSDataset.setFactory(conf);
startUp();
}

private void startUp() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
dataBlocks + parityBlocks).build();
cluster.waitActive();
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(getEcPolicy().getName());
fs.mkdirs(dirPath);
fs.getClient()
.setErasureCodingPolicy(dirPath.toString(), ecPolicy.getName());
}

@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}

@Test
public void testPreadTimeout() throws Exception {
final int numBlocks = 2;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
stripesPerBlock, false, ecPolicy);
final int fileSize = numBlocks * blockGroupSize;

LocatedBlocks lbs = fs.getClient().namenode.
getBlockLocations(filePath.toString(), 0, fileSize);
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
assert lb instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock) (lb);
for (int i = 0; i < dataBlocks; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i,
stripesPerBlock * cellSize,
bg.getBlock().getGenerationStamp());
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
cluster.injectBlocks(i, Arrays.asList(blk),
bg.getBlock().getBlockPoolId());
}
}
try {
testReadFileWithAttempt(1);
Assert.fail("It Should fail to read striped time out with 1 attempt . ");
} catch (Exception e) {
Assert.assertTrue(
"Throw IOException error message with 4 missing blocks. ",
e.getMessage().contains("4 missing blocks"));
}

try {
testReadFileWithAttempt(3);
} catch (Exception e) {
Assert.fail("It Should successfully read striped file with 3 attempts. ");
}
}

private void testReadFileWithAttempt(int attempt) throws Exception {
// set dfs client config
cluster.getConfiguration(0)
.setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS,
attempt);
cluster.getConfiguration(0)
.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000);
DistributedFileSystem newFs =
(DistributedFileSystem) cluster.getNewFileSystemInstance(0);
try(DFSStripedInputStream in = new DFSStripedInputStream(newFs.getClient(),
filePath.toString(), false, ecPolicy, null)){
int bufLen = 1024 * 100;
byte[] buf = new byte[bufLen];
int readTotal = 0;
in.seek(readTotal);
int nread = in.read(buf, 0, bufLen);
// Simulated time-consuming processing operations, such as UDF.
Thread.sleep(10000);
in.seek(nread);
// StripeRange 6MB
bufLen = 1024 * 1024 * 6;
buf = new byte[bufLen];
in.read(buf, 0, bufLen);
}
}
}
Loading