From 24a76c3a36ebc1bc5d1acedd1d4091b275fb24f4 Mon Sep 17 00:00:00 2001 From: Huachao Huang Date: Thu, 30 Aug 2018 15:08:21 +0800 Subject: [PATCH] titandb: add blob cache and blob prefetcher (#31) * titandb: fix two bugs * titandb: add blob cache and blob prefetcher --- utilities/titandb/blob_file_cache.cc | 80 ++++++++++------------ utilities/titandb/blob_file_cache.h | 15 ++--- utilities/titandb/blob_file_reader.cc | 90 +++++++++++++++++++++++-- utilities/titandb/blob_file_reader.h | 56 ++++++++------- utilities/titandb/blob_file_test.cc | 13 +++- utilities/titandb/blob_format.h | 2 +- utilities/titandb/db_iter.h | 17 ++--- utilities/titandb/options.h | 5 ++ utilities/titandb/table_builder_test.cc | 1 - utilities/titandb/version.cc | 8 +-- utilities/titandb/version.h | 8 +-- 11 files changed, 182 insertions(+), 113 deletions(-) diff --git a/utilities/titandb/blob_file_cache.cc b/utilities/titandb/blob_file_cache.cc index beaead43aac..22cc0baa83b 100644 --- a/utilities/titandb/blob_file_cache.cc +++ b/utilities/titandb/blob_file_cache.cc @@ -8,12 +8,18 @@ namespace titandb { namespace { -void DeleteValue(const Slice&, void* value) { +Slice EncodeFileNumber(const uint64_t* number) { + return Slice(reinterpret_cast(number), sizeof(*number)); +} + +void DeleteFileReader(const Slice&, void* value) { delete reinterpret_cast(value); } -Slice FileNumberSlice(const uint64_t* number) { - return Slice(reinterpret_cast(number), sizeof(*number)); +void UnrefCacheHandle(void* arg1, void* arg2) { + Cache* cache = reinterpret_cast(arg1); + Cache::Handle* h = reinterpret_cast(arg2); + cache->Release(h); } } @@ -32,72 +38,60 @@ Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_size, const BlobHandle& handle, BlobRecord* record, std::string* buffer) { - Cache::Handle* ch = nullptr; - Status s = FindFile(file_number, file_size, &ch); + Cache::Handle* cache_handle = nullptr; + Status s = FindFile(file_number, file_size, &cache_handle); if (!s.ok()) return s; - auto reader = reinterpret_cast(cache_->Value(ch)); + auto reader = reinterpret_cast(cache_->Value(cache_handle)); s = reader->Get(options, handle, record, buffer); - cache_->Release(ch); + cache_->Release(cache_handle); return s; } -Status BlobFileCache::NewReader(const ReadOptions& options, - uint64_t file_number, - uint64_t file_size, - std::unique_ptr* result) { - Cache::Handle* ch = nullptr; - Status s = FindFile(file_number, file_size, &ch); - if (!s.ok()) return s; - - auto reader = reinterpret_cast(cache_->Value(ch)); - auto blob_file = reader->GetBlobFile(); - cache_->Release(ch); - - std::unique_ptr file; - s = NewRandomAccessReader(file_number, options.readahead_size, &file); +Status BlobFileCache::NewPrefetcher( + uint64_t file_number, + uint64_t file_size, + std::unique_ptr* result) { + Cache::Handle* cache_handle = nullptr; + Status s = FindFile(file_number, file_size, &cache_handle); if (!s.ok()) return s; - result->reset(new BlobFileReader(cf_options_, blob_file, std::move(file))); + auto reader = reinterpret_cast(cache_->Value(cache_handle)); + auto prefetcher = new BlobFilePrefetcher(reader); + prefetcher->RegisterCleanup(&UnrefCacheHandle, cache_.get(), cache_handle); + result->reset(prefetcher); return s; } void BlobFileCache::Evict(uint64_t file_number) { - cache_->Erase(FileNumberSlice(&file_number)); + cache_->Erase(EncodeFileNumber(&file_number)); } Status BlobFileCache::FindFile(uint64_t file_number, uint64_t file_size, Cache::Handle** handle) { Status s; - Slice number = FileNumberSlice(&file_number); - *handle = cache_->Lookup(number); + Slice cache_key = EncodeFileNumber(&file_number); + *handle = cache_->Lookup(cache_key); if (*handle) return s; std::unique_ptr file; - s = NewRandomAccessReader(file_number, 0, &file); - if (!s.ok()) return s; + { + std::unique_ptr f; + auto file_name = BlobFileName(db_options_.dirname, file_number); + s = env_->NewRandomAccessFile(file_name, &f, env_options_); + if (!s.ok()) return s; + if (db_options_.advise_random_on_open) { + f->Hint(RandomAccessFile::RANDOM); + } + file.reset(new RandomAccessFileReader(std::move(f), file_name)); + } std::unique_ptr reader; s = BlobFileReader::Open(cf_options_, std::move(file), file_size, &reader); if (!s.ok()) return s; - cache_->Insert(number, reader.release(), 1, &DeleteValue, handle); - return s; -} - -Status BlobFileCache::NewRandomAccessReader( - uint64_t file_number, uint64_t readahead_size, - std::unique_ptr* result) { - std::unique_ptr file; - auto file_name = BlobFileName(db_options_.dirname, file_number); - Status s = env_->NewRandomAccessFile(file_name, &file, env_options_); - if (!s.ok()) return s; - - if (readahead_size > 0) { - file = NewReadaheadRandomAccessFile(std::move(file), readahead_size); - } - result->reset(new RandomAccessFileReader(std::move(file), file_name)); + cache_->Insert(cache_key, reader.release(), 1, &DeleteFileReader, handle); return s; } diff --git a/utilities/titandb/blob_file_cache.h b/utilities/titandb/blob_file_cache.h index a574d144fe2..1f8df240b2a 100644 --- a/utilities/titandb/blob_file_cache.h +++ b/utilities/titandb/blob_file_cache.h @@ -25,13 +25,10 @@ class BlobFileCache { const BlobHandle& handle, BlobRecord* record, std::string* buffer); - // Creates a new blob file reader for the specified file number. The - // corresponding file size must be exactly "file_size" bytes. - // If successful, sets "*result" to the new blob file reader. - Status NewReader(const ReadOptions& options, - uint64_t file_number, - uint64_t file_size, - std::unique_ptr* result); + // Creates a prefetcher for the specified file number. + Status NewPrefetcher(uint64_t file_number, + uint64_t file_size, + std::unique_ptr* result); // Evicts the file cache for the specified file number. void Evict(uint64_t file_number); @@ -44,10 +41,6 @@ class BlobFileCache { uint64_t file_size, Cache::Handle** handle); - Status NewRandomAccessReader(uint64_t file_number, - uint64_t readahead_size, - std::unique_ptr* result); - Env* env_; EnvOptions env_options_; TitanDBOptions db_options_; diff --git a/utilities/titandb/blob_file_reader.cc b/utilities/titandb/blob_file_reader.cc index e3b01cf09f2..351f77577bf 100644 --- a/utilities/titandb/blob_file_reader.cc +++ b/utilities/titandb/blob_file_reader.cc @@ -3,6 +3,32 @@ namespace rocksdb { namespace titandb { +const uint64_t kMinReadaheadSize = 4 << 10; +const uint64_t kMaxReadaheadSize = 256 << 10; + +namespace { + +void GenerateCachePrefix(std::string* dst, Cache* cc, RandomAccessFile* file) { + char buffer[kMaxVarint64Length * 3 + 1]; + auto size = file->GetUniqueId(buffer, sizeof(buffer)); + if (size == 0) { + auto end = EncodeVarint64(buffer, cc->NewId()); + size = end - buffer; + } + dst->assign(buffer, size); +} + +void EncodeBlobCache(std::string* dst, const Slice& prefix, uint64_t offset) { + dst->assign(prefix.data(), prefix.size()); + PutVarint64(dst, offset); +} + +void DeleteBlobCache(const Slice&, void* value) { + delete reinterpret_cast(value); +} + +} + Status BlobFileReader::Open(const TitanCFOptions& options, std::unique_ptr file, uint64_t file_size, @@ -18,25 +44,75 @@ Status BlobFileReader::Open(const TitanCFOptions& options, &footer_input, footer_space); if (!s.ok()) return s; - std::shared_ptr blob_file(new BlobFile); - s = DecodeInto(footer_input, &blob_file->footer_); + BlobFileFooter footer; + s = DecodeInto(footer_input, &footer); if (!s.ok()) return s; - result->reset(new BlobFileReader(options, blob_file, std::move(file))); + auto reader = new BlobFileReader(options, std::move(file)); + reader->footer_ = footer; + result->reset(reader); return s; } +BlobFileReader::BlobFileReader(const TitanCFOptions& options, + std::unique_ptr file) + : options_(options), + file_(std::move(file)), + cache_(options.blob_cache) { + if (cache_) { + GenerateCachePrefix(&cache_prefix_, cache_.get(), file_->file()); + } +} + Status BlobFileReader::Get(const ReadOptions& /*options*/, const BlobHandle& handle, BlobRecord* record, std::string* buffer) { + Status s; + std::string cache_key; + if (cache_) { + EncodeBlobCache(&cache_key, cache_prefix_, handle.offset); + auto cache_handle = cache_->Lookup(cache_key); + if (cache_handle) { + *buffer = *reinterpret_cast(cache_->Value(cache_handle)); + s = DecodeInto(*buffer, record); + cache_->Release(cache_handle); + return s; + } + } + Slice blob; - buffer->clear(); - buffer->reserve(handle.size); - Status s = file_->Read(handle.offset, handle.size, - &blob, const_cast(buffer->data())); + buffer->resize(handle.size); + s = file_->Read(handle.offset, handle.size, &blob, &(*buffer)[0]); if (!s.ok()) return s; + + if (cache_) { + auto cache_value = new std::string(*buffer); + cache_->Insert(cache_key, cache_value, cache_value->size(), + &DeleteBlobCache); + } + return DecodeInto(blob, record); } +Status BlobFilePrefetcher::Get(const ReadOptions& options, + const BlobHandle& handle, + BlobRecord* record, std::string* buffer) { + if (handle.offset == last_offset_) { + last_offset_ = handle.offset + handle.size; + if (handle.offset + handle.size > readahead_limit_) { + readahead_size_ = std::max(handle.size, readahead_size_); + reader_->file_->Prefetch(handle.offset, readahead_size_); + readahead_limit_ = handle.offset + readahead_size_; + readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ * 2); + } + } else { + last_offset_ = handle.offset + handle.size; + readahead_size_ = 0; + readahead_limit_ = 0; + } + + return reader_->Get(options, handle, record, buffer); +} + } // namespace titandb } // namespace rocksdb diff --git a/utilities/titandb/blob_file_reader.h b/utilities/titandb/blob_file_reader.h index 392ec9abc85..7edd364de05 100644 --- a/utilities/titandb/blob_file_reader.h +++ b/utilities/titandb/blob_file_reader.h @@ -7,19 +7,6 @@ namespace rocksdb { namespace titandb { -// Represents the information of a blob file read from the file. -class BlobFile { - public: - const BlobFileFooter& footer() const { return footer_; } - - private: - friend class BlobFileReader; - - BlobFile() = default; - - BlobFileFooter footer_; -}; - class BlobFileReader { public: // Opens a blob file and read the necessary metadata from it. @@ -29,15 +16,6 @@ class BlobFileReader { uint64_t file_size, std::unique_ptr* result); - // Constructs a reader with the shared blob file. The provided blob - // file must be corresponding to the "file". - BlobFileReader(const TitanCFOptions& options, - std::shared_ptr blob_file, - std::unique_ptr file) - : options_(options), - blob_file_(blob_file), - file_(std::move(file)) {} - // Gets the blob record pointed by the handle in this file. The data // of the record is stored in the provided buffer, so the buffer // must be valid when the record is used. @@ -45,13 +23,39 @@ class BlobFileReader { const BlobHandle& handle, BlobRecord* record, std::string* buffer); - // Returns a shared reference to the blob file. - std::shared_ptr GetBlobFile() const { return blob_file_; } - private: + friend class BlobFilePrefetcher; + + BlobFileReader(const TitanCFOptions& options, + std::unique_ptr file); + TitanCFOptions options_; - std::shared_ptr blob_file_; std::unique_ptr file_; + + std::shared_ptr cache_; + std::string cache_prefix_; + + // Information read from the file. + BlobFileFooter footer_; +}; + +// Performs readahead on continuous reads. +class BlobFilePrefetcher : public Cleanable { + public: + // Constructs a prefetcher with the blob file reader. + // "*reader" must be valid when the prefetcher is used. + BlobFilePrefetcher(BlobFileReader* reader) + : reader_(reader) {} + + Status Get(const ReadOptions& options, + const BlobHandle& handle, + BlobRecord* record, std::string* buffer); + + private: + BlobFileReader* reader_; + uint64_t last_offset_ {0}; + uint64_t readahead_size_ {0}; + uint64_t readahead_limit_ {0}; }; } // namespace titandb diff --git a/utilities/titandb/blob_file_test.cc b/utilities/titandb/blob_file_test.cc index 1bf7036c831..7b824590f1f 100644 --- a/utilities/titandb/blob_file_test.cc +++ b/utilities/titandb/blob_file_test.cc @@ -52,8 +52,8 @@ class BlobFileTest : public testing::Test { ASSERT_OK(env_->GetFileSize(file_name_, &file_size)); ReadOptions ro; - std::unique_ptr reader; - ASSERT_OK(cache.NewReader(ro, file_number_, file_size, &reader)); + std::unique_ptr prefetcher; + ASSERT_OK(cache.NewPrefetcher(file_number_, file_size, &prefetcher)); for (int i = 0; i < n; i++) { auto id = std::to_string(i); BlobRecord expect; @@ -61,11 +61,16 @@ class BlobFileTest : public testing::Test { expect.value = id; BlobRecord record; std::string buffer; - ASSERT_OK(reader->Get(ro, handles[i], &record, &buffer)); + ASSERT_OK(cache.Get(ro, file_number_, file_size, handles[i], + &record, &buffer)); ASSERT_EQ(record, expect); ASSERT_OK(cache.Get(ro, file_number_, file_size, handles[i], &record, &buffer)); ASSERT_EQ(record, expect); + ASSERT_OK(prefetcher->Get(ro, handles[i], &record, &buffer)); + ASSERT_EQ(record, expect); + ASSERT_OK(prefetcher->Get(ro, handles[i], &record, &buffer)); + ASSERT_EQ(record, expect); } } @@ -79,6 +84,8 @@ class BlobFileTest : public testing::Test { TEST_F(BlobFileTest, Basic) { TitanOptions options; TestBlobFile(options); + options.blob_cache = NewLRUCache(1 << 20); + TestBlobFile(options); } } // namespace titandb diff --git a/utilities/titandb/blob_format.h b/utilities/titandb/blob_format.h index 0f3e665d8fc..3be816cfce0 100644 --- a/utilities/titandb/blob_format.h +++ b/utilities/titandb/blob_format.h @@ -93,7 +93,7 @@ template Status DecodeInto(const Slice& src, T* target) { auto tmp = src; auto s = target->DecodeFrom(&tmp); - if (!s.ok() || !tmp.empty()) { + if (s.ok() && !tmp.empty()) { s = Status::Corruption(Slice()); } return s; diff --git a/utilities/titandb/db_iter.h b/utilities/titandb/db_iter.h index a1037f45021..6420f85a955 100644 --- a/utilities/titandb/db_iter.h +++ b/utilities/titandb/db_iter.h @@ -96,17 +96,12 @@ class TitanDBIterator : public Iterator { status_ = DecodeInto(iter_->value(), &index); if (!status_.ok()) return; - if (!options_.readahead_size) { - status_ = storage_->Get(options_, index, &record_, &buffer_); - return; - } - - auto it = cache_.find(index.file_number); - if (it == cache_.end()) { - std::unique_ptr reader; - status_ = storage_->NewReader(options_, index.file_number, &reader); + auto it = files_.find(index.file_number); + if (it == files_.end()) { + std::unique_ptr prefetcher; + status_ = storage_->NewPrefetcher(index.file_number, &prefetcher); if (!status_.ok()) return; - it = cache_.emplace(index.file_number, std::move(reader)).first; + it = files_.emplace(index.file_number, std::move(prefetcher)).first; } status_ = it->second->Get(options_, index.blob_handle, &record_, &buffer_); } @@ -119,7 +114,7 @@ class TitanDBIterator : public Iterator { std::shared_ptr storage_; std::shared_ptr snap_; std::unique_ptr iter_; - std::map> cache_; + std::map> files_; }; } // namespace titandb diff --git a/utilities/titandb/options.h b/utilities/titandb/options.h index 0e9b38eb30f..a5e620ab332 100644 --- a/utilities/titandb/options.h +++ b/utilities/titandb/options.h @@ -34,6 +34,11 @@ struct TitanCFOptions : public ColumnFamilyOptions { // Default: 256MB uint64_t blob_file_target_size {256 << 20}; + // If non-NULL use the specified cache for blob records. + // + // Default: nullptr + std::shared_ptr blob_cache; + TitanCFOptions() = default; explicit TitanCFOptions(const ColumnFamilyOptions& options) : ColumnFamilyOptions(options) {} diff --git a/utilities/titandb/table_builder_test.cc b/utilities/titandb/table_builder_test.cc index 3ee0118700c..a5a2fb8a27c 100644 --- a/utilities/titandb/table_builder_test.cc +++ b/utilities/titandb/table_builder_test.cc @@ -118,7 +118,6 @@ class TableBuilderTest : public testing::Test { void NewBlobFileReader(std::unique_ptr* result) { std::unique_ptr file; NewFileReader(blob_name_, &file); - std::unique_ptr blob_file; uint64_t file_size = 0; ASSERT_OK(env_->GetFileSize(blob_name_, &file_size)); ASSERT_OK(BlobFileReader::Open( diff --git a/utilities/titandb/version.cc b/utilities/titandb/version.cc index 41c672e0216..848e50777ad 100644 --- a/utilities/titandb/version.cc +++ b/utilities/titandb/version.cc @@ -14,14 +14,12 @@ Status BlobStorage::Get(const ReadOptions& options, record, buffer); } -Status BlobStorage::NewReader(const ReadOptions& options, - uint64_t file_number, - std::unique_ptr* result) { +Status BlobStorage::NewPrefetcher(uint64_t file_number, + std::unique_ptr* result) { const BlobFileMeta* file; Status s = FindFile(file_number, &file); if (!s.ok()) return s; - return file_cache_->NewReader( - options, file->file_number, file->file_size, result); + return file_cache_->NewPrefetcher(file->file_number, file->file_size, result); } Status BlobStorage::FindFile(uint64_t file_number, const BlobFileMeta** file) { diff --git a/utilities/titandb/version.h b/utilities/titandb/version.h index 73ab41a7184..92752b174bc 100644 --- a/utilities/titandb/version.h +++ b/utilities/titandb/version.h @@ -21,11 +21,9 @@ class BlobStorage { const BlobIndex& index, BlobRecord* record, std::string* buffer); - // Creates a new blob file reader for the specified file number. - // If successful, sets "*result" to the new blob file reader. - Status NewReader(const ReadOptions& options, - uint64_t file_number, - std::unique_ptr* result); + // Creates a prefetcher for the specified file number. + Status NewPrefetcher(uint64_t file_number, + std::unique_ptr* result); // Finds the blob file meta for the specified file number. It is a // corruption if the file doesn't exist in the specific version.