Skip to content

Commit

Permalink
HADOOP-19221. Test wiring up
Browse files Browse the repository at this point in the history
* failing byte buffer test good
* added byte array test
* a bit of complexity wiring up the file datablock to avoid
  creating a mock s3afs.

Change-Id: Ife675015c245b63e5fe027e162c016f3f739d9a9
  • Loading branch information
steveloughran committed Jul 12, 2024
1 parent cd82520 commit 9b942c7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.util.Preconditions;
Expand All @@ -37,6 +38,8 @@

import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.util.functional.BiFunctionRaisingIOE;
import org.apache.hadoop.util.functional.CallableRaisingIOE;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
Expand Down Expand Up @@ -659,8 +662,28 @@ public String toString() {
*/
static class DiskBlockFactory extends BlockFactory {

/**
* Function to create a temp file.
*/
private final BiFunctionRaisingIOE<Long, Long, File> tempFileFn;

DiskBlockFactory(S3AFileSystem owner) {
super(owner);
tempFileFn = (index, limit) ->
owner.createTmpFileForWrite(
String.format("s3ablock-%04d-", index),
limit,
getOwner().getConf());
}

/**
* Constructor for testing.
* @param tempFileFn function to create a temp file
*/
@VisibleForTesting
DiskBlockFactory(BiFunctionRaisingIOE<Long, Long, File> tempFileFn) {
super(null);
this.tempFileFn = requireNonNull(tempFileFn);
}

/**
Expand All @@ -679,9 +702,7 @@ DataBlock create(long index,
throws IOException {
Preconditions.checkArgument(limit != 0,
"Invalid block size: %d", limit);
File destFile = getOwner()
.createTmpFileForWrite(String.format("s3ablock-%04d-", index),
limit, getOwner().getConf());
File destFile = tempFileFn.apply(index, limit);
return new DiskBlock(destFile, limit, index, statistics);
}
}
Expand All @@ -705,7 +726,7 @@ static class DiskBlock extends DataBlock {
throws FileNotFoundException {
super(index, statistics);
this.limit = limit;
this.bufferFile = bufferFile;
this.bufferFile = requireNonNull(bufferFile);
blockAllocated();
out = new BufferedOutputStream(new FileOutputStream(bufferFile));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Optional;

import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

Expand Down Expand Up @@ -54,6 +56,9 @@ public static Collection<Object[]> params() {
});
}

@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();

/**
* Buffer type.
*/
Expand All @@ -69,8 +74,10 @@ public TestDataBlocks(final String bufferType) {
*/
private S3ADataBlocks.BlockFactory createFactory() {
switch (bufferType) {
// this one passed in a file allocation function
case FAST_UPLOAD_BUFFER_DISK:
return new S3ADataBlocks.DiskBlockFactory(null);
return new S3ADataBlocks.DiskBlockFactory((i, l) ->
tempDir.newFile("file" + i));
case FAST_UPLOAD_BUFFER_ARRAY:
return new S3ADataBlocks.ArrayBlockFactory(null);
case FAST_UPLOAD_BYTEBUFFER:
Expand Down Expand Up @@ -174,6 +181,14 @@ public void testBlockFactoryIO() throws Throwable {

// now ask the content provider for another content stream.
final InputStream stream2 = cp.newStream();
assertStreamCreationCount(cp, 2);

// this must close the old stream
bbStream.ifPresent(bb -> {
Assertions.assertThat(bb.isOpen())
.describedAs("stream %s is open", bb)
.isFalse();
});

// do a read(byte[]) of everything
byte[] readBuffer = new byte[bufferLen];
Expand All @@ -184,15 +199,6 @@ public void testBlockFactoryIO() throws Throwable {
.describedAs("data read into buffer")
.isEqualTo(buffer);

// this must close the old stream
bbStream.ifPresent(bb -> {
Assertions.assertThat(bb.isOpen())
.describedAs("stream %s is open", bb)
.isFalse();
});

assertStreamCreationCount(cp, 2);

// when the block is closed, the buffer must be returned
// to the pool.
block.close();
Expand All @@ -203,7 +209,8 @@ public void testBlockFactoryIO() throws Throwable {

}

private static void assertStreamCreationCount(final UploadContentProviders.BaseContentProvider<?> cp,
private static void assertStreamCreationCount(
final UploadContentProviders.BaseContentProvider<?> cp,
final int count) {
Assertions.assertThat(cp.getStreamCreationCount())
.describedAs("stream creation count of %s", cp)
Expand Down

0 comments on commit 9b942c7

Please sign in to comment.