Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27646 Should not use pread when prefetching in HFilePreadReader #5063

Merged
merged 1 commit into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ private static class ReadStatistics {
private Boolean instanceOfCanUnbuffer = null;
private CanUnbuffer unbuffer = null;

protected Path readerPath;

public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, path, false, -1L);
}
Expand Down Expand Up @@ -127,6 +129,9 @@ private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolea
// Initially we are going to read the tail block. Open the reader w/FS checksum.
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream
? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath()
: path;
setStreamOptions(stream);
}

Expand Down Expand Up @@ -342,4 +347,8 @@ public void unbuffer() {
}
}
}

public Path getReaderPath() {
return readerPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class FileLink {
* FileLink InputStream that handles the switch between the original path and the alternative
* locations, when the file is moved.
*/
private static class FileLinkInputStream extends InputStream
protected static class FileLinkInputStream extends InputStream
implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
private FSDataInputStream in = null;
private Path currentPath = null;
Expand Down Expand Up @@ -286,6 +286,10 @@ public void setReadahead(Long readahead) throws IOException, UnsupportedOperatio
public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
in.setDropBehind(dropCache);
}

public Path getCurrentPath() {
return currentPath;
}
}

private Path[] locations = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,7 +42,15 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
public void run() {
long offset = 0;
long end = 0;
HFile.Reader prefetchStreamReader = null;
try {
ReaderContext streamReaderContext = ReaderContextBuilder.newBuilder(context)
.withReaderType(ReaderContext.ReaderType.STREAM)
.withInputStreamWrapper(new FSDataInputStreamWrapper(context.getFileSystem(),
context.getInputStreamWrapper().getReaderPath()))
.build();
prefetchStreamReader =
new HFileStreamReader(streamReaderContext, fileInfo, cacheConf, conf);
end = getTrailer().getLoadOnOpenDataOffset();
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
Expand All @@ -56,8 +65,8 @@ public void run() {
// the internal-to-hfileblock thread local which holds the overread that gets the
// next header, will not have happened...so, pass in the onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely I'd say.
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
/* pread= */true, false, false, null, null, true);
HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock,
/* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
Expand All @@ -77,6 +86,13 @@ public void run() {
// Other exceptions are interesting
LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
} finally {
if (prefetchStreamReader != null) {
try {
prefetchStreamReader.close(false);
} catch (IOException e) {
LOG.warn("Close prefetch stream reader failed, path: " + path, e);
}
}
PrefetchExecutor.complete(path);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ public class ReaderContextBuilder {
public ReaderContextBuilder() {
}

public static ReaderContextBuilder newBuilder(ReaderContext readerContext) {
return new ReaderContextBuilder(readerContext);
}

private ReaderContextBuilder(ReaderContext readerContext) {
this.filePath = readerContext.getFilePath();
this.fsdis = readerContext.getInputStreamWrapper();
this.fileSize = readerContext.getFileSize();
this.hfs = readerContext.getFileSystem();
this.type = readerContext.getReaderType();
}

public ReaderContextBuilder withFilePath(Path filePath) {
this.filePath = filePath;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,20 @@
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -252,6 +256,14 @@ public void testPrefetchDoesntSkipRefs() throws Exception {
});
}

@Test
public void testPrefetchDoesntSkipHFileLink() throws Exception {
testPrefetchWhenHFileLink(c -> {
boolean isCached = c != null;
assertTrue(isCached);
});
}

private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable> test)
throws Exception {
cacheConf = new CacheConfig(conf, blockCache);
Expand Down Expand Up @@ -287,6 +299,52 @@ private void testPrefetchWhenRefs(boolean compactionEnabled, Consumer<Cacheable>
}
}

private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
cacheConf = new CacheConfig(conf, blockCache);
HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
final RegionInfo hri =
RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
// force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
Configuration testConf = new Configuration(this.conf);
CommonFSUtils.setRootDir(testConf, testDir);
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);

// Make a store file and write data to it.
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(regionFs.createTempName()).withFileContext(context).build();
TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
Bytes.toBytes("testPrefetchWhenHFileLink"));

Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf"));
HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
Path linkFilePath =
new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));

// Try to open store file from link
StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
assertTrue(storeFileInfo.isLink());

hsf.initReader();
HFile.Reader reader = hsf.getReader().getHFileReader();
while (!reader.prefetchComplete()) {
// Sleep for a bit
Thread.sleep(1000);
}
long offset = 0;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
if (block.getBlockType() == BlockType.DATA) {
test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
}
offset += block.getOnDiskSizeWithHeader();
}
}

private Path writeStoreFile(String fname) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
return writeStoreFile(fname, meta);
Expand Down