From 6bb6bd769809cac2da2fa3e04cdead9f9782e69c Mon Sep 17 00:00:00 2001 From: Anuj Modi Date: Tue, 21 Nov 2023 23:11:07 -0800 Subject: [PATCH] Enhanced Test Coverage For Footer Related Configs --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 7 ++- .../ITestAbfsInputStreamReadFooter.java | 49 ++++++++++++++----- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index b998d4f0b8163..a03877668ff73 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -147,8 +147,10 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE; /** * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage. @@ -811,6 +813,9 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( boolean bufferedPreadDisabled = options .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false)) .orElse(false); + int footerReadBufferSize = options.map(c -> c.getInt( + AZURE_FOOTER_READ_BUFFER_SIZE, DEFAULT_FOOTER_READ_BUFFER_SIZE)) + .orElse(abfsConfiguration.getFooterReadBufferSize()); return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) @@ -818,7 +823,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( .isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled()) .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely()) .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead()) - .withFooterReadBufferSize(abfsConfiguration.getFooterReadBufferSize()) + .withFooterReadBufferSize(footerReadBufferSize) .withReadAheadRange(abfsConfiguration.getReadAheadRange()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withShouldReadBufferSizeAlways( diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java index 923eb9e2acec8..29ebcaa1629a6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java @@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; + +import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.fs.FSDataInputStream; @@ -72,7 +74,7 @@ private void testNumBackendCalls(boolean optimizeFooterRead) int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead, - fileSize, footerReadBufferSize); + fileSize); String fileName = methodName.getMethodName() + i; byte[] fileContent = getRandomBytesArray(fileSize); Path testFilePath = createFileWithContent(fs, fileName, fileContent); @@ -81,6 +83,10 @@ private void testNumBackendCalls(boolean optimizeFooterRead) builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE, footerReadBufferSize); try (FSDataInputStream iStream = builder.build().get()) { + AbfsInputStream abfsInputStream = (AbfsInputStream)iStream.getWrappedStream(); + Assertions.assertThat(abfsInputStream.getFooterReadBufferSize()) + .describedAs("Footer Read Buffer Size Should be same as what set in builder") + .isEqualTo(footerReadBufferSize); byte[] buffer = new byte[length]; Map metricMap = getInstrumentationMap(fs); @@ -164,7 +170,8 @@ public void testSeekToEndAndReadWithConfFalse() throws Exception { private void testSeekAndReadWithConf(boolean optimizeFooterRead, SeekTo seekTo) throws Exception { - // Running the test for file sizes ranging from 256 KB to 8 MB + // Running the test for file sizes ranging from 256 KB to 4 MB with + // Footer Read Buffer size ranging from 256 KB to 1 MB // This will cover files less than footer read buffer size, // Files between footer read buffer and read buffer size // Files bigger than read buffer size @@ -173,12 +180,12 @@ private void testSeekAndReadWithConf(boolean optimizeFooterRead, int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead, - fileSize, footerReadBufferSize); + fileSize); String fileName = methodName.getMethodName() + i; byte[] fileContent = getRandomBytesArray(fileSize); Path testFilePath = createFileWithContent(fs, fileName, fileContent); seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED, - fileContent); + fileContent, footerReadBufferSize); } } } @@ -201,16 +208,18 @@ private int seekPos(SeekTo seekTo, int fileSize) { } private void seekReadAndTest(final FileSystem fs, final Path testFilePath, - final int seekPos, final int length, final byte[] fileContent) + final int seekPos, final int length, final byte[] fileContent, int footerReadBufferSize) throws Exception { AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration(); long actualContentLength = fileContent.length; FutureDataInputStreamBuilder builder = fs.openFile(testFilePath); - builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE, DEFAULT_FOOTER_READ_BUFFER_SIZE); + builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE, footerReadBufferSize); try (FSDataInputStream iStream = builder.build().get()) { AbfsInputStream abfsInputStream = (AbfsInputStream) iStream .getWrappedStream(); - long footerReadBufferSize = abfsInputStream.getFooterReadBufferSize(); + Assertions.assertThat(abfsInputStream.getFooterReadBufferSize()) + .describedAs("Footer Read Buffer Size Should be same as what set in builder") + .isEqualTo(footerReadBufferSize); long readBufferSize = abfsInputStream.getBufferSize(); seek(iStream, seekPos); byte[] buffer = new byte[length]; @@ -281,19 +290,22 @@ public void testPartialReadWithNoData() Path testFilePath = createFileWithContent(fs, fileName, fileContent); testPartialReadWithNoData(fs, testFilePath, fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, - fileContent); + fileContent, footerReadBufferSize); } } } private void testPartialReadWithNoData(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, - final byte[] fileContent) + final byte[] fileContent, int footerReadBufferSize) throws IOException, NoSuchFieldException, IllegalAccessException { FSDataInputStream iStream = fs.open(testFilePath); try { AbfsInputStream abfsInputStream = (AbfsInputStream) iStream .getWrappedStream(); + Assertions.assertThat(abfsInputStream.getFooterReadBufferSize()) + .describedAs("Footer Read Buffer Size Should be same as what set in builder") + .isEqualTo(footerReadBufferSize); abfsInputStream = spy(abfsInputStream); doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream) .readRemote(anyLong(), any(), anyInt(), anyInt(), @@ -328,19 +340,23 @@ public void testPartialReadWithSomeData() Path testFilePath = createFileWithContent(fs, fileName, fileContent); testPartialReadWithSomeData(fs, testFilePath, fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, - fileContent); + fileContent, footerReadBufferSize); } } } private void testPartialReadWithSomeData(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, - final byte[] fileContent) + final byte[] fileContent, final int footerReadBufferSize) throws IOException, NoSuchFieldException, IllegalAccessException { FSDataInputStream iStream = fs.open(testFilePath); try { AbfsInputStream abfsInputStream = (AbfsInputStream) iStream .getWrappedStream(); + Assertions.assertThat(abfsInputStream.getFooterReadBufferSize()) + .describedAs("Footer Read Buffer Size Should be same as what set in builder") + .isEqualTo(footerReadBufferSize); + abfsInputStream = spy(abfsInputStream); // first readRemote, will return first 10 bytes // second readRemote returns data till the last 2 bytes @@ -371,6 +387,17 @@ private void testPartialReadWithSomeData(final FileSystem fs, } } + private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead, + int fileSize) throws IOException { + final AzureBlobFileSystem fs = getFileSystem(); + AzureBlobFileSystemStore store = getAbfsStore(fs); + store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead); + if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) { + store.getAbfsConfiguration().setReadSmallFilesCompletely(false); + } + return fs; + } + private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead, int fileSize, int footerReadBufferSize) throws IOException { final AzureBlobFileSystem fs = getFileSystem();