Skip to content

Commit

Permalink
HADOOP-18184. Still working on prefetch logic and that test
Browse files Browse the repository at this point in the history
* Lots of logging
* prefetch operation appears to be blocking for much longer than the
  read will take, and the read doesn't take place then anyway.
  Sync problem?
* Test didn't expect any prefetching

Change-Id: Ie9da9a464ddccd481865eec2bc97fce5c50ed306
  • Loading branch information
steveloughran committed Nov 18, 2024
1 parent 5b85014 commit dfc9a6c
Show file tree
Hide file tree
Showing 16 changed files with 217 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.fs.impl.prefetch;

import java.util.Arrays;

import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkPositiveInteger;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkWithinRange;
Expand Down Expand Up @@ -92,6 +94,16 @@ public BlockData(long fileSize, int blockSize) {
markBlocksAsNotReady();
}

@Override
public String toString() {
return "BlockData{" +
"state=" + Arrays.toString(state) +
", fileSize=" + fileSize +
", blockSize=" + blockSize +
", numBlocks=" + numBlocks +
'}';
}

/**
* Mark all the blocks as not ready.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;

Expand All @@ -34,6 +37,9 @@
*/
public abstract class BlockManager implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(
BlockManager.class);

/**
* Information about each block of the underlying file.
*/
Expand Down Expand Up @@ -80,6 +86,7 @@ public BufferData get(int blockNumber) throws IOException {
int size = blockData.getSize(blockNumber);
ByteBuffer buffer = ByteBuffer.allocate(size);
long startOffset = blockData.getStartOffset(blockNumber);
LOG.debug("Get block {} range [{} - {}]", blockNumber, startOffset, startOffset + size - 1);
read(buffer, startOffset, size);
buffer.flip();
return new BufferData(blockNumber, buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.fs.impl.prefetch;

import java.time.Duration;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
Expand Down Expand Up @@ -76,6 +78,10 @@ public final class BlockManagerParameters {
*/
private DurationTrackerFactory trackerFactory;


private Duration maxRetry;

private Duration updateInterval;
/**
* @return The Executor future pool to perform async prefetch tasks.
*/
Expand Down Expand Up @@ -246,4 +252,32 @@ public BlockManagerParameters withPath(final Path value) {
path = value;
return this;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockManagerParameters withMaxRetry(final Duration value) {
maxRetry = value;
return this;
}

public Duration getMaxRetry() {
return maxRetry;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public BlockManagerParameters withUpdateInterval(final Duration value) {
updateInterval = value;
return this;
}

public Duration getUpdateInterval() {
return updateInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
Expand Down Expand Up @@ -56,6 +57,10 @@ public class BufferPool implements Closeable {
*/
private final int bufferSize;

private final Duration maxRetry;

private final Duration updateInterval;

/*
Invariants for internal state.
-- a buffer is either in this.pool or in this.allocated
Expand Down Expand Up @@ -84,18 +89,22 @@ public class BufferPool implements Closeable {
* @param size number of buffer in this pool.
* @param bufferSize size in bytes of each buffer.
* @param prefetchingStatistics statistics for this stream.
* @param maxRetry max time to retry
* @param updateInterval interval for updates
* @throws IllegalArgumentException if size is zero or negative.
* @throws IllegalArgumentException if bufferSize is zero or negative.
*/
public BufferPool(int size,
int bufferSize,
PrefetchingStatistics prefetchingStatistics) {
PrefetchingStatistics prefetchingStatistics,
Duration maxRetry,
Duration updateInterval) {
Validate.checkPositiveInteger(size, "size");
Validate.checkPositiveInteger(bufferSize, "bufferSize");

this.size = size;
this.bufferSize = bufferSize;
this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
this.allocated = new IdentityHashMap<>();
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.pool = new BoundedResourcePool<ByteBuffer>(size) {
@Override
Expand All @@ -105,6 +114,8 @@ public ByteBuffer createNew() {
return buffer;
}
};
this.maxRetry = maxRetry;
this.updateInterval = updateInterval;
}

/**
Expand All @@ -124,8 +135,8 @@ public List<BufferData> getAll() {
*/
public synchronized BufferData acquire(int blockNumber) {
BufferData data;
final int maxRetryDelayMs = 600 * 1000;
final int statusUpdateDelayMs = 120 * 1000;
final int maxRetryDelayMs = (int) maxRetry.toMillis();
final int statusUpdateDelayMs = (int) updateInterval.toMillis();
Retryer retryer = new Retryer(10, maxRetryDelayMs, statusUpdateDelayMs);

do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,21 @@ public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerPar
this.cachingDisabled = new AtomicBoolean();
this.prefetchingStatistics = requireNonNull(
blockManagerParameters.getPrefetchingStatistics(), "prefetching statistics");
this.conf = requireNonNull(blockManagerParameters.getConf(), "configuratin");
this.conf = requireNonNull(blockManagerParameters.getConf(), "configuration");

if (getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, getBlockData().getBlockSize(),
this.prefetchingStatistics);
this.prefetchingStatistics,
blockManagerParameters.getMaxRetry(),
blockManagerParameters.getUpdateInterval());
this.cache = createCache(blockManagerParameters.getMaxBlocksCount(),
blockManagerParameters.getTrackerFactory());
}

this.ops = new BlockOperations();
this.ops.setDebug(LOG.isDebugEnabled());
this.localDirAllocator = blockManagerParameters.getLocalDirAllocator();
prefetchingStatistics.setPrefetchDiskCachingState(true);
this.prefetchingStatistics.setPrefetchDiskCachingState(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.util.concurrent.HadoopExecutors;

Expand All @@ -42,6 +43,8 @@
*
*/
public class ExecutorServiceFuturePool {
private static final Logger LOG = LoggerFactory.getLogger(
ExecutorServiceFuturePool.class);

private final ExecutorService executor;

Expand All @@ -56,6 +59,8 @@ public ExecutorServiceFuturePool(ExecutorService executor) {
* @throws NullPointerException if f param is null
*/
public Future<Void> executeFunction(final Supplier<Void> f) {

LOG.debug("Executing function {}", f);
return executor.submit(f::get);
}

Expand All @@ -67,6 +72,8 @@ public Future<Void> executeFunction(final Supplier<Void> f) {
*/
@SuppressWarnings("unchecked")
public Future<Void> executeRunnable(final Runnable r) {
LOG.debug("Executing runnable {}", r);

return (Future<Void>) executor.submit(r::run);
}

Expand All @@ -79,6 +86,8 @@ public Future<Void> executeRunnable(final Runnable r) {
* @param unit the time unit of the timeout argument
*/
public void shutdown(Logger logger, long timeout, TimeUnit unit) {
LOG.debug("Shutdown");

HadoopExecutors.shutdown(executor, logger, timeout, unit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,8 @@ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
} else {
try {
if (Files.deleteIfExists(elementToPurge.path)) {
LOG.debug("Removing and evicting Cache file {}: {}",
elementToPurge.path, elementToPurge);
entryListSize--;
prefetchingStatistics.blockRemovedFromFileCache();
blocks.remove(elementToPurge.blockNumber);
Expand Down Expand Up @@ -641,7 +643,7 @@ private String getStats() {
StringBuilder sb = new StringBuilder();
sb.append(String.format(
"#entries = %d, #gets = %d",
blocks.size(), numGets));
blocks.size(), numGets.get()));
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.fs.impl.prefetch;

import java.time.Duration;

import org.junit.Test;

import org.apache.hadoop.test.AbstractHadoopTestBase;
Expand All @@ -35,34 +37,39 @@ public class TestBufferPool extends AbstractHadoopTestBase {

private static final int BUFFER_SIZE = 10;

public static final Duration MAX_RETRY = Duration.ofSeconds(10);

public static final Duration UPDATE_INTERVAL = Duration.ofSeconds(1);

private final PrefetchingStatistics statistics =
EmptyPrefetchingStatistics.getInstance();

@Test
public void testArgChecks() throws Exception {
// Should not throw.
BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, statistics);
BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, statistics, MAX_RETRY,
UPDATE_INTERVAL);

// Verify it throws correctly.

intercept(IllegalArgumentException.class,
"'size' must be a positive integer",
() -> new BufferPool(0, 10, statistics));
() -> new BufferPool(0, 10, statistics, MAX_RETRY, UPDATE_INTERVAL));

intercept(IllegalArgumentException.class,
"'size' must be a positive integer",
() -> new BufferPool(-1, 10, statistics));
() -> new BufferPool(-1, 10, statistics, MAX_RETRY, UPDATE_INTERVAL));

intercept(IllegalArgumentException.class,
"'bufferSize' must be a positive integer",
() -> new BufferPool(10, 0, statistics));
() -> new BufferPool(10, 0, statistics, MAX_RETRY, UPDATE_INTERVAL));

intercept(IllegalArgumentException.class,
"'bufferSize' must be a positive integer",
() -> new BufferPool(1, -10, statistics));
() -> new BufferPool(1, -10, statistics, MAX_RETRY, UPDATE_INTERVAL));

intercept(NullPointerException.class,
() -> new BufferPool(1, 10, null));
() -> new BufferPool(1, 10, null, MAX_RETRY, UPDATE_INTERVAL));

intercept(IllegalArgumentException.class,
"'blockNumber' must not be negative",
Expand All @@ -79,7 +86,8 @@ public void testArgChecks() throws Exception {

@Test
public void testGetAndRelease() {
BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, statistics);
BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, statistics, MAX_RETRY,
UPDATE_INTERVAL);
assertInitialState(pool, POOL_SIZE);

int count = 0;
Expand Down Expand Up @@ -127,7 +135,8 @@ private void testReleaseHelper(BufferData.State stateBeforeRelease,
boolean expectThrow)
throws Exception {

BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, statistics);
BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, statistics, MAX_RETRY,
UPDATE_INTERVAL);
assertInitialState(pool, POOL_SIZE);

BufferData data = this.acquire(pool, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public int getPrefetchBlockSize() {
* @return the size of prefetch queue (in number of blocks).
*/
public int getPrefetchBlockCount() {
return this.prefetchBlockCount;
return prefetchBlockCount;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ protected S3ARemoteObjectReader getReader() {
@Override
public int read(ByteBuffer buffer, long startOffset, int size)
throws IOException {
return this.reader.read(buffer, startOffset, size);
return reader.read(buffer, startOffset, size);
}

@Override
public synchronized void close() {
this.reader.close();
reader.close();

super.close();
}
Expand Down
Loading

0 comments on commit dfc9a6c

Please sign in to comment.