Skip to content

Commit

Permalink
Addressing Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Anuj Modi committed Nov 21, 2023
1 parent e438b94 commit c0c1319
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,11 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
}

@VisibleForTesting
public void setFooterReadBufferSize(int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
}

@VisibleForTesting
public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,17 @@ public final class ConfigurationKeys {
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
/**
* When parquet files are read, first few read are metadata reads before reading the actual data.
* First the read is done of last 8 bytes of parquet file to get the postion of metadta and next read
* is done for reading that metadata. With this optimizations these two reads can be combined into 1.
* When parquet files are read, first few read are metadata reads before
* reading the actual data. First the read is done of last 8 bytes of parquet
* file to get the postion of metadta and next read is done for reading that
* metadata. With this optimization these two reads can be combined into 1.
* Value: {@value}
*/
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
/**
* In case of footer reads it was not required to read full buffer size.
* Most of the metadata information required was within 256KB and it will be more performant to read lesser.
* Most of the metadata information required was within 256 KB and it will be
* more performant to read less. 512 KB is a sweet spot.
* This config is used to define how much footer length the user wants to read.
* Value: {@value}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private int readFileCompletely(final byte[] b, final int off, final int len)
return optimisedRead(b, off, len, 0, contentLength);
}

// To do footer read of files when enabled
// To do footer read of files when enabled.
private int readLastBlock(final byte[] b, final int off, final int len)
throws IOException {
if (len == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.junit.Test;

import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -33,6 +36,7 @@
import static java.lang.Math.max;
import static java.lang.Math.min;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -63,38 +67,46 @@ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()

private void testNumBackendCalls(boolean optimizeFooterRead)
throws Exception {
for (int i = 0; i <= 5; i++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
int length = AbfsInputStream.FOOTER_SIZE;
try (FSDataInputStream iStream = fs.open(testFilePath)) {
byte[] buffer = new byte[length];

Map<String, Long> metricMap = getInstrumentationMap(fs);
long requestsMadeBeforeTest = metricMap
.get(CONNECTIONS_MADE.getStatName());

iStream.seek(fileSize - 8);
iStream.read(buffer, 0, length);

iStream.seek(fileSize - (TEN * ONE_KB));
iStream.read(buffer, 0, length);

iStream.seek(fileSize - (TWENTY * ONE_KB));
iStream.read(buffer, 0, length);

metricMap = getInstrumentationMap(fs);
long requestsMadeAfterTest = metricMap
.get(CONNECTIONS_MADE.getStatName());

if (optimizeFooterRead) {
assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
} else {
assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
for (int i = 0; i <= 4; i++) {
for (int j = 0; j <= 2; j++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize, footerReadBufferSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
int length = AbfsInputStream.FOOTER_SIZE;
FutureDataInputStreamBuilder builder = fs.openFile(testFilePath);
builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE,
footerReadBufferSize);
try (FSDataInputStream iStream = builder.build().get()) {
byte[] buffer = new byte[length];

Map<String, Long> metricMap = getInstrumentationMap(fs);
long requestsMadeBeforeTest = metricMap
.get(CONNECTIONS_MADE.getStatName());

iStream.seek(fileSize - 8);
iStream.read(buffer, 0, length);

iStream.seek(fileSize - (TEN * ONE_KB));
iStream.read(buffer, 0, length);

iStream.seek(fileSize - (TWENTY * ONE_KB));
iStream.read(buffer, 0, length);

metricMap = getInstrumentationMap(fs);
long requestsMadeAfterTest = metricMap
.get(CONNECTIONS_MADE.getStatName());

if (optimizeFooterRead) {
assertEquals(1,
requestsMadeAfterTest - requestsMadeBeforeTest);
} else {
assertEquals(3,
requestsMadeAfterTest - requestsMadeBeforeTest);
}
}
}
}
Expand Down Expand Up @@ -152,19 +164,22 @@ public void testSeekToEndAndReadWithConfFalse() throws Exception {

private void testSeekAndReadWithConf(boolean optimizeFooterRead,
SeekTo seekTo) throws Exception {
// Running the test for file sizes ranging from 256 KB to 8 MB/
// Running the test for file sizes ranging from 256 KB to 8 MB
// This will cover files less than footer read buffer size,
// Files between footer read buffer and read buffer size
// Files bigger than read buffer size
for (int i = 0; i <= 5; i++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
fileContent);
for (int i = 0; i <= 4; i++) {
for (int j = 0; j <= 2; j++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize, footerReadBufferSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
fileContent);
}
}
}

Expand All @@ -187,10 +202,12 @@ private int seekPos(SeekTo seekTo, int fileSize) {

private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
final int seekPos, final int length, final byte[] fileContent)
throws IOException, NoSuchFieldException, IllegalAccessException {
throws Exception {
AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
long actualContentLength = fileContent.length;
try (FSDataInputStream iStream = fs.open(testFilePath)) {
FutureDataInputStreamBuilder builder = fs.openFile(testFilePath);
builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE, DEFAULT_FOOTER_READ_BUFFER_SIZE);
try (FSDataInputStream iStream = builder.build().get()) {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
long footerReadBufferSize = abfsInputStream.getFooterReadBufferSize();
Expand Down Expand Up @@ -253,15 +270,19 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
@Test
public void testPartialReadWithNoData()
throws Exception {
for (int i = 0; i <= 5; i++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testPartialReadWithNoData(fs, testFilePath,
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
fileContent);
for (int i = 0; i <= 4; i++) {
for (int j = 0; j <= 2; j++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(true,
fileSize, footerReadBufferSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testPartialReadWithNoData(fs, testFilePath,
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
fileContent);
}
}
}

Expand Down Expand Up @@ -296,15 +317,19 @@ private void testPartialReadWithNoData(final FileSystem fs,
@Test
public void testPartialReadWithSomeData()
throws Exception {
for (int i = 0; i <= 5; i++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testPartialReadWithSomeData(fs, testFilePath,
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
fileContent);
for (int i = 0; i <= 4; i++) {
for (int j = 0; j <= 2; j++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(true,
fileSize, footerReadBufferSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testPartialReadWithSomeData(fs, testFilePath,
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
fileContent);
}
}
}

Expand Down Expand Up @@ -347,14 +372,13 @@ private void testPartialReadWithSomeData(final FileSystem fs,
}

private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
int fileSize) throws IOException {
int fileSize, int footerReadBufferSize) throws IOException {
final AzureBlobFileSystem fs = getFileSystem();
getAbfsStore(fs).getAbfsConfiguration()
.setOptimizeFooterRead(optimizeFooterRead);
if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
.getReadBufferSize()) {
getAbfsStore(fs).getAbfsConfiguration()
.setReadSmallFilesCompletely(false);
AzureBlobFileSystemStore store = getAbfsStore(fs);
store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead);
store.getAbfsConfiguration().setFooterReadBufferSize(footerReadBufferSize);
if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) {
store.getAbfsConfiguration().setReadSmallFilesCompletely(false);
}
return fs;
}
Expand Down

0 comments on commit c0c1319

Please sign in to comment.