Skip to content

Commit

Permalink
Enable blob caching for MultiGetBlob in RocksDB (#10272)
Browse files Browse the repository at this point in the history
Summary:
- [x] Enabled blob caching for MultiGetBlob in RocksDB
- [x] Refactored MultiGetBlob logic and interface in RocksDB
- [x] Cleaned up Version::MultiGetBlob() and moved 'blob'-related code snippets into BlobSource
- [x] Add End-to-end test cases in db_blob_basic_test and also add unit tests in blob_source_test

This task is a part of #10156

Pull Request resolved: #10272

Reviewed By: ltamasi

Differential Revision: D37558112

Pulled By: gangliao

fbshipit-source-id: a73a6a94ffdee0024d5b2a39e6d1c1a7d38664db
  • Loading branch information
gangliao authored and facebook-github-bot committed Jun 30, 2022
1 parent 20754b3 commit 056e08d
Show file tree
Hide file tree
Showing 11 changed files with 756 additions and 316 deletions.
90 changes: 55 additions & 35 deletions db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/file_system.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "table/multiget_context.h"
#include "test_util/sync_point.h"
#include "util/compression.h"
#include "util/crc32c.h"
Expand Down Expand Up @@ -374,40 +375,50 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
return Status::OK();
}

void BlobFileReader::MultiGetBlob(
const ReadOptions& read_options,
const autovector<std::reference_wrapper<const Slice>>& user_keys,
const autovector<uint64_t>& offsets,
const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
autovector<PinnableSlice*>& values, uint64_t* bytes_read) const {
const size_t num_blobs = user_keys.size();
void BlobFileReader::MultiGetBlob(const ReadOptions& read_options,
autovector<BlobReadRequest*>& blob_reqs,
uint64_t* bytes_read) const {
const size_t num_blobs = blob_reqs.size();
assert(num_blobs > 0);
assert(num_blobs == offsets.size());
assert(num_blobs == value_sizes.size());
assert(num_blobs == statuses.size());
assert(num_blobs == values.size());
assert(num_blobs <= MultiGetContext::MAX_BATCH_SIZE);

#ifndef NDEBUG
for (size_t i = 0; i < offsets.size() - 1; ++i) {
assert(offsets[i] <= offsets[i + 1]);
for (size_t i = 0; i < num_blobs - 1; ++i) {
assert(blob_reqs[i]->offset <= blob_reqs[i + 1]->offset);
}
#endif // !NDEBUG

std::vector<FSReadRequest> read_reqs(num_blobs);
std::vector<FSReadRequest> read_reqs;
autovector<uint64_t> adjustments;
uint64_t total_len = 0;
read_reqs.reserve(num_blobs);
for (size_t i = 0; i < num_blobs; ++i) {
const size_t key_size = user_keys[i].get().size();
assert(IsValidBlobOffset(offsets[i], key_size, value_sizes[i], file_size_));
const size_t key_size = blob_reqs[i]->user_key->size();
const uint64_t offset = blob_reqs[i]->offset;
const uint64_t value_size = blob_reqs[i]->len;

if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
*blob_reqs[i]->status = Status::Corruption("Invalid blob offset");
continue;
}
if (blob_reqs[i]->compression != compression_type_) {
*blob_reqs[i]->status =
Status::Corruption("Compression type mismatch when reading a blob");
continue;
}

const uint64_t adjustment =
read_options.verify_checksums
? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
: 0;
assert(offsets[i] >= adjustment);
assert(blob_reqs[i]->offset >= adjustment);
adjustments.push_back(adjustment);
read_reqs[i].offset = offsets[i] - adjustment;
read_reqs[i].len = value_sizes[i] + adjustment;
total_len += read_reqs[i].len;

FSReadRequest read_req;
read_req.offset = blob_reqs[i]->offset - adjustment;
read_req.len = blob_reqs[i]->len + adjustment;
read_reqs.emplace_back(read_req);
total_len += read_req.len;
}

RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, total_len);
Expand Down Expand Up @@ -439,43 +450,52 @@ void BlobFileReader::MultiGetBlob(
for (auto& req : read_reqs) {
req.status.PermitUncheckedError();
}
for (size_t i = 0; i < num_blobs; ++i) {
assert(statuses[i]);
*statuses[i] = s;
for (auto& req : blob_reqs) {
assert(req->status);
if (!req->status->IsCorruption()) {
// Avoid overwriting corruption status.
*req->status = s;
}
}
return;
}

assert(s.ok());

uint64_t total_bytes = 0;
for (size_t i = 0; i < num_blobs; ++i) {
auto& req = read_reqs[i];
const auto& record_slice = req.result;
for (size_t i = 0, j = 0; i < num_blobs; ++i) {
assert(blob_reqs[i]->status);
if (!blob_reqs[i]->status->ok()) {
continue;
}

assert(statuses[i]);
assert(j < read_reqs.size());
auto& req = read_reqs[j++];
const auto& record_slice = req.result;
if (req.status.ok() && record_slice.size() != req.len) {
req.status = IOStatus::Corruption("Failed to read data from blob file");
}

*statuses[i] = req.status;
if (!statuses[i]->ok()) {
*blob_reqs[i]->status = req.status;
if (!blob_reqs[i]->status->ok()) {
continue;
}

// Verify checksums if enabled
if (read_options.verify_checksums) {
*statuses[i] = VerifyBlob(record_slice, user_keys[i], value_sizes[i]);
if (!statuses[i]->ok()) {
*blob_reqs[i]->status =
VerifyBlob(record_slice, *blob_reqs[i]->user_key, blob_reqs[i]->len);
if (!blob_reqs[i]->status->ok()) {
continue;
}
}

// Uncompress blob if needed
Slice value_slice(record_slice.data() + adjustments[i], value_sizes[i]);
*statuses[i] = UncompressBlobIfNeeded(value_slice, compression_type_,
clock_, statistics_, values[i]);
if (statuses[i]->ok()) {
Slice value_slice(record_slice.data() + adjustments[i], blob_reqs[i]->len);
*blob_reqs[i]->status =
UncompressBlobIfNeeded(value_slice, compression_type_, clock_,
statistics_, blob_reqs[i]->result);
if (blob_reqs[i]->status->ok()) {
total_bytes += record_slice.size();
}
}
Expand Down
10 changes: 4 additions & 6 deletions db/blob/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cinttypes>
#include <memory>

#include "db/blob/blob_read_request.h"
#include "file/random_access_file_reader.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/rocksdb_namespace.h"
Expand Down Expand Up @@ -47,12 +48,9 @@ class BlobFileReader {
uint64_t* bytes_read) const;

// offsets must be sorted in ascending order by caller.
void MultiGetBlob(
const ReadOptions& read_options,
const autovector<std::reference_wrapper<const Slice>>& user_keys,
const autovector<uint64_t>& offsets,
const autovector<uint64_t>& value_sizes, autovector<Status*>& statuses,
autovector<PinnableSlice*>& values, uint64_t* bytes_read) const;
void MultiGetBlob(const ReadOptions& read_options,
autovector<BlobReadRequest*>& blob_reqs,
uint64_t* bytes_read) const;

CompressionType GetCompressionType() const { return compression_type_; }

Expand Down
95 changes: 57 additions & 38 deletions db/blob/blob_file_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,21 +194,21 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
// MultiGetBlob
bytes_read = 0;
size_t total_size = 0;
autovector<std::reference_wrapper<const Slice>> key_refs;
for (const auto& key_ref : keys) {
key_refs.emplace_back(std::cref(key_ref));
}
autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
blob_offsets[2]};
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};

std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
&statuses_buf[2]};
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
&value_buf[2]};
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
values, &bytes_read);
std::array<BlobReadRequest, num_blobs> requests_buf;
autovector<BlobReadRequest*> blob_reqs;

for (size_t i = 0; i < num_blobs; ++i) {
requests_buf[i] =
BlobReadRequest(keys[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
blob_reqs.push_back(&requests_buf[i]);
}

reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);

for (size_t i = 0; i < num_blobs; ++i) {
ASSERT_OK(statuses_buf[i]);
ASSERT_EQ(value_buf[i], blobs[i]);
Expand Down Expand Up @@ -300,15 +300,21 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
blob_offsets[0],
blob_offsets[1] - (keys[1].size() - key_refs[1].get().size()),
blob_offsets[2]};
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};

std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
&statuses_buf[2]};
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
&value_buf[2]};
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
values, &bytes_read);
std::array<BlobReadRequest, num_blobs> requests_buf;
autovector<BlobReadRequest*> blob_reqs;

for (size_t i = 0; i < num_blobs; ++i) {
requests_buf[i] =
BlobReadRequest(key_refs[i], offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
blob_reqs.push_back(&requests_buf[i]);
}

reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);

for (size_t i = 0; i < num_blobs; ++i) {
if (i == 1) {
ASSERT_TRUE(statuses_buf[i].IsCorruption());
Expand Down Expand Up @@ -339,17 +345,21 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
Slice wrong_key_slice(incorrect_key, sizeof(incorrect_key) - 1);
key_refs[2] = std::cref(wrong_key_slice);

autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
blob_offsets[2]};
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1], blob_sizes[2]};
std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
&statuses_buf[2]};
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
&value_buf[2]};
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
values, &bytes_read);
std::array<BlobReadRequest, num_blobs> requests_buf;

for (size_t i = 0; i < num_blobs; ++i) {
requests_buf[i] =
BlobReadRequest(key_refs[i], blob_offsets[i], blob_sizes[i],
kNoCompression, &value_buf[i], &statuses_buf[i]);
}

autovector<BlobReadRequest*> blob_reqs = {
&requests_buf[0], &requests_buf[1], &requests_buf[2]};

reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);

for (size_t i = 0; i < num_blobs; ++i) {
if (i == num_blobs - 1) {
ASSERT_TRUE(statuses_buf[i].IsCorruption());
Expand All @@ -376,17 +386,26 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
for (const auto& key_ref : keys) {
key_refs.emplace_back(std::cref(key_ref));
}
autovector<uint64_t> offsets{blob_offsets[0], blob_offsets[1],
blob_offsets[2]};
autovector<uint64_t> sizes{blob_sizes[0], blob_sizes[1] + 1, blob_sizes[2]};

std::array<Status, num_blobs> statuses_buf;
autovector<Status*> statuses{&statuses_buf[0], &statuses_buf[1],
&statuses_buf[2]};
std::array<PinnableSlice, num_blobs> value_buf;
autovector<PinnableSlice*> values{&value_buf[0], &value_buf[1],
&value_buf[2]};
reader->MultiGetBlob(read_options, key_refs, offsets, sizes, statuses,
values, &bytes_read);
std::array<BlobReadRequest, num_blobs> requests_buf;

requests_buf[0] =
BlobReadRequest(key_refs[0], blob_offsets[0], blob_sizes[0],
kNoCompression, &value_buf[0], &statuses_buf[0]);
requests_buf[1] =
BlobReadRequest(key_refs[1], blob_offsets[1], blob_sizes[1] + 1,
kNoCompression, &value_buf[1], &statuses_buf[1]);
requests_buf[2] =
BlobReadRequest(key_refs[2], blob_offsets[2], blob_sizes[2],
kNoCompression, &value_buf[2], &statuses_buf[2]);

autovector<BlobReadRequest*> blob_reqs = {
&requests_buf[0], &requests_buf[1], &requests_buf[2]};

reader->MultiGetBlob(read_options, blob_reqs, &bytes_read);

for (size_t i = 0; i < num_blobs; ++i) {
if (i != 1) {
ASSERT_OK(statuses_buf[i]);
Expand Down
58 changes: 58 additions & 0 deletions db/blob/blob_read_request.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include <cinttypes>

#include "rocksdb/compression_type.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/autovector.h"

namespace ROCKSDB_NAMESPACE {

// A read Blob request structure for use in BlobSource::MultiGetBlob and
// BlobFileReader::MultiGetBlob.
struct BlobReadRequest {
// User key to lookup the paired blob
const Slice* user_key = nullptr;

// File offset in bytes
uint64_t offset = 0;

// Length to read in bytes
size_t len = 0;

// Blob compression type
CompressionType compression = kNoCompression;

// Output parameter set by MultiGetBlob() to point to the data buffer, and
// the number of valid bytes
PinnableSlice* result = nullptr;

// Status of read
Status* status = nullptr;

BlobReadRequest(const Slice& _user_key, uint64_t _offset, size_t _len,
CompressionType _compression, PinnableSlice* _result,
Status* _status)
: user_key(&_user_key),
offset(_offset),
len(_len),
compression(_compression),
result(_result),
status(_status) {}

BlobReadRequest() = default;
BlobReadRequest(const BlobReadRequest& other) = default;
BlobReadRequest& operator=(const BlobReadRequest& other) = default;
};

using BlobFileReadRequests =
std::tuple<uint64_t /* file_number */, uint64_t /* file_size */,
autovector<BlobReadRequest>>;

} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit 056e08d

Please sign in to comment.