Skip to content

Commit

Permalink
HADOOP-18184. S3A Prefetching unbuffer.
Browse files Browse the repository at this point in the history
Making this a pre-requisite for vectored IO as
* helps me learn my way around the code
* I propose that the caching block stream stops its
  prefetching once a vector io request has come in,
  maybe even free up all existing blocks.

Change-Id: I6189a2a6afb23d5f57d663728d3c5747acd4cc73

HADOOP-18184 unbuffer; getting tests to work

Change-Id: I3a9513f39595c8fa8d7aa282ef647b0fcc8b7ef9
  • Loading branch information
steveloughran committed Jul 12, 2023
1 parent 2fb459b commit 8c8d6ed
Show file tree
Hide file tree
Showing 23 changed files with 444 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ public void requestPrefetch(int blockNumber) {

/**
* Requests cancellation of any previously issued prefetch requests.
* @param reason why?
*/
public void cancelPrefetches() {
public void cancelPrefetches(final CancelReason reason) {
// Do nothing because we do not support prefetches.
}

Expand All @@ -142,4 +143,10 @@ public void requestCaching(BufferData data) {
@Override
public void close() {
}

public enum CancelReason {
RandomIO,
Close,
Unbuffer
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static java.util.Objects.requireNonNull;

Expand All @@ -48,6 +49,8 @@
*/
public abstract class CachingBlockManager extends BlockManager {
private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);

private static final LogExactlyOnce LOG_CACHING_DISABLED = new LogExactlyOnce(LOG);
private static final int TIMEOUT_MINUTES = 60;

/**
Expand Down Expand Up @@ -82,7 +85,10 @@ public abstract class CachingBlockManager extends BlockManager {
*/
private final BlockOperations ops;

private boolean closed;
/**
* True if the manager has been closed.
*/
private final AtomicBoolean closed = new AtomicBoolean(false);

/**
* If a single caching operation takes more than this time (in seconds),
Expand Down Expand Up @@ -137,7 +143,7 @@ public CachingBlockManager(
}

this.ops = new BlockOperations();
this.ops.setDebug(false);
this.ops.setDebug(LOG.isDebugEnabled());
this.conf = requireNonNull(conf);
this.localDirAllocator = localDirAllocator;
}
Expand All @@ -158,7 +164,7 @@ public BufferData get(int blockNumber) throws IOException {
boolean done;

do {
if (closed) {
if (isClosed()) {
throw new IOException("this stream is already closed");
}

Expand Down Expand Up @@ -220,7 +226,7 @@ private boolean getInternal(BufferData data) throws IOException {
*/
@Override
public void release(BufferData data) {
if (closed) {
if (isClosed()) {
return;
}

Expand All @@ -233,16 +239,14 @@ public void release(BufferData data) {

@Override
public synchronized void close() {
if (closed) {
if (closed.getAndSet(true)) {
return;
}

closed = true;

final BlockOperations.Operation op = ops.close();

// Cancel any prefetches in progress.
cancelPrefetches();
cancelPrefetches(CancelReason.Close);

cleanupWithLogger(LOG, cache);

Expand All @@ -263,15 +267,18 @@ public synchronized void close() {
public void requestPrefetch(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");

if (closed) {
if (isClosed()) {
return;
}

// We initiate a prefetch only if we can acquire a buffer from the shared pool.
LOG.debug("Requesting prefetch for block {}", blockNumber);
BufferData data = bufferPool.tryAcquire(blockNumber);
if (data == null) {
LOG.debug("no buffer acquired for block {}", blockNumber);
return;
}
LOG.debug("acquired {}", data);

// Opportunistic check without locking.
if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
Expand All @@ -298,12 +305,15 @@ public void requestPrefetch(int blockNumber) {
* Requests cancellation of any previously issued prefetch requests.
*/
@Override
public void cancelPrefetches() {
public void cancelPrefetches(final CancelReason reason) {
LOG.debug("Cancelling prefetches {}", reason);
BlockOperations.Operation op = ops.cancelPrefetches();

for (BufferData data : bufferPool.getAll()) {
// We add blocks being prefetched to the local cache so that the prefetch is not wasted.
if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) {
// this only done if the reason is random IO-related, not due to close/unbuffer
if (reason == CancelReason.RandomIO &&
data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) {
requestCaching(data);
}
}
Expand All @@ -316,7 +326,7 @@ private void read(BufferData data) throws IOException {
try {
readBlock(data, false, BufferData.State.BLANK);
} catch (IOException e) {
LOG.error("error reading block {}", data.getBlockNumber(), e);
LOG.debug("error reading block {}", data.getBlockNumber(), e);
throw e;
}
}
Expand All @@ -337,7 +347,7 @@ private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOExc
private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... expectedState)
throws IOException {

if (closed) {
if (isClosed()) {
return;
}

Expand Down Expand Up @@ -367,6 +377,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
tracker = prefetchingStatistics.prefetchOperationStarted();
op = ops.prefetch(data.getBlockNumber());
} else {
tracker = prefetchingStatistics.blockFetchOperationStarted();
op = ops.getRead(data.getBlockNumber());
}

Expand All @@ -392,14 +403,22 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...

if (isPrefetch) {
prefetchingStatistics.prefetchOperationCompleted();
if (tracker != null) {
tracker.close();
}
}
if (tracker != null) {
tracker.close();
LOG.debug("fetch completed: {}", tracker);
}
}
}
}

/**
* True if the manager has been closed.
*/
private boolean isClosed() {
return closed.get();
}

/**
* Read task that is submitted to the future pool.
*/
Expand All @@ -426,10 +445,13 @@ public Void get() {
}
}

private static final BufferData.State[] EXPECTED_STATE_AT_CACHING =
new BufferData.State[] {
BufferData.State.PREFETCHING, BufferData.State.READY
};

/**
* Required state of a block for it to be cacheable.
*/
private static final BufferData.State[] EXPECTED_STATE_AT_CACHING = {
BufferData.State.PREFETCHING, BufferData.State.READY
};

/**
* Requests that the given block should be copied to the local cache.
Expand All @@ -440,9 +462,10 @@ public Void get() {
*/
@Override
public void requestCaching(BufferData data) {
if (closed) {
if (isClosed()) {
return;
}
LOG.debug("Request caching of {}", data);

if (cachingDisabled.get()) {
data.setDone();
Expand All @@ -453,16 +476,19 @@ public void requestCaching(BufferData data) {

// Opportunistic check without locking.
if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
LOG.debug("Block in wrong state to cache");
return;
}

synchronized (data) {
// Reconfirm state after locking.
if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
LOG.debug("Block in wrong state to cache");
return;
}

if (cache.containsBlock(data.getBlockNumber())) {
LOG.debug("Block is already in cache");
data.setDone();
return;
}
Expand Down Expand Up @@ -492,7 +518,7 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
prefetchingStatistics.executorAcquired(
Duration.between(taskQueuedStartTime, Instant.now()));

if (closed) {
if (isClosed()) {
return;
}

Expand Down Expand Up @@ -550,7 +576,7 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
if (!cachingDisabled.getAndSet(true)) {
String message = String.format(
"Caching disabled because of slow operation (%.1f sec)", endOp.duration());
LOG.warn(message);
LOG_CACHING_DISABLED.info(message);
}
}
}
Expand All @@ -562,9 +588,10 @@ protected BlockCache createCache() {
}

protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
if (closed) {
if (isClosed()) {
return;
}
LOG.debug("Caching {}: {}", blockNumber, buffer);

cache.put(blockNumber, buffer, conf, localDirAllocator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

public interface PrefetchingStatistics extends IOStatisticsSource {
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;

public interface PrefetchingStatistics extends IOStatisticsSource {

/**
* A prefetch operation has started.
* @return duration tracker
*/
DurationTracker prefetchOperationStarted();

/**
* A block fetch operation has started.
* @return duration tracker
*/
default DurationTracker blockFetchOperationStarted() {
return stubDurationTracker();
}

/**
* A block has been saved to the file cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.DurationInfo;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
Expand Down Expand Up @@ -311,11 +312,13 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,

protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
buffer.rewind();
WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS);
while (buffer.hasRemaining()) {
writeChannel.write(buffer);
try (WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS);
DurationInfo d = new DurationInfo(LOG, "Writing %d bytes to %s",
buffer.remaining(), path)) {
while (buffer.hasRemaining()) {
writeChannel.write(buffer);
}
}
writeChannel.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,12 @@ public final class StreamStatisticNames {
public static final String STREAM_READ_PREFETCH_OPERATIONS
= "stream_read_prefetch_operations";

/**
* Total number of block fetch operations executed.
*/
public static final String STREAM_READ_BLOCK_FETCH_OPERATIONS
= "stream_read_block_fetch_operations";

/**
* Total number of block in disk cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

import org.apache.hadoop.fs.StorageStatistics;
Expand All @@ -51,6 +54,12 @@
* Support for implementing IOStatistics interfaces.
*/
public final class IOStatisticsBinding {
/**
* Log changes at debug.
* Noisy, but occasionally useful.
*/
private static final Logger LOG =
LoggerFactory.getLogger(IOStatisticsBinding.class);

/** Pattern used for each entry. */
public static final String ENTRY_PATTERN = "(%s=%s)";
Expand Down Expand Up @@ -548,13 +557,15 @@ public static <B> B invokeTrackingDuration(
} catch (IOException | RuntimeException e) {
// input function failed: note it
tracker.failed();
LOG.debug("Operation failure with duration {}", tracker);
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after the catch() call would have
// set the failed flag.
tracker.close();
LOG.debug("Operation success with duration {}", tracker);
}
}

Expand Down
Loading

0 comments on commit 8c8d6ed

Please sign in to comment.