Skip to content

Commit

Permalink
Code Changes to enable footer optimization with new buffer size
Browse files Browse the repository at this point in the history
  • Loading branch information
Anuj Modi committed Nov 13, 2023
1 parent 077263d commit 5af02c5
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;

@IntegerConfigurationValidatorAnnotation(
ConfigurationKey = AZURE_FOOTER_READ_BUFFER_SIZE,
DefaultValue = DEFAULT_FOOTER_READ_BUFFER_SIZE)
private int footerReadBufferSize;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
Expand Down Expand Up @@ -643,6 +648,10 @@ public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}

public int getFooterReadBufferSize() {
return this.footerReadBufferSize;
}

public int getReadBufferSize() {
return this.readBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
.withFooterReadBufferSize(abfsConfiguration.getFooterReadBufferSize())
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,20 @@ public final class ConfigurationKeys {
public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
/**
* When parquet files are read, first few read are metadata reads before reading the actual data.
* First the read is done of last 8 bytes of parquet file to get the postion of metadta and next read
* is done for reading that metadata. With this optimizations these two reads can be combined into 1.
* Value: {@value}
*/
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
/**
* In case of footer reads it was not required to read full buffer size.
* Most of the metadata information required was within 256KB and it will be more performant to read lesser.
* This config is used to define how much footer length the user wants to read.
* Value: {@value}
*/
public static final String AZURE_FOOTER_READ_BUFFER_SIZE = "fs.azure.footer.read.request.size";

/**
* Read ahead range parameter which can be set by user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
public static final int DEFAULT_FOOTER_READ_BUFFER_SIZE = 512 * ONE_KB;
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String path;
private final long contentLength;
private final int bufferSize; // default buffer size
private final int footerReadSize; // default buffer size to read when reading footer
private final int readAheadQueueDepth; // initialized in constructor
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
Expand Down Expand Up @@ -138,6 +139,7 @@ public AbfsInputStream(
this.path = path;
this.contentLength = contentLength;
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
this.footerReadSize = abfsInputStreamContext.getFooterReadBufferSize();
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
Expand Down Expand Up @@ -370,10 +372,10 @@ private int readLastBlock(final byte[] b, final int off, final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
long lastBlockStart = max(0, contentLength - bufferSize);
long lastBlockStart = max(0, contentLength - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
long actualLenToRead = min(bufferSize, contentLength);
long actualLenToRead = min(footerReadSize, contentLength);
return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
}

Expand Down Expand Up @@ -812,6 +814,11 @@ public int getBufferSize() {
return bufferSize;
}

@VisibleForTesting
public int getFooterReadBufferSize() {
return footerReadSize;
}

@VisibleForTesting
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean optimizeFooterRead;

private int footerReadBufferSize;

private boolean bufferedPreadDisabled;

/** A BackReference to the FS instance that created this OutputStream. */
Expand Down Expand Up @@ -109,6 +111,11 @@ public AbfsInputStreamContext withOptimizeFooterRead(
return this;
}

public AbfsInputStreamContext withFooterReadBufferSize(final int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
return this;
}

public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize;
Expand Down Expand Up @@ -180,6 +187,10 @@ public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}

public int getFooterReadBufferSize() {
return footerReadBufferSize;
}

public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ public void testSeekToEndAndReadWithConfFalse() throws Exception {

private void testSeekAndReadWithConf(boolean optimizeFooterRead,
SeekTo seekTo) throws Exception {
for (int i = 2; i <= 6; i++) {
int fileSize = i * ONE_MB;
// Running the test for file sizes ranging from 256 KB to 8 MB/
// This will cover files less than footer read buffer size,
// Files between footer read buffer and read buffer size
// Files bigger than read buffer size
for (int i = 0; i <= 5; i++) {
int fileSize = (int)Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize);
String fileName = methodName.getMethodName() + i;
Expand Down Expand Up @@ -190,7 +194,8 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
try (FSDataInputStream iStream = fs.open(testFilePath)) {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
long bufferSize = abfsInputStream.getBufferSize();
long footerReadBufferSize = abfsInputStream.getFooterReadBufferSize();
long readBufferSize = abfsInputStream.getBufferSize();
seek(iStream, seekPos);
byte[] buffer = new byte[length];
long bytesRead = iStream.read(buffer, 0, length);
Expand All @@ -209,22 +214,22 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
long expectedBCurson;
long expectedFCursor;
if (optimizationOn) {
if (actualContentLength <= bufferSize) {
if (actualContentLength <= footerReadBufferSize) {
expectedLimit = actualContentLength;
expectedBCurson = seekPos + actualLength;
} else {
expectedLimit = bufferSize;
long lastBlockStart = max(0, actualContentLength - bufferSize);
expectedLimit = footerReadBufferSize;
long lastBlockStart = max(0, actualContentLength - footerReadBufferSize);
expectedBCurson = seekPos - lastBlockStart + actualLength;
}
expectedFCursor = actualContentLength;
} else {
if (seekPos + bufferSize < actualContentLength) {
expectedLimit = bufferSize;
expectedFCursor = bufferSize;
if (seekPos + readBufferSize < actualContentLength) {
expectedLimit = readBufferSize;
expectedFCursor = readBufferSize;
} else {
expectedLimit = actualContentLength - seekPos;
expectedFCursor = min(seekPos + bufferSize, actualContentLength);
expectedFCursor = min(seekPos + readBufferSize, actualContentLength);
}
expectedBCurson = actualLength;
}
Expand All @@ -239,7 +244,7 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
// Verify data read to AbfsInputStream buffer
int from = seekPos;
if (optimizationOn) {
from = (int) max(0, actualContentLength - bufferSize);
from = (int) max(0, actualContentLength - footerReadBufferSize);
}
assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(),
abfsInputStream.getBuffer(), testFilePath);
Expand All @@ -249,8 +254,8 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
@Test
public void testPartialReadWithNoData()
throws Exception {
for (int i = 2; i <= 6; i++) {
int fileSize = i * ONE_MB;
for (int i = 0; i <= 5; i++) {
int fileSize = (int)Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Expand Down Expand Up @@ -292,8 +297,8 @@ private void testPartialReadWithNoData(final FileSystem fs,
@Test
public void testPartialReadWithSomeDat()
throws Exception {
for (int i = 3; i <= 6; i++) {
int fileSize = i * ONE_MB;
for (int i = 0; i <= 5; i++) {
int fileSize = (int)Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Expand All @@ -317,7 +322,7 @@ private void testPartialReadWithSomeDat(final FileSystem fs,
// second readRemote returns data till the last 2 bytes
int someDataLength = 2;
int secondReturnSize =
min(fileContent.length, abfsInputStream.getBufferSize()) - 10
min(fileContent.length, abfsInputStream.getFooterReadBufferSize()) - 10
- someDataLength;
doReturn(10).doReturn(secondReturnSize).doCallRealMethod()
.when(abfsInputStream)
Expand Down

0 comments on commit 5af02c5

Please sign in to comment.