Skip to content

Commit

Permalink
HADOOP-18184 unbuffer; getting tests to work
Browse files Browse the repository at this point in the history
Change-Id: I3a9513f39595c8fa8d7aa282ef647b0fcc8b7ef9
  • Loading branch information
steveloughran committed Jul 12, 2023
1 parent 65fd9ba commit 7d6ae13
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,13 @@ public void requestPrefetch(int blockNumber) {
}

// We initiate a prefetch only if we can acquire a buffer from the shared pool.
LOG.debug("Requesting prefetch for block {}", blockNumber);
BufferData data = bufferPool.tryAcquire(blockNumber);
if (data == null) {
LOG.debug("no buffer acquired for block {}", blockNumber);
return;
}
LOG.debug("acquired {}", data);

// Opportunistic check without locking.
if (!data.stateEqualsOneOf(BufferData.State.BLANK)) {
Expand Down Expand Up @@ -373,6 +376,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
tracker = prefetchingStatistics.prefetchOperationStarted();
op = ops.prefetch(data.getBlockNumber());
} else {
tracker = prefetchingStatistics.blockFetchOperationStarted();
op = ops.getRead(data.getBlockNumber());
}

Expand All @@ -398,9 +402,10 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...

if (isPrefetch) {
prefetchingStatistics.prefetchOperationCompleted();
if (tracker != null) {
tracker.close();
}
}
if (tracker != null) {
tracker.close();
LOG.debug("fetch completed: {}", tracker);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

public interface PrefetchingStatistics extends IOStatisticsSource {
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;

public interface PrefetchingStatistics extends IOStatisticsSource {

/**
* A prefetch operation has started.
* @return duration tracker
*/
DurationTracker prefetchOperationStarted();

/**
* A block fetch operation has started.
* @return duration tracker
*/
default DurationTracker blockFetchOperationStarted() {
return stubDurationTracker();
}

/**
* A block has been saved to the file cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.function.Function;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

import org.apache.hadoop.fs.StorageStatistics;
Expand All @@ -51,6 +54,12 @@
* Support for implementing IOStatistics interfaces.
*/
public final class IOStatisticsBinding {
/**
* Log changes at debug.
* Noisy, but occasionally useful.
*/
private static final Logger LOG =
LoggerFactory.getLogger(IOStatisticsBinding.class);

/** Pattern used for each entry. */
public static final String ENTRY_PATTERN = "(%s=%s)";
Expand Down Expand Up @@ -548,13 +557,15 @@ public static <B> B invokeTrackingDuration(
} catch (IOException | RuntimeException e) {
// input function failed: note it
tracker.failed();
LOG.debug("Operation failure with duration {}", tracker);
// and rethrow
throw e;
} finally {
// update the tracker.
// this is called after the catch() call would have
// set the failed flag.
tracker.close();
LOG.debug("Operation success with duration {}", tracker);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public S3ACachingInputStream(
int fileSize = (int) s3Attributes.getLen();
LOG.debug("Created caching input stream for {} (size = {})", this.getName(),
fileSize);
demandCreateBlockManager();
}

/**
Expand All @@ -108,11 +109,7 @@ public void close() throws IOException {
// Close the BlockManager first, cancelling active prefetches,
// deleting cached files and freeing memory used by buffer pool.
if (!isClosed()) {

if (blockManager != null) {
blockManager.close();
}

closeBlockManager();
super.close();
LOG.info("closed: {}", getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,11 @@ public InputStream openForRead(long offset, int size) throws IOException {
changeTracker.maybeApplyConstraint(request);

String operation = String.format(
"%s %s at %d", S3AInputStream.OPERATION_OPEN, uri, offset);
"%s %s at %d size %d", S3AInputStream.OPERATION_OPEN, uri, offset, size);
DurationTracker tracker = streamStatistics.initiateGetRequest();
S3Object object = null;

LOG.debug("{}", operation);
try {
object = Invoker.once(operation, uri, () -> client.getObject(request));
} catch (IOException e) {
Expand All @@ -200,6 +201,7 @@ public InputStream openForRead(long offset, int size) throws IOException {
} finally {
tracker.close();
}
LOG.debug("{} result duration {}", operation, tracker);

changeTracker.processResponse(object, operation, offset);
InputStream stream = object.getObjectContent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
this.streamStatistics.readOperationStarted(offset, size);
Invoker invoker = this.remoteObject.getReadInvoker();

final String path = this.remoteObject.getPath();
int invokerResponse =
invoker.retry("read", this.remoteObject.getPath(), true,
invoker.retry(String.format("read %s [%d-%d]", path, offset, size),
path, true,
trackDurationOfOperation(streamStatistics,
STREAM_READ_REMOTE_BLOCK_READ, () -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MockS3ARemoteObject extends S3ARemoteObject {

MockS3ARemoteObject(int size, boolean throwExceptionOnOpen) {
super(
S3APrefetchFakes.createReadContext(null, KEY, size, 1, 1),
S3APrefetchFakes.createReadContext(null, "s3a://" + BUCKET + "/" + KEY, size, 1, 1),
S3APrefetchFakes.createObjectAttributes(BUCKET, KEY, size),
S3APrefetchFakes.createInputStreamCallbacks(BUCKET, KEY),
EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,10 @@ private void waitForCaching(
numTrys++;
if (numTrys > 600) {
String message = String.format(
"waitForCaching: expected: %d, actual: %d, read errors: %d, caching errors: %d",
"waitForCaching: expected: %d, actual: %d, read errors: %d, caching errors: %d in %s",
expectedCount, count, blockManager.numReadErrors(),
blockManager.numCachingErrors());
blockManager.numCachingErrors(),
blockManager);
throw new IllegalStateException(message);
}
}
Expand Down

0 comments on commit 7d6ae13

Please sign in to comment.