Skip to content

Commit

Permalink
HADOOP-18184. prefetching
Browse files Browse the repository at this point in the history
- Cache was not thread safe and it was possible for cleanup
  to happen while the caller had just verified it was there and
  before a read lock was acquired.
  fix: synchronize check and get into one block, use synchronized elsewhere.
- try to cut back on assertions in ITestS3APrefetchingLargeFiles which
  seem too brittle to prefetch behaviour/race conditions.
- minor doc, log, assertion changes
more work on that test failure needed

Change-Id: I288540ec1fb08e1a5684cde8e94e1c7933d1e41d
  • Loading branch information
steveloughran committed May 31, 2024
1 parent b2d1d92 commit bfd3716
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ public interface BlockCache extends Closeable {

/**
* Gets the block having the given {@code blockNumber}.
*
* If the block is not present then the method returns
* false and {@code buffer} is unchanged.
* @param blockNumber the id of the desired block.
* @param buffer contents of the desired block are copied to this buffer.
* @return true if the block was found.
* @throws IOException if there is an error reading the given block.
* @throws IllegalArgumentException if buffer is null.
*/
void get(int blockNumber, ByteBuffer buffer) throws IOException;
boolean get(int blockNumber, ByteBuffer buffer) throws IOException;

/**
* Puts the given block in this cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.annotation.Nullable;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
Expand All @@ -51,12 +54,13 @@
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Preconditions;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_FILE_CACHE_EVICTION;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.Preconditions.checkState;

/**
* Provides functionality necessary for caching blocks of data read from FileSystem.
Expand All @@ -67,8 +71,10 @@ public class SingleFilePerBlockCache implements BlockCache {

/**
* Blocks stored in this cache.
* A concurrent hash map is used here, but it is still important for cache operations to
* be thread safe.
*/
private final Map<Integer, Entry> blocks;
private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>();

/**
* Total max blocks count, to be considered as baseline for LRU cache eviction.
Expand All @@ -78,7 +84,7 @@ public class SingleFilePerBlockCache implements BlockCache {
/**
* The lock to be shared by LRU based linked list updates.
*/
private final ReentrantReadWriteLock blocksLock;
private final ReentrantReadWriteLock blocksLock = new ReentrantReadWriteLock();

/**
* Head of the linked list.
Expand All @@ -99,9 +105,9 @@ public class SingleFilePerBlockCache implements BlockCache {
* Number of times a block was read from this cache.
* Used for determining cache utilization factor.
*/
private int numGets = 0;
private final AtomicInteger numGets = new AtomicInteger();

private final AtomicBoolean closed;
private final AtomicBoolean closed = new AtomicBoolean(false);

private final PrefetchingStatistics prefetchingStatistics;

Expand Down Expand Up @@ -224,13 +230,10 @@ private void setNext(Entry next) {
*/
public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics,
int maxBlocksCount,
DurationTrackerFactory trackerFactory) {
@Nullable DurationTrackerFactory trackerFactory) {
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.closed = new AtomicBoolean(false);
checkArgument(maxBlocksCount > 0, "maxBlocksCount should be more than 0");
this.maxBlocksCount = maxBlocksCount;
Preconditions.checkArgument(maxBlocksCount > 0, "maxBlocksCount should be more than 0");
blocks = new ConcurrentHashMap<>();
blocksLock = new ReentrantReadWriteLock();
this.trackerFactory = trackerFactory != null
? trackerFactory : stubDurationTrackerFactory();
}
Expand All @@ -247,7 +250,7 @@ public boolean containsBlock(int blockNumber) {
* Gets the blocks in this cache.
*/
@Override
public Iterable<Integer> blocks() {
public synchronized Iterable<Integer> blocks() {
return Collections.unmodifiableList(new ArrayList<>(blocks.keySet()));
}

Expand All @@ -259,19 +262,20 @@ public int size() {
return blocks.size();
}

/**
* Gets the block having the given {@code blockNumber}.
*
* @throws IllegalArgumentException if buffer is null.
*/
@Override
public void get(int blockNumber, ByteBuffer buffer) throws IOException {
public synchronized boolean get(int blockNumber, ByteBuffer buffer) throws IOException {
if (closed.get()) {
return;
return false;
}

checkNotNull(buffer, "buffer");

if (!blocks.containsKey(blockNumber)) {
// no block found
return false;
}

// block found. read it.
Entry entry = getEntry(blockNumber);
entry.takeLock(Entry.LockType.READ);
try {
Expand All @@ -282,8 +286,16 @@ public void get(int blockNumber, ByteBuffer buffer) throws IOException {
} finally {
entry.releaseLock(Entry.LockType.READ);
}
return true;
}

/**
* Read the contents of a file into a bytebuffer.
* @param path local path
* @param buffer destination.
* @return bytes read.
* @throws IOException read failure.
*/
protected int readFile(Path path, ByteBuffer buffer) throws IOException {
int numBytesRead = 0;
int numBytes;
Expand All @@ -296,21 +308,26 @@ protected int readFile(Path path, ByteBuffer buffer) throws IOException {
return numBytesRead;
}

private Entry getEntry(int blockNumber) {
/**
* Get an entry in the cache.
* Increases the value of {@link #numGets}
* @param blockNumber block number
* @return the entry.
*/
private synchronized Entry getEntry(int blockNumber) {
Validate.checkNotNegative(blockNumber, "blockNumber");

Entry entry = blocks.get(blockNumber);
if (entry == null) {
throw new IllegalStateException(String.format("block %d not found in cache", blockNumber));
}
numGets++;
checkState(entry != null, "block %d not found in cache", blockNumber);
numGets.getAndIncrement();
addToLinkedListHead(entry);
return entry;
}

/**
* Helper method to add the given entry to the head of the linked list.
*
* Add the given entry to the head of the linked list if
* is not already there.
* Locks {@link #blocksLock} first.
* @param entry Block entry to add.
*/
private void addToLinkedListHead(Entry entry) {
Expand Down Expand Up @@ -371,9 +388,10 @@ private boolean maybePushToHeadOfBlockList(Entry entry) {
* @throws IOException if either local dir allocator fails to allocate file or if IO error
* occurs while writing the buffer content to the file.
* @throws IllegalArgumentException if buffer is null, or if buffer.limit() is zero or negative.
* @throws IllegalStateException if the cache file exists and is not empty
*/
@Override
public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
public synchronized void put(int blockNumber, ByteBuffer buffer, Configuration conf,
LocalDirAllocator localDirAllocator) throws IOException {
if (closed.get()) {
return;
Expand All @@ -382,6 +400,8 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
checkNotNull(buffer, "buffer");

if (blocks.containsKey(blockNumber)) {
// this block already exists.
// verify the checksum matches
Entry entry = blocks.get(blockNumber);
entry.takeLock(Entry.LockType.READ);
try {
Expand All @@ -398,12 +418,8 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
String blockInfo = String.format("-block-%04d", blockNumber);
Path blockFilePath = getCacheFilePath(conf, localDirAllocator, blockInfo, buffer.limit());
long size = Files.size(blockFilePath);
if (size != 0) {
String message =
String.format("[%d] temp file already has data. %s (%d)",
checkState(size == 0, "[%d] temp file already has data. %s (%d)",
blockNumber, blockFilePath, size);
throw new IllegalStateException(message);
}

writeFile(blockFilePath, buffer);
long checksum = BufferData.getChecksum(buffer);
Expand Down Expand Up @@ -485,6 +501,12 @@ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);

/**
* Write the contents of the buffer to the path.
* @param path file to create.
* @param buffer source buffer.
* @throws IOException

Check failure on line 508 in hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java#L508

javadoc: warning: no description for @throws
*/
protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
buffer.rewind();
try (WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS);
Expand Down Expand Up @@ -564,21 +586,21 @@ public String toString() {
return sb.toString();
}

/**
* Validate a block entry against a buffer, including checksum comparison
* @param entry block entry
* @param buffer buffer
* @throws IllegalStateException if invalid.
*/
private void validateEntry(Entry entry, ByteBuffer buffer) {
if (entry.size != buffer.limit()) {
String message = String.format(
"[%d] entry.size(%d) != buffer.limit(%d)",
entry.blockNumber, entry.size, buffer.limit());
throw new IllegalStateException(message);
}
checkState(entry.size == buffer.limit(),
"[%d] entry.size(%d) != buffer.limit(%d)",
entry.blockNumber, entry.size, buffer.limit());

long checksum = BufferData.getChecksum(buffer);
if (entry.checksum != checksum) {
String message = String.format(
"[%d] entry.checksum(%d) != buffer checksum(%d)",
entry.blockNumber, entry.checksum, checksum);
throw new IllegalStateException(message);
}
checkState(entry.checksum == checksum,
"[%d] entry.checksum(%d) != buffer checksum(%d)",
entry.blockNumber, entry.checksum, checksum);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public abstract class S3ARemoteInputStream

private final ChangeTracker changeTracker;

/**
* IOStatistics.
*/
private final IOStatistics ioStatistics;

/** Aggregator used to aggregate per thread IOStatistics. */
Expand Down Expand Up @@ -339,7 +342,7 @@ public int read(byte[] buffer) throws IOException {
@Override
public int read(byte[] buffer, int offset, int len) throws IOException {
throwIfClosed();

validatePositionedReadArgs(nextReadPos, buffer, offset, len);
if (len == 0) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ Multiple blocks may be read in parallel.

### Configuring the stream

|Property |Meaning |Default |
|---|---|---|
|`fs.s3a.prefetch.enabled` |Enable the prefetch input stream |`false` |
|`fs.s3a.prefetch.block.size` |Size of a block |`8M` |
|`fs.s3a.prefetch.block.count` |Number of blocks to prefetch |`8` |
| Property | Meaning | Default |
|-------------------------------|----------------------------------|---------|
| `fs.s3a.prefetch.enabled` | Enable the prefetch input stream | `false` |
| `fs.s3a.prefetch.block.size` | Size of a block | `8M` |
| `fs.s3a.prefetch.block.count` | Number of blocks to prefetch | `8` |

The default size of a block is 8MB, and the minimum allowed block size is 1 byte.
Decreasing block size will increase the number of blocks to be read for a file.
Expand Down Expand Up @@ -100,7 +100,7 @@ the `S3InMemoryInputStream` will be used.

If the caller makes the following read calls:

```
```java
in.read(buffer, 0, 3MB);
in.read(buffer, 0, 2MB);
```
Expand Down
Loading

0 comments on commit bfd3716

Please sign in to comment.