Skip to content

Commit

Permalink
renames streamClosed variable
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmarsuhail committed May 9, 2022
1 parent e989eec commit 2e455d0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public S3CachingInputStream(
LOG.debug("Created caching input stream for {} (size = {})", this.getName(), fileSize);
}

protected void initialize() {
super.initialize();
@Override
protected void initializeUnderlyingResources() {
super.initializeUnderlyingResources();
int bufferPoolSize = this.numBlocksToPrefetch + 1;
this.blockManager = this.createBlockManager(
this.getContext().getFuturePool(),
Expand All @@ -100,7 +101,7 @@ public void seek(long pos) throws IOException {
// The call to setAbsolute() returns true if the target position is valid and
// within the current block. Therefore, no additional work is needed when we get back true.
if (!this.getFilePosition().setAbsolute(pos)) {
if (!this.isStreamClosed()) {
if (!this.allUnderlyingResourceClosed()) {
LOG.info("seek({})", getOffsetStr(pos));
}
// We could be here in two cases:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public abstract class S3InputStream
private volatile boolean closed;

// Indicates whether resources have been freed, useful when the stream is closed via unbuffer.
private volatile boolean streamClosed;
private volatile boolean underlyingResourcesClosed;

// Current position within the file.
private FilePosition fpos;
Expand Down Expand Up @@ -130,16 +130,16 @@ public S3InputStream(
setReadahead(context.getReadahead());

this.s3File = this.getS3File();
this.initialize();
this.initializeUnderlyingResources();

this.seekTargetPos = 0;
}

/**
* Initializes those resources that the stream uses but are released during unbuffer.
*/
protected void initialize() {
this.streamClosed = false;
protected void initializeUnderlyingResources() {
this.underlyingResourcesClosed = false;
long fileSize = s3Attributes.getLen();
int bufferSize = context.getPrefetchBlockSize();
this.reader = new S3Reader(this.s3File);
Expand Down Expand Up @@ -211,7 +211,7 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) {
public int available() throws IOException {
this.throwIfClosed();

if (streamClosed || !ensureCurrentBuffer()) {
if (underlyingResourcesClosed || !ensureCurrentBuffer()) {
return 0;
}

Expand Down Expand Up @@ -366,8 +366,8 @@ protected boolean isClosed() {
return this.closed;
}

protected boolean isStreamClosed() {
return this.streamClosed;
protected boolean allUnderlyingResourceClosed() {
return this.underlyingResourcesClosed;
}

protected long getSeekTargetPos() {
Expand Down Expand Up @@ -419,11 +419,11 @@ protected String getOffsetStr(long offset) {

protected void closeStream() {

if(this.streamClosed) {
if(this.underlyingResourcesClosed) {
return;
}

this.streamClosed = true;
this.underlyingResourcesClosed = true;
this.blockData = null;
this.reader.close();
this.reader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.StreamCapabilities;
Expand All @@ -46,7 +47,7 @@
*/
public class S3PrefetchingInputStream
extends FSInputStream
implements CanSetReadahead, StreamCapabilities, IOStatisticsSource {
implements CanSetReadahead, StreamCapabilities, IOStatisticsSource, CanUnbuffer {
private static final Logger LOG = LoggerFactory.getLogger(S3PrefetchingInputStream.class);

// Underlying input stream used for reading S3 file.
Expand Down Expand Up @@ -99,10 +100,9 @@ public synchronized int available() throws IOException {
* Gets the current position.
*
* @return the current position.
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized long getPos() throws IOException {
public synchronized long getPos() {
return this.isClosed() ? 0 : this.inputStream.getPos();
}

Expand All @@ -115,6 +115,11 @@ public synchronized long getPos() throws IOException {
@Override
public synchronized int read() throws IOException {
this.throwIfClosed();

if(this.inputStream.allUnderlyingResourceClosed()) {
this.inputStream.initializeUnderlyingResources();
}

return this.inputStream.read();
}

Expand All @@ -132,9 +137,21 @@ public synchronized int read() throws IOException {
@Override
public synchronized int read(byte[] buffer, int offset, int len) throws IOException {
this.throwIfClosed();

if(this.inputStream.allUnderlyingResourceClosed()) {
this.inputStream.initializeUnderlyingResources();
}

return this.inputStream.read(buffer, offset, len);
}

@Override
public synchronized void unbuffer() {
if (this.inputStream != null) {
this.inputStream.unbuffer();
}
}

/**
* Closes this stream and releases all acquired resources.
*
Expand Down

0 comments on commit 2e455d0

Please sign in to comment.