diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManager.java index 45f0aabe7dcd92..00fa0f3733019a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManager.java @@ -125,8 +125,9 @@ public void requestPrefetch(int blockNumber) { /** * Requests cancellation of any previously issued prefetch requests. + * @param reason why? */ - public void cancelPrefetches() { + public void cancelPrefetches(final CancelReason reason) { // Do nothing because we do not support prefetches. } @@ -142,4 +143,10 @@ public void requestCaching(BufferData data) { @Override public void close() { } + + public enum CancelReason { + RandomIO, + Close, + Unbuffer + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java index e43b176d0bfe90..dbc5d8b2577da2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.store.LogExactlyOnce; import static java.util.Objects.requireNonNull; @@ -48,6 +49,8 @@ */ public abstract class CachingBlockManager extends BlockManager { private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class); + + private static final LogExactlyOnce LOG_CACHING_DISABLED = new LogExactlyOnce(LOG); private static final int TIMEOUT_MINUTES = 60; /** @@ -82,7 +85,10 @@ public abstract class CachingBlockManager extends BlockManager { */ private final BlockOperations ops; - private boolean closed; + /** + * True if the manager has been closed. + */ + private final AtomicBoolean closed = new AtomicBoolean(false); /** * If a single caching operation takes more than this time (in seconds), @@ -137,7 +143,7 @@ public CachingBlockManager( } this.ops = new BlockOperations(); - this.ops.setDebug(false); + this.ops.setDebug(LOG.isDebugEnabled()); this.conf = requireNonNull(conf); this.localDirAllocator = localDirAllocator; } @@ -158,7 +164,7 @@ public BufferData get(int blockNumber) throws IOException { boolean done; do { - if (closed) { + if (isClosed()) { throw new IOException("this stream is already closed"); } @@ -220,7 +226,7 @@ private boolean getInternal(BufferData data) throws IOException { */ @Override public void release(BufferData data) { - if (closed) { + if (isClosed()) { return; } @@ -233,16 +239,14 @@ public void release(BufferData data) { @Override public synchronized void close() { - if (closed) { + if (closed.getAndSet(true)) { return; } - closed = true; - final BlockOperations.Operation op = ops.close(); // Cancel any prefetches in progress. - cancelPrefetches(); + cancelPrefetches(CancelReason.Close); cleanupWithLogger(LOG, cache); @@ -263,15 +267,18 @@ public synchronized void close() { public void requestPrefetch(int blockNumber) { checkNotNegative(blockNumber, "blockNumber"); - if (closed) { + if (isClosed()) { return; } // We initiate a prefetch only if we can acquire a buffer from the shared pool. + LOG.debug("Requesting prefetch for block {}", blockNumber); BufferData data = bufferPool.tryAcquire(blockNumber); if (data == null) { + LOG.debug("no buffer acquired for block {}", blockNumber); return; } + LOG.debug("acquired {}", data); // Opportunistic check without locking. if (!data.stateEqualsOneOf(BufferData.State.BLANK)) { @@ -298,12 +305,15 @@ public void requestPrefetch(int blockNumber) { * Requests cancellation of any previously issued prefetch requests. */ @Override - public void cancelPrefetches() { + public void cancelPrefetches(final CancelReason reason) { + LOG.debug("Cancelling prefetches {}", reason); BlockOperations.Operation op = ops.cancelPrefetches(); for (BufferData data : bufferPool.getAll()) { // We add blocks being prefetched to the local cache so that the prefetch is not wasted. - if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) { + // this only done if the reason is random IO-related, not due to close/unbuffer + if (reason == CancelReason.RandomIO && + data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) { requestCaching(data); } } @@ -316,7 +326,7 @@ private void read(BufferData data) throws IOException { try { readBlock(data, false, BufferData.State.BLANK); } catch (IOException e) { - LOG.error("error reading block {}", data.getBlockNumber(), e); + LOG.debug("error reading block {}", data.getBlockNumber(), e); throw e; } } @@ -337,7 +347,7 @@ private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOExc private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... expectedState) throws IOException { - if (closed) { + if (isClosed()) { return; } @@ -367,6 +377,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... tracker = prefetchingStatistics.prefetchOperationStarted(); op = ops.prefetch(data.getBlockNumber()); } else { + tracker = prefetchingStatistics.blockFetchOperationStarted(); op = ops.getRead(data.getBlockNumber()); } @@ -392,14 +403,22 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... if (isPrefetch) { prefetchingStatistics.prefetchOperationCompleted(); - if (tracker != null) { - tracker.close(); - } + } + if (tracker != null) { + tracker.close(); + LOG.debug("fetch completed: {}", tracker); } } } } + /** + * True if the manager has been closed. + */ + private boolean isClosed() { + return closed.get(); + } + /** * Read task that is submitted to the future pool. */ @@ -426,10 +445,13 @@ public Void get() { } } - private static final BufferData.State[] EXPECTED_STATE_AT_CACHING = - new BufferData.State[] { - BufferData.State.PREFETCHING, BufferData.State.READY - }; + + /** + * Required state of a block for it to be cacheable. + */ + private static final BufferData.State[] EXPECTED_STATE_AT_CACHING = { + BufferData.State.PREFETCHING, BufferData.State.READY + }; /** * Requests that the given block should be copied to the local cache. @@ -440,9 +462,10 @@ public Void get() { */ @Override public void requestCaching(BufferData data) { - if (closed) { + if (isClosed()) { return; } + LOG.debug("Request caching of {}", data); if (cachingDisabled.get()) { data.setDone(); @@ -453,16 +476,19 @@ public void requestCaching(BufferData data) { // Opportunistic check without locking. if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) { + LOG.debug("Block in wrong state to cache"); return; } synchronized (data) { // Reconfirm state after locking. if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) { + LOG.debug("Block in wrong state to cache"); return; } if (cache.containsBlock(data.getBlockNumber())) { + LOG.debug("Block is already in cache"); data.setDone(); return; } @@ -492,7 +518,7 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture, prefetchingStatistics.executorAcquired( Duration.between(taskQueuedStartTime, Instant.now())); - if (closed) { + if (isClosed()) { return; } @@ -550,7 +576,7 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture, if (!cachingDisabled.getAndSet(true)) { String message = String.format( "Caching disabled because of slow operation (%.1f sec)", endOp.duration()); - LOG.warn(message); + LOG_CACHING_DISABLED.info(message); } } } @@ -562,9 +588,10 @@ protected BlockCache createCache() { } protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { - if (closed) { + if (isClosed()) { return; } + LOG.debug("Caching {}: {}", blockNumber, buffer); cache.put(blockNumber, buffer, conf, localDirAllocator); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java index 9ce2dec5889f13..587b22f7df8760 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java @@ -24,7 +24,9 @@ import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatisticsSource; -public interface PrefetchingStatistics extends IOStatisticsSource { +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker; + + public interface PrefetchingStatistics extends IOStatisticsSource { /** * A prefetch operation has started. @@ -32,6 +34,14 @@ public interface PrefetchingStatistics extends IOStatisticsSource { */ DurationTracker prefetchOperationStarted(); + /** + * A block fetch operation has started. + * @return duration tracker + */ + default DurationTracker blockFetchOperationStarted() { + return stubDurationTracker(); + } + /** * A block has been saved to the file cache. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java index e043fbd904be8e..bcc643a46299d3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java @@ -47,6 +47,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.util.DurationInfo; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull; @@ -311,11 +312,13 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf, protected void writeFile(Path path, ByteBuffer buffer) throws IOException { buffer.rewind(); - WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS); - while (buffer.hasRemaining()) { - writeChannel.write(buffer); + try (WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS); + DurationInfo d = new DurationInfo(LOG, "Writing %d bytes to %s", + buffer.remaining(), path)) { + while (buffer.hasRemaining()) { + writeChannel.write(buffer); + } } - writeChannel.close(); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 50bbf45505cece..015330d5396724 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -421,6 +421,12 @@ public final class StreamStatisticNames { public static final String STREAM_READ_PREFETCH_OPERATIONS = "stream_read_prefetch_operations"; + /** + * Total number of block fetch operations executed. + */ + public static final String STREAM_READ_BLOCK_FETCH_OPERATIONS + = "stream_read_block_fetch_operations"; + /** * Total number of block in disk cache. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java index 6a5d01fb3b0746..ad3fb1a0a88c57 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java @@ -31,6 +31,9 @@ import java.util.function.Function; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.StorageStatistics; @@ -51,6 +54,12 @@ * Support for implementing IOStatistics interfaces. */ public final class IOStatisticsBinding { + /** + * Log changes at debug. + * Noisy, but occasionally useful. + */ + private static final Logger LOG = + LoggerFactory.getLogger(IOStatisticsBinding.class); /** Pattern used for each entry. */ public static final String ENTRY_PATTERN = "(%s=%s)"; @@ -548,6 +557,7 @@ public static B invokeTrackingDuration( } catch (IOException | RuntimeException e) { // input function failed: note it tracker.failed(); + LOG.debug("Operation failure with duration {}", tracker); // and rethrow throw e; } finally { @@ -555,6 +565,7 @@ public static B invokeTrackingDuration( // this is called after the catch() call would have // set the failed flag. tracker.close(); + LOG.debug("Operation success with duration {}", tracker); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StatisticDurationTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StatisticDurationTracker.java index 04d30135f6bd3d..3d2d26ca320ace 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StatisticDurationTracker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StatisticDurationTracker.java @@ -31,6 +31,10 @@ * In the constructor, the counter with name of 'key' is * incremented -default is by 1, but can be set to other * values, including 0. + *

+ * If there is also a gauge with with the same key, + * then it is incremented in the constructor and decremented + * afterwards. */ public class StatisticDurationTracker extends OperationDuration implements DurationTracker { @@ -78,6 +82,9 @@ public StatisticDurationTracker( if (count > 0) { iostats.incrementCounter(key, count); } + if (iostats.gauges().containsKey(key)) { + iostats.incrementGauge(key, 1); + } } @Override @@ -102,6 +109,9 @@ public void close() { iostats.incrementCounter(name); } iostats.addTimedOperation(name, asDuration()); + if (iostats.gauges().containsKey(key)) { + iostats.incrementGauge(key, -1); + } } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 48977b510578b1..1848eead6b858c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -220,6 +220,10 @@ public S3AInstrumentation(URI name) { storeBuilder.withDurationTracking(stat.getSymbol()); }); + // plus any gauges which are also counters; the auto-registration + // doesn't handle these. + storeBuilder.withGauges(StreamStatisticNames.STREAM_READ_BLOCK_FETCH_OPERATIONS); + //todo need a config for the quantiles interval? int interval = 1; throttleRateQuantile = quantiles(STORE_IO_THROTTLE_RATE, @@ -1385,6 +1389,11 @@ public DurationTracker prefetchOperationStarted() { return trackDuration(StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS); } + @Override + public DurationTracker blockFetchOperationStarted() { + return trackDuration(StreamStatisticNames.STREAM_READ_BLOCK_FETCH_OPERATIONS); + } + @Override public void blockAddedToFileCache() { incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, 1); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 651769ff283bd7..10d06227f015b4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -394,6 +394,10 @@ public enum Statistic { StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, "Gauge of active memory in use", TYPE_GAUGE), + STREAM_READ_BLOCK_FETCH_OPERATIONS( + StreamStatisticNames.STREAM_READ_BLOCK_FETCH_OPERATIONS, + "Gauge of active block fetch in use", + TYPE_COUNTER), /* Stream Write statistics */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java index fe950486480380..0af2696d735c95 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java @@ -54,11 +54,14 @@ public class S3ACachingInputStream extends S3ARemoteInputStream { */ private final int numBlocksToPrefetch; - private final BlockManager blockManager; + private final Configuration conf; + + private final LocalDirAllocator localDirAllocator; + + private BlockManager blockManager; /** * Initializes a new instance of the {@code S3ACachingInputStream} class. - * * @param context read-specific operation context. * @param s3Attributes attributes of the S3 object being read. * @param client callbacks used for interacting with the underlying S3 client. @@ -76,29 +79,57 @@ public S3ACachingInputStream( S3AInputStreamStatistics streamStatistics, Configuration conf, LocalDirAllocator localDirAllocator) { - super(context, s3Attributes, client, streamStatistics); - this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount(); - int bufferPoolSize = this.numBlocksToPrefetch + 1; - this.blockManager = this.createBlockManager( - this.getContext().getFuturePool(), - this.getReader(), - this.getBlockData(), - bufferPoolSize, - conf, - localDirAllocator); + super(context, s3Attributes, client, streamStatistics); + this.conf = conf; + this.localDirAllocator = localDirAllocator; + this.numBlocksToPrefetch = getContext().getPrefetchBlockCount(); + demandCreateBlockManager(); int fileSize = (int) s3Attributes.getLen(); LOG.debug("Created caching input stream for {} (size = {})", this.getName(), fileSize); + demandCreateBlockManager(); + } + + /** + * Demand create the block manager. + */ + private void demandCreateBlockManager() { + blockManager = createBlockManager( + getContext().getFuturePool(), + getReader(), + getBlockData(), + numBlocksToPrefetch + 1, + conf, + localDirAllocator); } @Override public void close() throws IOException { // Close the BlockManager first, cancelling active prefetches, // deleting cached files and freeing memory used by buffer pool. - blockManager.close(); - super.close(); - LOG.info("closed: {}", getName()); + if (!isClosed()) { + closeBlockManager(); + super.close(); + LOG.info("closed: {}", getName()); + } + } + + @Override + protected boolean closeStream() { + final boolean b = super.closeStream(); + closeBlockManager(); + return b; + } + + /** + * Close the block manager and set to null, if + * it is not already in this state. + */ + private void closeBlockManager() { + if (blockManager != null) { + blockManager.close(); + } } @Override @@ -106,6 +137,7 @@ protected boolean ensureCurrentBuffer() throws IOException { if (isClosed()) { return false; } + demandCreateBlockManager(); long readPos = getNextReadPos(); if (!getBlockData().isValidOffset(readPos)) { @@ -139,7 +171,7 @@ protected boolean ensureCurrentBuffer() throws IOException { int prefetchCount; if (outOfOrderRead) { LOG.debug("lazy-seek({})", getOffsetStr(readPos)); - blockManager.cancelPrefetches(); + blockManager.cancelPrefetches(BlockManager.CancelReason.RandomIO); // We prefetch only 1 block immediately after a seek operation. prefetchCount = 1; @@ -175,7 +207,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append(String.format("%s%n", super.toString())); - sb.append(blockManager.toString()); + sb.append(" block manager: ").append(blockManager); return sb.toString(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java index e8bfe946f4abf2..03bd09342d5e98 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3AInMemoryInputStream.java @@ -32,18 +32,29 @@ import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.Preconditions.checkState; + /** * Provides an {@code InputStream} that allows reading from an S3 file. * The entire file is read into memory before reads can begin. * - * Use of this class is recommended only for small files that can fit - * entirely in memory. + * Use of this class is recommended only for small files. + * When {@link #unbuffer()} is called, the memory is released. */ public class S3AInMemoryInputStream extends S3ARemoteInputStream { private static final Logger LOG = LoggerFactory.getLogger( S3AInMemoryInputStream.class); + /** + * The file size; cut down from long. + */ + private final int fileSize; + + /** + * Buffer containing the read data. + */ private ByteBuffer buffer; /** @@ -64,12 +75,32 @@ public S3AInMemoryInputStream( S3AInputStream.InputStreamCallbacks client, S3AInputStreamStatistics streamStatistics) { super(context, s3Attributes, client, streamStatistics); - int fileSize = (int) s3Attributes.getLen(); - this.buffer = ByteBuffer.allocate(fileSize); + final long len = s3Attributes.getLen(); + checkArgument(len < Integer.MAX_VALUE && len >= 0, + "Unsupported file size: %s", len); + fileSize = (int) len; + } + + /** + * Allocates a buffer for the input stream. + * + * @throws IllegalStateException if the buffer is already allocated. + */ + private void allocateBuffer() { + checkState(buffer == null, "buffer for {} already allocated", getName()); + buffer = ByteBuffer.allocate(fileSize); LOG.debug("Created in-memory input stream for {} (size = {})", getName(), fileSize); } + /** + * Set the buffer to null so that GC will clean it up. + * Harmless to call on a released buffer. + */ + private void releaseBuffer() { + buffer = null; + } + /** * Ensures that a non-empty valid buffer is available for immediate reading. * It returns true when at least one such buffer is available for reading. @@ -86,6 +117,9 @@ protected boolean ensureCurrentBuffer() throws IOException { if (getBlockData().getFileSize() == 0) { return false; } + if (buffer == null) { + allocateBuffer(); + } FilePosition filePosition = getFilePosition(); if (filePosition.isValid()) { @@ -105,4 +139,12 @@ protected boolean ensureCurrentBuffer() throws IOException { return filePosition.buffer().hasRemaining(); } + + @Override + protected boolean closeStream() { + final boolean b = super.closeStream(); + releaseBuffer(); + return b; + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java index 9b9ee12ad75026..d41bfc5059f1b7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.LocalDirAllocator; @@ -49,7 +50,7 @@ */ public class S3APrefetchingInputStream extends FSInputStream - implements CanSetReadahead, StreamCapabilities, IOStatisticsSource { + implements CanSetReadahead, StreamCapabilities, IOStatisticsSource, CanUnbuffer { private static final Logger LOG = LoggerFactory.getLogger( S3APrefetchingInputStream.class); @@ -154,6 +155,19 @@ public synchronized long getPos() throws IOException { return lastReadCurrentPos; } + /** + * Ensure the stream is active by initializing the inner input stream if + * neeeded. + * @throws IOException failure + */ + protected synchronized void ensureStreamActive() throws IOException { + throwIfClosed(); + + if (inputStream.underlyingResourcesClosed()) { + inputStream.initializeUnderlyingResources(); + } + } + /** * Reads and returns one byte from this stream. * @@ -162,7 +176,7 @@ public synchronized long getPos() throws IOException { */ @Override public synchronized int read() throws IOException { - throwIfClosed(); + ensureStreamActive(); return inputStream.read(); } @@ -180,10 +194,17 @@ public synchronized int read() throws IOException { @Override public synchronized int read(byte[] buffer, int offset, int len) throws IOException { - throwIfClosed(); + ensureStreamActive(); return inputStream.read(buffer, offset, len); } + @Override + public synchronized void unbuffer() { + if (inputStream != null) { + inputStream.unbuffer(); + } + } + /** * Closes this stream and releases all acquired resources. * diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java index 38d740bd74f94d..cdaaa4c22b7d29 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.impl.prefetch.BlockData; @@ -51,7 +53,7 @@ */ public abstract class S3ARemoteInputStream extends InputStream - implements CanSetReadahead, StreamCapabilities, IOStatisticsSource { + implements CanSetReadahead, StreamCapabilities, IOStatisticsSource, CanUnbuffer { private static final Logger LOG = LoggerFactory.getLogger( S3ARemoteInputStream.class); @@ -74,7 +76,13 @@ public abstract class S3ARemoteInputStream /** * Indicates whether the stream has been closed. */ - private volatile boolean closed; + private final AtomicBoolean closed = new AtomicBoolean(false); + + /** + * Indicates whether resources have been freed, used when the stream is closed + * via unbuffer. + */ + private final AtomicBoolean underlyingResourcesClosed = new AtomicBoolean(true); /** * Internal position within the file. Updated lazily @@ -132,7 +140,7 @@ public S3ARemoteInputStream( this.client = requireNonNull(client); this.streamStatistics = requireNonNull(streamStatistics); this.ioStatistics = streamStatistics.getIOStatistics(); - this.name = S3ARemoteObject.getPath(s3Attributes); + this.name = context.getPath().toUri().toString(); this.changeTracker = new ChangeTracker( this.name, context.getChangeDetectionPolicy(), @@ -142,15 +150,21 @@ public S3ARemoteInputStream( setInputPolicy(context.getInputPolicy()); setReadahead(context.getReadahead()); + initializeUnderlyingResources(); + + this.nextReadPos = 0; + } + /** + * Initializes those resources that the stream uses but are released during unbuffer. + */ + protected void initializeUnderlyingResources() { + underlyingResourcesClosed.set(false); long fileSize = s3Attributes.getLen(); int bufferSize = context.getPrefetchBlockSize(); - - this.blockData = new BlockData(fileSize, bufferSize); - this.fpos = new FilePosition(fileSize, bufferSize); this.remoteObject = getS3File(); this.reader = new S3ARemoteObjectReader(remoteObject); - - this.nextReadPos = 0; + this.blockData = new BlockData(fileSize, bufferSize); + this.fpos = new FilePosition(fileSize, bufferSize); } /** @@ -196,7 +210,8 @@ public synchronized void setReadahead(Long readahead) { @Override public boolean hasCapability(String capability) { return capability.equalsIgnoreCase(StreamCapabilities.IOSTATISTICS) - || capability.equalsIgnoreCase(StreamCapabilities.READAHEAD); + || capability.equalsIgnoreCase(StreamCapabilities.READAHEAD) + || capability.equalsIgnoreCase(StreamCapabilities.UNBUFFER); } /** @@ -216,6 +231,11 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) { public int available() throws IOException { throwIfClosed(); + // buffer is closed. + if (underlyingResourcesClosed.get()) { + return 0; + } + // Update the current position in the current buffer, if possible. if (!fpos.setAbsolute(nextReadPos)) { return 0; @@ -343,39 +363,43 @@ public int read(byte[] buffer, int offset, int len) throws IOException { return numBytesRead; } - protected S3ARemoteObject getFile() { + protected final S3ARemoteObject getRemoteObject() { return remoteObject; } - protected S3ARemoteObjectReader getReader() { + protected final S3ARemoteObjectReader getReader() { return reader; } - protected S3ObjectAttributes getS3ObjectAttributes() { + protected final S3ObjectAttributes getS3ObjectAttributes() { return s3Attributes; } - protected FilePosition getFilePosition() { + protected final FilePosition getFilePosition() { return fpos; } - protected String getName() { + protected final String getName() { return name; } - protected boolean isClosed() { - return closed; + protected final boolean isClosed() { + return closed.get(); } - protected long getNextReadPos() { + protected final boolean underlyingResourcesClosed() { + return underlyingResourcesClosed.get(); + } + + protected final long getNextReadPos() { return nextReadPos; } - protected BlockData getBlockData() { + protected final BlockData getBlockData() { return blockData; } - protected S3AReadOpContext getContext() { + protected final S3AReadOpContext getContext() { return context; } @@ -409,6 +433,34 @@ protected String getOffsetStr(long offset) { return String.format("%d:%d", blockNumber, offset); } + + @Override + public synchronized void unbuffer() { + LOG.debug("{}: unbuffered", getName()); + if (closeStream()) { + getS3AStreamStatistics().unbuffered(); + } + } + + /** + * Close the stream in close() or unbuffer(). + * @return true if the stream was closed; false means it was already closed. + */ + + protected boolean closeStream() { + + if (underlyingResourcesClosed.getAndSet(true)) { + return false; + } + + blockData = null; + reader.close(); + reader = null; + remoteObject = null; + fpos.invalidate(); + return true; + } + /** * Closes this stream and releases all acquired resources. * @@ -416,16 +468,13 @@ protected String getOffsetStr(long offset) { */ @Override public void close() throws IOException { - if (closed) { + if (closed.getAndSet(true)) { return; } - closed = true; + closeStream(); - blockData = null; - reader.close(); - reader = null; + // TODO remoteObject.close(); remoteObject = null; - fpos.invalidate(); try { client.close(); } finally { @@ -452,7 +501,7 @@ public String toString() { } protected void throwIfClosed() throws IOException { - if (closed) { + if (closed.get()) { throw new IOException( name + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java index 3ab0022bb082e6..52c0c828252687 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java @@ -90,6 +90,8 @@ public class S3ARemoteObject { */ private static final int DRAIN_BUFFER_SIZE = 16384; + public static final String OPERATION_GET = "GET"; + /** * Initializes a new instance of the {@code S3ARemoteObject} class. * @@ -124,7 +126,7 @@ public S3ARemoteObject( this.streamStatistics = streamStatistics; this.changeTracker = changeTracker; this.s3Objects = new IdentityHashMap<>(); - this.uri = this.getPath(); + this.uri = context.getPath().toUri().toString(); } /** @@ -151,18 +153,7 @@ public S3AInputStreamStatistics getStatistics() { * @return the path of this file. */ public String getPath() { - return getPath(s3Attributes); - } - - /** - * Gets the path corresponding to the given s3Attributes. - * - * @param s3Attributes attributes of an S3 object. - * @return the path corresponding to the given s3Attributes. - */ - public static String getPath(S3ObjectAttributes s3Attributes) { - return String.format("s3a://%s/%s", s3Attributes.getBucket(), - s3Attributes.getKey()); + return uri; } /** @@ -199,10 +190,11 @@ public InputStream openForRead(long offset, int size) throws IOException { changeTracker.maybeApplyConstraint(request); String operation = String.format( - "%s %s at %d", S3AInputStream.OPERATION_OPEN, uri, offset); + "%s %s at %d size %d", OPERATION_GET, uri, offset, size); DurationTracker tracker = streamStatistics.initiateGetRequest(); S3Object object = null; + LOG.debug("{}", operation); try { object = Invoker.once(operation, uri, () -> client.getObject(request)); } catch (IOException e) { @@ -211,6 +203,7 @@ public InputStream openForRead(long offset, int size) throws IOException { } finally { tracker.close(); } + LOG.debug("{} result duration {}", operation, tracker); changeTracker.processResponse(object, operation, offset); InputStream stream = object.getObjectContent(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java index 89ea77d6d0ebb1..38c3c108e95233 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java @@ -107,8 +107,10 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) this.streamStatistics.readOperationStarted(offset, size); Invoker invoker = this.remoteObject.getReadInvoker(); + final String path = this.remoteObject.getPath(); int invokerResponse = - invoker.retry("read", this.remoteObject.getPath(), true, + invoker.retry(String.format("read %s [%d-%d]", path, offset, size), + path, true, trackDurationOfOperation(streamStatistics, STREAM_READ_REMOTE_BLOCK_READ, () -> { try { @@ -127,6 +129,10 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) int numBytesRead = buffer.position(); buffer.limit(numBytesRead); + if (numBytesRead != size) { + // fewer bytes came back; log at debug + LOG.debug("Expected to read {} bytes but got {}", size, numBytesRead); + } this.remoteObject.getStatistics() .readOperationCompleted(size, numBytesRead); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java index 82a7a3c63b37f7..7b40e403b8ee11 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java @@ -19,8 +19,11 @@ package org.apache.hadoop.fs.contract.s3a; import java.io.FileNotFoundException; +import java.util.Collection; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -32,13 +35,32 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.PREFETCH_OPTIONS; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * S3A contract tests opening files. */ +@RunWith(Parameterized.class) public class ITestS3AContractOpen extends AbstractContractOpenTest { + public ITestS3AContractOpen(final boolean prefetch) { + this.prefetch = prefetch; + } + + /** + * Parameterization. + */ + @Parameterized.Parameters(name = "prefetch={0}") + public static Collection params() { + return PREFETCH_OPTIONS; + } + + /** + * Prefetch flag. + */ + private final boolean prefetch; + @Override protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java index 2c7149ff5cb155..25f86a70a53259 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java @@ -18,12 +18,49 @@ package org.apache.hadoop.fs.contract.s3a; +import java.util.Collection; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.PREFETCH_OPTIONS; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.prepareTestConfiguration; + +@RunWith(Parameterized.class) public class ITestS3AContractUnbuffer extends AbstractContractUnbufferTest { + /** + * Parameterization. + */ + @Parameterized.Parameters(name = "prefetch={0}") + public static Collection params() { + return PREFETCH_OPTIONS; + } + + /** + * Prefetch flag. + */ + private final boolean prefetch; + + public ITestS3AContractUnbuffer(final boolean prefetch) { + this.prefetch = prefetch; + } + + /** + * Create a configuration. + * @return a configuration + */ + @Override + protected Configuration createConfiguration() { + final Configuration c = prepareTestConfiguration(super.createConfiguration()); + return enablePrefetch(c, prefetch); + } + @Override protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index 2146e17b6adfbb..cc043547a9507d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; @@ -36,12 +35,15 @@ import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX; @@ -57,6 +59,8 @@ */ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest { + private FileStatus fileStatus; + public ITestS3APrefetchingInputStream() { super(true); } @@ -68,7 +72,7 @@ public ITestS3APrefetchingInputStream() { private static final int S_1M = S_1K * S_1K; // Path for file which should have length > block size so S3ACachingInputStream is used private Path largeFile; - private FileSystem largeFileFS; + private S3AFileSystem largeFileFS; private int numBlocks; private int blockSize; private long largeFileSize; @@ -89,9 +93,12 @@ public Configuration createConfiguration() { @Override public void teardown() throws Exception { + if (largeFileFS != null) { + FILESYSTEM_IOSTATS.aggregate(largeFileFS.getIOStatistics()); + cleanupWithLogger(LOG, largeFileFS); + largeFileFS = null; + } super.teardown(); - cleanupWithLogger(LOG, largeFileFS); - largeFileFS = null; } private void openFS() throws Exception { @@ -102,7 +109,7 @@ private void openFS() throws Exception { blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); largeFileFS = new S3AFileSystem(); largeFileFS.initialize(new URI(largeFileUri), getConfiguration()); - FileStatus fileStatus = largeFileFS.getFileStatus(largeFile); + fileStatus = largeFileFS.getFileStatus(largeFile); largeFileSize = fileStatus.getLen(); numBlocks = calculateNumBlocks(largeFileSize, blockSize); } @@ -121,7 +128,10 @@ public void testReadLargeFileFully() throws Throwable { IOStatistics ioStats; openFS(); - try (FSDataInputStream in = largeFileFS.open(largeFile)) { + try (FSDataInputStream in = largeFileFS.openFile(largeFile) + .withFileStatus(fileStatus) + .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .build().get()) { ioStats = in.getIOStatistics(); byte[] buffer = new byte[S_1M * 10]; @@ -134,12 +144,18 @@ public void testReadLargeFileFully() throws Throwable { verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); } + LOG.info("Stream statistics for file {}\n{}", fileStatus, + ioStatisticsToPrettyString(in.getIOStatistics())); + // Assert that first block is read synchronously, following blocks are prefetched verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, numBlocks - 1); verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks); verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks); + in.unbuffer(); + // Verify that once stream is closed, all memory is freed + verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); } // Verify that once stream is closed, all memory is freed verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java index 3a2d1b1b09a491..2d37fc1885d77b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java @@ -35,6 +35,7 @@ import java.io.IOException; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getInputStreamStatistics; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES; @@ -75,7 +76,6 @@ public void testUnbuffer() throws IOException { // Open file, read half the data, and then call unbuffer try (FSDataInputStream inputStream = getFileSystem().open(dest)) { skipIfCannotUnbuffer(inputStream); - assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream); int bytesToRead = 8; readAndAssertBytesRead(inputStream, bytesToRead); assertTrue(isObjectStreamOpen(inputStream)); @@ -178,9 +178,7 @@ public void testUnbufferStreamStatistics() throws IOException { totalBytesRead.assertDiffEquals(expectedTotalBytesRead); // Validate that the input stream stats are correct when the file is closed - S3AInputStreamStatistics streamStatistics = ((S3AInputStream) inputStream - .getWrappedStream()) - .getS3AStreamStatistics(); + S3AInputStreamStatistics streamStatistics = getInputStreamStatistics(inputStream); Assertions.assertThat(streamStatistics) .describedAs("Stream statistics %s", streamStatistics) .hasFieldOrPropertyWithValue("bytesRead", diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 469562f9b33b96..16f8e1664a3927 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -77,6 +77,8 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -577,11 +579,33 @@ public static Configuration prepareTestConfiguration(final Configuration conf) { boolean prefetchEnabled = getTestPropertyBool(conf, PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT); - conf.setBoolean(PREFETCH_ENABLED_KEY, prefetchEnabled); + enablePrefetch(conf, prefetchEnabled); return conf; } + /** + * Unset base/bucket prefetch options and set to the supplied option instead. + * @param conf configuration + * @param prefetch prefetch option + * @return the modified configuration. + */ + public static Configuration enablePrefetch(final Configuration conf, boolean prefetch) { + removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY); + conf.setBoolean(PREFETCH_ENABLED_KEY, prefetch); + return conf; + } + + /** + * The prefetch parameters to expose in a parameterized + * test to turn prefetching on/off. + */ + public static Collection PREFETCH_OPTIONS = + Arrays.asList(new Object[][] { + {true}, + {false}, + }); + /** * build dir. * @return the directory for the project's build, as set by maven, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/MockS3ARemoteObject.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/MockS3ARemoteObject.java index 6e2f547a22ec1a..d0382018306abc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/MockS3ARemoteObject.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/MockS3ARemoteObject.java @@ -53,7 +53,7 @@ class MockS3ARemoteObject extends S3ARemoteObject { MockS3ARemoteObject(int size, boolean throwExceptionOnOpen) { super( - S3APrefetchFakes.createReadContext(null, KEY, size, 1, 1), + S3APrefetchFakes.createReadContext(null, "s3a://" + BUCKET + "/" + KEY, size, 1, 1), S3APrefetchFakes.createObjectAttributes(BUCKET, KEY, size), S3APrefetchFakes.createInputStreamCallbacks(BUCKET, KEY), EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java index cbfa643ee53629..2c6cde349f5aa2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.impl.prefetch.BlockData; +import org.apache.hadoop.fs.impl.prefetch.BlockManager; import org.apache.hadoop.fs.impl.prefetch.BufferData; import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3ATestUtils; @@ -260,7 +261,9 @@ private void testPrefetchHelper(boolean forcePrefetchFailure) assertEquals(0, blockManager.numCached()); - blockManager.cancelPrefetches(); + // random IO prefetches will still complete; close/unbuffer will not + // add the result to the cache. + blockManager.cancelPrefetches(BlockManager.CancelReason.RandomIO); waitForCaching(blockManager, expectedNumSuccesses); assertEquals(expectedNumErrors, this.totalErrors(blockManager)); assertEquals(expectedNumSuccesses, blockManager.numCached()); @@ -354,9 +357,10 @@ private void waitForCaching( numTrys++; if (numTrys > 600) { String message = String.format( - "waitForCaching: expected: %d, actual: %d, read errors: %d, caching errors: %d", + "waitForCaching: expected: %d, actual: %d, read errors: %d, caching errors: %d in %s", expectedCount, count, blockManager.numReadErrors(), - blockManager.numCachingErrors()); + blockManager.numCachingErrors(), + blockManager); throw new IllegalStateException(message); } } diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 306a79a20a2040..798aa0d8affe5c 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -88,3 +88,6 @@ log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO # uncomment this to trace where context entries are set # log4j.logger.org.apache.hadoop.fs.audit.CommonAuditContext=TRACE + +# prefetch hadoop common logs +log4j.logger.org.apache.hadoop.fs.impl.prefetch=DEBUG