Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18231. Fixes failing tests & drain stream async. #4386

Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,8 @@ private FSDataInputStream executeOpen(
new S3PrefetchingInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan)));
createInputStreamCallbacks(auditSpan),
inputStreamStats));
} else {
return new FSDataInputStream(
new S3AInputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

/**
* Provides an {@code InputStream} that allows reading from an S3 file.
Expand All @@ -53,6 +54,7 @@ public class S3CachingInputStream extends S3InputStream {
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -61,8 +63,9 @@ public class S3CachingInputStream extends S3InputStream {
public S3CachingInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {
super(context, s3Attributes, client);
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics);

this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
int bufferPoolSize = this.numBlocksToPrefetch + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

package org.apache.hadoop.fs.s3a.read;

import java.io.Closeable;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.common.Io;
import org.apache.hadoop.fs.common.Validate;
Expand All @@ -40,10 +40,13 @@
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;

import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;

/**
* Encapsulates low level interactions with S3 object on AWS.
*/
public class S3File implements Closeable {
public class S3File {
private static final Logger LOG = LoggerFactory.getLogger(S3File.class);

// Read-specific operation context.
private final S3AReadOpContext context;
Expand All @@ -64,6 +67,12 @@ public class S3File implements Closeable {
// That allows us to close the object when closing the stream.
private Map<InputStream, S3Object> s3Objects;

// uri of the object being read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you make these all javadocs, so they show up in IDEs.

private final String uri;

// size of a buffer to create when draining the stream.
private static final int DRAIN_BUFFER_SIZE = 16384;

/**
* Initializes a new instance of the {@code S3File} class.
*
Expand Down Expand Up @@ -98,6 +107,7 @@ public S3File(
this.streamStatistics = streamStatistics;
this.changeTracker = changeTracker;
this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while you are there, can you change to a simple <> here

this.uri = "s3a://" + this.s3Attributes.getBucket() + "/" + this.s3Attributes.getKey();
}

/**
Expand Down Expand Up @@ -193,18 +203,7 @@ public InputStream openForRead(long offset, int size) throws IOException {
return stream;
}

/**
* Closes this stream and releases all acquired resources.
*/
@Override
public synchronized void close() {
List<InputStream> streams = new ArrayList<InputStream>(this.s3Objects.keySet());
for (InputStream stream : streams) {
this.close(stream);
}
}

void close(InputStream inputStream) {
void close(InputStream inputStream, int numRemainingBytes) {
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
S3Object obj;
synchronized (this.s3Objects) {
obj = this.s3Objects.get(inputStream);
Expand All @@ -214,7 +213,83 @@ void close(InputStream inputStream) {
this.s3Objects.remove(inputStream);
}

if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
// don't bother with async io.
drain(false, "close() operation", numRemainingBytes, obj, inputStream);
} else {
LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
// schedule an async drain/abort with references to the fields so they
// can be reused
client.submit(() -> drain(false, "close() operation", numRemainingBytes, obj, inputStream));
}
}

/**
* drain the stream. This method is intended to be
* used directly or asynchronously, and measures the
* duration of the operation in the stream statistics.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object;
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drain(final boolean shouldAbort, final String reason, final long remaining,
final S3Object requestObject, final InputStream inputStream) {

try {
return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
() -> drainOrAbortHttpStream(shouldAbort, reason, remaining, requestObject, inputStream));
} catch (IOException e) {
// this is only here because invokeTrackingDuration() has it in its
// signature
return shouldAbort;
}
}

/**
* Drain or abort the inner stream.
* Exceptions are swallowed.
* If a close() is attempted and fails, the operation escalates to
* an abort.
*
* @param shouldAbort force an abort; used if explicitly requested.
* @param reason reason for stream being closed; used in messages
* @param remaining remaining bytes
* @param requestObject http request object
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(boolean shouldAbort, final String reason,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit: can you split one to a line, all tagged final. there's enough params to justify the effort

final long remaining, final S3Object requestObject, final InputStream inputStream) {

if (!shouldAbort && remaining > 0) {
try {
long drained = 0;
byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
while (true) {
final int count = inputStream.read(buffer);
if (count < 0) {
// no more data is left
break;
}
drained += count;
}
LOG.debug("Drained stream of {} bytes", drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}, will abort the stream", uri, reason, e);
shouldAbort = true;
}
}
Io.closeIgnoringIoException(inputStream);
Io.closeIgnoringIoException(obj);
Io.closeIgnoringIoException(requestObject);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure which library this is. switch to our IOUtils.cleanupWithLogger(lOG, inputStream, requestObject)

this will log any exceptions at debug, if ever needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a new Io class added as part of the initial prefetching PR. I've removed this class, and updated usages with cleanupWithLogger

streamStatistics.streamClose(shouldAbort, remaining);

LOG.debug("Stream {} {}: {}; remaining={}", uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining);
return shouldAbort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

/**
* Provides an {@code InputStream} that allows reading from an S3 file.
Expand All @@ -48,6 +49,7 @@ public class S3InMemoryInputStream extends S3InputStream {
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -56,8 +58,9 @@ public class S3InMemoryInputStream extends S3InputStream {
public S3InMemoryInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {
super(context, s3Attributes, client);
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {
super(context, s3Attributes, client, streamStatistics);
int fileSize = (int) s3Attributes.getLen();
this.buffer = ByteBuffer.allocate(fileSize);
LOG.debug("Created in-memory input stream for {} (size = {})", this.getName(), fileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public abstract class S3InputStream
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -104,16 +105,18 @@ public abstract class S3InputStream
public S3InputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {

Validate.checkNotNull(context, "context");
Validate.checkNotNull(s3Attributes, "s3Attributes");
Validate.checkNotNull(client, "client");
Validate.checkNotNull(streamStatistics, "streamStatistics");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one of those things we'd inevitably want to clean up. i can see a "steve does a cleanup" PR coming soon....
If you use Objects.requireNonNull() you can add the check on L119 so no need for extra work...its where new code should be going, even if older code still uses the guava checkNotNull


this.context = context;
this.s3Attributes = s3Attributes;
this.client = client;
this.streamStatistics = context.getS3AStatisticsContext().newInputStreamStatistics();
this.streamStatistics = streamStatistics;
this.ioStatistics = streamStatistics.getIOStatistics();
this.name = S3File.getPath(s3Attributes);
this.changeTracker = new ChangeTracker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class S3PrefetchingInputStream
* @param context read-specific operation context.
* @param s3Attributes attributes of the S3 object being read.
* @param client callbacks used for interacting with the underlying S3 client.
* @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if context is null.
* @throws IllegalArgumentException if s3Attributes is null.
Expand All @@ -66,20 +67,22 @@ public class S3PrefetchingInputStream
public S3PrefetchingInputStream(
S3AReadOpContext context,
S3ObjectAttributes s3Attributes,
S3AInputStream.InputStreamCallbacks client) {
S3AInputStream.InputStreamCallbacks client,
S3AInputStreamStatistics streamStatistics) {

Validate.checkNotNull(context, "context");
Validate.checkNotNull(s3Attributes, "s3Attributes");
Validate.checkNotNullAndNotEmpty(s3Attributes.getBucket(), "s3Attributes.getBucket()");
Validate.checkNotNullAndNotEmpty(s3Attributes.getKey(), "s3Attributes.getKey()");
Validate.checkNotNegative(s3Attributes.getLen(), "s3Attributes.getLen()");
Validate.checkNotNull(client, "client");
Validate.checkNotNull(streamStatistics, "streamStatistics");

long fileSize = s3Attributes.getLen();
if (fileSize <= context.getPrefetchBlockSize()) {
this.inputStream = new S3InMemoryInputStream(context, s3Attributes, client);
this.inputStream = new S3InMemoryInputStream(context, s3Attributes, client, streamStatistics);
} else {
this.inputStream = new S3CachingInputStream(context, s3Attributes, client);
this.inputStream = new S3CachingInputStream(context, s3Attributes, client, streamStatistics);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
this.s3File.getStatistics().readOperationStarted(offset, size);
Invoker invoker = this.s3File.getReadInvoker();

invoker.retry(
int invokerResponse = invoker.retry(
"read", this.s3File.getPath(), true,
() -> {
try {
Expand All @@ -119,7 +119,12 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
int numBytesRead = buffer.position();
buffer.limit(numBytesRead);
this.s3File.getStatistics().readOperationCompleted(size, numBytesRead);
return numBytesRead;

if (invokerResponse < 0) {
return invokerResponse;
} else {
return numBytesRead;
}
}

private void readOneBlock(ByteBuffer buffer, long offset, int size) throws IOException {
Expand Down Expand Up @@ -153,7 +158,7 @@ private void readOneBlock(ByteBuffer buffer, long offset, int size) throws IOExc
}
while (!this.closed && (numRemainingBytes > 0));
} finally {
s3File.close(inputStream);
s3File.close(inputStream, numRemainingBytes);
}
}
}
Loading