Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18971: [ABFS] Enable Footer Read Optimizations with Appropriate Footer Read Buffer Size #6270

Merged
merged 9 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;

@IntegerConfigurationValidatorAnnotation(
ConfigurationKey = AZURE_FOOTER_READ_BUFFER_SIZE,
DefaultValue = DEFAULT_FOOTER_READ_BUFFER_SIZE)
private int footerReadBufferSize;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
Expand Down Expand Up @@ -643,6 +648,10 @@ public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}

public int getFooterReadBufferSize() {
return this.footerReadBufferSize;
}

public int getReadBufferSize() {
return this.readBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
.withFooterReadBufferSize(abfsConfiguration.getFooterReadBufferSize())
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,20 @@ public final class ConfigurationKeys {
public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
/**
* When parquet files are read, first few read are metadata reads before reading the actual data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is roughly the same for ORC, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the procedure is same for ORC as well...
This optimization will impact workloads running on both parquet and ORC files

* First the read is done of last 8 bytes of parquet file to get the postion of metadta and next read
* is done for reading that metadata. With this optimizations these two reads can be combined into 1.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit "optimization"

* Value: {@value}
*/
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
/**
* In case of footer reads it was not required to read full buffer size.
* Most of the metadata information required was within 256KB and it will be more performant to read lesser.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"read less"

* This config is used to define how much footer length the user wants to read.
* Value: {@value}
*/
public static final String AZURE_FOOTER_READ_BUFFER_SIZE = "fs.azure.footer.read.request.size";

/**
* Read ahead range parameter which can be set by user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = true;
public static final int DEFAULT_FOOTER_READ_BUFFER_SIZE = 512 * ONE_KB;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is 512k; docs in file above say 265K.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc.
So 256KB is enough data but to be on safer side we want to read more. It was found that 512 KB is a sweet spot between caching as much data as possible without impacting performance as much as possible.

public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String path;
private final long contentLength;
private final int bufferSize; // default buffer size
private final int footerReadSize; // default buffer size to read when reading footer
private final int readAheadQueueDepth; // initialized in constructor
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
Expand Down Expand Up @@ -138,6 +139,7 @@ public AbfsInputStream(
this.path = path;
this.contentLength = contentLength;
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
this.footerReadSize = abfsInputStreamContext.getFooterReadBufferSize();
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
Expand Down Expand Up @@ -358,6 +360,7 @@ private int readFileCompletely(final byte[] b, final int off, final int len)
return optimisedRead(b, off, len, 0, contentLength);
}

// To do footer read of files when enabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: . at end for javadoc

private int readLastBlock(final byte[] b, final int off, final int len)
throws IOException {
if (len == 0) {
Expand All @@ -370,10 +373,10 @@ private int readLastBlock(final byte[] b, final int off, final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
long lastBlockStart = max(0, contentLength - bufferSize);
long lastBlockStart = max(0, contentLength - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
long actualLenToRead = min(bufferSize, contentLength);
long actualLenToRead = min(footerReadSize, contentLength);
return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
}

Expand Down Expand Up @@ -812,6 +815,11 @@ public int getBufferSize() {
return bufferSize;
}

@VisibleForTesting
protected int getFooterReadBufferSize() {
return footerReadSize;
}

@VisibleForTesting
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean optimizeFooterRead;

private int footerReadBufferSize;

private boolean bufferedPreadDisabled;

/** A BackReference to the FS instance that created this OutputStream. */
Expand Down Expand Up @@ -109,6 +111,11 @@ public AbfsInputStreamContext withOptimizeFooterRead(
return this;
}

public AbfsInputStreamContext withFooterReadBufferSize(final int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
return this;
}

public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize;
Expand Down Expand Up @@ -180,6 +187,10 @@ public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}

public int getFooterReadBufferSize() {
return footerReadBufferSize;
}

public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
abfsConfiguration.setWriteBufferSize(bufferSize);
abfsConfiguration.setReadBufferSize(bufferSize);
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
abfsConfiguration.setOptimizeFooterRead(false);

final byte[] b = new byte[2 * bufferSize];
new Random().nextBytes(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
final AzureBlobFileSystem fs = getFileSystem();
getAbfsStore(fs).getAbfsConfiguration()
.setReadSmallFilesCompletely(readSmallFilesCompletely);
getAbfsStore(fs).getAbfsConfiguration()
.setOptimizeFooterRead(false);
return fs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;

public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {

Expand All @@ -64,8 +63,8 @@ public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()

private void testNumBackendCalls(boolean optimizeFooterRead)
throws Exception {
for (int i = 1; i <= 4; i++) {
int fileSize = i * ONE_MB;
for (int i = 0; i <= 5; i++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize);
String fileName = methodName.getMethodName() + i;
Expand Down Expand Up @@ -153,8 +152,12 @@ public void testSeekToEndAndReadWithConfFalse() throws Exception {

private void testSeekAndReadWithConf(boolean optimizeFooterRead,
SeekTo seekTo) throws Exception {
for (int i = 2; i <= 6; i++) {
int fileSize = i * ONE_MB;
// Running the test for file sizes ranging from 256 KB to 8 MB/
// This will cover files less than footer read buffer size,
// Files between footer read buffer and read buffer size
// Files bigger than read buffer size
for (int i = 0; i <= 5; i++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize);
String fileName = methodName.getMethodName() + i;
Expand Down Expand Up @@ -190,7 +193,8 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
try (FSDataInputStream iStream = fs.open(testFilePath)) {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
long bufferSize = abfsInputStream.getBufferSize();
long footerReadBufferSize = abfsInputStream.getFooterReadBufferSize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on diff sizes of file. Should we have parameterized values for getFooterReadBufferSize. Right now, it depends on what test-config developer has.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not able to get this...
Can you please elaborate...

The footer buffer size here will be the default one unless user sets it in configs explicitly.
Are you recommending this to be hardcoded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of this config is 256KB. Now, developer can have any other config also. Right now, test is very much inline of using 256 KB. What I am proposing is, that in the test, we set the config and don't depend on the dev given config. Plus, I am proposing we run this test for different values of footerBufferSize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose something else, will comment below

long readBufferSize = abfsInputStream.getBufferSize();
seek(iStream, seekPos);
byte[] buffer = new byte[length];
long bytesRead = iStream.read(buffer, 0, length);
Expand All @@ -206,40 +210,40 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
actualLength = length - delta;
}
long expectedLimit;
long expectedBCurson;
long expectedBCursor;
long expectedFCursor;
if (optimizationOn) {
if (actualContentLength <= bufferSize) {
if (actualContentLength <= footerReadBufferSize) {
expectedLimit = actualContentLength;
expectedBCurson = seekPos + actualLength;
expectedBCursor = seekPos + actualLength;
} else {
expectedLimit = bufferSize;
long lastBlockStart = max(0, actualContentLength - bufferSize);
expectedBCurson = seekPos - lastBlockStart + actualLength;
expectedLimit = footerReadBufferSize;
long lastBlockStart = max(0, actualContentLength - footerReadBufferSize);
expectedBCursor = seekPos - lastBlockStart + actualLength;
}
expectedFCursor = actualContentLength;
} else {
if (seekPos + bufferSize < actualContentLength) {
expectedLimit = bufferSize;
expectedFCursor = bufferSize;
if (seekPos + readBufferSize < actualContentLength) {
expectedLimit = readBufferSize;
expectedFCursor = readBufferSize;
} else {
expectedLimit = actualContentLength - seekPos;
expectedFCursor = min(seekPos + bufferSize, actualContentLength);
expectedFCursor = min(seekPos + readBufferSize, actualContentLength);
}
expectedBCurson = actualLength;
expectedBCursor = actualLength;
}

assertEquals(expectedFCursor, abfsInputStream.getFCursor());
assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
assertEquals(expectedLimit, abfsInputStream.getLimit());
assertEquals(expectedBCurson, abfsInputStream.getBCursor());
assertEquals(expectedBCursor, abfsInputStream.getBCursor());
assertEquals(actualLength, bytesRead);
// Verify user-content read
assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer, testFilePath);
// Verify data read to AbfsInputStream buffer
int from = seekPos;
if (optimizationOn) {
from = (int) max(0, actualContentLength - bufferSize);
from = (int) max(0, actualContentLength - footerReadBufferSize);
}
assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(),
abfsInputStream.getBuffer(), testFilePath);
Expand All @@ -249,8 +253,8 @@ private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
@Test
public void testPartialReadWithNoData()
throws Exception {
for (int i = 2; i <= 6; i++) {
int fileSize = i * ONE_MB;
for (int i = 0; i <= 5; i++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Expand Down Expand Up @@ -290,21 +294,21 @@ private void testPartialReadWithNoData(final FileSystem fs,
}

@Test
public void testPartialReadWithSomeDat()
public void testPartialReadWithSomeData()
throws Exception {
for (int i = 3; i <= 6; i++) {
int fileSize = i * ONE_MB;
for (int i = 0; i <= 5; i++) {
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testPartialReadWithSomeDat(fs, testFilePath,
testPartialReadWithSomeData(fs, testFilePath,
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
fileContent);
}
}

private void testPartialReadWithSomeDat(final FileSystem fs,
private void testPartialReadWithSomeData(final FileSystem fs,
final Path testFilePath, final int seekPos, final int length,
final byte[] fileContent)
throws IOException, NoSuchFieldException, IllegalAccessException {
Expand All @@ -317,7 +321,7 @@ private void testPartialReadWithSomeDat(final FileSystem fs,
// second readRemote returns data till the last 2 bytes
int someDataLength = 2;
int secondReturnSize =
min(fileContent.length, abfsInputStream.getBufferSize()) - 10
min(fileContent.length, abfsInputStream.getFooterReadBufferSize()) - 10
- someDataLength;
doReturn(10).doReturn(secondReturnSize).doCallRealMethod()
.when(abfsInputStream)
Expand Down