From 9b942c71ecede4a410af646fa07aaa8d35f21bf0 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 12 Jul 2024 19:30:44 +0100 Subject: [PATCH] HADOOP-19221. Test wiring up * failing byte buffer test good * added byte array test * a bit of complexity wiring up the file datablock to avoid creating a mock s3afs. Change-Id: Ife675015c245b63e5fe027e162c016f3f739d9a9 --- .../apache/hadoop/fs/s3a/S3ADataBlocks.java | 29 ++++++++++++++++--- .../apache/hadoop/fs/s3a/TestDataBlocks.java | 29 ++++++++++++------- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index 49f6b7ba4b8fc..15f879378c281 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.s3a.impl.UploadContentProviders; import org.apache.hadoop.fs.store.DataBlocks; import org.apache.hadoop.util.Preconditions; @@ -37,6 +38,8 @@ import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.util.DirectBufferPool; +import org.apache.hadoop.util.functional.BiFunctionRaisingIOE; +import org.apache.hadoop.util.functional.CallableRaisingIOE; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*; @@ -659,8 +662,28 @@ public String toString() { */ static class DiskBlockFactory extends BlockFactory { + /** + * Function to create a temp file. + */ + private final BiFunctionRaisingIOE tempFileFn; + DiskBlockFactory(S3AFileSystem owner) { super(owner); + tempFileFn = (index, limit) -> + owner.createTmpFileForWrite( + String.format("s3ablock-%04d-", index), + limit, + getOwner().getConf()); + } + + /** + * Constructor for testing. + * @param tempFileFn function to create a temp file + */ + @VisibleForTesting + DiskBlockFactory(BiFunctionRaisingIOE tempFileFn) { + super(null); + this.tempFileFn = requireNonNull(tempFileFn); } /** @@ -679,9 +702,7 @@ DataBlock create(long index, throws IOException { Preconditions.checkArgument(limit != 0, "Invalid block size: %d", limit); - File destFile = getOwner() - .createTmpFileForWrite(String.format("s3ablock-%04d-", index), - limit, getOwner().getConf()); + File destFile = tempFileFn.apply(index, limit); return new DiskBlock(destFile, limit, index, statistics); } } @@ -705,7 +726,7 @@ static class DiskBlock extends DataBlock { throws FileNotFoundException { super(index, statistics); this.limit = limit; - this.bufferFile = bufferFile; + this.bufferFile = requireNonNull(bufferFile); blockAllocated(); out = new BufferedOutputStream(new FileOutputStream(bufferFile)); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java index cdd3ec324d92a..b75758ab963f3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java @@ -24,7 +24,9 @@ import java.util.Optional; import org.assertj.core.api.Assertions; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -54,6 +56,9 @@ public static Collection params() { }); } + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + /** * Buffer type. */ @@ -69,8 +74,10 @@ public TestDataBlocks(final String bufferType) { */ private S3ADataBlocks.BlockFactory createFactory() { switch (bufferType) { + // this one passed in a file allocation function case FAST_UPLOAD_BUFFER_DISK: - return new S3ADataBlocks.DiskBlockFactory(null); + return new S3ADataBlocks.DiskBlockFactory((i, l) -> + tempDir.newFile("file" + i)); case FAST_UPLOAD_BUFFER_ARRAY: return new S3ADataBlocks.ArrayBlockFactory(null); case FAST_UPLOAD_BYTEBUFFER: @@ -174,6 +181,14 @@ public void testBlockFactoryIO() throws Throwable { // now ask the content provider for another content stream. final InputStream stream2 = cp.newStream(); + assertStreamCreationCount(cp, 2); + + // this must close the old stream + bbStream.ifPresent(bb -> { + Assertions.assertThat(bb.isOpen()) + .describedAs("stream %s is open", bb) + .isFalse(); + }); // do a read(byte[]) of everything byte[] readBuffer = new byte[bufferLen]; @@ -184,15 +199,6 @@ public void testBlockFactoryIO() throws Throwable { .describedAs("data read into buffer") .isEqualTo(buffer); - // this must close the old stream - bbStream.ifPresent(bb -> { - Assertions.assertThat(bb.isOpen()) - .describedAs("stream %s is open", bb) - .isFalse(); - }); - - assertStreamCreationCount(cp, 2); - // when the block is closed, the buffer must be returned // to the pool. block.close(); @@ -203,7 +209,8 @@ public void testBlockFactoryIO() throws Throwable { } - private static void assertStreamCreationCount(final UploadContentProviders.BaseContentProvider cp, + private static void assertStreamCreationCount( + final UploadContentProviders.BaseContentProvider cp, final int count) { Assertions.assertThat(cp.getStreamCreationCount()) .describedAs("stream creation count of %s", cp)