diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManager.java index 46a428cdfb48fd..078bbe05c9c6a3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManager.java @@ -150,7 +150,7 @@ public void close() { public enum CancelReason { /** Stream has switched to random IO. */ RandomIO, - /** Stream closed completely */ + /** Stream closed completely. */ Close, /** Stream unbuffered. */ Unbuffer diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java index 215278c584806a..1d34c01c3ff4cc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -42,10 +43,11 @@ import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** * Provides read access to the underlying file one block at a time. - * Improve read performance by prefetching and locall caching blocks. + * Improve read performance by prefetching and locally caching blocks. */ public abstract class CachingBlockManager extends BlockManager { private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class); @@ -432,7 +434,7 @@ private boolean isClosed() { /** * Disable caching; updates stream statistics and logs exactly once - * at info + * at info. * @param endOp operation which measured the duration of the write. */ private void disableCaching(final BlockOperations.End endOp) { @@ -566,21 +568,23 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture, LOG.debug("Block {}: awaiting any read to complete", blockNumber); try { - blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); + // wait for data; state of caching may change during this period. + awaitFuture(blockFuture, TIMEOUT_MINUTES, TimeUnit.MINUTES); if (data.stateEqualsOneOf(BufferData.State.DONE)) { // There was an error during prefetch. LOG.debug("Block {}: prefetch failure", blockNumber); return; } - } catch (Exception e) { - LOG.info("error waiting on blockFuture: {}. {}", data, e.getMessage()); - LOG.debug("error waiting on blockFuture: {}", data, e); + } catch (IOException | TimeoutException e) { + LOG.info("Error fetching block: {}. {}", data, e.toString()); + LOG.debug("Error fetching block: {}", data, e); data.setDone(); return; } if (isCachingDisabled()) { - LOG.debug("Block {}: Preparing caching disabled while reading data", blockNumber); + // caching was disabled while waiting fro the read to complete. + LOG.debug("Block {}: caching disabled while reading data", blockNumber); data.setDone(); return; } @@ -590,8 +594,7 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture, synchronized (data) { try { if (data.stateEqualsOneOf(BufferData.State.DONE)) { - LOG.debug("Block {}: Block already in cache; not adding", blockNumber); - + LOG.debug("Block {}: block no longer in use; not adding", blockNumber); return; } @@ -606,10 +609,10 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture, buffer.rewind(); cachePut(blockNumber, buffer); data.setDone(); - } catch (Exception e) { + } catch (IOException e) { numCachingErrors.incrementAndGet(); - LOG.info("error adding block to cache after wait: {}. {}", data, e.getMessage()); - LOG.debug("error adding block to cache after wait: {}", data, e); + LOG.info("error adding block to cache: {}. {}", data, e.getMessage()); + LOG.debug("error adding block to cache: {}", data, e); data.setDone(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/Sizes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/Sizes.java index 7ea3531c1d7c9c..faaaba9ae8c191 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/Sizes.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/Sizes.java @@ -19,9 +19,9 @@ package org.apache.hadoop.test; /** - * Sizes of data in KiB/MiB + * Sizes of data in KiB/MiB. */ -public class Sizes { +public final class Sizes { public static final int S_256 = 256; public static final int S_512 = 512; @@ -48,4 +48,6 @@ public class Sizes { public static final int S_64M = 64 * S_1M; public static final double NANOSEC = 1.0e9; + private Sizes() { + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java index ca083b776ec670..1ac12c7de9fac4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java @@ -221,6 +221,18 @@ public String toString() { return sb.toString(); } + /** + * Construct an instance of a {@code S3ACachingBlockManager}. + * + * @param futurePool asynchronous tasks are performed in this pool. + * @param reader reader that reads from S3 file. + * @param blockData information about each block of the S3 file. + * @param bufferPoolSize size of the in-memory cache in terms of number of blocks. + * @param streamStatistics statistics for this stream. + * @param conf the configuration. + * @param localDirAllocator the local dir allocator instance. + * @throws IllegalArgumentException if reader is null. + */ protected BlockManager createBlockManager( ExecutorServiceFuturePool futurePool, S3ARemoteObjectReader reader,