Skip to content

Commit

Permalink
HADOOP-18184. temp file creation/test validation
Browse files Browse the repository at this point in the history
* use block id in filename
* log statements include fs path
* tests more resilient
* logging auditor prints GET range and length

Tests are failing with signs of
* too many GETs
* incomplete buffers. race conditions?

Change-Id: Ibdca6292df8cf0149697cecfec24035e2be473d8
  • Loading branch information
steveloughran committed Jan 2, 2024
1 parent 9c4478d commit 1381966
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 327 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;

/**
Expand All @@ -30,6 +31,11 @@
@InterfaceAudience.Private
public final class BlockManagerParameters {

/**
* The path of the underlying file; for logging.
*/
private Path path;

/**
* Asynchronous tasks are performed in this pool.
*/
Expand Down Expand Up @@ -224,4 +230,20 @@ public BlockManagerParameters withTrackerFactory(
return this;
}

/**
* @return The path of the underlying file.
*/
public Path getPath() {
return path;
}

/**
* Set the path.
* @param value new value
* @return the builder
*/
public BlockManagerParameters withPath(final Path value) {
path = value;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public enum State {
/**
* Buffer has been acquired but has no data.
*/
BLANK,
EMPTY,

/**
* This block is being prefetched.
Expand Down Expand Up @@ -114,7 +114,7 @@ public BufferData(int blockNumber, ByteBuffer buffer) {

this.blockNumber = blockNumber;
this.buffer = buffer;
this.state = State.BLANK;
this.state = State.EMPTY;
}

/**
Expand Down Expand Up @@ -181,7 +181,7 @@ public synchronized Future<Void> getActionFuture() {
public synchronized void setPrefetch(Future<Void> actionFuture) {
Validate.checkNotNull(actionFuture, "actionFuture");

this.updateState(State.PREFETCHING, State.BLANK);
this.updateState(State.PREFETCHING, State.EMPTY);
this.action = actionFuture;
}

Expand Down Expand Up @@ -289,7 +289,7 @@ public boolean stateEqualsOneOf(State... states) {
public String toString() {

return String.format(
"[%03d] id: %03d, %s: buf: %s, checksum: %d, future: %s",
"[%03d] id: %03d, State: %s: buffer: %s, checksum: %d, future: %s",
this.blockNumber,
System.identityHashCode(this),
this.state,
Expand All @@ -300,7 +300,7 @@ public String toString() {

private String getFutureStr(Future<Void> f) {
if (f == null) {
return "--";
return "(none)";
} else {
return this.action.isDone() ? "done" : "not done";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.LogExactlyOnce;

import static java.util.Objects.requireNonNull;

import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
Expand Down Expand Up @@ -112,6 +112,11 @@ public abstract class CachingBlockManager extends BlockManager {

private final LocalDirAllocator localDirAllocator;

/**
* The path of the underlying file; for logging.
*/
private Path path;

/**
* Constructs an instance of a {@code CachingBlockManager}.
*
Expand All @@ -123,6 +128,7 @@ public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerPar

Validate.checkPositiveInteger(blockManagerParameters.getBufferPoolSize(), "bufferPoolSize");

this.path = requireNonNull(blockManagerParameters.getPath());
this.futurePool = requireNonNull(blockManagerParameters.getFuturePool());
this.bufferPoolSize = blockManagerParameters.getBufferPoolSize();
this.numCachingErrors = new AtomicInteger();
Expand All @@ -132,10 +138,10 @@ public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerPar
blockManagerParameters.getPrefetchingStatistics());
this.conf = requireNonNull(blockManagerParameters.getConf());

if (this.getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
if (getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, getBlockData().getBlockSize(),
this.prefetchingStatistics);
this.cache = this.createCache(blockManagerParameters.getMaxBlocksCount(),
this.cache = createCache(blockManagerParameters.getMaxBlocksCount(),
blockManagerParameters.getTrackerFactory());
}

Expand Down Expand Up @@ -210,7 +216,7 @@ private boolean getInternal(BufferData data) throws IOException {
return true;
}

data.throwIfStateIncorrect(BufferData.State.BLANK);
data.throwIfStateIncorrect(BufferData.State.EMPTY);
read(data);
return true;
}
Expand Down Expand Up @@ -269,23 +275,23 @@ public void requestPrefetch(int blockNumber) {
}

// We initiate a prefetch only if we can acquire a buffer from the shared pool.
LOG.debug("Requesting prefetch for block {}", blockNumber);
LOG.debug("{}: Requesting prefetch for block {}", path, blockNumber);
BufferData data = bufferPool.tryAcquire(blockNumber);
if (data == null) {
LOG.debug("no buffer acquired for block {}", blockNumber);
LOG.debug("{}: no buffer acquired for block {}", path, blockNumber);
return;
}
LOG.debug("acquired {}", data);
LOG.debug("{}: acquired {}", path, data);

// Opportunistic check without locking.
if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
if (!data.stateEqualsOneOf(BufferData.State.EMPTY)) {
// The block is ready or being prefetched/cached.
return;
}

synchronized (data) {
// Reconfirm state after locking.
if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
if (!data.stateEqualsOneOf(BufferData.State.EMPTY)) {
// The block is ready or being prefetched/cached.
return;
}
Expand All @@ -306,7 +312,7 @@ public void requestPrefetch(int blockNumber) {
*/
@Override
public void cancelPrefetches(final CancelReason reason) {
LOG.debug("Cancelling prefetches: {}", reason);
LOG.debug("{}: Cancelling prefetches: {}", path, reason);
BlockOperations.Operation op = ops.cancelPrefetches();

if (reason == CancelReason.RandomIO) {
Expand All @@ -328,9 +334,9 @@ public void cancelPrefetches(final CancelReason reason) {
private void read(BufferData data) throws IOException {
synchronized (data) {
try {
readBlock(data, false, BufferData.State.BLANK);
readBlock(data, false, BufferData.State.EMPTY);
} catch (IOException e) {
LOG.debug("error reading block {}", data.getBlockNumber(), e);
LOG.debug("{}: error reading block {}", path, data.getBlockNumber(), e);
throw e;
}
}
Expand Down Expand Up @@ -432,7 +438,8 @@ private boolean isClosed() {
private void disableCaching(final BlockOperations.End endOp) {
if (!cachingDisabled.getAndSet(true)) {
String message = String.format(
"Caching disabled because of slow operation (%.1f sec)", endOp.duration());
"%s: Caching disabled because of slow operation (%.1f sec)",
path, endOp.duration());
LOG_CACHING_DISABLED.info(message);
prefetchingStatistics.setPrefetchDiskCachingState(false);
}
Expand All @@ -445,7 +452,7 @@ private boolean isCachingDisabled() {
/**
* Read task that is submitted to the future pool.
*/
private static class PrefetchTask implements Supplier<Void> {
private class PrefetchTask implements Supplier<Void> {
private final BufferData data;
private final CachingBlockManager blockManager;
private final Instant taskQueuedStartTime;
Expand All @@ -461,8 +468,8 @@ public Void get() {
try {
blockManager.prefetch(data, taskQueuedStartTime);
} catch (Exception e) {
LOG.info("error prefetching block {}. {}", data.getBlockNumber(), e.getMessage());
LOG.debug("error prefetching block {}", data.getBlockNumber(), e);
LOG.info("{}: error prefetching block {}. {}", path, data.getBlockNumber(), e.getMessage());
LOG.debug("{}: error prefetching block {}", path , data.getBlockNumber(), e);
}
return null;
}
Expand Down Expand Up @@ -492,7 +499,7 @@ public void requestCaching(BufferData data) {
Validate.checkNotNull(data, "data");

final int blockNumber = data.getBlockNumber();
LOG.debug("Block {}: request caching of {}", blockNumber, data);
LOG.debug("{}: Block {}: request caching of {}", path , blockNumber, data);

if (isClosed() || isCachingDisabled()) {
data.setDone();
Expand All @@ -501,21 +508,21 @@ public void requestCaching(BufferData data) {

// Opportunistic check without locking.
if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
LOG.debug("Block {}: Block in wrong state to cache: {}",
blockNumber, data.getState());
LOG.debug("{}: Block {}: Block in wrong state to cache: {}",
path, blockNumber, data.getState());
return;
}

synchronized (data) {
// Reconfirm state after locking.
if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
LOG.debug("Block {}: Block in wrong state to cache: {}",
blockNumber, data.getState());
LOG.debug("{}: Block {}: Block in wrong state to cache: {}",
path, blockNumber, data.getState());
return;
}

if (cache.containsBlock(data.getBlockNumber())) {
LOG.debug("Block {}: Block is already in cache", blockNumber);
LOG.debug("{}: Block {}: Block is already in cache", path, blockNumber);
data.setDone();
return;
}
Expand Down Expand Up @@ -550,33 +557,33 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
}

final int blockNumber = data.getBlockNumber();
LOG.debug("Block {}: Preparing to cache block", blockNumber);
LOG.debug("{}: Block {}: Preparing to cache block", path, blockNumber);

if (isCachingDisabled()) {
LOG.debug("Block {}: Preparing caching disabled, not prefetching", blockNumber);
LOG.debug("{}: Block {}: Preparing caching disabled, not prefetching", path, blockNumber);
data.setDone();
return;
}
LOG.debug("Block {}: awaiting any read to complete", blockNumber);
LOG.debug("{}: Block {}: awaiting any read to complete", path, blockNumber);

try {
// wait for data; state of caching may change during this period.
awaitFuture(blockFuture, TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (data.stateEqualsOneOf(BufferData.State.DONE)) {
// There was an error during prefetch.
LOG.debug("Block {}: prefetch failure", blockNumber);
LOG.debug("{}: Block {}: prefetch failure", path, blockNumber);
return;
}
} catch (IOException | TimeoutException e) {
LOG.info("Error fetching block: {}. {}", data, e.toString());
LOG.debug("Error fetching block: {}", data, e);
LOG.info("{}: Error fetching block: {}. {}", path, data, e.toString());
LOG.debug("{}: Error fetching block: {}", path, data, e);
data.setDone();
return;
}

if (isCachingDisabled()) {
// caching was disabled while waiting fro the read to complete.
LOG.debug("Block {}: caching disabled while reading data", blockNumber);
LOG.debug("{}: Block {}: caching disabled while reading data", path, blockNumber);
data.setDone();
return;
}
Expand All @@ -586,12 +593,12 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
synchronized (data) {
try {
if (data.stateEqualsOneOf(BufferData.State.DONE)) {
LOG.debug("Block {}: block no longer in use; not adding", blockNumber);
LOG.debug("{}: Block {}: block no longer in use; not adding", path, blockNumber);
return;
}

if (cache.containsBlock(blockNumber)) {
LOG.debug("Block {}: already in cache; not adding", blockNumber);
LOG.debug("{}: Block {}: already in cache; not adding", path, blockNumber);
data.setDone();
return;
}
Expand All @@ -603,8 +610,8 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
data.setDone();
} catch (Exception e) {
numCachingErrors.incrementAndGet();
LOG.info("error adding block to cache: {}. {}", data, e.getMessage());
LOG.debug("error adding block to cache: {}", data, e);
LOG.info("{}: error adding block to cache: {}. {}", path, data, e.getMessage());
LOG.debug("{}: error adding block to cache: {}", path, data, e);
data.setDone();
}

Expand All @@ -625,7 +632,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
if (isClosed()) {
return;
}
LOG.debug("Block {}: Caching", buffer);
LOG.debug("{}: Block {}: Caching", path, buffer);

cache.put(blockNumber, buffer, conf, localDirAllocator);
}
Expand Down Expand Up @@ -701,18 +708,13 @@ BufferData getData(int blockNumber) {

@Override
public String toString() {
StringBuilder sb = new StringBuilder();

sb.append("cache(");
sb.append(cache.toString());
sb.append("); ");

sb.append("pool: ");
sb.append(bufferPool.toString());

sb.append("; numReadErrors: ").append(numReadErrors.get());
sb.append("; numCachingErrors: ").append(numCachingErrors.get());
String sb = "path: " + path + "; "
+ "; cache(" + cache + "); "
+ "pool: " + bufferPool
+ "; numReadErrors: " + numReadErrors.get()
+ "; numCachingErrors: " + numCachingErrors.get();

return sb.toString();
return sb;
}
}
Loading

0 comments on commit 1381966

Please sign in to comment.