From c0c131932b9a06b4ccbaeb19a8e23da87f383065 Mon Sep 17 00:00:00 2001 From: Anuj Modi Date: Tue, 21 Nov 2023 02:36:58 -0800 Subject: [PATCH] Addressing Comments --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 5 + .../azurebfs/constants/ConfigurationKeys.java | 10 +- .../fs/azurebfs/services/AbfsInputStream.java | 2 +- .../ITestAbfsInputStreamReadFooter.java | 162 ++++++++++-------- 4 files changed, 105 insertions(+), 74 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 5bd935e516b54..8989f99db8da3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -1147,6 +1147,11 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) { this.optimizeFooterRead = optimizeFooterRead; } + @VisibleForTesting + public void setFooterReadBufferSize(int footerReadBufferSize) { + this.footerReadBufferSize = footerReadBufferSize; + } + @VisibleForTesting public void setEnableAbfsListIterator(boolean enableAbfsListIterator) { this.enableAbfsListIterator = enableAbfsListIterator; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index bd66d7266a30d..2d0738dc7e023 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -105,15 +105,17 @@ public final class ConfigurationKeys { public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size"; public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely"; /** - * When parquet files are read, first few read are metadata reads before reading the actual data. - * First the read is done of last 8 bytes of parquet file to get the postion of metadta and next read - * is done for reading that metadata. With this optimizations these two reads can be combined into 1. + * When parquet files are read, first few read are metadata reads before + * reading the actual data. First the read is done of last 8 bytes of parquet + * file to get the postion of metadta and next read is done for reading that + * metadata. With this optimization these two reads can be combined into 1. * Value: {@value} */ public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread"; /** * In case of footer reads it was not required to read full buffer size. - * Most of the metadata information required was within 256KB and it will be more performant to read lesser. + * Most of the metadata information required was within 256 KB and it will be + * more performant to read less. 512 KB is a sweet spot. * This config is used to define how much footer length the user wants to read. * Value: {@value} */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index dcdf8d7ea17d6..589dba9db563e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -360,7 +360,7 @@ private int readFileCompletely(final byte[] b, final int off, final int len) return optimisedRead(b, off, len, 0, contentLength); } - // To do footer read of files when enabled + // To do footer read of files when enabled. private int readLastBlock(final byte[] b, final int off, final int len) throws IOException { if (len == 0) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java index fbf469bbcf7d9..923eb9e2acec8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.util.Map; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.junit.Test; import org.apache.hadoop.fs.FSDataInputStream; @@ -33,6 +36,7 @@ import static java.lang.Math.max; import static java.lang.Math.min; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -63,38 +67,46 @@ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse() private void testNumBackendCalls(boolean optimizeFooterRead) throws Exception { - for (int i = 0; i <= 5; i++) { - int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; - final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead, - fileSize); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - int length = AbfsInputStream.FOOTER_SIZE; - try (FSDataInputStream iStream = fs.open(testFilePath)) { - byte[] buffer = new byte[length]; - - Map metricMap = getInstrumentationMap(fs); - long requestsMadeBeforeTest = metricMap - .get(CONNECTIONS_MADE.getStatName()); - - iStream.seek(fileSize - 8); - iStream.read(buffer, 0, length); - - iStream.seek(fileSize - (TEN * ONE_KB)); - iStream.read(buffer, 0, length); - - iStream.seek(fileSize - (TWENTY * ONE_KB)); - iStream.read(buffer, 0, length); - - metricMap = getInstrumentationMap(fs); - long requestsMadeAfterTest = metricMap - .get(CONNECTIONS_MADE.getStatName()); - - if (optimizeFooterRead) { - assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest); - } else { - assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest); + for (int i = 0; i <= 4; i++) { + for (int j = 0; j <= 2; j++) { + int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; + int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; + final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead, + fileSize, footerReadBufferSize); + String fileName = methodName.getMethodName() + i; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + int length = AbfsInputStream.FOOTER_SIZE; + FutureDataInputStreamBuilder builder = fs.openFile(testFilePath); + builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE, + footerReadBufferSize); + try (FSDataInputStream iStream = builder.build().get()) { + byte[] buffer = new byte[length]; + + Map metricMap = getInstrumentationMap(fs); + long requestsMadeBeforeTest = metricMap + .get(CONNECTIONS_MADE.getStatName()); + + iStream.seek(fileSize - 8); + iStream.read(buffer, 0, length); + + iStream.seek(fileSize - (TEN * ONE_KB)); + iStream.read(buffer, 0, length); + + iStream.seek(fileSize - (TWENTY * ONE_KB)); + iStream.read(buffer, 0, length); + + metricMap = getInstrumentationMap(fs); + long requestsMadeAfterTest = metricMap + .get(CONNECTIONS_MADE.getStatName()); + + if (optimizeFooterRead) { + assertEquals(1, + requestsMadeAfterTest - requestsMadeBeforeTest); + } else { + assertEquals(3, + requestsMadeAfterTest - requestsMadeBeforeTest); + } } } } @@ -152,19 +164,22 @@ public void testSeekToEndAndReadWithConfFalse() throws Exception { private void testSeekAndReadWithConf(boolean optimizeFooterRead, SeekTo seekTo) throws Exception { - // Running the test for file sizes ranging from 256 KB to 8 MB/ + // Running the test for file sizes ranging from 256 KB to 8 MB // This will cover files less than footer read buffer size, // Files between footer read buffer and read buffer size // Files bigger than read buffer size - for (int i = 0; i <= 5; i++) { - int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; - final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead, - fileSize); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED, - fileContent); + for (int i = 0; i <= 4; i++) { + for (int j = 0; j <= 2; j++) { + int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; + int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; + final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead, + fileSize, footerReadBufferSize); + String fileName = methodName.getMethodName() + i; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED, + fileContent); + } } } @@ -187,10 +202,12 @@ private int seekPos(SeekTo seekTo, int fileSize) { private void seekReadAndTest(final FileSystem fs, final Path testFilePath, final int seekPos, final int length, final byte[] fileContent) - throws IOException, NoSuchFieldException, IllegalAccessException { + throws Exception { AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration(); long actualContentLength = fileContent.length; - try (FSDataInputStream iStream = fs.open(testFilePath)) { + FutureDataInputStreamBuilder builder = fs.openFile(testFilePath); + builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE, DEFAULT_FOOTER_READ_BUFFER_SIZE); + try (FSDataInputStream iStream = builder.build().get()) { AbfsInputStream abfsInputStream = (AbfsInputStream) iStream .getWrappedStream(); long footerReadBufferSize = abfsInputStream.getFooterReadBufferSize(); @@ -253,15 +270,19 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath, @Test public void testPartialReadWithNoData() throws Exception { - for (int i = 0; i <= 5; i++) { - int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; - final AzureBlobFileSystem fs = getFileSystem(true, fileSize); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - testPartialReadWithNoData(fs, testFilePath, - fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, - fileContent); + for (int i = 0; i <= 4; i++) { + for (int j = 0; j <= 2; j++) { + int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; + int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; + final AzureBlobFileSystem fs = getFileSystem(true, + fileSize, footerReadBufferSize); + String fileName = methodName.getMethodName() + i; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + testPartialReadWithNoData(fs, testFilePath, + fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, + fileContent); + } } } @@ -296,15 +317,19 @@ private void testPartialReadWithNoData(final FileSystem fs, @Test public void testPartialReadWithSomeData() throws Exception { - for (int i = 0; i <= 5; i++) { - int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; - final AzureBlobFileSystem fs = getFileSystem(true, fileSize); - String fileName = methodName.getMethodName() + i; - byte[] fileContent = getRandomBytesArray(fileSize); - Path testFilePath = createFileWithContent(fs, fileName, fileContent); - testPartialReadWithSomeData(fs, testFilePath, - fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, - fileContent); + for (int i = 0; i <= 4; i++) { + for (int j = 0; j <= 2; j++) { + int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB; + int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB; + final AzureBlobFileSystem fs = getFileSystem(true, + fileSize, footerReadBufferSize); + String fileName = methodName.getMethodName() + i; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + testPartialReadWithSomeData(fs, testFilePath, + fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE, + fileContent); + } } } @@ -347,14 +372,13 @@ private void testPartialReadWithSomeData(final FileSystem fs, } private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead, - int fileSize) throws IOException { + int fileSize, int footerReadBufferSize) throws IOException { final AzureBlobFileSystem fs = getFileSystem(); - getAbfsStore(fs).getAbfsConfiguration() - .setOptimizeFooterRead(optimizeFooterRead); - if (fileSize <= getAbfsStore(fs).getAbfsConfiguration() - .getReadBufferSize()) { - getAbfsStore(fs).getAbfsConfiguration() - .setReadSmallFilesCompletely(false); + AzureBlobFileSystemStore store = getAbfsStore(fs); + store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead); + store.getAbfsConfiguration().setFooterReadBufferSize(footerReadBufferSize); + if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) { + store.getAbfsConfiguration().setReadSmallFilesCompletely(false); } return fs; }