diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/AltFileInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/AltFileInputStream.java new file mode 100644 index 0000000000000..d377a0df75190 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/AltFileInputStream.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import org.apache.hadoop.util.Shell; + +import java.io.InputStream; +import java.io.FileInputStream; +import java.io.File; +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.RandomAccessFile; + +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; + +/** + * This class is substitute FileInputStream. When on windows, We are still use + * FileInputStream. For non-windows, we use channel and FileDescriptor to + * construct a stream. + */ +public class AltFileInputStream extends InputStream { + // For non-Windows + private final InputStream inputStream; + private final FileDescriptor fd; + private final FileChannel fileChannel; + + // For Windows + private FileInputStream fileInputStream; + + public AltFileInputStream(File file) throws IOException { + if (!Shell.WINDOWS) { + RandomAccessFile rf = new RandomAccessFile(file, "r"); + this.fd = rf.getFD(); + this.fileChannel = rf.getChannel(); + this.inputStream = Channels.newInputStream(fileChannel); + } else { + FileInputStream fis = new FileInputStream(file); + this.fileInputStream = fis; + this.inputStream = fileInputStream; + this.fd = fis.getFD(); + this.fileChannel = fis.getChannel(); + } + } + + /** + * Create a stream with fd and channel + * @param fd FileDescriptor + * @param fileChannel FileChannel + */ + public AltFileInputStream(FileDescriptor fd, FileChannel fileChannel) { + this.fd = fd; + this.fileChannel = fileChannel; + this.inputStream = Channels.newInputStream(fileChannel); + } + + /** + * Create a stream with FileInputSteam + * @param fis FileInputStream + * @throws IOException + */ + public AltFileInputStream(FileInputStream fis) throws IOException { + this.fileInputStream = fis; + this.inputStream = fileInputStream; + this.fd = fis.getFD(); + this.fileChannel = fis.getChannel(); + } + + /** + * Returns the FileDescriptor + * object that represents the connection to + * the actual file in the file system being + * used by this FileInputStream. + * + * @return the file descriptor object associated with this stream. + * @exception IOException if an I/O error occurs. + * @see java.io.FileDescriptor + */ + public final FileDescriptor getFD() throws IOException { + if (fd != null) { + return fd; + } + throw new IOException(); + } + + // return a channel + public FileChannel getChannel() { + return fileChannel; + } + + /** + * For Windows, use fileInputStream to read data. + * For Non-Windows, use inputStream to read data. + * @return + * @throws IOException + */ + public int read() throws IOException { + if (fileInputStream != null) { + return fileInputStream.read(); + } else { + return inputStream.read(); + } + } + + /** + * Reads up to len bytes of data from this input stream + * into an array of bytes. If len is not zero, the method + * blocks until some input is available; otherwise, no + * bytes are read and 0 is returned. + * + * @param b the buffer into which the data is read. + * @param off the start offset in the destination array b + * @param len the maximum number of bytes read. + * @return the total number of bytes read into the buffer, or + * -1 if there is no more data because the end of + * the file has been reached. + * @exception NullPointerException If b is null. + * @exception IndexOutOfBoundsException If off is negative, + * len is negative, or len is greater than + * b.length - off + * @exception IOException if an I/O error occurs. + */ + public int read(byte[] b, int off, int len) throws IOException { + if (fileInputStream != null) { + return fileInputStream.read(b, off, len); + } else { + return inputStream.read(b, off, len); + } + } + + /** + * Reads up to b.length bytes of data from this input + * stream into an array of bytes. This method blocks until some input + * is available. + * + * @param b the buffer into which the data is read. + * @return the total number of bytes read into the buffer, or + * -1 if there is no more data because the end of + * the file has been reached. + * @exception IOException if an I/O error occurs. + */ + public int read(byte[] b) throws IOException { + if (fileInputStream != null) { + return fileInputStream.read(b); + } else { + return inputStream.read(b); + } + } + + /** + * Closes this file input stream and releases any system resources + * associated with the stream. + * + *

If this stream has an associated channel then the channel is closed + * as well. + * + * @exception IOException if an I/O error occurs. + */ + public void close() throws IOException { + if (fileInputStream != null) { + fileInputStream.close(); + } + fileChannel.close(); + inputStream.close(); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java index 688b955f6d811..f39cd217de070 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.HardLink; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException; import org.apache.hadoop.util.NativeCodeLoader; @@ -773,19 +774,19 @@ public static FileInputStream getShareDeleteFileInputStream(File f) } /** - * Create a FileInputStream that shares delete permission on the + * Create a AltFileInputStream that shares delete permission on the * file opened at a given offset, i.e. other process can delete * the file the FileInputStream is reading. Only Windows implementation * uses the native interface. */ - public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset) + public static AltFileInputStream getShareDeleteFileInputStream(File f, long seekOffset) throws IOException { if (!Shell.WINDOWS) { RandomAccessFile rf = new RandomAccessFile(f, "r"); if (seekOffset > 0) { rf.seek(seekOffset); } - return new FileInputStream(rf.getFD()); + return new AltFileInputStream(rf.getFD(), rf.getChannel()); } else { // Use Windows native interface to create a FileInputStream that // shares delete permission on the file opened, and set it to the @@ -800,7 +801,7 @@ public static FileInputStream getShareDeleteFileInputStream(File f, long seekOff NativeIO.Windows.OPEN_EXISTING); if (seekOffset > 0) NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN); - return new FileInputStream(fd); + return new AltFileInputStream(new FileInputStream(fd)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index c16ffdf2e0e11..a15efc79a6039 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.UserGroupInformation; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java index 4977fd7069ed5..41d3c28a85f07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java @@ -17,15 +17,14 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; import java.io.File; -import java.io.FileInputStream; +import java.io.DataInputStream; +import java.io.BufferedInputStream; import java.io.IOException; +import java.io.EOFException; import java.io.RandomAccessFile; +import java.io.ByteArrayInputStream; +import java.io.DataOutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -35,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; @@ -88,7 +88,7 @@ public static DataChecksum readDataChecksum(File metaFile) throws IOException { DataInputStream in = null; try { in = new DataInputStream(new BufferedInputStream( - new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf))); + new AltFileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf))); return readDataChecksum(in, metaFile); } finally { IOUtils.closeStream(in); @@ -153,7 +153,7 @@ public static BlockMetadataHeader readHeader(File file) throws IOException { DataInputStream in = null; try { in = new DataInputStream(new BufferedInputStream( - new FileInputStream(file))); + new AltFileInputStream(file))); return readHeader(in); } finally { IOUtils.closeStream(in); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 79f4dd7aa2eec..0b9894e876d40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -21,7 +21,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileDescriptor; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -41,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; @@ -388,8 +388,8 @@ class BlockSender implements java.io.Closeable { DataNode.LOG.debug("replica=" + replica); } blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset - if (blockIn instanceof FileInputStream) { - blockInFd = ((FileInputStream)blockIn).getFD(); + if (blockIn instanceof AltFileInputStream) { + blockInFd = ((AltFileInputStream)blockIn).getFD(); } else { blockInFd = null; } @@ -579,7 +579,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, sockOut.write(buf, headerOff, dataOff - headerOff); // no need to flush since we know out is not a buffered stream - FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); + FileChannel fileCh = ((AltFileInputStream)blockIn).getChannel(); LongWritable waitTime = new LongWritable(); LongWritable transferTime = new LongWritable(); sockOut.transferToFully(fileCh, blockInPosition, dataLen, @@ -742,9 +742,9 @@ private long doSendBlock(DataOutputStream out, OutputStream baseStream, int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream - && blockIn instanceof FileInputStream; + && blockIn instanceof AltFileInputStream; if (transferTo) { - FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); + FileChannel fileChannel = ((AltFileInputStream)blockIn).getChannel(); blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index e265dadcfbddc..53e235a546506 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -46,17 +46,16 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; import static org.apache.hadoop.util.ExitUtil.terminate; -import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.File; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; +import java.io.EOFException; import java.io.OutputStream; +import java.io.BufferedOutputStream; +import java.io.InputStream; import java.io.PrintStream; import java.lang.management.ManagementFactory; import java.net.InetSocketAddress; @@ -100,11 +99,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.client.BlockReportOptions; +import org.apache.hadoop.hdfs.HDFSPolicyProvider; + import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.net.DomainPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer; @@ -162,6 +162,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.nativeio.NativeIO; @@ -205,6 +206,7 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.protobuf.BlockingService; +import sun.tools.tree.CastExpression; /********************************************************** * DataNode is a class (and program) that stores a set of @@ -1594,10 +1596,10 @@ public ShortCircuitFdsVersionException(String msg) { } } - FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk, - final Token token, int maxVersion) - throws ShortCircuitFdsUnsupportedException, - ShortCircuitFdsVersionException, IOException { + AltFileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk, + final Token token, int maxVersion) + throws ShortCircuitFdsUnsupportedException, + ShortCircuitFdsVersionException, IOException { if (fileDescriptorPassingDisabledReason != null) { throw new ShortCircuitFdsUnsupportedException( fileDescriptorPassingDisabledReason); @@ -1606,15 +1608,15 @@ FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk, int blkVersion = CURRENT_BLOCK_FORMAT_VERSION; if (maxVersion < blkVersion) { throw new ShortCircuitFdsVersionException("Your client is too old " + - "to read this block! Its format version is " + - blkVersion + ", but the highest format version you can read is " + - maxVersion); + "to read this block! Its format version is " + + blkVersion + ", but the highest format version you can read is " + + maxVersion); } metrics.incrBlocksGetLocalPathInfo(); - FileInputStream fis[] = new FileInputStream[2]; - + + AltFileInputStream fis[] = new AltFileInputStream[2]; try { - fis[0] = (FileInputStream)data.getBlockInputStream(blk, 0); + fis[0] = (AltFileInputStream)data.getBlockInputStream(blk, 0); fis[1] = DatanodeUtil.getMetaDataInputStream(blk, data); } catch (ClassCastException e) { LOG.debug("requestShortCircuitFdsForRead failed", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 26d669cb5fb00..8949b2de58e0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -27,17 +27,6 @@ import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; import static org.apache.hadoop.util.Time.monotonicNow; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.FileDescriptor; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; @@ -46,11 +35,20 @@ import java.security.MessageDigest; import java.util.Arrays; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.InterruptedIOException; +import java.io.EOFException; +import java.io.FileDescriptor; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -79,6 +77,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.net.NetUtils; @@ -96,6 +95,7 @@ * Thread for processing incoming/outgoing data stream. */ class DataXceiver extends Receiver implements Runnable { + public static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; @@ -302,7 +302,7 @@ public void requestShortCircuitFds(final ExtendedBlock blk, throws IOException { updateCurrentThreadName("Passing file descriptors for block " + blk); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); - FileInputStream fis[] = null; + AltFileInputStream fis[] = null; SlotId registeredSlotId = null; boolean success = false; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java index 746c3f6948f27..ed4cf91b894f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -27,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.io.AltFileInputStream; /** Provide utility methods for Datanode. */ @InterfaceAudience.Private @@ -121,18 +121,19 @@ public static File idToBlockDir(File root, long blockId) { } /** - * @return the FileInputStream for the meta data of the given block. + * @return the AltFileInputStream for the meta data of the given block. * @throws FileNotFoundException * if the file not found. * @throws ClassCastException - * if the underlying input stream is not a FileInputStream. + * if the underlying input stream is not a AltFileInputStream. */ - public static FileInputStream getMetaDataInputStream( - ExtendedBlock b, FsDatasetSpi data) throws IOException { + public static AltFileInputStream getMetaDataInputStream( + ExtendedBlock b, FsDatasetSpi data) throws IOException + { final LengthInputStream lin = data.getMetaDataInputStream(b); if (lin == null) { throw new FileNotFoundException("Meta file for " + b + " not found."); } - return (FileInputStream)lin.getWrappedStream(); + return (AltFileInputStream)lin.getWrappedStream(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 136d8a93bce2c..0188ffe7b93e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; @@ -31,6 +30,7 @@ import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import com.google.common.annotations.VisibleForTesting; @@ -239,7 +239,7 @@ public long getOriginalBytesReserved() { private void unlinkFile(File file, Block b) throws IOException { File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); try { - FileInputStream in = new FileInputStream(file); + AltFileInputStream in = new AltFileInputStream(file); try { FileOutputStream out = new FileOutputStream(tmpFile); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index d1f7c5f1ef515..e176754599e5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -17,20 +17,18 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import java.io.BufferedInputStream; -import java.io.DataInputStream; +import java.util.Iterator; +import java.util.Scanner; import java.io.File; -import java.io.FileInputStream; +import java.io.IOException; import java.io.FileNotFoundException; +import java.io.Writer; +import java.io.OutputStreamWriter; import java.io.FileOutputStream; -import java.io.IOException; +import java.io.DataInputStream; import java.io.InputStream; -import java.io.OutputStreamWriter; +import java.io.BufferedInputStream; import java.io.RandomAccessFile; -import java.io.Writer; -import java.util.Iterator; -import java.util.Scanner; - import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; @@ -633,7 +632,7 @@ private long validateIntegrityAndSetLength(File blockFile, long genStamp) { return 0; } checksumIn = new DataInputStream( - new BufferedInputStream(new FileInputStream(metaFile), + new BufferedInputStream(new AltFileInputStream(metaFile), ioFileBufferSize)); // read and handle the common header here. For now just a version @@ -648,7 +647,7 @@ private long validateIntegrityAndSetLength(File blockFile, long genStamp) { return 0; } IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize); - blockIn = new FileInputStream(blockFile); + blockIn = new AltFileInputStream(blockFile); long lastChunkStartPos = (numChunks-1)*bytesPerChecksum; IOUtils.skipFully(blockIn, lastChunkStartPos); int lastChunkSize = (int)Math.min( @@ -719,9 +718,9 @@ private boolean readReplicasFromCache(ReplicaMap volumeMap, } return false; } - FileInputStream inputStream = null; + AltFileInputStream inputStream = null; try { - inputStream = new FileInputStream(replicaFile); + inputStream = new AltFileInputStream(replicaFile); BlockListAsLongs blocksList = BlockListAsLongs.readFrom(inputStream); Iterator iterator = blocksList.iterator(); while (iterator.hasNext()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java index f70d4afe297d5..0e72a1ff5ca0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java @@ -26,7 +26,6 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -52,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -430,7 +430,8 @@ private class CachingTask implements Runnable { @Override public void run() { boolean success = false; - FileInputStream blockIn = null, metaIn = null; + AltFileInputStream blockIn = null; + AltFileInputStream metaIn = null; MappableBlock mappableBlock = null; ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(), key.getBlockId(), length, genstamp); @@ -446,7 +447,7 @@ public void run() { } reservedBytes = true; try { - blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0); + blockIn = (AltFileInputStream)dataset.getBlockInputStream(extBlk, 0); metaIn = DatanodeUtil.getMetaDataInputStream(extBlk, dataset); } catch (ClassCastException e) { LOG.warn("Failed to cache " + key + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index a2bb2c03221ac..5f9dfaa85b63c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -57,11 +57,10 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.ExtendedBlockId; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -103,6 +102,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.nativeio.NativeIO; @@ -227,7 +227,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) NativeIO.getShareDeleteFileInputStream(meta), meta.length()); } - return new LengthInputStream(new FileInputStream(meta), meta.length()); + return new LengthInputStream(new AltFileInputStream(meta), meta.length()); } final DataNode datanode; @@ -729,7 +729,7 @@ private File getBlockFileNoExistsCheck(ExtendedBlock b, } @Override // FsDatasetSpi - public InputStream getBlockInputStream(ExtendedBlock b, + public AltFileInputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { File blockFile = getBlockFileNoExistsCheck(b, true); if (isNativeIOAvailable) { @@ -804,7 +804,7 @@ public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, } } - private static FileInputStream openAndSeek(File file, long offset) + private static AltFileInputStream openAndSeek(File file, long offset) throws IOException { RandomAccessFile raf = null; try { @@ -812,7 +812,7 @@ private static FileInputStream openAndSeek(File file, long offset) if (offset > 0) { raf.seek(offset); } - return new FileInputStream(raf.getFD()); + return new AltFileInputStream(raf.getFD(), raf.getChannel()); } catch(IOException ioe) { IOUtils.cleanup(null, raf); throw ioe; @@ -978,7 +978,7 @@ private static void computeChecksum(File srcMeta, File dstMeta, int offset = 0; try (InputStream dataIn = isNativeIOAvailable ? NativeIO.getShareDeleteFileInputStream(blockFile) : - new FileInputStream(blockFile)) { + new AltFileInputStream(blockFile)) { for (int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) { if (n > 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java index 45aa364bf8d9c..7ab4fcc8c55d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java @@ -18,11 +18,10 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.IOException; +import java.io.DataInputStream; import java.io.BufferedInputStream; import java.io.Closeable; -import java.io.DataInputStream; -import java.io.FileInputStream; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; @@ -33,11 +32,13 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Represents an HDFS block that is mmapped by the DataNode. @@ -45,6 +46,9 @@ @InterfaceAudience.Private @InterfaceStability.Unstable public class MappableBlock implements Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(MappableBlock.class); private MappedByteBuffer mmap; private final long length; @@ -73,7 +77,7 @@ public long getLength() { * @return The Mappable block. */ public static MappableBlock load(long length, - FileInputStream blockIn, FileInputStream metaIn, + AltFileInputStream blockIn, AltFileInputStream metaIn, String blockFileName) throws IOException { MappableBlock mappableBlock = null; MappedByteBuffer mmap = null; @@ -102,7 +106,7 @@ public static MappableBlock load(long length, * Verifies the block's checksum. This is an I/O intensive operation. */ private static void verifyChecksum(long length, - FileInputStream metaIn, FileChannel blockChannel, String blockFileName) + AltFileInputStream metaIn, FileChannel blockChannel, String blockFileName) throws IOException, ChecksumException { // Verify the checksum from the block's meta file // Get the DataChecksum from the meta file header diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 7c8a857c89640..51934ad82237d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -21,7 +21,6 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -59,6 +58,7 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType; import org.apache.hadoop.hdfs.util.MD5FileUtils; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressorStream; @@ -175,7 +175,7 @@ void load(File file) throws IOException { long start = Time.monotonicNow(); imgDigest = MD5FileUtils.computeMd5ForFile(file); RandomAccessFile raFile = new RandomAccessFile(file, "r"); - FileInputStream fin = new FileInputStream(file); + AltFileInputStream fin = new AltFileInputStream(file); try { loadInternal(raFile, fin); long end = Time.monotonicNow(); @@ -186,7 +186,7 @@ void load(File file) throws IOException { } } - private void loadInternal(RandomAccessFile raFile, FileInputStream fin) + private void loadInternal(RandomAccessFile raFile, AltFileInputStream fin) throws IOException { if (!FSImageUtil.checkFileFormat(raFile)) { throw new IOException("Unrecognized file format"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java index 1390cf3eee16c..0616f6c2012b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Time; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java index d87ffbf315423..406f276c7917d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.AltFileInputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.util.StringUtils; @@ -125,7 +126,7 @@ public static MD5Hash readStoredMd5ForFile(File dataFile) throws IOException { * Read dataFile and compute its MD5 checksum. */ public static MD5Hash computeMd5ForFile(File dataFile) throws IOException { - InputStream in = new FileInputStream(dataFile); + InputStream in = new AltFileInputStream(dataFile); try { MessageDigest digester = MD5Hash.getDigester(); DigestInputStream dis = new DigestInputStream(in, digester); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestAltFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestAltFileInputStream.java new file mode 100644 index 0000000000000..e45c02799b669 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestAltFileInputStream.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.io.AltFileInputStream; +import org.apache.hadoop.test.PathUtils; +import org.junit.Before; +import org.junit.Test; + +import java.io.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestAltFileInputStream { + private static final File TEST_DIR = PathUtils.getTestDir(TestAltFileInputStream.class); + private static final File TEST_FILE = new File(TEST_DIR, + "testAltFileInputStream.dat"); + + private static final int TEST_DATA_LEN = 7 * 1024; // 7 KB test data + private static final byte[] TEST_DATA = DFSTestUtil.generateSequentialBytes(0, TEST_DATA_LEN); + + @Before + public void setup() throws IOException { + FileUtil.fullyDelete(TEST_DIR); + assertTrue(TEST_DIR.mkdirs()); + FileOutputStream fos = new FileOutputStream(TEST_FILE); + fos.write(TEST_DATA); + fos.close(); + } + + @Test + public void readWithFileName() throws Exception { + AltFileInputStream inputStream = new AltFileInputStream(TEST_FILE); + assertNotNull(inputStream.getFD()); + assertNotNull(inputStream.getChannel()); + calculate(inputStream); + } + + @Test + public void readWithFdAndChannel() throws Exception { + RandomAccessFile raf = new RandomAccessFile(TEST_FILE, "r"); + AltFileInputStream inputStream = new AltFileInputStream(raf.getFD(), raf.getChannel()); + assertNotNull(inputStream.getFD()); + assertNotNull(inputStream.getChannel()); + calculate(inputStream); + } + + @Test + public void readWithFileByFileInputStream() throws Exception { + FileInputStream fileInputStream = new FileInputStream(TEST_FILE); + assertNotNull(fileInputStream.getChannel()); + assertNotNull(fileInputStream.getFD()); + calculate(fileInputStream); + } + + public void calculate(InputStream inputStream) throws Exception { + long numberOfTheFileByte = 0; + while (inputStream.read() != -1) { + numberOfTheFileByte++; + } + assertEquals(TEST_DATA_LEN, numberOfTheFileByte); + inputStream.close(); + } +} \ No newline at end of file