Skip to content

Commit

Permalink
HADOOP-18184. yetus and code reviews.
Browse files Browse the repository at this point in the history
 ITestS3APrefetchingCacheFiles.testCacheFileExistence:
 135->assertCacheFileExists:151
  [No cache files found under
   /var/folders/4n/w4cjr_d95kg9bxkl6sz3n3ym0000gr/
    T/ITestS3APrefetchingCacheFiles2189128656118567478]

Change-Id: I31477ded465d5e48f33876f793b7f50421698c11
  • Loading branch information
steveloughran committed Aug 1, 2023
1 parent 2a52912 commit c0e4f1c
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public int getPrefetchBlockCount() {

/**
* Where does the read start from, if known.
* @return split start.
*/
public Optional<Long> getSplitStart() {
return splitStart;
Expand Down Expand Up @@ -325,6 +326,7 @@ public S3AReadOpContext withSplitEnd(final Optional<Long> value) {

/**
* What is the split end, if known?
* @return split end.
*/
public Optional<Long> getSplitEnd() {
return splitEnd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public OpenFileInformation prepareToOpenFile(
}
FSBuilderSupport builderSupport = new FSBuilderSupport(options);
// determine start and end of file.
Optional<Long> splitStart = getOptionalLong(options,FS_OPTION_OPENFILE_SPLIT_START);
Optional<Long> splitStart = getOptionalLong(options, FS_OPTION_OPENFILE_SPLIT_START);

// split end
Optional<Long> splitEnd = getOptionalLong(options, FS_OPTION_OPENFILE_SPLIT_END);
Expand Down Expand Up @@ -466,13 +466,15 @@ public int getBufferSize() {

/**
* Where does the read start from, if known.
* @return split start.
*/
public Optional<Long> getSplitStart() {
return splitStart;
}

/**
* What is the split end, if known?
* @return split end.
*/
public Optional<Long> getSplitEnd() {
return splitEnd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,14 @@ protected BlockManager createBlockManager(
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize,
Configuration conf,
LocalDirAllocator localDirAllocator) {
Configuration configuration,
LocalDirAllocator dirAllocator) {
return new S3ACachingBlockManager(futurePool,
reader,
blockData,
bufferPoolSize,
getS3AStreamStatistics(),
conf,
localDirAllocator);
configuration,
dirAllocator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public S3AInMemoryInputStream(
checkArgument(len < Integer.MAX_VALUE && len >= 0,
"Unsupported file size: %s", len);
fileSize = (int) len;
LOG.debug("Created in memory input stream for {} (size = {})", this.getName(),
fileSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public int read(byte[] buffer, int offset, int len) throws IOException {
}

/**
* Forward to superclass after updating the read fully IOStatistics
* Forward to superclass after updating the {@code readFully()} IOStatistics.
* {@inheritDoc}
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;

import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -634,11 +635,10 @@ public static Configuration enablePrefetch(final Configuration conf, boolean pre
* The prefetch parameters to expose in a parameterized
* test to turn prefetching on/off.
*/
public static Collection<Object[]> PREFETCH_OPTIONS =
Arrays.asList(new Object[][] {
{true},
{false},
});
public static final Collection<Object[]> PREFETCH_OPTIONS =
Collections.unmodifiableList(Arrays.asList(new Object[][] {
{true}, {false}
}));

/**
* build dir.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,23 @@ public void testRandomReadSmallFile() throws Throwable {
Path smallFile = path("randomReadSmallFile");
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);

int readBytes = 0;
int expectedReadBytes = 0;
try (FSDataInputStream in = getFileSystem().open(smallFile)) {
IOStatistics ioStats = in.getIOStatistics();

byte[] buffer = new byte[SMALL_FILE_SIZE];

in.read(buffer, 0, S_4K);
expectedReadBytes += S_4K;
verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
readBytes = readBytes + S_4K);
expectedReadBytes);

in.seek(S_1K * 12);
in.read(buffer, 0, S_4K);
expectedReadBytes += S_4K;

verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
readBytes = readBytes + S_4K);
expectedReadBytes);
printStreamStatistics(in);

verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
Expand All @@ -130,8 +133,10 @@ public void testRandomReadSmallFile() throws Throwable {
// now read offset 0 again and again, expect no new costs
in.readFully(0, buffer);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
expectedReadBytes += buffer.length;

verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
readBytes = readBytes + buffer.length);
expectedReadBytes);
// unbuffer
in.unbuffer();
LOG.info("unbuffered {}", in);
Expand All @@ -142,8 +147,10 @@ public void testRandomReadSmallFile() throws Throwable {
in.readFully(0, buffer);
verifyStatisticCounterValue(ioStats, STREAM_READ_FULLY_OPERATIONS, 2);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 2);
expectedReadBytes += buffer.length;

verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
readBytes = readBytes + buffer.length);
expectedReadBytes);
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, SMALL_FILE_SIZE);

}
Expand All @@ -153,7 +160,7 @@ public void testRandomReadSmallFile() throws Throwable {
verifyStatisticCounterValue(threadIOStats,
ACTION_HTTP_GET_REQUEST, 2);
verifyStatisticGaugeValue(threadIOStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
verifyStatisticCounterValue(threadIOStats, STREAM_READ_BYTES, readBytes);
verifyStatisticCounterValue(threadIOStats, STREAM_READ_BYTES, expectedReadBytes);
}

@Test
Expand All @@ -165,49 +172,52 @@ public void testStatusProbesAfterClosingStream() throws Throwable {
Path smallFile = methodPath();
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);

FSDataInputStream in = getFileSystem().open(smallFile);

byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_4K);
in.seek(S_1K * 12);
in.read(buffer, 0, S_4K);
try (FSDataInputStream in = getFileSystem().open(smallFile)) {
byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_4K);
in.seek(S_1K * 12);
in.read(buffer, 0, S_4K);

long pos = in.getPos();
IOStatistics ioStats = in.getIOStatistics();
S3AInputStreamStatistics inputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();

long pos = in.getPos();
IOStatistics ioStats = in.getIOStatistics();
S3AInputStreamStatistics inputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
assertNotNull("Prefetching input IO stats should not be null", ioStats);
assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
pos);

assertNotNull("Prefetching input IO stats should not be null", ioStats);
assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
pos);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);

verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);

// close the stream and still use it.
in.close();

in.close();
// status probes after closing the input stream
long newPos = in.getPos();
IOStatistics newIoStats = in.getIOStatistics();
S3AInputStreamStatistics newInputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();

// status probes after closing the input stream
long newPos = in.getPos();
IOStatistics newIoStats = in.getIOStatistics();
S3AInputStreamStatistics newInputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
assertNotNull("Prefetching input IO stats should not be null", newIoStats);
assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
newPos);

assertNotNull("Prefetching input IO stats should not be null", newIoStats);
assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
newPos);
// compare status probes after closing of the stream with status probes done before
// closing the stream
assertEquals("Position retrieved through stream before and after closing should match", pos,
newPos);
assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
newIoStats);
assertEquals("Stream stats retrieved through stream before and after closing should match",
inputStreamStatistics, newInputStreamStatistics);

// compare status probes after closing of the stream with status probes done before
// closing the stream
assertEquals("Position retrieved through stream before and after closing should match", pos,
newPos);
assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
newIoStats);
assertEquals("Stream stats retrieved through stream before and after closing should match",
inputStreamStatistics, newInputStreamStatistics);
assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));

assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;

import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
import static org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
Expand All @@ -58,7 +60,7 @@ public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
private static final int BLOCK_SIZE = S_1K * 10;

private Path testFile;
private FileSystem testFileSystem;
private S3AFileSystem testFileSystem;
private int prefetchBlockSize;
private Configuration conf;

Expand All @@ -77,11 +79,13 @@ public void setUp() throws Exception {
tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", "");
tmpFileDir.delete();
tmpFileDir.mkdirs();

conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath());
String testFileUri = S3ATestUtils.getCSVTestFile(conf);

testFile = new Path(testFileUri);
testFileSystem = FileSystem.get(testFile.toUri(), conf);
testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);

prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile);
Assumptions.assumeThat(testFileStatus.getLen())
Expand All @@ -98,6 +102,7 @@ public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
enablePrefetch(conf, true);
disableFilesystemCaching(conf);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
return conf;
}
Expand Down Expand Up @@ -126,7 +131,7 @@ public void testCacheFileExistence() throws Throwable {
describe("Verify that FS cache files exist on local FS");
skipIfClientSideEncryption();

try (FSDataInputStream in = fs.open(testFile)) {
try (FSDataInputStream in = testFileSystem.open(testFile)) {
byte[] buffer = new byte[prefetchBlockSize];

in.read(buffer, 0, prefetchBlockSize - 10240);
Expand All @@ -143,10 +148,10 @@ private void assertCacheFileExists() throws IOException {
Assertions.assertThat(tmpFileDir.isDirectory())
.describedAs("The dir to keep cache files must exist %s", tmpFileDir);
File[] tmpFiles = tmpFileDir.listFiles();
boolean isCacheFileForBlockFound = tmpFiles != null && tmpFiles.length > 0;
Assertions.assertThat(isCacheFileForBlockFound)
Assertions.assertThat(tmpFiles)
.describedAs("No cache files found under " + tmpFileDir)
.isTrue();
.isNotNull()
.hasSizeGreaterThanOrEqualTo( 1);

for (File tmpFile : tmpFiles) {
Path path = new Path(tmpFile.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,10 @@ protected S3ACachingBlockManager createBlockManager(
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize,
Configuration conf,
Configuration configuration,
LocalDirAllocator localDirAllocator) {
return new FakeS3ACachingBlockManager(futurePool, reader, blockData,
bufferPoolSize, conf, localDirAllocator);
bufferPoolSize, configuration, localDirAllocator);
}
}
}

0 comments on commit c0e4f1c

Please sign in to comment.