Skip to content

Commit

Permalink
Enhanced Test Coverage For Footer Related Configs
Browse files Browse the repository at this point in the history
  • Loading branch information
Anuj Modi committed Nov 22, 2023
1 parent c0c1319 commit 6bb6bd7
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FOOTER_READ_BUFFER_SIZE;

/**
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage.
Expand Down Expand Up @@ -811,14 +813,17 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
boolean bufferedPreadDisabled = options
.map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
.orElse(false);
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE, DEFAULT_FOOTER_READ_BUFFER_SIZE))
.orElse(abfsConfiguration.getFooterReadBufferSize());
return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
.withFooterReadBufferSize(abfsConfiguration.getFooterReadBufferSize())
.withFooterReadBufferSize(footerReadBufferSize)
.withReadAheadRange(abfsConfiguration.getReadAheadRange())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -72,7 +74,7 @@ private void testNumBackendCalls(boolean optimizeFooterRead)
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize, footerReadBufferSize);
fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
Expand All @@ -81,6 +83,10 @@ private void testNumBackendCalls(boolean optimizeFooterRead)
builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE,
footerReadBufferSize);
try (FSDataInputStream iStream = builder.build().get()) {
AbfsInputStream abfsInputStream = (AbfsInputStream)iStream.getWrappedStream();
Assertions.assertThat(abfsInputStream.getFooterReadBufferSize())
.describedAs("Footer Read Buffer Size Should be same as what set in builder")
.isEqualTo(footerReadBufferSize);
byte[] buffer = new byte[length];

Map<String, Long> metricMap = getInstrumentationMap(fs);
Expand Down Expand Up @@ -164,7 +170,8 @@ public void testSeekToEndAndReadWithConfFalse() throws Exception {

private void testSeekAndReadWithConf(boolean optimizeFooterRead,
SeekTo seekTo) throws Exception {
// Running the test for file sizes ranging from 256 KB to 8 MB
// Running the test for file sizes ranging from 256 KB to 4 MB with
// Footer Read Buffer size ranging from 256 KB to 1 MB
// This will cover files less than footer read buffer size,
// Files between footer read buffer and read buffer size
// Files bigger than read buffer size
Expand All @@ -173,12 +180,12 @@ private void testSeekAndReadWithConf(boolean optimizeFooterRead,
int fileSize = (int) Math.pow(2, i) * 256 * ONE_KB;
int footerReadBufferSize = (int) Math.pow(2, j) * 256 * ONE_KB;
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
fileSize, footerReadBufferSize);
fileSize);
String fileName = methodName.getMethodName() + i;
byte[] fileContent = getRandomBytesArray(fileSize);
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
fileContent);
fileContent, footerReadBufferSize);
}
}
}
Expand All @@ -201,16 +208,18 @@ private int seekPos(SeekTo seekTo, int fileSize) {
}

private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
final int seekPos, final int length, final byte[] fileContent)
final int seekPos, final int length, final byte[] fileContent, int footerReadBufferSize)
throws Exception {
AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
long actualContentLength = fileContent.length;
FutureDataInputStreamBuilder builder = fs.openFile(testFilePath);
builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE, DEFAULT_FOOTER_READ_BUFFER_SIZE);
builder.opt(ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE, footerReadBufferSize);
try (FSDataInputStream iStream = builder.build().get()) {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
long footerReadBufferSize = abfsInputStream.getFooterReadBufferSize();
Assertions.assertThat(abfsInputStream.getFooterReadBufferSize())
.describedAs("Footer Read Buffer Size Should be same as what set in builder")
.isEqualTo(footerReadBufferSize);
long readBufferSize = abfsInputStream.getBufferSize();
seek(iStream, seekPos);
byte[] buffer = new byte[length];
Expand Down Expand Up @@ -281,19 +290,22 @@ public void testPartialReadWithNoData()
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testPartialReadWithNoData(fs, testFilePath,
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
fileContent);
fileContent, footerReadBufferSize);
}
}
}

private void testPartialReadWithNoData(final FileSystem fs,
final Path testFilePath, final int seekPos, final int length,
final byte[] fileContent)
final byte[] fileContent, int footerReadBufferSize)
throws IOException, NoSuchFieldException, IllegalAccessException {
FSDataInputStream iStream = fs.open(testFilePath);
try {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
Assertions.assertThat(abfsInputStream.getFooterReadBufferSize())
.describedAs("Footer Read Buffer Size Should be same as what set in builder")
.isEqualTo(footerReadBufferSize);
abfsInputStream = spy(abfsInputStream);
doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream)
.readRemote(anyLong(), any(), anyInt(), anyInt(),
Expand Down Expand Up @@ -328,19 +340,23 @@ public void testPartialReadWithSomeData()
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testPartialReadWithSomeData(fs, testFilePath,
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
fileContent);
fileContent, footerReadBufferSize);
}
}
}

private void testPartialReadWithSomeData(final FileSystem fs,
final Path testFilePath, final int seekPos, final int length,
final byte[] fileContent)
final byte[] fileContent, final int footerReadBufferSize)
throws IOException, NoSuchFieldException, IllegalAccessException {
FSDataInputStream iStream = fs.open(testFilePath);
try {
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
.getWrappedStream();
Assertions.assertThat(abfsInputStream.getFooterReadBufferSize())
.describedAs("Footer Read Buffer Size Should be same as what set in builder")
.isEqualTo(footerReadBufferSize);

abfsInputStream = spy(abfsInputStream);
// first readRemote, will return first 10 bytes
// second readRemote returns data till the last 2 bytes
Expand Down Expand Up @@ -371,6 +387,17 @@ private void testPartialReadWithSomeData(final FileSystem fs,
}
}

private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
int fileSize) throws IOException {
final AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore store = getAbfsStore(fs);
store.getAbfsConfiguration().setOptimizeFooterRead(optimizeFooterRead);
if (fileSize <= store.getAbfsConfiguration().getReadBufferSize()) {
store.getAbfsConfiguration().setReadSmallFilesCompletely(false);
}
return fs;
}

private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
int fileSize, int footerReadBufferSize) throws IOException {
final AzureBlobFileSystem fs = getFileSystem();
Expand Down

0 comments on commit 6bb6bd7

Please sign in to comment.