Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18971: [ABFS] Enable Footer Read Optimizations with Appropriate Footer Read Buffer Size #6270

Merged
merged 9 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;

@IntegerConfigurationValidatorAnnotation(
ConfigurationKey = AZURE_FOOTER_READ_BUFFER_SIZE,
DefaultValue = DEFAULT_FOOTER_READ_BUFFER_SIZE)
private int footerReadBufferSize;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
Expand Down Expand Up @@ -648,6 +653,10 @@ public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}

public int getFooterReadBufferSize() {
return this.footerReadBufferSize;
}

public int getReadBufferSize() {
return this.readBufferSize;
}
Expand Down Expand Up @@ -1182,6 +1191,11 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
}

@VisibleForTesting
public void setFooterReadBufferSize(int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
}

@VisibleForTesting
public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
Expand Down Expand Up @@ -895,13 +896,17 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
boolean bufferedPreadDisabled = options
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE, abfsConfiguration.getFooterReadBufferSize()))
.orElse(abfsConfiguration.getFooterReadBufferSize());
return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,22 @@ public final class ConfigurationKeys {
public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
/**
* When parquet files are read, first few read are metadata reads before
* reading the actual data. First the read is done of last 8 bytes of parquet
* file to get the postion of metadta and next read is done for reading that
* metadata. With this optimization these two reads can be combined into 1.
* Value: {@value}
*/
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
/**
* In case of footer reads it was not required to read full buffer size.
* Most of the metadata information required was within 256 KB and it will be
* more performant to read less. 512 KB is a sweet spot.
* This config is used to define how much footer length the user wants to read.
* Value: {@value}
*/
public static final String AZURE_FOOTER_READ_BUFFER_SIZE = "fs.azure.footer.read.request.size";

/**
* Read ahead range parameter which can be set by user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = true;
public static final int DEFAULT_FOOTER_READ_BUFFER_SIZE = 512 * ONE_KB;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is 512k; docs in file above say 265K.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc.
So 256KB is enough data but to be on safer side we want to read more. It was found that 512 KB is a sweet spot between caching as much data as possible without impacting performance as much as possible.

public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String path;
private final long contentLength;
private final int bufferSize; // default buffer size
private final int footerReadSize; // default buffer size to read when reading footer
private final int readAheadQueueDepth; // initialized in constructor
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
Expand Down Expand Up @@ -140,6 +141,7 @@ public AbfsInputStream(
this.path = path;
this.contentLength = contentLength;
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
this.footerReadSize = abfsInputStreamContext.getFooterReadBufferSize();
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
Expand Down Expand Up @@ -361,6 +363,7 @@ private int readFileCompletely(final byte[] b, final int off, final int len)
return optimisedRead(b, off, len, 0, contentLength);
}

// To do footer read of files when enabled.
private int readLastBlock(final byte[] b, final int off, final int len)
throws IOException {
if (len == 0) {
Expand All @@ -373,10 +376,10 @@ private int readLastBlock(final byte[] b, final int off, final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
long lastBlockStart = max(0, contentLength - bufferSize);
long lastBlockStart = max(0, contentLength - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
long actualLenToRead = min(bufferSize, contentLength);
long actualLenToRead = min(footerReadSize, contentLength);
return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
}

Expand Down Expand Up @@ -819,6 +822,11 @@ public int getBufferSize() {
return bufferSize;
}

@VisibleForTesting
protected int getFooterReadBufferSize() {
return footerReadSize;
}

@VisibleForTesting
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean optimizeFooterRead;

private int footerReadBufferSize;

private boolean bufferedPreadDisabled;

/** A BackReference to the FS instance that created this OutputStream. */
Expand Down Expand Up @@ -113,6 +115,11 @@ public AbfsInputStreamContext withOptimizeFooterRead(
return this;
}

public AbfsInputStreamContext withFooterReadBufferSize(final int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
return this;
}

public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize;
Expand Down Expand Up @@ -190,6 +197,10 @@ public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}

public int getFooterReadBufferSize() {
return footerReadBufferSize;
}

public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
abfsConfiguration.setWriteBufferSize(bufferSize);
abfsConfiguration.setReadBufferSize(bufferSize);
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
abfsConfiguration.setOptimizeFooterRead(false);

final byte[] b = new byte[2 * bufferSize];
new Random().nextBytes(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
final AzureBlobFileSystem fs = getFileSystem();
getAbfsStore(fs).getAbfsConfiguration()
.setReadSmallFilesCompletely(readSmallFilesCompletely);
getAbfsStore(fs).getAbfsConfiguration()
.setOptimizeFooterRead(false);
return fs;
}

Expand Down
Loading