Skip to content

Commit

Permalink
titandb: add blob cache and blob prefetcher (facebook#31)
Browse files Browse the repository at this point in the history
* titandb: fix two bugs

* titandb: add blob cache and blob prefetcher
  • Loading branch information
huachaohuang authored and DorianZheng committed Sep 5, 2018
1 parent 00463fe commit 24a76c3
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 113 deletions.
80 changes: 37 additions & 43 deletions utilities/titandb/blob_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ namespace titandb {

namespace {

void DeleteValue(const Slice&, void* value) {
Slice EncodeFileNumber(const uint64_t* number) {
return Slice(reinterpret_cast<const char*>(number), sizeof(*number));
}

void DeleteFileReader(const Slice&, void* value) {
delete reinterpret_cast<BlobFileReader*>(value);
}

Slice FileNumberSlice(const uint64_t* number) {
return Slice(reinterpret_cast<const char*>(number), sizeof(*number));
void UnrefCacheHandle(void* arg1, void* arg2) {
Cache* cache = reinterpret_cast<Cache*>(arg1);
Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
cache->Release(h);
}

}
Expand All @@ -32,72 +38,60 @@ Status BlobFileCache::Get(const ReadOptions& options,
uint64_t file_size,
const BlobHandle& handle,
BlobRecord* record, std::string* buffer) {
Cache::Handle* ch = nullptr;
Status s = FindFile(file_number, file_size, &ch);
Cache::Handle* cache_handle = nullptr;
Status s = FindFile(file_number, file_size, &cache_handle);
if (!s.ok()) return s;

auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(ch));
auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle));
s = reader->Get(options, handle, record, buffer);
cache_->Release(ch);
cache_->Release(cache_handle);
return s;
}

Status BlobFileCache::NewReader(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
std::unique_ptr<BlobFileReader>* result) {
Cache::Handle* ch = nullptr;
Status s = FindFile(file_number, file_size, &ch);
if (!s.ok()) return s;

auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(ch));
auto blob_file = reader->GetBlobFile();
cache_->Release(ch);

std::unique_ptr<RandomAccessFileReader> file;
s = NewRandomAccessReader(file_number, options.readahead_size, &file);
Status BlobFileCache::NewPrefetcher(
uint64_t file_number,
uint64_t file_size,
std::unique_ptr<BlobFilePrefetcher>* result) {
Cache::Handle* cache_handle = nullptr;
Status s = FindFile(file_number, file_size, &cache_handle);
if (!s.ok()) return s;

result->reset(new BlobFileReader(cf_options_, blob_file, std::move(file)));
auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle));
auto prefetcher = new BlobFilePrefetcher(reader);
prefetcher->RegisterCleanup(&UnrefCacheHandle, cache_.get(), cache_handle);
result->reset(prefetcher);
return s;
}

void BlobFileCache::Evict(uint64_t file_number) {
cache_->Erase(FileNumberSlice(&file_number));
cache_->Erase(EncodeFileNumber(&file_number));
}

Status BlobFileCache::FindFile(uint64_t file_number,
uint64_t file_size,
Cache::Handle** handle) {
Status s;
Slice number = FileNumberSlice(&file_number);
*handle = cache_->Lookup(number);
Slice cache_key = EncodeFileNumber(&file_number);
*handle = cache_->Lookup(cache_key);
if (*handle) return s;

std::unique_ptr<RandomAccessFileReader> file;
s = NewRandomAccessReader(file_number, 0, &file);
if (!s.ok()) return s;
{
std::unique_ptr<RandomAccessFile> f;
auto file_name = BlobFileName(db_options_.dirname, file_number);
s = env_->NewRandomAccessFile(file_name, &f, env_options_);
if (!s.ok()) return s;
if (db_options_.advise_random_on_open) {
f->Hint(RandomAccessFile::RANDOM);
}
file.reset(new RandomAccessFileReader(std::move(f), file_name));
}

std::unique_ptr<BlobFileReader> reader;
s = BlobFileReader::Open(cf_options_, std::move(file), file_size, &reader);
if (!s.ok()) return s;

cache_->Insert(number, reader.release(), 1, &DeleteValue, handle);
return s;
}

Status BlobFileCache::NewRandomAccessReader(
uint64_t file_number, uint64_t readahead_size,
std::unique_ptr<RandomAccessFileReader>* result) {
std::unique_ptr<RandomAccessFile> file;
auto file_name = BlobFileName(db_options_.dirname, file_number);
Status s = env_->NewRandomAccessFile(file_name, &file, env_options_);
if (!s.ok()) return s;

if (readahead_size > 0) {
file = NewReadaheadRandomAccessFile(std::move(file), readahead_size);
}
result->reset(new RandomAccessFileReader(std::move(file), file_name));
cache_->Insert(cache_key, reader.release(), 1, &DeleteFileReader, handle);
return s;
}

Expand Down
15 changes: 4 additions & 11 deletions utilities/titandb/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ class BlobFileCache {
const BlobHandle& handle,
BlobRecord* record, std::string* buffer);

// Creates a new blob file reader for the specified file number. The
// corresponding file size must be exactly "file_size" bytes.
// If successful, sets "*result" to the new blob file reader.
Status NewReader(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
std::unique_ptr<BlobFileReader>* result);
// Creates a prefetcher for the specified file number.
Status NewPrefetcher(uint64_t file_number,
uint64_t file_size,
std::unique_ptr<BlobFilePrefetcher>* result);

// Evicts the file cache for the specified file number.
void Evict(uint64_t file_number);
Expand All @@ -44,10 +41,6 @@ class BlobFileCache {
uint64_t file_size,
Cache::Handle** handle);

Status NewRandomAccessReader(uint64_t file_number,
uint64_t readahead_size,
std::unique_ptr<RandomAccessFileReader>* result);

Env* env_;
EnvOptions env_options_;
TitanDBOptions db_options_;
Expand Down
90 changes: 83 additions & 7 deletions utilities/titandb/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,32 @@
namespace rocksdb {
namespace titandb {

const uint64_t kMinReadaheadSize = 4 << 10;
const uint64_t kMaxReadaheadSize = 256 << 10;

namespace {

void GenerateCachePrefix(std::string* dst, Cache* cc, RandomAccessFile* file) {
char buffer[kMaxVarint64Length * 3 + 1];
auto size = file->GetUniqueId(buffer, sizeof(buffer));
if (size == 0) {
auto end = EncodeVarint64(buffer, cc->NewId());
size = end - buffer;
}
dst->assign(buffer, size);
}

void EncodeBlobCache(std::string* dst, const Slice& prefix, uint64_t offset) {
dst->assign(prefix.data(), prefix.size());
PutVarint64(dst, offset);
}

void DeleteBlobCache(const Slice&, void* value) {
delete reinterpret_cast<std::string*>(value);
}

}

Status BlobFileReader::Open(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file,
uint64_t file_size,
Expand All @@ -18,25 +44,75 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
&footer_input, footer_space);
if (!s.ok()) return s;

std::shared_ptr<BlobFile> blob_file(new BlobFile);
s = DecodeInto(footer_input, &blob_file->footer_);
BlobFileFooter footer;
s = DecodeInto(footer_input, &footer);
if (!s.ok()) return s;

result->reset(new BlobFileReader(options, blob_file, std::move(file)));
auto reader = new BlobFileReader(options, std::move(file));
reader->footer_ = footer;
result->reset(reader);
return s;
}

BlobFileReader::BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file)
: options_(options),
file_(std::move(file)),
cache_(options.blob_cache) {
if (cache_) {
GenerateCachePrefix(&cache_prefix_, cache_.get(), file_->file());
}
}

Status BlobFileReader::Get(const ReadOptions& /*options*/,
const BlobHandle& handle,
BlobRecord* record, std::string* buffer) {
Status s;
std::string cache_key;
if (cache_) {
EncodeBlobCache(&cache_key, cache_prefix_, handle.offset);
auto cache_handle = cache_->Lookup(cache_key);
if (cache_handle) {
*buffer = *reinterpret_cast<std::string*>(cache_->Value(cache_handle));
s = DecodeInto(*buffer, record);
cache_->Release(cache_handle);
return s;
}
}

Slice blob;
buffer->clear();
buffer->reserve(handle.size);
Status s = file_->Read(handle.offset, handle.size,
&blob, const_cast<char*>(buffer->data()));
buffer->resize(handle.size);
s = file_->Read(handle.offset, handle.size, &blob, &(*buffer)[0]);
if (!s.ok()) return s;

if (cache_) {
auto cache_value = new std::string(*buffer);
cache_->Insert(cache_key, cache_value, cache_value->size(),
&DeleteBlobCache);
}

return DecodeInto(blob, record);
}

Status BlobFilePrefetcher::Get(const ReadOptions& options,
const BlobHandle& handle,
BlobRecord* record, std::string* buffer) {
if (handle.offset == last_offset_) {
last_offset_ = handle.offset + handle.size;
if (handle.offset + handle.size > readahead_limit_) {
readahead_size_ = std::max(handle.size, readahead_size_);
reader_->file_->Prefetch(handle.offset, readahead_size_);
readahead_limit_ = handle.offset + readahead_size_;
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ * 2);
}
} else {
last_offset_ = handle.offset + handle.size;
readahead_size_ = 0;
readahead_limit_ = 0;
}

return reader_->Get(options, handle, record, buffer);
}

} // namespace titandb
} // namespace rocksdb
56 changes: 30 additions & 26 deletions utilities/titandb/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,6 @@
namespace rocksdb {
namespace titandb {

// Represents the information of a blob file read from the file.
class BlobFile {
public:
const BlobFileFooter& footer() const { return footer_; }

private:
friend class BlobFileReader;

BlobFile() = default;

BlobFileFooter footer_;
};

class BlobFileReader {
public:
// Opens a blob file and read the necessary metadata from it.
Expand All @@ -29,29 +16,46 @@ class BlobFileReader {
uint64_t file_size,
std::unique_ptr<BlobFileReader>* result);

// Constructs a reader with the shared blob file. The provided blob
// file must be corresponding to the "file".
BlobFileReader(const TitanCFOptions& options,
std::shared_ptr<BlobFile> blob_file,
std::unique_ptr<RandomAccessFileReader> file)
: options_(options),
blob_file_(blob_file),
file_(std::move(file)) {}

// Gets the blob record pointed by the handle in this file. The data
// of the record is stored in the provided buffer, so the buffer
// must be valid when the record is used.
Status Get(const ReadOptions& options,
const BlobHandle& handle,
BlobRecord* record, std::string* buffer);

// Returns a shared reference to the blob file.
std::shared_ptr<BlobFile> GetBlobFile() const { return blob_file_; }

private:
friend class BlobFilePrefetcher;

BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file);

TitanCFOptions options_;
std::shared_ptr<BlobFile> blob_file_;
std::unique_ptr<RandomAccessFileReader> file_;

std::shared_ptr<Cache> cache_;
std::string cache_prefix_;

// Information read from the file.
BlobFileFooter footer_;
};

// Performs readahead on continuous reads.
class BlobFilePrefetcher : public Cleanable {
public:
// Constructs a prefetcher with the blob file reader.
// "*reader" must be valid when the prefetcher is used.
BlobFilePrefetcher(BlobFileReader* reader)
: reader_(reader) {}

Status Get(const ReadOptions& options,
const BlobHandle& handle,
BlobRecord* record, std::string* buffer);

private:
BlobFileReader* reader_;
uint64_t last_offset_ {0};
uint64_t readahead_size_ {0};
uint64_t readahead_limit_ {0};
};

} // namespace titandb
Expand Down
13 changes: 10 additions & 3 deletions utilities/titandb/blob_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,25 @@ class BlobFileTest : public testing::Test {
ASSERT_OK(env_->GetFileSize(file_name_, &file_size));

ReadOptions ro;
std::unique_ptr<BlobFileReader> reader;
ASSERT_OK(cache.NewReader(ro, file_number_, file_size, &reader));
std::unique_ptr<BlobFilePrefetcher> prefetcher;
ASSERT_OK(cache.NewPrefetcher(file_number_, file_size, &prefetcher));
for (int i = 0; i < n; i++) {
auto id = std::to_string(i);
BlobRecord expect;
expect.key = id;
expect.value = id;
BlobRecord record;
std::string buffer;
ASSERT_OK(reader->Get(ro, handles[i], &record, &buffer));
ASSERT_OK(cache.Get(ro, file_number_, file_size, handles[i],
&record, &buffer));
ASSERT_EQ(record, expect);
ASSERT_OK(cache.Get(ro, file_number_, file_size, handles[i],
&record, &buffer));
ASSERT_EQ(record, expect);
ASSERT_OK(prefetcher->Get(ro, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
ASSERT_OK(prefetcher->Get(ro, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
}
}

Expand All @@ -79,6 +84,8 @@ class BlobFileTest : public testing::Test {
TEST_F(BlobFileTest, Basic) {
TitanOptions options;
TestBlobFile(options);
options.blob_cache = NewLRUCache(1 << 20);
TestBlobFile(options);
}

} // namespace titandb
Expand Down
2 changes: 1 addition & 1 deletion utilities/titandb/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ template<typename T>
Status DecodeInto(const Slice& src, T* target) {
auto tmp = src;
auto s = target->DecodeFrom(&tmp);
if (!s.ok() || !tmp.empty()) {
if (s.ok() && !tmp.empty()) {
s = Status::Corruption(Slice());
}
return s;
Expand Down
Loading

0 comments on commit 24a76c3

Please sign in to comment.