Skip to content

Commit

Permalink
HADOOP-18184. S3A Prefetching unbuffer.
Browse files Browse the repository at this point in the history
Making this a pre-requisite for vectored IO as
* helps me learn my way around the code
* I propose that the caching block stream stops its
  prefetching once a vector io request has come in,
  maybe even free up all existing blocks.

Change-Id: I6189a2a6afb23d5f57d663728d3c5747acd4cc73

HADOOP-18184 unbuffer; getting tests to work

Change-Id: I3a9513f39595c8fa8d7aa282ef647b0fcc8b7ef9

HADOOP-18184. S3A prefetch unbuffer

* In memory unbuffer working, with tests
* prefetching worse than it was.

Change-Id: I33178b7f691a75f2f043c0d585e7d9fb08f53a4a

HADOOP-18184. S3A prefetch unbuffer

Adding gauges of prefetch block size and number to prefetch.

Change-Id: I35dcbccf0fc7b29d4b1294c49d7dfa97c573befb
  • Loading branch information
steveloughran committed Jul 20, 2023
1 parent 646854e commit 13b8091
Show file tree
Hide file tree
Showing 27 changed files with 723 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public BlockData(long fileSize, int blockSize) {
? 1
: 0);
this.state = new State[this.numBlocks];
markBlocksAsNotReady();
}

/**
* Mark all the blocks as not ready.
*/
public void markBlocksAsNotReady() {
for (int b = 0; b < this.numBlocks; b++) {
setState(b, State.NOT_READY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ public void requestPrefetch(int blockNumber) {

/**
* Requests cancellation of any previously issued prefetch requests.
* @param reason why?
*/
public void cancelPrefetches() {
public void cancelPrefetches(final CancelReason reason) {
// Do nothing because we do not support prefetches.
}

Expand All @@ -142,4 +143,16 @@ public void requestCaching(BufferData data) {
@Override
public void close() {
}

/**
* Reason for cancelling prefetches.
*/
public enum CancelReason {
/** Stream has switched to random IO. */
RandomIO,
/** Stream closed completely */
Close,
/** Stream unbuffered. */
Unbuffer
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static java.util.Objects.requireNonNull;

Expand All @@ -48,6 +49,8 @@
*/
public abstract class CachingBlockManager extends BlockManager {
private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);

private static final LogExactlyOnce LOG_CACHING_DISABLED = new LogExactlyOnce(LOG);
private static final int TIMEOUT_MINUTES = 60;

/**
Expand Down Expand Up @@ -82,7 +85,10 @@ public abstract class CachingBlockManager extends BlockManager {
*/
private final BlockOperations ops;

private boolean closed;
/**
* True if the manager has been closed.
*/
private final AtomicBoolean closed = new AtomicBoolean(false);

/**
* If a single caching operation takes more than this time (in seconds),
Expand Down Expand Up @@ -140,7 +146,8 @@ public CachingBlockManager(
}

this.ops = new BlockOperations();
this.ops.setDebug(false);
this.ops.setDebug(LOG.isDebugEnabled());
this.conf = requireNonNull(conf);
this.localDirAllocator = localDirAllocator;
}

Expand All @@ -160,7 +167,7 @@ public BufferData get(int blockNumber) throws IOException {
boolean done;

do {
if (closed) {
if (isClosed()) {
throw new IOException("this stream is already closed");
}

Expand Down Expand Up @@ -222,7 +229,7 @@ private boolean getInternal(BufferData data) throws IOException {
*/
@Override
public void release(BufferData data) {
if (closed) {
if (isClosed()) {
return;
}

Expand All @@ -235,16 +242,14 @@ public void release(BufferData data) {

@Override
public synchronized void close() {
if (closed) {
if (closed.getAndSet(true)) {
return;
}

closed = true;

final BlockOperations.Operation op = ops.close();

// Cancel any prefetches in progress.
cancelPrefetches();
cancelPrefetches(CancelReason.Close);

cleanupWithLogger(LOG, cache);

Expand All @@ -265,15 +270,18 @@ public synchronized void close() {
public void requestPrefetch(int blockNumber) {
checkNotNegative(blockNumber, "blockNumber");

if (closed) {
if (isClosed()) {
return;
}

// We initiate a prefetch only if we can acquire a buffer from the shared pool.
LOG.debug("Requesting prefetch for block {}", blockNumber);
BufferData data = bufferPool.tryAcquire(blockNumber);
if (data == null) {
LOG.debug("no buffer acquired for block {}", blockNumber);
return;
}
LOG.debug("acquired {}", data);

// Opportunistic check without locking.
if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
Expand All @@ -298,16 +306,26 @@ public void requestPrefetch(int blockNumber) {

/**
* Requests cancellation of any previously issued prefetch requests.
* If the reason was switching to random IO, any active prefetched blocks
* are still cached.
* @param reason why?
*/
@Override
public void cancelPrefetches() {
public void cancelPrefetches(final CancelReason reason) {
LOG.debug("Cancelling prefetches {}", reason);
BlockOperations.Operation op = ops.cancelPrefetches();

for (BufferData data : bufferPool.getAll()) {
// We add blocks being prefetched to the local cache so that the prefetch is not wasted.
if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) {
requestCaching(data);
if (reason == CancelReason.RandomIO) {
for (BufferData data : bufferPool.getAll()) {
// We add blocks being prefetched to the local cache so that the prefetch is not wasted.
// this only done if the reason is random IO-related, not due to close/unbuffer
if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) {
requestCaching(data);
}
}
} else {
// free the buffers
bufferPool.getAll().forEach(BufferData::setDone);
}

ops.end(op);
Expand All @@ -318,7 +336,7 @@ private void read(BufferData data) throws IOException {
try {
readBlock(data, false, BufferData.State.BLANK);
} catch (IOException e) {
LOG.error("error reading block {}", data.getBlockNumber(), e);
LOG.debug("error reading block {}", data.getBlockNumber(), e);
throw e;
}
}
Expand All @@ -339,7 +357,7 @@ private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOExc
private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... expectedState)
throws IOException {

if (closed) {
if (isClosed()) {
return;
}

Expand Down Expand Up @@ -369,6 +387,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
tracker = prefetchingStatistics.prefetchOperationStarted();
op = ops.prefetch(data.getBlockNumber());
} else {
tracker = prefetchingStatistics.blockFetchOperationStarted();
op = ops.getRead(data.getBlockNumber());
}

Expand All @@ -394,14 +413,40 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...

if (isPrefetch) {
prefetchingStatistics.prefetchOperationCompleted();
if (tracker != null) {
tracker.close();
}
}
if (tracker != null) {
tracker.close();
LOG.debug("fetch completed: {}", tracker);
}
}
}
}

/**
* True if the manager has been closed.
*/
private boolean isClosed() {
return closed.get();
}

/**
* Disable caching; updates stream statistics and logs exactly once
* at info
* @param endOp operation which measured the duration of the write.
*/
private void disableCaching(final BlockOperations.End endOp) {
if (!cachingDisabled.getAndSet(true)) {
String message = String.format(
"Caching disabled because of slow operation (%.1f sec)", endOp.duration());
LOG_CACHING_DISABLED.info(message);
prefetchingStatistics.setPrefetchCachingState(false);
}
}

private boolean isCachingDisabled() {
return cachingDisabled.get();
}

/**
* Read task that is submitted to the future pool.
*/
Expand All @@ -428,10 +473,12 @@ public Void get() {
}
}

private static final BufferData.State[] EXPECTED_STATE_AT_CACHING =
new BufferData.State[] {
BufferData.State.PREFETCHING, BufferData.State.READY
};
/**
* Required state of a block for it to be cacheable.
*/
private static final BufferData.State[] EXPECTED_STATE_AT_CACHING = {
BufferData.State.PREFETCHING, BufferData.State.READY
};

/**
* Requests that the given block should be copied to the local cache.
Expand All @@ -442,11 +489,11 @@ public Void get() {
*/
@Override
public void requestCaching(BufferData data) {
if (closed) {
return;
}

if (cachingDisabled.get()) {
final int blockNumber = data.getBlockNumber();
LOG.debug("Block {}: request caching of {}", blockNumber, data);

if (isClosed() || isCachingDisabled()) {
data.setDone();
return;
}
Expand All @@ -455,16 +502,21 @@ public void requestCaching(BufferData data) {

// Opportunistic check without locking.
if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
LOG.debug("Block {}: Block in wrong state to cache:",
blockNumber, data.getState());
return;
}

synchronized (data) {
// Reconfirm state after locking.
if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
LOG.debug("Block {}: Block in wrong state to cache:",
blockNumber, data.getState());
return;
}

if (cache.containsBlock(data.getBlockNumber())) {
LOG.debug("Block {}: Block is already in cache", blockNumber);
data.setDone();
return;
}
Expand Down Expand Up @@ -494,19 +546,25 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
prefetchingStatistics.executorAcquired(
Duration.between(taskQueuedStartTime, Instant.now()));

if (closed) {
if (isClosed()) {
return;
}

if (cachingDisabled.get()) {
final int blockNumber = data.getBlockNumber();
LOG.debug("Block {}: Preparing to cache block", blockNumber);

if (isCachingDisabled()) {
LOG.debug("Block {}: Preparing caching disabled, not prefetching", blockNumber);
data.setDone();
return;
}
LOG.debug("Block {}: awaiting any read to complete", blockNumber);

try {
blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (data.stateEqualsOneOf(BufferData.State.DONE)) {
// There was an error during prefetch.
LOG.debug("Block {}: prefetch failure", blockNumber);
return;
}
} catch (Exception e) {
Expand All @@ -516,7 +574,8 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
return;
}

if (cachingDisabled.get()) {
if (isCachingDisabled()) {
LOG.debug("Block {}: Preparing caching disabled while reading data", blockNumber);
data.setDone();
return;
}
Expand All @@ -526,18 +585,21 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
synchronized (data) {
try {
if (data.stateEqualsOneOf(BufferData.State.DONE)) {
LOG.debug("Block {}: Block already in cache; not adding", blockNumber);

return;
}

if (cache.containsBlock(data.getBlockNumber())) {
if (cache.containsBlock(blockNumber)) {
LOG.debug("Block {}: already in cache; not adding", blockNumber);
data.setDone();
return;
}

op = ops.addToCache(data.getBlockNumber());
op = ops.addToCache(blockNumber);
ByteBuffer buffer = data.getBuffer().duplicate();
buffer.rewind();
cachePut(data.getBlockNumber(), buffer);
cachePut(blockNumber, buffer);
data.setDone();
} catch (Exception e) {
numCachingErrors.incrementAndGet();
Expand All @@ -549,11 +611,7 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
if (op != null) {
BlockOperations.End endOp = (BlockOperations.End) ops.end(op);
if (endOp.duration() > SLOW_CACHING_THRESHOLD) {
if (!cachingDisabled.getAndSet(true)) {
String message = String.format(
"Caching disabled because of slow operation (%.1f sec)", endOp.duration());
LOG.warn(message);
}
disableCaching(endOp);
}
}
}
Expand All @@ -564,9 +622,10 @@ protected BlockCache createCache(int maxBlocksCount) {
}

protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
if (closed) {
if (isClosed()) {
return;
}
LOG.debug("Block {}: Caching", buffer);

cache.put(blockNumber, buffer, conf, localDirAllocator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,7 @@ public void memoryAllocated(int size) {
public void memoryFreed(int size) {

}


}

Loading

0 comments on commit 13b8091

Please sign in to comment.