Skip to content

Commit

Permalink
HADOOP-18184. S3A prefetch unbuffer
Browse files Browse the repository at this point in the history
* In memory unbuffer working, with tests
* prefetching worse than it was.

Change-Id: I33178b7f691a75f2f043c0d585e7d9fb08f53a4a
  • Loading branch information
steveloughran committed Jul 12, 2023
1 parent 8c8d6ed commit ffd3180
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public BlockData(long fileSize, int blockSize) {
? 1
: 0);
this.state = new State[this.numBlocks];
markBlocksAsNotReady();
}

/**
* Mark all the blocks as not ready.
*/
public void markBlocksAsNotReady() {
for (int b = 0; b < this.numBlocks; b++) {
setState(b, State.NOT_READY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public void cancelPrefetches(final CancelReason reason) {
for (BufferData data : bufferPool.getAll()) {
// We add blocks being prefetched to the local cache so that the prefetch is not wasted.
// this only done if the reason is random IO-related, not due to close/unbuffer
if (reason == CancelReason.RandomIO &&
if ( /* reason == CancelReason.RandomIO && */
data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) {
requestCaching(data);
}
Expand Down Expand Up @@ -445,7 +445,6 @@ public Void get() {
}
}


/**
* Required state of a block for it to be cacheable.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ public void close() throws IOException {
}

@Override
protected boolean closeStream() {
final boolean b = super.closeStream();
protected boolean closeStream(final boolean unbuffer) {
final boolean b = super.closeStream(unbuffer);
closeBlockManager();
return b;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public S3AInMemoryInputStream(
private void allocateBuffer() {
checkState(buffer == null, "buffer for {} already allocated", getName());
buffer = ByteBuffer.allocate(fileSize);
getS3AStreamStatistics().memoryAllocated(fileSize);
LOG.debug("Created in-memory input stream for {} (size = {})",
getName(), fileSize);
}
Expand All @@ -98,7 +99,10 @@ private void allocateBuffer() {
* Harmless to call on a released buffer.
*/
private void releaseBuffer() {
buffer = null;
if (buffer != null) {
getS3AStreamStatistics().memoryFreed(fileSize);
buffer = null;
}
}

/**
Expand Down Expand Up @@ -141,8 +145,8 @@ protected boolean ensureCurrentBuffer() throws IOException {
}

@Override
protected boolean closeStream() {
final boolean b = super.closeStream();
protected boolean closeStream(final boolean unbuffer) {
final boolean b = super.closeStream(unbuffer);
releaseBuffer();
return b;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,11 @@ public S3ARemoteInputStream(

setInputPolicy(context.getInputPolicy());
setReadahead(context.getReadahead());
long fileSize = s3Attributes.getLen();
int bufferSize = context.getPrefetchBlockSize();

this.blockData = new BlockData(fileSize, bufferSize);
initializeUnderlyingResources();

this.nextReadPos = 0;
}
/**
Expand Down Expand Up @@ -433,27 +435,28 @@ protected String getOffsetStr(long offset) {
return String.format("%d:%d", blockNumber, offset);
}


@Override
public synchronized void unbuffer() {
LOG.debug("{}: unbuffered", getName());
if (closeStream()) {
if (closeStream(true)) {
getS3AStreamStatistics().unbuffered();
}
}

/**
* Close the stream in close() or unbuffer().
* @param unbuffer is this an unbuffer operation?
* @return true if the stream was closed; false means it was already closed.
*/

protected boolean closeStream() {
protected boolean closeStream(final boolean unbuffer) {

if (underlyingResourcesClosed.getAndSet(true)) {
return false;
}

blockData = null;
if (unbuffer) {
blockData.markBlocksAsNotReady();
}
reader.close();
reader = null;
remoteObject = null;
Expand All @@ -471,7 +474,7 @@ public void close() throws IOException {
if (closed.getAndSet(true)) {
return;
}
closeStream();
closeStream(false);

// TODO remoteObject.close();
remoteObject = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,14 @@ public void testReadLargeFileFully() throws Throwable {
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
0);
}
LOG.info("Stream statistics for file {}\n{}", fileStatus,
ioStatisticsToPrettyString(in.getIOStatistics()));

printStreamStatistics(in);

// Assert that first block is read synchronously, following blocks are prefetched
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
numBlocks - 1);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);

in.unbuffer();
// Verify that once stream is closed, all memory is freed
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
Expand All @@ -163,6 +162,11 @@ public void testReadLargeFileFully() throws Throwable {
ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
}

private void printStreamStatistics(final FSDataInputStream in) {
LOG.info("Stream statistics for file {}\n{}", fileStatus,
ioStatisticsToPrettyString(in.getIOStatistics()));
}

@Test
public void testReadLargeFileFullyLazySeek() throws Throwable {
describe("read a large file using readFully(position,buffer,offset,length),"
Expand All @@ -184,6 +188,7 @@ public void testReadLargeFileFullyLazySeek() throws Throwable {
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
0);
}
printStreamStatistics(in);

// Assert that first block is read synchronously, following blocks are prefetched
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
Expand Down Expand Up @@ -253,15 +258,32 @@ public void testRandomReadSmallFile() throws Throwable {
in.read(buffer, 0, S_1K * 4);
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);
printStreamStatistics(in);

verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0);
// The buffer pool is not used
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, SMALL_FILE_SIZE);
// no prefetch ops, so no action_executor_acquired
assertThatStatisticMaximum(ioStats,
ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);

// now read offset 0 again and again, expect no new costs
in.readFully(0, buffer);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);

// unbuffer
in.unbuffer();
LOG.info("unbuffered {}", in);
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);


printStreamStatistics(in);

in.readFully(0, buffer);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 2);
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, SMALL_FILE_SIZE);

}
}

Expand Down

0 comments on commit ffd3180

Please sign in to comment.