Skip to content

Commit

Permalink
Merge pull request apache#1 from hash-X/NewFileInputStream
Browse files Browse the repository at this point in the history
New file of AltFileInputStream.java to replace FileInputStream.java in apache/hadoop/HDFS
  • Loading branch information
zhangminglei committed Aug 18, 2015
2 parents 176131f + 6304777 commit 2740d9b
Show file tree
Hide file tree
Showing 17 changed files with 367 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.io;

import org.apache.hadoop.util.Shell;

import java.io.InputStream;
import java.io.FileInputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.RandomAccessFile;

import java.nio.channels.Channels;
import java.nio.channels.FileChannel;

/**
* This class is substitute FileInputStream. When on windows, We are still use
* FileInputStream. For non-windows, we use channel and FileDescriptor to
* construct a stream.
*/
public class AltFileInputStream extends InputStream {
// For non-Windows
private final InputStream inputStream;
private final FileDescriptor fd;
private final FileChannel fileChannel;

// For Windows
private FileInputStream fileInputStream;

public AltFileInputStream(File file) throws IOException {
if (!Shell.WINDOWS) {
RandomAccessFile rf = new RandomAccessFile(file, "r");
this.fd = rf.getFD();
this.fileChannel = rf.getChannel();
this.inputStream = Channels.newInputStream(fileChannel);
} else {
FileInputStream fis = new FileInputStream(file);
this.fileInputStream = fis;
this.inputStream = fileInputStream;
this.fd = fis.getFD();
this.fileChannel = fis.getChannel();
}
}

/**
* Create a stream with fd and channel
* @param fd FileDescriptor
* @param fileChannel FileChannel
*/
public AltFileInputStream(FileDescriptor fd, FileChannel fileChannel) {
this.fd = fd;
this.fileChannel = fileChannel;
this.inputStream = Channels.newInputStream(fileChannel);
}

/**
* Create a stream with FileInputSteam
* @param fis FileInputStream
* @throws IOException
*/
public AltFileInputStream(FileInputStream fis) throws IOException {
this.fileInputStream = fis;
this.inputStream = fileInputStream;
this.fd = fis.getFD();
this.fileChannel = fis.getChannel();
}

/**
* Returns the <code>FileDescriptor</code>
* object that represents the connection to
* the actual file in the file system being
* used by this <code>FileInputStream</code>.
*
* @return the file descriptor object associated with this stream.
* @exception IOException if an I/O error occurs.
* @see java.io.FileDescriptor
*/
public final FileDescriptor getFD() throws IOException {
if (fd != null) {
return fd;
}
throw new IOException();
}

// return a channel
public FileChannel getChannel() {
return fileChannel;
}

/**
* For Windows, use fileInputStream to read data.
* For Non-Windows, use inputStream to read data.
* @return
* @throws IOException
*/
public int read() throws IOException {
if (fileInputStream != null) {
return fileInputStream.read();
} else {
return inputStream.read();
}
}

/**
* Reads up to <code>len</code> bytes of data from this input stream
* into an array of bytes. If <code>len</code> is not zero, the method
* blocks until some input is available; otherwise, no
* bytes are read and <code>0</code> is returned.
*
* @param b the buffer into which the data is read.
* @param off the start offset in the destination array <code>b</code>
* @param len the maximum number of bytes read.
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if there is no more data because the end of
* the file has been reached.
* @exception NullPointerException If <code>b</code> is <code>null</code>.
* @exception IndexOutOfBoundsException If <code>off</code> is negative,
* <code>len</code> is negative, or <code>len</code> is greater than
* <code>b.length - off</code>
* @exception IOException if an I/O error occurs.
*/
public int read(byte[] b, int off, int len) throws IOException {
if (fileInputStream != null) {
return fileInputStream.read(b, off, len);
} else {
return inputStream.read(b, off, len);
}
}

/**
* Reads up to <code>b.length</code> bytes of data from this input
* stream into an array of bytes. This method blocks until some input
* is available.
*
* @param b the buffer into which the data is read.
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if there is no more data because the end of
* the file has been reached.
* @exception IOException if an I/O error occurs.
*/
public int read(byte[] b) throws IOException {
if (fileInputStream != null) {
return fileInputStream.read(b);
} else {
return inputStream.read(b);
}
}

/**
* Closes this file input stream and releases any system resources
* associated with the stream.
*
* <p> If this stream has an associated channel then the channel is closed
* as well.
*
* @exception IOException if an I/O error occurs.
*/
public void close() throws IOException {
if (fileInputStream != null) {
fileInputStream.close();
}
fileChannel.close();
inputStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.io.AltFileInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
import org.apache.hadoop.util.NativeCodeLoader;
Expand Down Expand Up @@ -773,19 +774,19 @@ public static FileInputStream getShareDeleteFileInputStream(File f)
}

/**
* Create a FileInputStream that shares delete permission on the
* Create a AltFileInputStream that shares delete permission on the
* file opened at a given offset, i.e. other process can delete
* the file the FileInputStream is reading. Only Windows implementation
* uses the native interface.
*/
public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
public static AltFileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
throws IOException {
if (!Shell.WINDOWS) {
RandomAccessFile rf = new RandomAccessFile(f, "r");
if (seekOffset > 0) {
rf.seek(seekOffset);
}
return new FileInputStream(rf.getFD());
return new AltFileInputStream(rf.getFD(), rf.getChannel());
} else {
// Use Windows native interface to create a FileInputStream that
// shares delete permission on the file opened, and set it to the
Expand All @@ -800,7 +801,7 @@ public static FileInputStream getShareDeleteFileInputStream(File f, long seekOff
NativeIO.Windows.OPEN_EXISTING);
if (seekOffset > 0)
NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
return new FileInputStream(fd);
return new AltFileInputStream(new FileInputStream(fd));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.io.AltFileInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
*/
package org.apache.hadoop.hdfs.server.datanode;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.DataInputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.EOFException;
import java.io.RandomAccessFile;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

Expand All @@ -35,6 +34,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.AltFileInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;

Expand Down Expand Up @@ -88,7 +88,7 @@ public static DataChecksum readDataChecksum(File metaFile) throws IOException {
DataInputStream in = null;
try {
in = new DataInputStream(new BufferedInputStream(
new FileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf)));
new AltFileInputStream(metaFile), DFSUtil.getIoFileBufferSize(conf)));
return readDataChecksum(in, metaFile);
} finally {
IOUtils.closeStream(in);
Expand Down Expand Up @@ -153,7 +153,7 @@ public static BlockMetadataHeader readHeader(File file) throws IOException {
DataInputStream in = null;
try {
in = new DataInputStream(new BufferedInputStream(
new FileInputStream(file)));
new AltFileInputStream(file)));
return readHeader(in);
} finally {
IOUtils.closeStream(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -41,6 +40,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.AltFileInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
Expand Down Expand Up @@ -388,8 +388,8 @@ class BlockSender implements java.io.Closeable {
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
if (blockIn instanceof FileInputStream) {
blockInFd = ((FileInputStream)blockIn).getFD();
if (blockIn instanceof AltFileInputStream) {
blockInFd = ((AltFileInputStream)blockIn).getFD();
} else {
blockInFd = null;
}
Expand Down Expand Up @@ -579,7 +579,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
sockOut.write(buf, headerOff, dataOff - headerOff);

// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
FileChannel fileCh = ((AltFileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
Expand Down Expand Up @@ -742,9 +742,9 @@ private long doSendBlock(DataOutputStream out, OutputStream baseStream,
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
&& blockIn instanceof AltFileInputStream;
if (transferTo) {
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
FileChannel fileChannel = ((AltFileInputStream)blockIn).getChannel();
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
Expand Down
Loading

0 comments on commit 2740d9b

Please sign in to comment.