Skip to content

Commit

Permalink
HADOOP-18184. test failure ITestS3APrefetchingCacheFiles
Browse files Browse the repository at this point in the history
* improved logging, especially on queue update and block evict
* still not working

Change-Id: Id3c1b8c0891a9cf095d4d6d8603ab94830d28940
  • Loading branch information
steveloughran committed Apr 24, 2024
1 parent 82493f1 commit e7a73c0
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
DurationTracker tracker = null;

int bytesFetched = 0;
// to be filled in later.
long offset = 0;
int size = 0;
synchronized (data) {
try {
if (data.stateEqualsOneOf(BufferData.State.DONE, BufferData.State.READY)) {
Expand All @@ -375,6 +378,10 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...

data.throwIfStateIncorrect(expectedState);
int blockNumber = data.getBlockNumber();
final BlockData blockData = getBlockData();

offset = blockData.getStartOffset(data.getBlockNumber());
size = blockData.getSize(data.getBlockNumber());

// Prefer reading from cache over reading from network.
if (cache.containsBlock(blockNumber)) {
Expand All @@ -392,15 +399,23 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
op = ops.getRead(data.getBlockNumber());
}

long offset = getBlockData().getStartOffset(data.getBlockNumber());
int size = getBlockData().getSize(data.getBlockNumber());

ByteBuffer buffer = data.getBuffer();
buffer.clear();
read(buffer, offset, size);
buffer.flip();
data.setReady(expectedState);
bytesFetched = size;
LOG.debug("Completed {} of block {} [{}-{}]",
isPrefetch ? "prefetch" : "read",
data.getBlockNumber(),
offset, offset + size);
} catch (Exception e) {
LOG.debug("Failure in {} of block {} [{}-{}]",
isPrefetch ? "prefetch" : "read",
data.getBlockNumber(),
offset, offset + size,
e);
if (isPrefetch && tracker != null) {
tracker.failed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private enum LockType {
@Override
public String toString() {
return String.format(
"([%03d] %s: size = %d, checksum = %d)",
"([%03d] %s: size = %,d, checksum = %d)",
blockNumber, path, size, checksum);
}

Expand Down Expand Up @@ -316,37 +316,39 @@ private Entry getEntry(int blockNumber) {
private void addToLinkedListHead(Entry entry) {
blocksLock.writeLock().lock();
try {
addToHeadOfLinkedList(entry);
maybePushToHeadOfBlockList(entry);
} finally {
blocksLock.writeLock().unlock();
}
}

/**
* Add the given entry to the head of the linked list.
*
* Maybe Add the given entry to the head of the block list.
* No-op if the block is already in the list.
* @param entry Block entry to add.
* @return true if the block was added.
*/
private void addToHeadOfLinkedList(Entry entry) {
private boolean maybePushToHeadOfBlockList(Entry entry) {
if (head == null) {
head = entry;
tail = entry;
}
LOG.debug(
"Block num {} to be added to the head. Current head block num: {} and tail block num: {}",
entry.blockNumber, head.blockNumber, tail.blockNumber);
"Block {} to be added to the head. Current head block {} and tail block {}; {}",
entry.blockNumber, head.blockNumber, tail.blockNumber, entry);
if (entry != head) {
Entry prev = entry.getPrevious();
Entry nxt = entry.getNext();
// no-op if the block is already evicted
Entry next = entry.getNext();
// no-op if the block is already block list
if (!blocks.containsKey(entry.blockNumber)) {
return;
LOG.debug("Block {} is already in block list", entry.blockNumber);
return false;
}
if (prev != null) {
prev.setNext(nxt);
prev.setNext(next);
}
if (nxt != null) {
nxt.setPrevious(prev);
if (next != null) {
next.setPrevious(prev);
}
entry.setPrevious(null);
entry.setNext(head);
Expand All @@ -356,6 +358,7 @@ private void addToHeadOfLinkedList(Entry entry) {
tail = prev;
}
}
return true;
}

/**
Expand Down Expand Up @@ -424,8 +427,9 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
private void addToLinkedListAndEvictIfRequired(Entry entry) {
blocksLock.writeLock().lock();
try {
addToHeadOfLinkedList(entry);
entryListSize++;
if (maybePushToHeadOfBlockList(entry)) {
entryListSize++;
}
if (entryListSize > maxBlocksCount && !closed.get()) {
Entry elementToPurge = tail;
tail = tail.getPrevious();
Expand All @@ -447,12 +451,13 @@ private void addToLinkedListAndEvictIfRequired(Entry entry) {
* @param elementToPurge Block entry to evict.
*/
private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
LOG.debug("Evicting block {} from cache: {}", elementToPurge.blockNumber, elementToPurge);
try (DurationTracker ignored = trackerFactory.trackDuration(STREAM_FILE_CACHE_EVICTION)) {
boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
LOG.warn("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", elementToPurge.path,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
Expand All @@ -463,9 +468,11 @@ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
prefetchingStatistics.blockRemovedFromFileCache();
blocks.remove(elementToPurge.blockNumber);
prefetchingStatistics.blockEvictedFromFileCache();
} else {
LOG.debug("Cache file {} not found for deletion: {}", elementToPurge.path, elementToPurge);
}
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
LOG.warn("Failed to delete cache file {} for {}", elementToPurge.path, elementToPurge, e);
} finally {
elementToPurge.releaseLock(Entry.LockType.WRITE);
}
Expand All @@ -481,7 +488,7 @@ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
buffer.rewind();
try (WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS);
DurationInfo d = new DurationInfo(LOG, "Writing %d bytes to %s",
DurationInfo d = new DurationInfo(LOG, "save %d bytes to %s",
buffer.remaining(), path)) {
while (buffer.hasRemaining()) {
writeChannel.write(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public S3APrefetchingInputStream(
client,
streamStatistics);
} else {
LOG.debug("Creating in caching input stream for {}", context.getPath());
LOG.debug("Creating caching input stream for {}", context.getPath());
this.inputStream = new S3ACachingInputStream(
context,
s3Attributes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)

final String path = this.remoteObject.getPath();
EOFException invokerResponse =
invoker.retry(String.format("read %s [%d-%d]", path, offset, size),
invoker.retry(String.format("read %s [%d-%d]", path, offset, offset + size),
path, true,
trackDurationOfOperation(streamStatistics,
STREAM_READ_REMOTE_BLOCK_READ, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;

import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_COUNT_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
Expand All @@ -56,6 +57,11 @@ public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {

public static final int PREFETCH_OFFSET = 10240;

/**
* Block prefetch count: {@value}.
*/
public static final int BLOCK_COUNT = 8;

private Path testFile;

/** The FS with the external file. */
Expand Down Expand Up @@ -84,12 +90,18 @@ public void setUp() throws Exception {
@Override
public Configuration createConfiguration() {
Configuration configuration = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_ENABLED_KEY);
S3ATestUtils.removeBaseAndBucketOverrides(configuration,
PREFETCH_ENABLED_KEY,
PREFETCH_BLOCK_COUNT_KEY,
PREFETCH_BLOCK_SIZE_KEY);
configuration.setBoolean(PREFETCH_ENABLED_KEY, true);
// use a small block size unless explicitly set in the test config.
configuration.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
configuration.setInt(PREFETCH_BLOCK_COUNT_KEY, BLOCK_COUNT);

// patch buffer dir with a unique path for test isolation.
final String bufferDirBase = configuration.get(BUFFER_DIR);

final String bufferDirBase = "target/prefetch";
bufferDir = bufferDirBase + "/" + UUID.randomUUID();
configuration.set(BUFFER_DIR, bufferDir);
return configuration;
Expand Down Expand Up @@ -125,7 +137,7 @@ public void testCacheFileExistence() throws Throwable {
in.read(prefetchBlockSize * 2, buffer, 0, prefetchBlockSize);


File tmpFileDir = new File(conf.get(BUFFER_DIR));
File tmpFileDir = new File(bufferDir);
final LocalFileSystem localFs = FileSystem.getLocal(conf);
Path bufferDirPath = new Path(tmpFileDir.toURI());
ContractTestUtils.assertIsDirectory(localFs, bufferDirPath);
Expand Down

0 comments on commit e7a73c0

Please sign in to comment.