Skip to content

Commit

Permalink
HADOOP-18971. [ABFS] Read and cache file footer with fs.azure.footer.…
Browse files Browse the repository at this point in the history
…read.request.size (apache#6270)


The option fs.azure.footer.read.request.size sets the size of the footer to
read and cache; the default value of 524288 has been measured to
be good for most workloads running on parquet, ORC and similar file formats.

Contributed by Anuj Modi
  • Loading branch information
anujmodi2021 authored and jiajunmao committed Feb 6, 2024
1 parent ded5602 commit fd81581
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;

@IntegerConfigurationValidatorAnnotation(
ConfigurationKey = AZURE_FOOTER_READ_BUFFER_SIZE,
DefaultValue = DEFAULT_FOOTER_READ_BUFFER_SIZE)
private int footerReadBufferSize;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
Expand Down Expand Up @@ -648,6 +653,10 @@ public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}

public int getFooterReadBufferSize() {
return this.footerReadBufferSize;
}

public int getReadBufferSize() {
return this.readBufferSize;
}
Expand Down Expand Up @@ -1182,6 +1191,11 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
}

@VisibleForTesting
public void setFooterReadBufferSize(int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
}

@VisibleForTesting
public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
this.enableAbfsListIterator = enableAbfsListIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
Expand Down Expand Up @@ -895,13 +896,17 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
boolean bufferedPreadDisabled = options
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE, abfsConfiguration.getFooterReadBufferSize()))
.orElse(abfsConfiguration.getFooterReadBufferSize());
return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,22 @@ public final class ConfigurationKeys {
public static final String AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = "fs.azure.write.enableappendwithflush";
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
/**
* When parquet files are read, first few read are metadata reads before
* reading the actual data. First the read is done of last 8 bytes of parquet
* file to get the postion of metadta and next read is done for reading that
* metadata. With this optimization these two reads can be combined into 1.
* Value: {@value}
*/
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
/**
* In case of footer reads it was not required to read full buffer size.
* Most of the metadata information required was within 256 KB and it will be
* more performant to read less. 512 KB is a sweet spot.
* This config is used to define how much footer length the user wants to read.
* Value: {@value}
*/
public static final String AZURE_FOOTER_READ_BUFFER_SIZE = "fs.azure.footer.read.request.size";

/**
* Read ahead range parameter which can be set by user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = false;
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = true;
public static final int DEFAULT_FOOTER_READ_BUFFER_SIZE = 512 * ONE_KB;
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
public static final int DEFAULT_READ_AHEAD_RANGE = 64 * ONE_KB; // 64 KB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String path;
private final long contentLength;
private final int bufferSize; // default buffer size
private final int footerReadSize; // default buffer size to read when reading footer
private final int readAheadQueueDepth; // initialized in constructor
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
Expand Down Expand Up @@ -140,6 +141,7 @@ public AbfsInputStream(
this.path = path;
this.contentLength = contentLength;
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
this.footerReadSize = abfsInputStreamContext.getFooterReadBufferSize();
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
Expand Down Expand Up @@ -361,6 +363,7 @@ private int readFileCompletely(final byte[] b, final int off, final int len)
return optimisedRead(b, off, len, 0, contentLength);
}

// To do footer read of files when enabled.
private int readLastBlock(final byte[] b, final int off, final int len)
throws IOException {
if (len == 0) {
Expand All @@ -373,10 +376,10 @@ private int readLastBlock(final byte[] b, final int off, final int len)
// data need to be copied to user buffer from index bCursor,
// AbfsInutStream buffer is going to contain data from last block start. In
// that case bCursor will be set to fCursor - lastBlockStart
long lastBlockStart = max(0, contentLength - bufferSize);
long lastBlockStart = max(0, contentLength - footerReadSize);
bCursor = (int) (fCursor - lastBlockStart);
// 0 if contentlength is < buffersize
long actualLenToRead = min(bufferSize, contentLength);
long actualLenToRead = min(footerReadSize, contentLength);
return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
}

Expand Down Expand Up @@ -819,6 +822,11 @@ public int getBufferSize() {
return bufferSize;
}

@VisibleForTesting
protected int getFooterReadBufferSize() {
return footerReadSize;
}

@VisibleForTesting
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean optimizeFooterRead;

private int footerReadBufferSize;

private boolean bufferedPreadDisabled;

/** A BackReference to the FS instance that created this OutputStream. */
Expand Down Expand Up @@ -113,6 +115,11 @@ public AbfsInputStreamContext withOptimizeFooterRead(
return this;
}

public AbfsInputStreamContext withFooterReadBufferSize(final int footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
return this;
}

public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize;
Expand Down Expand Up @@ -190,6 +197,10 @@ public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}

public int getFooterReadBufferSize() {
return footerReadBufferSize;
}

public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private void testReadWriteAndSeek(int bufferSize) throws Exception {
abfsConfiguration.setWriteBufferSize(bufferSize);
abfsConfiguration.setReadBufferSize(bufferSize);
abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
abfsConfiguration.setOptimizeFooterRead(false);

final byte[] b = new byte[2 * bufferSize];
new Random().nextBytes(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
final AzureBlobFileSystem fs = getFileSystem();
getAbfsStore(fs).getAbfsConfiguration()
.setReadSmallFilesCompletely(readSmallFilesCompletely);
getAbfsStore(fs).getAbfsConfiguration()
.setOptimizeFooterRead(false);
return fs;
}

Expand Down
Loading

0 comments on commit fd81581

Please sign in to comment.