From e7a73c0d9c6dfadcfe286422a535555240e3e7e4 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 24 Apr 2024 19:41:07 +0100 Subject: [PATCH] HADOOP-18184. test failure ITestS3APrefetchingCacheFiles * improved logging, especially on queue update and block evict * still not working Change-Id: Id3c1b8c0891a9cf095d4d6d8603ab94830d28940 --- .../fs/impl/prefetch/CachingBlockManager.java | 19 +++++++- .../prefetch/SingleFilePerBlockCache.java | 43 +++++++++++-------- .../prefetch/S3APrefetchingInputStream.java | 2 +- .../s3a/prefetch/S3ARemoteObjectReader.java | 2 +- .../fs/s3a/ITestS3APrefetchingCacheFiles.java | 18 ++++++-- 5 files changed, 59 insertions(+), 25 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java index e0ba474ebefb04..2ff100f37c338e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java @@ -365,6 +365,9 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... DurationTracker tracker = null; int bytesFetched = 0; + // to be filled in later. + long offset = 0; + int size = 0; synchronized (data) { try { if (data.stateEqualsOneOf(BufferData.State.DONE, BufferData.State.READY)) { @@ -375,6 +378,10 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... data.throwIfStateIncorrect(expectedState); int blockNumber = data.getBlockNumber(); + final BlockData blockData = getBlockData(); + + offset = blockData.getStartOffset(data.getBlockNumber()); + size = blockData.getSize(data.getBlockNumber()); // Prefer reading from cache over reading from network. if (cache.containsBlock(blockNumber)) { @@ -392,15 +399,23 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... op = ops.getRead(data.getBlockNumber()); } - long offset = getBlockData().getStartOffset(data.getBlockNumber()); - int size = getBlockData().getSize(data.getBlockNumber()); + ByteBuffer buffer = data.getBuffer(); buffer.clear(); read(buffer, offset, size); buffer.flip(); data.setReady(expectedState); bytesFetched = size; + LOG.debug("Completed {} of block {} [{}-{}]", + isPrefetch ? "prefetch" : "read", + data.getBlockNumber(), + offset, offset + size); } catch (Exception e) { + LOG.debug("Failure in {} of block {} [{}-{}]", + isPrefetch ? "prefetch" : "read", + data.getBlockNumber(), + offset, offset + size, + e); if (isPrefetch && tracker != null) { tracker.failed(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java index 1ff2d2bbfcd9aa..51134384a7d473 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java @@ -146,7 +146,7 @@ private enum LockType { @Override public String toString() { return String.format( - "([%03d] %s: size = %d, checksum = %d)", + "([%03d] %s: size = %,d, checksum = %d)", blockNumber, path, size, checksum); } @@ -316,37 +316,39 @@ private Entry getEntry(int blockNumber) { private void addToLinkedListHead(Entry entry) { blocksLock.writeLock().lock(); try { - addToHeadOfLinkedList(entry); + maybePushToHeadOfBlockList(entry); } finally { blocksLock.writeLock().unlock(); } } /** - * Add the given entry to the head of the linked list. - * + * Maybe Add the given entry to the head of the block list. + * No-op if the block is already in the list. * @param entry Block entry to add. + * @return true if the block was added. */ - private void addToHeadOfLinkedList(Entry entry) { + private boolean maybePushToHeadOfBlockList(Entry entry) { if (head == null) { head = entry; tail = entry; } LOG.debug( - "Block num {} to be added to the head. Current head block num: {} and tail block num: {}", - entry.blockNumber, head.blockNumber, tail.blockNumber); + "Block {} to be added to the head. Current head block {} and tail block {}; {}", + entry.blockNumber, head.blockNumber, tail.blockNumber, entry); if (entry != head) { Entry prev = entry.getPrevious(); - Entry nxt = entry.getNext(); - // no-op if the block is already evicted + Entry next = entry.getNext(); + // no-op if the block is already block list if (!blocks.containsKey(entry.blockNumber)) { - return; + LOG.debug("Block {} is already in block list", entry.blockNumber); + return false; } if (prev != null) { - prev.setNext(nxt); + prev.setNext(next); } - if (nxt != null) { - nxt.setPrevious(prev); + if (next != null) { + next.setPrevious(prev); } entry.setPrevious(null); entry.setNext(head); @@ -356,6 +358,7 @@ private void addToHeadOfLinkedList(Entry entry) { tail = prev; } } + return true; } /** @@ -424,8 +427,9 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf, private void addToLinkedListAndEvictIfRequired(Entry entry) { blocksLock.writeLock().lock(); try { - addToHeadOfLinkedList(entry); - entryListSize++; + if (maybePushToHeadOfBlockList(entry)) { + entryListSize++; + } if (entryListSize > maxBlocksCount && !closed.get()) { Entry elementToPurge = tail; tail = tail.getPrevious(); @@ -447,12 +451,13 @@ private void addToLinkedListAndEvictIfRequired(Entry entry) { * @param elementToPurge Block entry to evict. */ private void deleteBlockFileAndEvictCache(Entry elementToPurge) { + LOG.debug("Evicting block {} from cache: {}", elementToPurge.blockNumber, elementToPurge); try (DurationTracker ignored = trackerFactory.trackDuration(STREAM_FILE_CACHE_EVICTION)) { boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE, PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); if (!lockAcquired) { - LOG.error("Cache file {} deletion would not be attempted as write lock could not" + LOG.warn("Cache file {} deletion would not be attempted as write lock could not" + " be acquired within {} {}", elementToPurge.path, PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT, PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT); @@ -463,9 +468,11 @@ private void deleteBlockFileAndEvictCache(Entry elementToPurge) { prefetchingStatistics.blockRemovedFromFileCache(); blocks.remove(elementToPurge.blockNumber); prefetchingStatistics.blockEvictedFromFileCache(); + } else { + LOG.debug("Cache file {} not found for deletion: {}", elementToPurge.path, elementToPurge); } } catch (IOException e) { - LOG.warn("Failed to delete cache file {}", elementToPurge.path, e); + LOG.warn("Failed to delete cache file {} for {}", elementToPurge.path, elementToPurge, e); } finally { elementToPurge.releaseLock(Entry.LockType.WRITE); } @@ -481,7 +488,7 @@ private void deleteBlockFileAndEvictCache(Entry elementToPurge) { protected void writeFile(Path path, ByteBuffer buffer) throws IOException { buffer.rewind(); try (WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS); - DurationInfo d = new DurationInfo(LOG, "Writing %d bytes to %s", + DurationInfo d = new DurationInfo(LOG, "save %d bytes to %s", buffer.remaining(), path)) { while (buffer.hasRemaining()) { writeChannel.write(buffer); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java index 58ee9d13e5e2ee..a1326e810d5624 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java @@ -115,7 +115,7 @@ public S3APrefetchingInputStream( client, streamStatistics); } else { - LOG.debug("Creating in caching input stream for {}", context.getPath()); + LOG.debug("Creating caching input stream for {}", context.getPath()); this.inputStream = new S3ACachingInputStream( context, s3Attributes, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java index 08ccac0bc2e03f..bdcc02bc5120ea 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObjectReader.java @@ -129,7 +129,7 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) final String path = this.remoteObject.getPath(); EOFException invokerResponse = - invoker.retry(String.format("read %s [%d-%d]", path, offset, size), + invoker.retry(String.format("read %s [%d-%d]", path, offset, offset + size), path, true, trackDurationOfOperation(streamStatistics, STREAM_READ_REMOTE_BLOCK_READ, () -> { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java index ce962483d58408..22ec3d4285937b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_COUNT_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; @@ -56,6 +57,11 @@ public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest { public static final int PREFETCH_OFFSET = 10240; + /** + * Block prefetch count: {@value}. + */ + public static final int BLOCK_COUNT = 8; + private Path testFile; /** The FS with the external file. */ @@ -84,12 +90,18 @@ public void setUp() throws Exception { @Override public Configuration createConfiguration() { Configuration configuration = super.createConfiguration(); - S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_ENABLED_KEY); + S3ATestUtils.removeBaseAndBucketOverrides(configuration, + PREFETCH_ENABLED_KEY, + PREFETCH_BLOCK_COUNT_KEY, + PREFETCH_BLOCK_SIZE_KEY); configuration.setBoolean(PREFETCH_ENABLED_KEY, true); // use a small block size unless explicitly set in the test config. configuration.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); + configuration.setInt(PREFETCH_BLOCK_COUNT_KEY, BLOCK_COUNT); + // patch buffer dir with a unique path for test isolation. - final String bufferDirBase = configuration.get(BUFFER_DIR); + + final String bufferDirBase = "target/prefetch"; bufferDir = bufferDirBase + "/" + UUID.randomUUID(); configuration.set(BUFFER_DIR, bufferDir); return configuration; @@ -125,7 +137,7 @@ public void testCacheFileExistence() throws Throwable { in.read(prefetchBlockSize * 2, buffer, 0, prefetchBlockSize); - File tmpFileDir = new File(conf.get(BUFFER_DIR)); + File tmpFileDir = new File(bufferDir); final LocalFileSystem localFs = FileSystem.getLocal(conf); Path bufferDirPath = new Path(tmpFileDir.toURI()); ContractTestUtils.assertIsDirectory(localFs, bufferDirPath);