Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18231. Fixes failing tests & drain stream async. #4386

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
* Provides read access to the underlying file one block at a time.
* Improve read performance by prefetching and locall caching blocks.
Expand Down Expand Up @@ -204,7 +206,7 @@ public synchronized void close() {
// Cancel any prefetches in progress.
this.cancelPrefetches();

Io.closeIgnoringIoException(this.cache);
cleanupWithLogger(LOG, this.cache);

this.ops.end(op);
LOG.info(this.ops.getSummary(false));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,8 @@ private FSDataInputStream executeOpen(
new S3PrefetchingInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan)));
createInputStreamCallbacks(auditSpan),
inputStreamStats));
} else {
return new FSDataInputStream(
new S3AInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

/**
* Provides an {@code InputStream} that allows reading from an S3 file.
Expand All @@ -53,6 +54,7 @@ public class S3CachingInputStream extends S3InputStream {
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -61,8 +63,9 @@ public class S3CachingInputStream extends S3InputStream {
public S3CachingInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {
super(context, s3Attributes, client);
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics);

this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
int bufferPoolSize = this.numBlocksToPrefetch + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@

package org.apache.hadoop.fs.s3a.read;

import java.io.Closeable;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.common.Io;
import org.apache.hadoop.fs.common.Validate;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AInputStream;
Expand All @@ -40,30 +39,56 @@
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;

import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
* Encapsulates low level interactions with S3 object on AWS.
*/
public class S3File implements Closeable {
public class S3File {
private static final Logger LOG = LoggerFactory.getLogger(S3File.class);

// Read-specific operation context.
/**
* Read-specific operation context.
*/
private final S3AReadOpContext context;

// S3 object attributes.
/**
* S3 object attributes.
*/
private final S3ObjectAttributes s3Attributes;

// Callbacks used for interacting with the underlying S3 client.
/**
* Callbacks used for interacting with the underlying S3 client.
*/
private final S3AInputStream.InputStreamCallbacks client;

// Used for reporting input stream access statistics.
/**
* Used for reporting input stream access statistics.
*/
private final S3AInputStreamStatistics streamStatistics;

// Enforces change tracking related policies.
/**
* Enforces change tracking related policies.
*/
private final ChangeTracker changeTracker;

// Maps a stream returned by openForRead() to the associated S3 object.
// That allows us to close the object when closing the stream.
/**
* Maps a stream returned by openForRead() to the associated S3 object.
* That allows us to close the object when closing the stream.
*/
private Map<InputStream, S3Object> s3Objects;

/**
* uri of the object being read.
*/
private final String uri;

/**
* size of a buffer to create when draining the stream.
*/
private static final int DRAIN_BUFFER_SIZE = 16384;

/**
* Initializes a new instance of the {@code S3File} class.
*
Expand Down Expand Up @@ -97,7 +122,8 @@ public S3File(
this.client = client;
this.streamStatistics = streamStatistics;
this.changeTracker = changeTracker;
this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
this.s3Objects = new IdentityHashMap<>();
this.uri = this.getPath();
}

/**
Expand Down Expand Up @@ -169,7 +195,6 @@ public InputStream openForRead(long offset, int size) throws IOException {
.withRange(offset, offset + size - 1);
this.changeTracker.maybeApplyConstraint(request);

String uri = this.getPath();
String operation = String.format(
"%s %s at %d", S3AInputStream.OPERATION_OPEN, uri, offset);
DurationTracker tracker = streamStatistics.initiateGetRequest();
Expand All @@ -193,18 +218,7 @@ public InputStream openForRead(long offset, int size) throws IOException {
return stream;
}

/**
* Closes this stream and releases all acquired resources.
*/
@Override
public synchronized void close() {
List<InputStream> streams = new ArrayList<InputStream>(this.s3Objects.keySet());
for (InputStream stream : streams) {
this.close(stream);
}
}

void close(InputStream inputStream) {
void close(InputStream inputStream, int numRemainingBytes) {
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
S3Object obj;
synchronized (this.s3Objects) {
obj = this.s3Objects.get(inputStream);
Expand All @@ -214,7 +228,91 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}

Io.closeIgnoringIoException(inputStream);
Io.closeIgnoringIoException(obj);
if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
// don't bother with async io.
drain(false, "close() operation", numRemainingBytes, obj, inputStream);
} else {
LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
// schedule an async drain/abort with references to the fields so they
// can be reused
client.submit(() -> drain(false, "close() operation", numRemainingBytes, obj, inputStream));
}
}

/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object;
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drain(
final boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {

try {
return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
() -> drainOrAbortHttpStream(shouldAbort, reason, remaining, requestObject, inputStream));
} catch (IOException e) {
// this is only here because invokeTrackingDuration() has it in its
// signature
return shouldAbort;
}
}

/**
* Drain or abort the inner stream.
* Exceptions are swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(
boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {

if (!shouldAbort && remaining > 0) {
try {
long drained = 0;
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
while (true) {
final int count = inputStream.read(buffer);
if (count < 0) {
// no more data is left
break;
}
drained += count;
}
LOG.debug("Drained stream of {} bytes", drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream", uri, reason, e);
shouldAbort = true;
}
}
cleanupWithLogger(LOG, inputStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, you can pass any number of Closeables in; not worth updating the PR for, but useful to know

cleanupWithLogger(LOG, requestObject);
streamStatistics.streamClose(shouldAbort, remaining);

LOG.debug("Stream {} {}: {}; remaining={}", uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining);
return shouldAbort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

/**
* Provides an {@code InputStream} that allows reading from an S3 file.
Expand All @@ -48,6 +49,7 @@ public class S3InMemoryInputStream extends S3InputStream {
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -56,8 +58,9 @@ public class S3InMemoryInputStream extends S3InputStream {
public S3InMemoryInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {
super(context, s3Attributes, client);
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics);
int fileSize = (int) s3Attributes.getLen();
this.buffer = ByteBuffer.allocate(fileSize);
LOG.debug("Created in-memory input stream for {} (size = {})", this.getName(), fileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import static java.util.Objects.requireNonNull;

/**
* Provides an {@link InputStream} that allows reading from an S3 file.
*/
Expand Down Expand Up @@ -96,6 +98,7 @@ public abstract class S3InputStream
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -104,16 +107,13 @@ public abstract class S3InputStream
public S3InputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {

Validate.checkNotNull(context, "context");
Validate.checkNotNull(s3Attributes, "s3Attributes");
Validate.checkNotNull(client, "client");
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {

this.context = context;
this.s3Attributes = s3Attributes;
this.client = client;
this.streamStatistics = context.getS3AStatisticsContext().newInputStreamStatistics();
this.context = requireNonNull(context);
this.s3Attributes = requireNonNull(s3Attributes);
this.client = requireNonNull(client);
this.streamStatistics = requireNonNull(streamStatistics);
this.ioStatistics = streamStatistics.getIOStatistics();
this.name = S3File.getPath(s3Attributes);
this.changeTracker = new ChangeTracker(
Expand Down
Loading