Skip to content

Commit

Permalink
HADOOP-18184. unbuffer.
Browse files Browse the repository at this point in the history
rebase and tweak input stream slightly, including docs.

Change-Id: I134486e21f928be2985c37949f4491e32314ed44
  • Loading branch information
steveloughran committed Jul 20, 2023
1 parent 2e6ab81 commit d8694da
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public S3ACachingInputStream(
/**
* Demand create the block manager.
*/
private void demandCreateBlockManager() {
private synchronized void demandCreateBlockManager() {
if (blockManager == null) {
LOG.debug("{}: creating block manager", getName());
blockManager = createBlockManager(
Expand All @@ -120,6 +120,11 @@ public void close() throws IOException {
}
}

/**
* Close the stream and the block manager.
* @param unbuffer is this an unbuffer operation?
* @return true if the stream was closed.
*/
@Override
protected boolean closeStream(final boolean unbuffer) {
final boolean b = super.closeStream(unbuffer);
Expand All @@ -131,7 +136,7 @@ protected boolean closeStream(final boolean unbuffer) {
* Close the block manager and set to null, if
* it is not already in this state.
*/
private void closeBlockManager() {
private synchronized void closeBlockManager() {
if (blockManager != null) {
blockManager.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;

import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;

/**
* Encapsulates low level interactions with S3 object on AWS.
*/
Expand Down Expand Up @@ -193,21 +195,12 @@ public InputStream openForRead(long offset, int size) throws IOException {

String operation = String.format(
"%s %s at %d size %d", OPERATION_GET, uri, offset, size);
DurationTracker tracker = streamStatistics.initiateGetRequest();
S3Object object = null;

// initiate the GET. This completes once the request returns the response headers;
// the data is read later.
LOG.debug("{}", operation);
try {
object = Invoker.once(operation, uri, () -> client.getObject(request));
} catch (IOException e) {
tracker.failed();
throw e;
} finally {
tracker.close();
}
LOG.debug("{} result duration {}", operation, tracker);
object = onceTrackingDuration(operation, uri, streamStatistics.initiateGetRequest(),
() -> client.getObject(request));

changeTracker.processResponse(object, operation, offset);
InputStream stream = object.getObjectContent();
Expand All @@ -218,6 +211,14 @@ public InputStream openForRead(long offset, int size) throws IOException {
return stream;
}

/**
* Close the input stream, draining it first.
* If the number of bytes is above a configured threshold,
* the stream is drained asynchronously
* @param inputStream stream to close
* @param numRemainingBytes number of bytes left in the stream.
* @throws IllegalArgumentException unknown stream.
*/
void close(InputStream inputStream, int numRemainingBytes) {
S3Object obj;
synchronized (s3Objects) {
Expand Down

0 comments on commit d8694da

Please sign in to comment.