Skip to content

Commit

Permalink
HADOOP-18184. S3A prefetch unbuffer
Browse files Browse the repository at this point in the history
* Lots of statistic collection with use in tests.
* s3a prefetch tests all moved to prefetch. package
* and split into caching stream and large files tests
* large files and LRU are scale
* and testRandomReadLargeFile uses small block size to reduce read overhead
* new hadoop common org.apache.hadoop.test.Sizes sizes class with predefined
  sizes (from azure; not moved existing code to it yet)

Overall, the prefetch reads of the large files are slow; while it's critical
to test multi-block files, we don't need to work on the landsat csv file.

better: one of the huge tests uses it, with a small block size of 1 MB to
force lots of work.

Change-Id: Idff93810f32f6b42fe07733d2de60d180ea978e8
  • Loading branch information
steveloughran committed Jul 14, 2023
1 parent 9274168 commit f24f1fd
Show file tree
Hide file tree
Showing 16 changed files with 504 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ public CachingBlockManager(

this.ops = new BlockOperations();
this.ops.setDebug(LOG.isDebugEnabled());
this.conf = requireNonNull(conf);
this.localDirAllocator = localDirAllocator;
prefetchingStatistics.setPrefetchCachingState(true);
}

/**
Expand Down Expand Up @@ -364,6 +364,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
BlockOperations.Operation op = null;
DurationTracker tracker = null;

int bytesFetched = 0;
synchronized (data) {
try {
if (data.stateEqualsOneOf(BufferData.State.DONE, BufferData.State.READY)) {
Expand Down Expand Up @@ -398,6 +399,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
read(buffer, offset, size);
buffer.flip();
data.setReady(expectedState);
bytesFetched = size;
} catch (Exception e) {
if (isPrefetch && tracker != null) {
tracker.failed();
Expand All @@ -411,9 +413,8 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
ops.end(op);
}

if (isPrefetch) {
prefetchingStatistics.prefetchOperationCompleted();
}
// update the statistics
prefetchingStatistics.fetchOperationCompleted(isPrefetch, bytesFetched);
if (tracker != null) {
tracker.close();
LOG.debug("fetch completed: {}", tracker);
Expand Down Expand Up @@ -489,6 +490,7 @@ public Void get() {
*/
@Override
public void requestCaching(BufferData data) {
Validate.checkNotNull(data, "data");

final int blockNumber = data.getBlockNumber();
LOG.debug("Block {}: request caching of {}", blockNumber, data);
Expand All @@ -498,19 +500,17 @@ public void requestCaching(BufferData data) {
return;
}

Validate.checkNotNull(data, "data");

// Opportunistic check without locking.
if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
LOG.debug("Block {}: Block in wrong state to cache:",
LOG.debug("Block {}: Block in wrong state to cache: {}",
blockNumber, data.getState());
return;
}

synchronized (data) {
// Reconfirm state after locking.
if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) {
LOG.debug("Block {}: Block in wrong state to cache:",
LOG.debug("Block {}: Block in wrong state to cache: {}",
blockNumber, data.getState());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void blockRemovedFromFileCache() {
}

@Override
public void prefetchOperationCompleted() {
public void fetchOperationCompleted(final boolean prefetch, final long bytesFetched) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ default DurationTracker blockFetchOperationStarted() {
void blockRemovedFromFileCache();

/**
* A prefetch operation has completed.
* A fetch/prefetch operation has completed.
*
* @param prefetch true if this was a prefetch
* @param bytesFetched number of bytes fetch
*/
void prefetchOperationCompleted();
void fetchOperationCompleted(boolean prefetch, long bytesFetched);

/**
* An executor has been acquired, either for prefetching or caching.
Expand Down Expand Up @@ -92,4 +95,11 @@ default void setPrefetchCachingState(boolean cacheEnabled) {

}

/**
* Bytes read from buffer (rather than via any direct http request).
* @param bytes number of bytes read.
*/
default void bytesReadFromBuffer(long bytes) {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.test;

/**
* Sizes of data in KiB/MiB
*/
public class Sizes {

public static final int S_256 = 256;
public static final int S_512 = 512;
public static final int S_1K = 1024;
public static final int S_2K = 2 * S_1K;
public static final int S_4K = 4 * S_1K;
public static final int S_8K = 8 * S_1K;
public static final int S_10K = 10 * S_1K;
public static final int S_16K = 16 * S_1K;
public static final int S_32K = 32 * S_1K;
public static final int S_64K = 64 * S_1K;
public static final int S_128K = 128 * S_1K;
public static final int S_256K = 256 * S_1K;
public static final int S_512K = 512 * S_1K;
public static final int S_1M = S_1K * S_1K;
public static final int S_2M = 2 * S_1M;
public static final int S_4M = 4 * S_1M;
public static final int S_5M = 5 * S_1M;
public static final int S_8M = 8 * S_1M;
public static final int S_16M = 16 * S_1M;
public static final int S_10M = 10 * S_1M;
public static final int S_32M = 32 * S_1M;
public static final int S_64M = 64 * S_1M;
public static final double NANOSEC = 1.0e9;

}
Original file line number Diff line number Diff line change
Expand Up @@ -1412,10 +1412,25 @@ public void blockRemovedFromFileCache() {
}

@Override
public void prefetchOperationCompleted() {
incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
public void fetchOperationCompleted(final boolean prefetch, final long bytesFetched) {
if (prefetch) {
incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
}
if (bytesFetched > 0) {
totalBytesRead.addAndGet(bytesFetched);
}
}

/**
* {@inheritDoc}.
* If the byte counter is positive, increment bytesRead.
*/
@Override
public void bytesReadFromBuffer(long bytes) {
if (bytes > 0) {
bytesRead.addAndGet(bytes);
}
}
@Override
public void memoryAllocated(int size) {
incAllGauges(STREAM_READ_ACTIVE_MEMORY_IN_USE, size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_BLOCK_FETCH_OPERATIONS,
"Tracker/Gauge of active block fetches",
TYPE_DURATION),
STREAM_READ_BLOCK_ACQUIRE_AND_READ(
StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ,
"Time spent blocked waiting for blocks prefetch/read to complete",
TYPE_DURATION),

/* Stream read block prefetch/cache gauges */
/* As with all gauges; aggregation is of limited value */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public S3ACachingInputStream(
int fileSize = (int) s3Attributes.getLen();
LOG.debug("Created caching input stream for {} (size = {})", this.getName(),
fileSize);
demandCreateBlockManager();
streamStatistics.setPrefetchState(numBlocksToPrefetch > 0,
numBlocksToPrefetch,
context.getPrefetchBlockSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,13 @@ public boolean seekToNewSource(long targetPos) throws IOException {
public boolean markSupported() {
return false;
}

@Override
public void readFully(final long position,
final byte[] buffer,
final int offset,
final int length) throws IOException {
ensureStreamActive();
inputStream.readFully(position, buffer, offset, length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.FilePosition;
Expand All @@ -44,6 +45,7 @@
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import static java.util.Objects.requireNonNull;
Expand All @@ -52,7 +54,7 @@
* Provides an {@link InputStream} that allows reading from an S3 file.
*/
public abstract class S3ARemoteInputStream
extends InputStream
extends FSInputStream
implements CanSetReadahead, StreamCapabilities, IOStatisticsSource, CanUnbuffer {

private static final Logger LOG = LoggerFactory.getLogger(
Expand Down Expand Up @@ -117,6 +119,9 @@ public abstract class S3ARemoteInputStream

private final IOStatistics ioStatistics;

/** Aggregator used to aggregate per thread IOStatistics. */
private final IOStatisticsAggregator threadIOStatistics;

/**
* Initializes a new instance of the {@code S3ARemoteInputStream} class.
*
Expand Down Expand Up @@ -146,6 +151,7 @@ public S3ARemoteInputStream(
context.getChangeDetectionPolicy(),
this.streamStatistics.getChangeTrackerStatistics(),
s3Attributes);
this.threadIOStatistics = requireNonNull(context.getIOStatisticsAggregator());

setInputPolicy(context.getInputPolicy());
setReadahead(context.getReadahead());
Expand All @@ -163,10 +169,10 @@ protected void initializeUnderlyingResources() {
underlyingResourcesClosed.set(false);
long fileSize = s3Attributes.getLen();
int bufferSize = context.getPrefetchBlockSize();
this.remoteObject = getS3File();
this.reader = new S3ARemoteObjectReader(remoteObject);
this.blockData = new BlockData(fileSize, bufferSize);
this.fpos = new FilePosition(fileSize, bufferSize);
remoteObject = getS3File();
reader = new S3ARemoteObjectReader(remoteObject);
blockData = new BlockData(fileSize, bufferSize);
fpos = new FilePosition(fileSize, bufferSize);
}

/**
Expand Down Expand Up @@ -361,10 +367,24 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
numBytesRemaining -= bytesToRead;
numBytesRead += bytesToRead;
}

return numBytesRead;
}

/**
* Forward to superclass after updating the read fully IOStatistics
* {@inheritDoc}
*/
@Override
public void readFully(final long position,
final byte[] buffer,
final int offset,
final int length) throws IOException {
throwIfClosed();
validatePositionedReadArgs(position, buffer, offset, length);
streamStatistics.readFullyOperationStarted(position, length);
super.readFully(position, buffer, offset, length);
}

protected final S3ARemoteObject getRemoteObject() {
return remoteObject;
}
Expand Down Expand Up @@ -407,7 +427,7 @@ protected final S3AReadOpContext getContext() {

private void incrementBytesRead(int bytesRead) {
if (bytesRead > 0) {
streamStatistics.bytesRead(bytesRead);
streamStatistics.bytesReadFromBuffer(bytesRead);
if (getContext().getStats() != null) {
getContext().getStats().incrementBytesRead(bytesRead);
}
Expand Down Expand Up @@ -459,6 +479,7 @@ protected boolean closeStream(final boolean unbuffer) {
}
reader.close();
reader = null;
// trigger GC.
remoteObject = null;
fpos.invalidate();
return true;
Expand All @@ -476,12 +497,12 @@ public void close() throws IOException {
}
closeStream(false);

// TODO remoteObject.close();
remoteObject = null;
try {
client.close();
} finally {
streamStatistics.close();
// Collect ThreadLevel IOStats
threadIOStatistics.aggregate(streamStatistics.getIOStatistics());
}
client = null;
}
Expand Down Expand Up @@ -531,7 +552,7 @@ public void reset() {
}

@Override
public long skip(long n) {
throw new UnsupportedOperationException("skip not supported");
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public DurationTracker prefetchOperationStarted() {
}

@Override
public void prefetchOperationCompleted() {
public void fetchOperationCompleted(final boolean prefetch, final long bytesFetched) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
Expand All @@ -35,6 +36,7 @@
import java.io.IOException;

import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getInputStreamStatistics;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
Expand All @@ -50,13 +52,20 @@
* {@link org.apache.hadoop.fs.contract.s3a.ITestS3AContractUnbuffer} tests,
* these tests leverage the fact that isObjectStreamOpen exposes if the
* underlying stream has been closed or not.
* Disables prefetching as the behaviors are so different that the stats are
* not assertable.
*/
public class ITestS3AUnbuffer extends AbstractS3ATestBase {

public static final int FILE_LENGTH = 16;

private Path dest;

@Override
protected Configuration createConfiguration() {
return enablePrefetch(super.createConfiguration(), false);
}

@Override
public void setup() throws Exception {
super.setup();
Expand Down
Loading

0 comments on commit f24f1fd

Please sign in to comment.