Skip to content

Commit

Permalink
HADOOP-18184. fix NPEs in BlockManager unit tests by adding withPath()
Browse files Browse the repository at this point in the history
Change-Id: Ie3d1c266b1231fa85c01092dd79f2dcf961fe498
  • Loading branch information
steveloughran committed May 31, 2024
1 parent d39deb8 commit b2d1d92
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerPar

Validate.checkPositiveInteger(blockManagerParameters.getBufferPoolSize(), "bufferPoolSize");

this.path = requireNonNull(blockManagerParameters.getPath());
this.futurePool = requireNonNull(blockManagerParameters.getFuturePool());
this.path = requireNonNull(blockManagerParameters.getPath(), "block manager path");
this.futurePool = requireNonNull(blockManagerParameters.getFuturePool(), "future pool");
this.bufferPoolSize = blockManagerParameters.getBufferPoolSize();
this.numCachingErrors = new AtomicInteger();
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
this.prefetchingStatistics = requireNonNull(
blockManagerParameters.getPrefetchingStatistics());
this.conf = requireNonNull(blockManagerParameters.getConf());
blockManagerParameters.getPrefetchingStatistics(), "prefetching statistics");
this.conf = requireNonNull(blockManagerParameters.getConf(), "configuratin");

if (getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, getBlockData().getBlockSize(),
Expand Down Expand Up @@ -365,6 +365,9 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
DurationTracker tracker = null;

int bytesFetched = 0;
// to be filled in later.
long offset = 0;
int size = 0;
synchronized (data) {
try {
if (data.stateEqualsOneOf(BufferData.State.DONE, BufferData.State.READY)) {
Expand All @@ -375,6 +378,10 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...

data.throwIfStateIncorrect(expectedState);
int blockNumber = data.getBlockNumber();
final BlockData blockData = getBlockData();

offset = blockData.getStartOffset(data.getBlockNumber());
size = blockData.getSize(data.getBlockNumber());

// Prefer reading from cache over reading from network.
if (cache.containsBlock(blockNumber)) {
Expand All @@ -392,15 +399,23 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
op = ops.getRead(data.getBlockNumber());
}

long offset = getBlockData().getStartOffset(data.getBlockNumber());
int size = getBlockData().getSize(data.getBlockNumber());

ByteBuffer buffer = data.getBuffer();
buffer.clear();
read(buffer, offset, size);
buffer.flip();
data.setReady(expectedState);
bytesFetched = size;
LOG.debug("Completed {} of block {} [{}-{}]",
isPrefetch ? "prefetch" : "read",
data.getBlockNumber(),
offset, offset + size);
} catch (Exception e) {
LOG.debug("Failure in {} of block {} [{}-{}]",
isPrefetch ? "prefetch" : "read",
data.getBlockNumber(),
offset, offset + size,
e);
if (isPrefetch && tracker != null) {
tracker.failed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private enum LockType {
@Override
public String toString() {
return String.format(
"([%03d] %s: size = %d, checksum = %d)",
"([%03d] %s: size = %,d, checksum = %d)",
blockNumber, path, size, checksum);
}

Expand Down Expand Up @@ -316,37 +316,39 @@ private Entry getEntry(int blockNumber) {
private void addToLinkedListHead(Entry entry) {
blocksLock.writeLock().lock();
try {
addToHeadOfLinkedList(entry);
maybePushToHeadOfBlockList(entry);
} finally {
blocksLock.writeLock().unlock();
}
}

/**
* Add the given entry to the head of the linked list.
*
* Maybe Add the given entry to the head of the block list.
* No-op if the block is already in the list.
* @param entry Block entry to add.
* @return true if the block was added.
*/
private void addToHeadOfLinkedList(Entry entry) {
private boolean maybePushToHeadOfBlockList(Entry entry) {
if (head == null) {
head = entry;
tail = entry;
}
LOG.debug(
"Block num {} to be added to the head. Current head block num: {} and tail block num: {}",
entry.blockNumber, head.blockNumber, tail.blockNumber);
"Block {} to be added to the head. Current head block {} and tail block {}; {}",
entry.blockNumber, head.blockNumber, tail.blockNumber, entry);
if (entry != head) {
Entry prev = entry.getPrevious();
Entry nxt = entry.getNext();
// no-op if the block is already evicted
Entry next = entry.getNext();
// no-op if the block is already block list
if (!blocks.containsKey(entry.blockNumber)) {
return;
LOG.debug("Block {} is already in block list", entry.blockNumber);
return false;
}
if (prev != null) {
prev.setNext(nxt);
prev.setNext(next);
}
if (nxt != null) {
nxt.setPrevious(prev);
if (next != null) {
next.setPrevious(prev);
}
entry.setPrevious(null);
entry.setNext(head);
Expand All @@ -356,6 +358,7 @@ private void addToHeadOfLinkedList(Entry entry) {
tail = prev;
}
}
return true;
}

/**
Expand Down Expand Up @@ -424,8 +427,9 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
private void addToLinkedListAndEvictIfRequired(Entry entry) {
blocksLock.writeLock().lock();
try {
addToHeadOfLinkedList(entry);
entryListSize++;
if (maybePushToHeadOfBlockList(entry)) {
entryListSize++;
}
if (entryListSize > maxBlocksCount && !closed.get()) {
Entry elementToPurge = tail;
tail = tail.getPrevious();
Expand All @@ -447,12 +451,13 @@ private void addToLinkedListAndEvictIfRequired(Entry entry) {
* @param elementToPurge Block entry to evict.
*/
private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
LOG.debug("Evicting block {} from cache: {}", elementToPurge.blockNumber, elementToPurge);
try (DurationTracker ignored = trackerFactory.trackDuration(STREAM_FILE_CACHE_EVICTION)) {
boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
if (!lockAcquired) {
LOG.error("Cache file {} deletion would not be attempted as write lock could not"
LOG.warn("Cache file {} deletion would not be attempted as write lock could not"
+ " be acquired within {} {}", elementToPurge.path,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
Expand All @@ -463,9 +468,11 @@ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
prefetchingStatistics.blockRemovedFromFileCache();
blocks.remove(elementToPurge.blockNumber);
prefetchingStatistics.blockEvictedFromFileCache();
} else {
LOG.debug("Cache file {} not found for deletion: {}", elementToPurge.path, elementToPurge);
}
} catch (IOException e) {
LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
LOG.warn("Failed to delete cache file {} for {}", elementToPurge.path, elementToPurge, e);
} finally {
elementToPurge.releaseLock(Entry.LockType.WRITE);
}
Expand All @@ -481,7 +488,7 @@ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
buffer.rewind();
try (WritableByteChannel writeChannel = Files.newByteChannel(path, CREATE_OPTIONS);
DurationInfo d = new DurationInfo(LOG, "Writing %d bytes to %s",
DurationInfo d = new DurationInfo(LOG, "save %d bytes to %s",
buffer.remaining(), path)) {
while (buffer.hasRemaining()) {
writeChannel.write(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public S3APrefetchingInputStream(
client,
streamStatistics);
} else {
LOG.debug("Creating in caching input stream for {}", context.getPath());
LOG.debug("Creating caching input stream for {}", context.getPath());
this.inputStream = new S3ACachingInputStream(
context,
s3Attributes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)

final String path = this.remoteObject.getPath();
EOFException invokerResponse =
invoker.retry(String.format("read %s [%d-%d]", path, offset, size),
invoker.retry(String.format("read %s [%d-%d]", path, offset, offset + size),
path, true,
trackDurationOfOperation(streamStatistics,
STREAM_READ_REMOTE_BLOCK_READ, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;

import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_COUNT_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
Expand All @@ -58,6 +57,11 @@ public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {

public static final int PREFETCH_OFFSET = 10240;

/**
* Block prefetch count: {@value}.
*/
public static final int BLOCK_COUNT = 8;

private Path testFile;

/** The FS with the external file. */
Expand Down Expand Up @@ -86,16 +90,18 @@ public void setUp() throws Exception {
@Override
public Configuration createConfiguration() {
Configuration configuration = super.createConfiguration();
if (isUsingDefaultExternalDataFile(configuration)) {
S3ATestUtils.removeBaseAndBucketOverrides(configuration,
PREFETCH_ENABLED_KEY,
ENDPOINT);
}
S3ATestUtils.removeBaseAndBucketOverrides(configuration,
PREFETCH_ENABLED_KEY,
PREFETCH_BLOCK_COUNT_KEY,
PREFETCH_BLOCK_SIZE_KEY);
configuration.setBoolean(PREFETCH_ENABLED_KEY, true);
// use a small block size unless explicitly set in the test config.
configuration.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
configuration.setInt(PREFETCH_BLOCK_COUNT_KEY, BLOCK_COUNT);

// patch buffer dir with a unique path for test isolation.
final String bufferDirBase = configuration.get(BUFFER_DIR);

final String bufferDirBase = "target/prefetch";
bufferDir = bufferDirBase + "/" + UUID.randomUUID();
configuration.set(BUFFER_DIR, bufferDir);
return configuration;
Expand Down Expand Up @@ -131,7 +137,7 @@ public void testCacheFileExistence() throws Throwable {
in.read(prefetchBlockSize * 2, buffer, 0, prefetchBlockSize);


File tmpFileDir = new File(conf.get(BUFFER_DIR));
File tmpFileDir = new File(bufferDir);
final LocalFileSystem localFs = FileSystem.getLocal(conf);
Path bufferDirPath = new Path(tmpFileDir.toURI());
ContractTestUtils.assertIsDirectory(localFs, bufferDirPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters;
import org.apache.hadoop.fs.impl.prefetch.BlockManager;
Expand Down Expand Up @@ -67,6 +68,11 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {

private ExecutorServiceFuturePool futurePool;

/**
* only used for logging.
*/
private Path testPath;

private final S3AInputStreamStatistics streamStatistics =
new EmptyS3AStatisticsContext().newInputStreamStatistics();

Expand All @@ -76,6 +82,7 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
public void setup() {
threadPool = Executors.newFixedThreadPool(4);
futurePool = new ExecutorServiceFuturePool(threadPool);
testPath = new Path("/");
}

@After
Expand All @@ -92,13 +99,14 @@ public void teardown() {
public void testFuturePoolNull() throws Exception {
MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
Configuration conf = new Configuration();
BlockManagerParameters blockManagerParams =
new BlockManagerParameters()
.withBlockData(blockData)
.withBufferPoolSize(POOL_SIZE)
.withConf(conf)
.withPath(testPath)
.withPrefetchingStatistics(streamStatistics);
try (S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File)) {
BlockManagerParameters blockManagerParams =
new BlockManagerParameters()
.withBlockData(blockData)
.withBufferPoolSize(POOL_SIZE)
.withPrefetchingStatistics(streamStatistics)
.withConf(conf);

intercept(NullPointerException.class,
() -> new S3ACachingBlockManager(blockManagerParams, reader));
Expand All @@ -110,13 +118,15 @@ public void testNullReader() throws Exception {
Configuration conf = new Configuration();
BlockManagerParameters blockManagerParams =
new BlockManagerParameters()
.withFuturePool(futurePool)
.withBlockData(blockData)
.withBufferPoolSize(POOL_SIZE)
.withPrefetchingStatistics(streamStatistics)
.withConf(conf)
.withFuturePool(futurePool)
.withMaxBlocksCount(
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT))
.withPath(testPath)
.withPrefetchingStatistics(streamStatistics);


intercept(IllegalArgumentException.class, "'reader' must not be null",
() -> new S3ACachingBlockManager(blockManagerParams, null));
Expand Down Expand Up @@ -194,13 +204,15 @@ public void testArgChecks() throws Exception {
Configuration conf = new Configuration();
BlockManagerParameters blockManagerParams =
new BlockManagerParameters()
.withFuturePool(futurePool)
.withBlockData(blockData)
.withBufferPoolSize(POOL_SIZE)
.withPrefetchingStatistics(streamStatistics)
.withConf(conf)
.withFuturePool(futurePool)
.withMaxBlocksCount(
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT))
.withPath(testPath)
.withPrefetchingStatistics(streamStatistics);


// Should not throw.
S3ACachingBlockManager blockManager =
Expand Down Expand Up @@ -370,14 +382,15 @@ private void testPrefetchHelper(boolean forcePrefetchFailure)

private BlockManagerParameters getBlockManagerParameters() {
return new BlockManagerParameters()
.withFuturePool(futurePool)
.withBlockData(blockData)
.withBufferPoolSize(POOL_SIZE)
.withPrefetchingStatistics(streamStatistics)
.withLocalDirAllocator(new LocalDirAllocator(HADOOP_TMP_DIR))
.withConf(CONF)
.withFuturePool(futurePool)
.withLocalDirAllocator(new LocalDirAllocator(HADOOP_TMP_DIR))
.withMaxBlocksCount(
CONF.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
CONF.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT))
.withPath(testPath)
.withPrefetchingStatistics(streamStatistics);
}

// @Ignore
Expand All @@ -389,15 +402,17 @@ public void testCachingOfPrefetched()
Configuration conf = new Configuration();
BlockManagerParameters blockManagerParamsBuilder =
new BlockManagerParameters()
.withFuturePool(futurePool)
.withBlockData(blockData)
.withBufferPoolSize(POOL_SIZE)
.withPrefetchingStatistics(streamStatistics)
.withConf(conf)
.withFuturePool(futurePool)
.withLocalDirAllocator(
new LocalDirAllocator(conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR))
.withConf(conf)
.withMaxBlocksCount(
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT))
.withPath(testPath)
.withPrefetchingStatistics(streamStatistics);

S3ACachingBlockManager blockManager =
new S3ACachingBlockManager(blockManagerParamsBuilder, reader);
assertInitialState(blockManager);
Expand Down

0 comments on commit b2d1d92

Please sign in to comment.