diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManagerParameters.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManagerParameters.java index 581d53016df744..0ffc631959f3e5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManagerParameters.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManagerParameters.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; /** @@ -30,6 +31,11 @@ @InterfaceAudience.Private public final class BlockManagerParameters { + /** + * The path of the underlying file; for logging. + */ + private Path path; + /** * Asynchronous tasks are performed in this pool. */ @@ -224,4 +230,20 @@ public BlockManagerParameters withTrackerFactory( return this; } + /** + * @return The path of the underlying file. + */ + public Path getPath() { + return path; + } + + /** + * Set the path. + * @param value new value + * @return the builder + */ + public BlockManagerParameters withPath(final Path value) { + path = value; + return this; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BufferData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BufferData.java index de68269ab700c4..d8a0c216106f03 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BufferData.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BufferData.java @@ -51,7 +51,7 @@ public enum State { /** * Buffer has been acquired but has no data. */ - BLANK, + EMPTY, /** * This block is being prefetched. @@ -114,7 +114,7 @@ public BufferData(int blockNumber, ByteBuffer buffer) { this.blockNumber = blockNumber; this.buffer = buffer; - this.state = State.BLANK; + this.state = State.EMPTY; } /** @@ -181,7 +181,7 @@ public synchronized Future getActionFuture() { public synchronized void setPrefetch(Future actionFuture) { Validate.checkNotNull(actionFuture, "actionFuture"); - this.updateState(State.PREFETCHING, State.BLANK); + this.updateState(State.PREFETCHING, State.EMPTY); this.action = actionFuture; } @@ -289,7 +289,7 @@ public boolean stateEqualsOneOf(State... states) { public String toString() { return String.format( - "[%03d] id: %03d, %s: buf: %s, checksum: %d, future: %s", + "[%03d] id: %03d, State: %s: buffer: %s, checksum: %d, future: %s", this.blockNumber, System.identityHashCode(this), this.state, @@ -300,7 +300,7 @@ public String toString() { private String getFutureStr(Future f) { if (f == null) { - return "--"; + return "(none)"; } else { return this.action.isDone() ? "done" : "not done"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java index 8675a2d08e768c..e0ba474ebefb04 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java @@ -38,12 +38,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.store.LogExactlyOnce; import static java.util.Objects.requireNonNull; - import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; @@ -112,6 +112,11 @@ public abstract class CachingBlockManager extends BlockManager { private final LocalDirAllocator localDirAllocator; + /** + * The path of the underlying file; for logging. + */ + private Path path; + /** * Constructs an instance of a {@code CachingBlockManager}. * @@ -123,6 +128,7 @@ public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerPar Validate.checkPositiveInteger(blockManagerParameters.getBufferPoolSize(), "bufferPoolSize"); + this.path = requireNonNull(blockManagerParameters.getPath()); this.futurePool = requireNonNull(blockManagerParameters.getFuturePool()); this.bufferPoolSize = blockManagerParameters.getBufferPoolSize(); this.numCachingErrors = new AtomicInteger(); @@ -132,10 +138,10 @@ public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerPar blockManagerParameters.getPrefetchingStatistics()); this.conf = requireNonNull(blockManagerParameters.getConf()); - if (this.getBlockData().getFileSize() > 0) { - this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(), + if (getBlockData().getFileSize() > 0) { + this.bufferPool = new BufferPool(bufferPoolSize, getBlockData().getBlockSize(), this.prefetchingStatistics); - this.cache = this.createCache(blockManagerParameters.getMaxBlocksCount(), + this.cache = createCache(blockManagerParameters.getMaxBlocksCount(), blockManagerParameters.getTrackerFactory()); } @@ -210,7 +216,7 @@ private boolean getInternal(BufferData data) throws IOException { return true; } - data.throwIfStateIncorrect(BufferData.State.BLANK); + data.throwIfStateIncorrect(BufferData.State.EMPTY); read(data); return true; } @@ -269,23 +275,23 @@ public void requestPrefetch(int blockNumber) { } // We initiate a prefetch only if we can acquire a buffer from the shared pool. - LOG.debug("Requesting prefetch for block {}", blockNumber); + LOG.debug("{}: Requesting prefetch for block {}", path, blockNumber); BufferData data = bufferPool.tryAcquire(blockNumber); if (data == null) { - LOG.debug("no buffer acquired for block {}", blockNumber); + LOG.debug("{}: no buffer acquired for block {}", path, blockNumber); return; } - LOG.debug("acquired {}", data); + LOG.debug("{}: acquired {}", path, data); // Opportunistic check without locking. - if (!data.stateEqualsOneOf(BufferData.State.BLANK)) { + if (!data.stateEqualsOneOf(BufferData.State.EMPTY)) { // The block is ready or being prefetched/cached. return; } synchronized (data) { // Reconfirm state after locking. - if (!data.stateEqualsOneOf(BufferData.State.BLANK)) { + if (!data.stateEqualsOneOf(BufferData.State.EMPTY)) { // The block is ready or being prefetched/cached. return; } @@ -306,7 +312,7 @@ public void requestPrefetch(int blockNumber) { */ @Override public void cancelPrefetches(final CancelReason reason) { - LOG.debug("Cancelling prefetches: {}", reason); + LOG.debug("{}: Cancelling prefetches: {}", path, reason); BlockOperations.Operation op = ops.cancelPrefetches(); if (reason == CancelReason.RandomIO) { @@ -328,9 +334,9 @@ public void cancelPrefetches(final CancelReason reason) { private void read(BufferData data) throws IOException { synchronized (data) { try { - readBlock(data, false, BufferData.State.BLANK); + readBlock(data, false, BufferData.State.EMPTY); } catch (IOException e) { - LOG.debug("error reading block {}", data.getBlockNumber(), e); + LOG.debug("{}: error reading block {}", path, data.getBlockNumber(), e); throw e; } } @@ -432,7 +438,8 @@ private boolean isClosed() { private void disableCaching(final BlockOperations.End endOp) { if (!cachingDisabled.getAndSet(true)) { String message = String.format( - "Caching disabled because of slow operation (%.1f sec)", endOp.duration()); + "%s: Caching disabled because of slow operation (%.1f sec)", + path, endOp.duration()); LOG_CACHING_DISABLED.info(message); prefetchingStatistics.setPrefetchDiskCachingState(false); } @@ -445,7 +452,7 @@ private boolean isCachingDisabled() { /** * Read task that is submitted to the future pool. */ - private static class PrefetchTask implements Supplier { + private class PrefetchTask implements Supplier { private final BufferData data; private final CachingBlockManager blockManager; private final Instant taskQueuedStartTime; @@ -461,8 +468,8 @@ public Void get() { try { blockManager.prefetch(data, taskQueuedStartTime); } catch (Exception e) { - LOG.info("error prefetching block {}. {}", data.getBlockNumber(), e.getMessage()); - LOG.debug("error prefetching block {}", data.getBlockNumber(), e); + LOG.info("{}: error prefetching block {}. {}", path, data.getBlockNumber(), e.getMessage()); + LOG.debug("{}: error prefetching block {}", path , data.getBlockNumber(), e); } return null; } @@ -492,7 +499,7 @@ public void requestCaching(BufferData data) { Validate.checkNotNull(data, "data"); final int blockNumber = data.getBlockNumber(); - LOG.debug("Block {}: request caching of {}", blockNumber, data); + LOG.debug("{}: Block {}: request caching of {}", path , blockNumber, data); if (isClosed() || isCachingDisabled()) { data.setDone(); @@ -501,21 +508,21 @@ public void requestCaching(BufferData data) { // Opportunistic check without locking. if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) { - LOG.debug("Block {}: Block in wrong state to cache: {}", - blockNumber, data.getState()); + LOG.debug("{}: Block {}: Block in wrong state to cache: {}", + path, blockNumber, data.getState()); return; } synchronized (data) { // Reconfirm state after locking. if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) { - LOG.debug("Block {}: Block in wrong state to cache: {}", - blockNumber, data.getState()); + LOG.debug("{}: Block {}: Block in wrong state to cache: {}", + path, blockNumber, data.getState()); return; } if (cache.containsBlock(data.getBlockNumber())) { - LOG.debug("Block {}: Block is already in cache", blockNumber); + LOG.debug("{}: Block {}: Block is already in cache", path, blockNumber); data.setDone(); return; } @@ -550,33 +557,33 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture, } final int blockNumber = data.getBlockNumber(); - LOG.debug("Block {}: Preparing to cache block", blockNumber); + LOG.debug("{}: Block {}: Preparing to cache block", path, blockNumber); if (isCachingDisabled()) { - LOG.debug("Block {}: Preparing caching disabled, not prefetching", blockNumber); + LOG.debug("{}: Block {}: Preparing caching disabled, not prefetching", path, blockNumber); data.setDone(); return; } - LOG.debug("Block {}: awaiting any read to complete", blockNumber); + LOG.debug("{}: Block {}: awaiting any read to complete", path, blockNumber); try { // wait for data; state of caching may change during this period. awaitFuture(blockFuture, TIMEOUT_MINUTES, TimeUnit.MINUTES); if (data.stateEqualsOneOf(BufferData.State.DONE)) { // There was an error during prefetch. - LOG.debug("Block {}: prefetch failure", blockNumber); + LOG.debug("{}: Block {}: prefetch failure", path, blockNumber); return; } } catch (IOException | TimeoutException e) { - LOG.info("Error fetching block: {}. {}", data, e.toString()); - LOG.debug("Error fetching block: {}", data, e); + LOG.info("{}: Error fetching block: {}. {}", path, data, e.toString()); + LOG.debug("{}: Error fetching block: {}", path, data, e); data.setDone(); return; } if (isCachingDisabled()) { // caching was disabled while waiting fro the read to complete. - LOG.debug("Block {}: caching disabled while reading data", blockNumber); + LOG.debug("{}: Block {}: caching disabled while reading data", path, blockNumber); data.setDone(); return; } @@ -586,12 +593,12 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture, synchronized (data) { try { if (data.stateEqualsOneOf(BufferData.State.DONE)) { - LOG.debug("Block {}: block no longer in use; not adding", blockNumber); + LOG.debug("{}: Block {}: block no longer in use; not adding", path, blockNumber); return; } if (cache.containsBlock(blockNumber)) { - LOG.debug("Block {}: already in cache; not adding", blockNumber); + LOG.debug("{}: Block {}: already in cache; not adding", path, blockNumber); data.setDone(); return; } @@ -603,8 +610,8 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture, data.setDone(); } catch (Exception e) { numCachingErrors.incrementAndGet(); - LOG.info("error adding block to cache: {}. {}", data, e.getMessage()); - LOG.debug("error adding block to cache: {}", data, e); + LOG.info("{}: error adding block to cache: {}. {}", path, data, e.getMessage()); + LOG.debug("{}: error adding block to cache: {}", path, data, e); data.setDone(); } @@ -625,7 +632,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { if (isClosed()) { return; } - LOG.debug("Block {}: Caching", buffer); + LOG.debug("{}: Block {}: Caching", path, buffer); cache.put(blockNumber, buffer, conf, localDirAllocator); } @@ -701,18 +708,13 @@ BufferData getData(int blockNumber) { @Override public String toString() { - StringBuilder sb = new StringBuilder(); - - sb.append("cache("); - sb.append(cache.toString()); - sb.append("); "); - - sb.append("pool: "); - sb.append(bufferPool.toString()); - sb.append("; numReadErrors: ").append(numReadErrors.get()); - sb.append("; numCachingErrors: ").append(numCachingErrors.get()); + String sb = "path: " + path + "; " + + "; cache(" + cache + "); " + + "pool: " + bufferPool + + "; numReadErrors: " + numReadErrors.get() + + "; numCachingErrors: " + numCachingErrors.get(); - return sb.toString(); + return sb; } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java index 435782ec543905..1ff2d2bbfcd9aa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java @@ -392,7 +392,8 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf, Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()"); - Path blockFilePath = getCacheFilePath(conf, localDirAllocator); + String blockInfo = String.format("-block-%04d", blockNumber); + Path blockFilePath = getCacheFilePath(conf, localDirAllocator, blockInfo, buffer.limit()); long size = Files.size(blockFilePath); if (size != 0) { String message = @@ -490,17 +491,20 @@ protected void writeFile(Path path, ByteBuffer buffer) throws IOException { /** * Return temporary file created based on the file path retrieved from local dir allocator. - * * @param conf The configuration object. * @param localDirAllocator Local dir allocator instance. + * @param blockInfo info about block to use in filename + * @param fileSize size or -1 if unknown * @return Path of the temporary file created. * @throws IOException if IO error occurs while local dir allocator tries to retrieve path * from local FS or file creation fails or permission set fails. */ protected Path getCacheFilePath(final Configuration conf, - final LocalDirAllocator localDirAllocator) + final LocalDirAllocator localDirAllocator, + final String blockInfo, + final long fileSize) throws IOException { - return getTempFilePath(conf, localDirAllocator); + return getTempFilePath(conf, localDirAllocator, blockInfo, fileSize); } @Override @@ -516,6 +520,7 @@ public void close() throws IOException { */ private void deleteCacheFiles() { int numFilesDeleted = 0; + LOG.debug("Prefetch cache close: Deleting {} cache files", blocks.size()); for (Entry entry : blocks.values()) { boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, @@ -617,29 +622,6 @@ private String getStats() { @VisibleForTesting public static final String CACHE_FILE_PREFIX = "fs-cache-"; - /** - * Determine if the cache space is available on the local FS. - * - * @param fileSize The size of the file. - * @param conf The configuration. - * @param localDirAllocator Local dir allocator instance. - * @return True if the given file size is less than the available free space on local FS, - * False otherwise. - */ - public static boolean isCacheSpaceAvailable(long fileSize, Configuration conf, - LocalDirAllocator localDirAllocator) { - try { - Path cacheFilePath = getTempFilePath(conf, localDirAllocator); - long freeSpace = new File(cacheFilePath.toString()).getUsableSpace(); - LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace); - Files.deleteIfExists(cacheFilePath); - return fileSize < freeSpace; - } catch (IOException e) { - LOG.error("isCacheSpaceAvailable", e); - return false; - } - } - // The suffix (file extension) of each serialized index file. private static final String BINARY_FILE_SUFFIX = ".bin"; @@ -647,20 +629,23 @@ public static boolean isCacheSpaceAvailable(long fileSize, Configuration conf, * Create temporary file based on the file path retrieved from local dir allocator * instance. The file is created with .bin suffix. The created file has been granted * posix file permissions available in TEMP_FILE_ATTRS. - * * @param conf the configuration. * @param localDirAllocator the local dir allocator instance. + * @param blockInfo info about block to use in filename + * @param fileSize size or -1 if unknown * @return path of the file created. * @throws IOException if IO error occurs while local dir allocator tries to retrieve path * from local FS or file creation fails or permission set fails. */ private static Path getTempFilePath(final Configuration conf, - final LocalDirAllocator localDirAllocator) throws IOException { + final LocalDirAllocator localDirAllocator, + final String blockInfo, + final long fileSize) throws IOException { org.apache.hadoop.fs.Path path = - localDirAllocator.getLocalPathForWrite(CACHE_FILE_PREFIX, conf); + localDirAllocator.getLocalPathForWrite(CACHE_FILE_PREFIX, fileSize, conf); File dir = new File(path.getParent().toUri().getPath()); String prefix = path.getName(); - File tmpFile = File.createTempFile(prefix, BINARY_FILE_SUFFIX, dir); + File tmpFile = File.createTempFile(prefix, blockInfo + BINARY_FILE_SUFFIX, dir); Path tmpFilePath = Paths.get(tmpFile.toURI()); return Files.setPosixFilePermissions(tmpFilePath, TEMP_FILE_ATTRS); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBufferData.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBufferData.java index ee5f95ca6bbb67..1102aa17730fbb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBufferData.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBufferData.java @@ -75,7 +75,7 @@ public void testValidStateUpdates() { ByteBuffer buffer = ByteBuffer.allocate(1); BufferData data = new BufferData(1, buffer); - assertEquals(BufferData.State.BLANK, data.getState()); + assertEquals(BufferData.State.EMPTY, data.getState()); CompletableFuture actionFuture = new CompletableFuture<>(); actionFuture.complete(null); @@ -92,7 +92,7 @@ public void testValidStateUpdates() { assertNotSame(actionFuture, actionFuture2); List states = Arrays.asList( - BufferData.State.BLANK, + BufferData.State.EMPTY, BufferData.State.PREFETCHING, BufferData.State.CACHING, BufferData.State.READY @@ -116,7 +116,7 @@ public void testInvalidStateUpdates() throws Exception { actionFuture.complete(null); testInvalidStateUpdatesHelper( (d) -> d.setPrefetch(actionFuture), - BufferData.State.BLANK, + BufferData.State.EMPTY, BufferData.State.READY); testInvalidStateUpdatesHelper( @@ -137,7 +137,7 @@ public void testSetReady() throws Exception { assertNotEquals(BufferData.State.READY, data.getState()); assertEquals(0, data.getChecksum()); - data.setReady(BufferData.State.BLANK); + data.setReady(BufferData.State.EMPTY); assertEquals(BufferData.State.READY, data.getState()); assertNotEquals(0, data.getChecksum()); @@ -151,7 +151,7 @@ public void testSetReady() throws Exception { ExceptionAsserts.assertThrows( IllegalStateException.class, "Checksum cannot be changed once set", - () -> data.setReady(BufferData.State.BLANK)); + () -> data.setReady(BufferData.State.EMPTY)); // Verify that we detect post READY buffer modification. buffer.array()[2] = (byte) 42; @@ -197,7 +197,7 @@ private void testInvalidStateUpdatesHelper( ByteBuffer buffer = ByteBuffer.allocate(1); BufferData data = new BufferData(1, buffer); - data.updateState(validFromState[0], BufferData.State.BLANK); + data.updateState(validFromState[0], BufferData.State.EMPTY); List states = this.getStatesExcept(validFromState); BufferData.State prevState = validFromState[0]; String expectedMessage = @@ -217,7 +217,7 @@ private void testInvalidStateUpdatesHelper( static final List ALL_STATES = Arrays.asList( BufferData.State.UNKNOWN, - BufferData.State.BLANK, + BufferData.State.EMPTY, BufferData.State.PREFETCHING, BufferData.State.CACHING, BufferData.State.READY diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBufferPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBufferPool.java index b8375fe66dcb1a..76da234805a1b9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBufferPool.java @@ -102,13 +102,13 @@ public void testGetAndRelease() { assertEquals(2, pool.numCreated()); assertEquals(0, pool.numAvailable()); - data1.updateState(BufferData.State.READY, BufferData.State.BLANK); + data1.updateState(BufferData.State.READY, BufferData.State.EMPTY); pool.release(data1); assertEquals(2, pool.numCreated()); assertEquals(1, pool.numAvailable()); - data2.updateState(BufferData.State.READY, BufferData.State.BLANK); + data2.updateState(BufferData.State.READY, BufferData.State.EMPTY); pool.release(data2); assertEquals(2, pool.numCreated()); @@ -117,7 +117,7 @@ public void testGetAndRelease() { @Test public void testRelease() throws Exception { - testReleaseHelper(BufferData.State.BLANK, true); + testReleaseHelper(BufferData.State.EMPTY, true); testReleaseHelper(BufferData.State.PREFETCHING, true); testReleaseHelper(BufferData.State.CACHING, true); testReleaseHelper(BufferData.State.READY, false); @@ -131,7 +131,7 @@ private void testReleaseHelper(BufferData.State stateBeforeRelease, assertInitialState(pool, POOL_SIZE); BufferData data = this.acquire(pool, 1); - data.updateState(stateBeforeRelease, BufferData.State.BLANK); + data.updateState(stateBeforeRelease, BufferData.State.EMPTY); if (expectThrow) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index dafc37da03332c..3a770292403235 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -345,6 +345,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private static final Logger PROGRESS = LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress"); + + /** + * Directory allocator, bonded to directory list set in + * {@link Constants#BUFFER_DIR} and used for storing temporary files + * including: upload file blocks, stage committer files and cached read blocks. + */ private LocalDirAllocator directoryAllocator; private String cannedACL; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index 3cb8d975324480..d39a3c8f71fde3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -302,7 +302,7 @@ private static Number sizeFromRangeHeader(String rangeHeader) { if (values.length == 2) { try { long start = Long.parseUnsignedLong(values[0]); - long end = Long.parseUnsignedLong(values[0]); + long end = Long.parseUnsignedLong(values[1]); return end - start; } catch(NumberFormatException e) { } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java index 3a2d9d7f823ee3..fdd8b9d54c4ece 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java @@ -264,8 +264,9 @@ private class LoggingAuditSpan extends AbstractAuditSpanImpl { * Attach Range of data for GetObject Request. * @param request the sdk request to be modified * @param executionAttributes execution attributes for this request + * @return the extracted range (bytes) or null if not a GetObject request/or no range */ - private void attachRangeFromRequest(SdkHttpRequest request, + private String attachRangeFromRequest(SdkHttpRequest request, ExecutionAttributes executionAttributes) { String operationName = executionAttributes.getAttribute(AwsExecutionAttribute.OPERATION_NAME); @@ -275,10 +276,13 @@ private void attachRangeFromRequest(SdkHttpRequest request, String[] rangeHeader = request.headers().get("Range").get(0).split("="); // only set header if range unit is bytes if (rangeHeader[0].equals("bytes")) { - referrer.set(AuditConstants.PARAM_RANGE, rangeHeader[1]); + final String range = rangeHeader[1]; + referrer.set(AuditConstants.PARAM_RANGE, range); + return range; } } } + return null; } private final String description; @@ -376,7 +380,7 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, SdkRequest sdkRequest = context.request(); // attach range for GetObject requests - attachRangeFromRequest(httpRequest, executionAttributes); + final String range = attachRangeFromRequest(httpRequest, executionAttributes); // for delete op, attach the number of files to delete attachDeleteKeySizeAttribute(sdkRequest); @@ -392,11 +396,13 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, .build(); } if (LOG.isDebugEnabled()) { - LOG.debug("[{}] {} Executing {} with {}; {}", + String extra = range == null ? "" : (" range=" + range + ";"); + LOG.debug("[{}] {} Executing {} with {};{} {}", currentThreadID(), getSpanId(), getOperationName(), analyzer.analyze(context.request()), + extra, header); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java index 0ec92bb39fb0b4..83c9de2d7a497f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java @@ -92,8 +92,6 @@ public S3ACachingInputStream( this.numBlocksToPrefetch = getContext().getPrefetchBlockCount(); demandCreateBlockManager(); - this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount(); - int fileSize = (int) s3Attributes.getLen(); LOG.debug("Created caching input stream for {} (size = {})", this.getName(), fileSize); @@ -108,20 +106,22 @@ public S3ACachingInputStream( */ private synchronized void demandCreateBlockManager() { if (blockManager == null) { - LOG.debug("{}: creating block manager", getName()); int bufferPoolSize = this.numBlocksToPrefetch + BLOCKS_TO_PREFETCH_AFTER_SEEK; - BlockManagerParameters blockManagerParamsBuilder = - new BlockManagerParameters() - .withFuturePool(this.getContext().getFuturePool()) - .withBlockData(this.getBlockData()) - .withBufferPoolSize(bufferPoolSize) - .withConf(conf) - .withLocalDirAllocator(localDirAllocator) - .withMaxBlocksCount( - conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)) - .withPrefetchingStatistics(getS3AStreamStatistics()) - .withTrackerFactory(getS3AStreamStatistics()); - this.blockManager = this.createBlockManager(blockManagerParamsBuilder, - this.getReader()); + LOG.debug("{}: creating block manager", getName()); + int bufferPoolSize = this.numBlocksToPrefetch + BLOCKS_TO_PREFETCH_AFTER_SEEK; + final S3AReadOpContext readOpContext = this.getContext(); + BlockManagerParameters blockManagerParamsBuilder = + new BlockManagerParameters() + .withPath(readOpContext.getPath()) + .withFuturePool(readOpContext.getFuturePool()) + .withBlockData(getBlockData()) + .withBufferPoolSize(bufferPoolSize) + .withConf(conf) + .withLocalDirAllocator(localDirAllocator) + .withMaxBlocksCount( + conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)) + .withPrefetchingStatistics(getS3AStreamStatistics()) + .withTrackerFactory(getS3AStreamStatistics()); + blockManager = createBlockManager(blockManagerParamsBuilder, getReader()); } } @@ -234,8 +234,7 @@ protected boolean ensureCurrentBuffer() throws IOException { } BufferData data = invokeTrackingDuration( - getS3AStreamStatistics() - .trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ), + getS3AStreamStatistics().trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ), () -> blockManager.get(toBlockNumber)); filePosition.setData(data, startOffset, readPos); @@ -259,12 +258,7 @@ public String toString() { /** * Construct an instance of a {@code S3ACachingBlockManager}. * - * @param futurePool asynchronous tasks are performed in this pool. - * @param reader reader that reads from S3 file. - * @param blockData information about each block of the S3 file. - * @param bufferPoolSize size of the in-memory cache in terms of number of blocks. - * @param configuration the configuration. - * @param dirAllocator the local dir allocator instance. + * @param blockManagerParameters block manager parameters * @return the block manager * @throws IllegalArgumentException if reader is null. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java deleted file mode 100644 index a2f4376b7a7c21..00000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.prefetch; - -import java.io.File; -import java.io.IOException; - -import org.assertj.core.api.Assertions; -import org.assertj.core.api.Assumptions; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; -import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.IOStatisticsContext; - -import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; -import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; -import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPrefetchOption; -import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticGauge; -import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; -import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; -import static org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext; -import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; -import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_CACHE_ENABLED; -import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; -import static org.apache.hadoop.test.Sizes.S_1K; - -/** - * Test the cache file behaviour with prefetching input stream. - * TODO: remove this once the test in ITestS3APrefetchingLargeFiles is good. - */ -public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest { - - private static final Logger LOG = - LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class); - private static final int BLOCK_SIZE = S_1K * 10; - - private Path testFile; - private S3AFileSystem testFileSystem; - private int prefetchBlockSize; - private Configuration conf; - - /** - * Thread level IOStatistics; reset in setup(). - */ - private IOStatisticsContext ioStatisticsContext; - - private File tmpFileDir; - - - @Before - public void setUp() throws Exception { - super.setup(); - conf = createConfiguration(); - tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", ""); - tmpFileDir.delete(); - tmpFileDir.mkdirs(); - - conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath()); - String testFileUri = S3ATestUtils.getCSVTestFile(conf); - - testFile = new Path(testFileUri); - testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf); - - prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE); - final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile); - Assumptions.assumeThat(testFileStatus.getLen()) - .describedAs("Test file %s is smaller than required size %d", - testFileStatus, prefetchBlockSize * 4) - .isGreaterThan(prefetchBlockSize); - - ioStatisticsContext = getCurrentIOStatisticsContext(); - ioStatisticsContext.reset(); - nameThread(); - } - - @Override - public Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); - setPrefetchOption(conf, true); - disableFilesystemCaching(conf); - S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY); - conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); - return conf; - } - - @Override - public synchronized void teardown() throws Exception { - super.teardown(); - tmpFileDir.delete(); - File[] tmpFiles = tmpFileDir.listFiles(); - if (tmpFiles != null) { - for (File filePath : tmpFiles) { - String path = filePath.getPath(); - filePath.delete(); - } - } - cleanupWithLogger(LOG, testFileSystem); - } - - /** - * Test to verify the existence of the cache file. - * Tries to perform inputStream read and seek ops to make the prefetching take place and - * asserts whether file with .bin suffix is present. It also verifies certain file stats. - */ - @Test - public void testCacheFileExistence() throws Throwable { - describe("Verify that FS cache files exist on local FS"); - skipIfClientSideEncryption(); - - try (FSDataInputStream in = testFileSystem.open(testFile)) { - byte[] buffer = new byte[prefetchBlockSize]; - // caching is enabled - final IOStatistics ioStats = in.getIOStatistics(); - assertThatStatisticGauge(ioStats, STREAM_READ_BLOCK_CACHE_ENABLED) - .isEqualTo(1); - - // read less than the block size - LOG.info("reading first data block"); - in.readFully(buffer, 0, prefetchBlockSize - 10240); - LOG.info("stream stats after a read {}", ioStatisticsToPrettyString(ioStats)); - - // there's been one fetch so far - verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); - - // a file is cached - verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 1); - - assertCacheFileExists(); - - in.readFully(prefetchBlockSize * 2, buffer, 0, prefetchBlockSize); - - assertCacheFileExists(); - } - } - - private void assertCacheFileExists() throws IOException { - final Configuration conf = testFileSystem.getConf(); - Assertions.assertThat(tmpFileDir.isDirectory()) - .describedAs("The dir to keep cache files must exist %s", tmpFileDir); - File[] tmpFiles = tmpFileDir.listFiles(); - Assertions.assertThat(tmpFiles) - .describedAs("No cache files found under " + tmpFileDir) - .isNotNull() - .hasSizeGreaterThanOrEqualTo( 1); - - for (File tmpFile : tmpFiles) { - Path path = new Path(tmpFile.getAbsolutePath()); - try (FileSystem localFs = FileSystem.getLocal(conf)) { - FileStatus stat = localFs.getFileStatus(path); - ContractTestUtils.assertIsFile(path, stat); - assertEquals("File length not matching with prefetchBlockSize", prefetchBlockSize, - stat.getLen()); - assertEquals("User permissions should be RW", FsAction.READ_WRITE, - stat.getPermission().getUserAction()); - assertEquals("Group permissions should be NONE", FsAction.NONE, - stat.getPermission().getGroupAction()); - assertEquals("Other permissions should be NONE", FsAction.NONE, - stat.getPermission().getOtherAction()); - } - } - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java index 12c17149c17bcc..73ed0c7b635dd4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; @@ -106,17 +107,18 @@ public class ITestS3APrefetchingLargeFiles extends AbstractS3ACostTest { public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); removeBaseAndBucketOverrides(conf, - PREFETCH_ENABLED_KEY, PREFETCH_BLOCK_SIZE_KEY); + PREFETCH_ENABLED_KEY, PREFETCH_BLOCK_SIZE_KEY, BUFFER_DIR); conf.setBoolean(PREFETCH_ENABLED_KEY, true); conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath()); return conf; } @Override public void setup() throws Exception { + tmpFileDir = tempFolder.newFolder("cache"); super.setup(); - tmpFileDir = tempFolder.newFolder("cache"); ioStatisticsContext = getCurrentIOStatisticsContext(); ioStatisticsContext.reset(); @@ -298,6 +300,7 @@ public void testCacheFileExistence() throws Throwable { int prefetchBlockSize = BLOCK_SIZE; final Path path = createLargeFile(); + LOG.info("Opening file {}", path); try (FSDataInputStream in = getFileSystem().open(path)) { byte[] buffer = new byte[prefetchBlockSize]; @@ -311,17 +314,24 @@ public void testCacheFileExistence() throws Throwable { in.readFully(buffer, 0, prefetchBlockSize - S_1K); LOG.info("stream stats after a read {}", ioStatisticsToPrettyString(ioStats)); - // there's been at least one fetch so far + // there's been at least one fetch so far; others may be in progress. assertThatStatisticCounter(ioStats, ACTION_HTTP_GET_REQUEST) .isGreaterThanOrEqualTo(1); - // a file is cached - // verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 1); - assertCacheFileExists(); + //assertCacheFileExists(); + + LOG.info("reading rest of file"); in.readFully(prefetchBlockSize * 2, buffer, 0, prefetchBlockSize); + LOG.info("stream stats after reading whole file {}", ioStatisticsToPrettyString(ioStats)); + + // and at least one block is in cache + assertThatStatisticGauge(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE) + .isGreaterThanOrEqualTo(1); + + // a file which must be under the tempt dir assertCacheFileExists(); } } @@ -344,8 +354,10 @@ private void assertCacheFileExists() throws IOException { try (FileSystem localFs = FileSystem.getLocal(conf)) { FileStatus stat = localFs.getFileStatus(path); ContractTestUtils.assertIsFile(path, stat); - assertEquals("File length not matching with prefetchBlockSize", BLOCK_SIZE, - stat.getLen()); + Assertions.assertThat(stat.getLen()) + .describedAs("%s: File length not matching with prefetchBlockSize", path) + .isEqualTo(BLOCK_SIZE); + assertEquals("User permissions should be RW", FsAction.READ_WRITE, stat.getPermission().getUserAction()); assertEquals("Group permissions should be NONE", FsAction.NONE, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java index faa773924bc9f0..b4eed922b05457 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java @@ -340,7 +340,9 @@ protected void writeFile(Path path, ByteBuffer buffer) throws IOException { @Override protected Path getCacheFilePath(final Configuration conf, - final LocalDirAllocator localDirAllocator) + final LocalDirAllocator localDirAllocator, + final String blockInfo, + final long fileSize) throws IOException { fileCount++; return Paths.get(Long.toString(fileCount));