Skip to content

Commit

Permalink
updates as per review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmarsuhail committed Jun 9, 2022
1 parent 52e6f96 commit 0807cbd
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
* Provides read access to the underlying file one block at a time.
* Improve read performance by prefetching and locall caching blocks.
Expand Down Expand Up @@ -204,7 +206,7 @@ public synchronized void close() {
// Cancel any prefetches in progress.
this.cancelPrefetches();

Io.closeIgnoringIoException(this.cache);
cleanupWithLogger(LOG, this.cache);

this.ops.end(op);
LOG.info(this.ops.getSummary(false));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.common.Io;
import org.apache.hadoop.fs.common.Validate;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AInputStream;
Expand All @@ -41,36 +40,53 @@
import org.apache.hadoop.fs.statistics.DurationTracker;

import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
* Encapsulates low level interactions with S3 object on AWS.
*/
public class S3File {
private static final Logger LOG = LoggerFactory.getLogger(S3File.class);

// Read-specific operation context.
/**
* Read-specific operation context.
*/
private final S3AReadOpContext context;

// S3 object attributes.
/**
* S3 object attributes.
*/
private final S3ObjectAttributes s3Attributes;

// Callbacks used for interacting with the underlying S3 client.
/**
* Callbacks used for interacting with the underlying S3 client.
*/
private final S3AInputStream.InputStreamCallbacks client;

// Used for reporting input stream access statistics.
/**
* Used for reporting input stream access statistics.
*/
private final S3AInputStreamStatistics streamStatistics;

// Enforces change tracking related policies.
/**
* Enforces change tracking related policies.
*/
private final ChangeTracker changeTracker;

// Maps a stream returned by openForRead() to the associated S3 object.
// That allows us to close the object when closing the stream.
/**
* Maps a stream returned by openForRead() to the associated S3 object.
* That allows us to close the object when closing the stream.
*/
private Map<InputStream, S3Object> s3Objects;

// uri of the object being read
/**
* uri of the object being read
*/
private final String uri;

// size of a buffer to create when draining the stream.
/**
* size of a buffer to create when draining the stream.
*/
private static final int DRAIN_BUFFER_SIZE = 16384;

/**
Expand Down Expand Up @@ -106,7 +122,7 @@ public S3File(
this.client = client;
this.streamStatistics = streamStatistics;
this.changeTracker = changeTracker;
this.s3Objects = new IdentityHashMap<InputStream, S3Object>();
this.s3Objects = new IdentityHashMap<>();
this.uri = "s3a://" + this.s3Attributes.getBucket() + "/" + this.s3Attributes.getKey();
}

Expand Down Expand Up @@ -236,8 +252,12 @@ void close(InputStream inputStream, int numRemainingBytes) {
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drain(final boolean shouldAbort, final String reason, final long remaining,
final S3Object requestObject, final InputStream inputStream) {
private boolean drain(
final boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {

try {
return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
Expand All @@ -262,8 +282,12 @@ private boolean drain(final boolean shouldAbort, final String reason, final long
* @param inputStream stream to close.
* @return was the stream aborted?
*/
private boolean drainOrAbortHttpStream(boolean shouldAbort, final String reason,
final long remaining, final S3Object requestObject, final InputStream inputStream) {
private boolean drainOrAbortHttpStream(
boolean shouldAbort,
final String reason,
final long remaining,
final S3Object requestObject,
final InputStream inputStream) {

if (!shouldAbort && remaining > 0) {
try {
Expand All @@ -284,8 +308,8 @@ private boolean drainOrAbortHttpStream(boolean shouldAbort, final String reason,
shouldAbort = true;
}
}
Io.closeIgnoringIoException(inputStream);
Io.closeIgnoringIoException(requestObject);
cleanupWithLogger(LOG, inputStream);
cleanupWithLogger(LOG, requestObject);
streamStatistics.streamClose(shouldAbort, remaining);

LOG.debug("Stream {} {}: {}; remaining={}", uri, (shouldAbort ? "aborted" : "closed"), reason,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package org.apache.hadoop.fs.s3a;

import java.io.IOException;
import java.net.URI;

import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -37,6 +40,7 @@
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
* Test the prefetching input stream, validates that the underlying S3CachingInputStream and
Expand All @@ -48,15 +52,19 @@ public ITestS3PrefetchingInputStream() {
super(true);
}

private static final int _1K = 1024;
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3PrefetchingInputStream.class);

private static final int S_1K = 1024;
private static final int S_1M = S_1K * S_1K;
// Path for file which should have length > block size so S3CachingInputStream is used
private Path largeFile;
private FileSystem fs;
private FileSystem largeFileFS;
private int numBlocks;
private int blockSize;
private long largeFileSize;
// Size should be < block size so S3InMemoryInputStream is used
private static final int SMALL_FILE_SIZE = _1K * 16;
private static final int SMALL_FILE_SIZE = S_1K * 16;


@Override
Expand All @@ -67,14 +75,21 @@ public Configuration createConfiguration() {
return conf;
}

@After
public void teardown() throws Exception {
super.teardown();
cleanupWithLogger(LOG, largeFileFS);
largeFileFS = null;
}

private void openFS() throws IOException {
private void openFS() throws Exception {
Configuration conf = getConfiguration();

largeFile = new Path(DEFAULT_CSVTEST_FILE);
blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
fs = largeFile.getFileSystem(getConfiguration());
FileStatus fileStatus = fs.getFileStatus(largeFile);
largeFileFS = new S3AFileSystem();
largeFileFS.initialize(new URI(DEFAULT_CSVTEST_FILE), getConfiguration());
FileStatus fileStatus = largeFileFS.getFileStatus(largeFile);
largeFileSize = fileStatus.getLen();
numBlocks = calculateNumBlocks(largeFileSize, blockSize);
}
Expand All @@ -92,12 +107,16 @@ public void testReadLargeFileFully() throws Throwable {
describe("read a large file fully, uses S3CachingInputStream");
openFS();

try (FSDataInputStream in = fs.open(largeFile)) {
try (FSDataInputStream in = largeFileFS.open(largeFile)) {
IOStatistics ioStats = in.getIOStatistics();

byte[] buffer = new byte[(int) largeFileSize];
byte[] buffer = new byte[S_1M * 10];
long bytesRead = 0;

in.read(buffer, 0, (int) largeFileSize);
while (bytesRead < largeFileSize) {
in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead));
bytesRead += buffer.length;
}

verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
Expand All @@ -109,16 +128,16 @@ public void testRandomReadLargeFile() throws Throwable {
describe("random read on a large file, uses S3CachingInputStream");
openFS();

try (FSDataInputStream in = fs.open(largeFile)) {
try (FSDataInputStream in = largeFileFS.open(largeFile)) {
IOStatistics ioStats = in.getIOStatistics();

byte[] buffer = new byte[blockSize];

// Don't read the block completely so it gets cached on seek
in.read(buffer, 0, blockSize - _1K * 10);
in.seek(blockSize + _1K * 10);
in.read(buffer, 0, blockSize - S_1K * 10);
in.seek(blockSize + S_1K * 10);
// Backwards seek, will use cached block
in.seek(_1K * 5);
in.seek(S_1K * 5);
in.read();

verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
Expand All @@ -139,13 +158,14 @@ public void testRandomReadSmallFile() throws Throwable {

byte[] buffer = new byte[SMALL_FILE_SIZE];

in.read(buffer, 0, _1K * 4);
in.seek(_1K * 12);
in.read(buffer, 0, _1K * 4);
in.read(buffer, 0, S_1K * 4);
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);

verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1);
}

}

}

0 comments on commit 0807cbd

Please sign in to comment.