Skip to content

Commit

Permalink
Fix kBlockCacheTier read for on-disk blob value
Browse files Browse the repository at this point in the history
  • Loading branch information
ajkr committed Mar 20, 2024
1 parent 6ddfa5f commit afe445f
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 24 deletions.
17 changes: 13 additions & 4 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2335,11 +2335,18 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
biter.key(), &parsed_key, false /* log_err_key */); // TODO
if (!pik_status.ok()) {
s = pik_status;
break;
}

if (!get_context->SaveValue(
parsed_key, biter.value(), &matched,
biter.IsValuePinned() ? &biter : nullptr)) {
Status read_status;
bool ret = get_context->SaveValue(
parsed_key, biter.value(), &matched, &read_status,
biter.IsValuePinned() ? &biter : nullptr);
if (!read_status.ok()) {
s = read_status;
break;
}
if (!ret) {
if (get_context->State() == GetContext::GetState::kFound) {
does_referenced_key_exist = true;
referenced_data_size = biter.key().size() + biter.value().size();
Expand All @@ -2348,7 +2355,9 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
break;
}
}
s = biter.status();
if (s.ok()) {
s = biter.status();
}
if (!s.ok()) {
break;
}
Expand Down
15 changes: 12 additions & 3 deletions table/block_based/block_based_table_reader_sync_and_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,9 +735,16 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
biter->key(), &parsed_key, false /* log_err_key */); // TODO
if (!pik_status.ok()) {
s = pik_status;
break;
}
Status read_status;
bool ret = get_context->SaveValue(
parsed_key, biter->value(), &matched, &read_status, value_pinner);
if (!read_status.ok()) {
s = read_status;
break;
}
if (!get_context->SaveValue(parsed_key, biter->value(), &matched,
value_pinner)) {
if (!ret) {
if (get_context->State() == GetContext::GetState::kFound) {
does_referenced_key_exist = true;
referenced_data_size =
Expand All @@ -746,7 +753,9 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet)
done = true;
break;
}
s = biter->status();
if (s.ok()) {
s = biter->status();
}
}
// Write the block cache access.
// XXX: There appear to be 'break' statements above that bypass this
Expand Down
5 changes: 4 additions & 1 deletion table/cuckoo/cuckoo_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/,
return s;
}
bool dont_care __attribute__((__unused__));
get_context->SaveValue(found_ikey, value, &dont_care);
get_context->SaveValue(found_ikey, value, &dont_care, &s);
if (!s.ok()) {
return s;
}
}
// We don't support merge operations. So, we return here.
return Status::OK();
Expand Down
24 changes: 13 additions & 11 deletions table/get_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ void GetContext::ReportCounters() {

bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
const Slice& value, bool* matched,
Cleanable* value_pinner) {
Status* read_status, Cleanable* value_pinner) {
assert(matched);
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
merge_context_ != nullptr);
Expand Down Expand Up @@ -356,8 +356,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
// merge_context_->operand_list
if (type == kTypeBlobIndex) {
PinnableSlice pin_val;
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) ==
false) {
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
read_status) == false) {
return false;
}
Slice blob_value(pin_val);
Expand All @@ -383,8 +383,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
assert(merge_operator_ != nullptr);
if (type == kTypeBlobIndex) {
PinnableSlice pin_val;
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val) ==
false) {
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
read_status) == false) {
return false;
}
Slice blob_value(pin_val);
Expand Down Expand Up @@ -547,14 +547,14 @@ void GetContext::MergeWithWideColumnBaseValue(const Slice& entity) {
}

bool GetContext::GetBlobValue(const Slice& user_key, const Slice& blob_index,
PinnableSlice* blob_value) {
PinnableSlice* blob_value, Status* read_status) {
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
constexpr uint64_t* bytes_read = nullptr;

Status status = blob_fetcher_->FetchBlob(
user_key, blob_index, prefetch_buffer, blob_value, bytes_read);
if (!status.ok()) {
if (status.IsIncomplete()) {
*read_status = blob_fetcher_->FetchBlob(user_key, blob_index, prefetch_buffer,
blob_value, bytes_read);
if (read_status->ok()) {
if (read_status->IsIncomplete()) {
// FIXME: this code is not covered by unit tests
MarkKeyMayExist();
return false;
Expand Down Expand Up @@ -610,7 +610,9 @@ void replayGetContextLog(const Slice& replay_log, const Slice& user_key,

(void)ret;

get_context->SaveValue(ikey, value, &dont_care, value_pinner);
Status read_status;
get_context->SaveValue(ikey, value, &dont_care, &read_status, value_pinner);
read_status.PermitUncheckedError(); // TODO
}
}

Expand Down
5 changes: 3 additions & 2 deletions table/get_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ class GetContext {
// Returns True if more keys need to be read (due to merges) or
// False if the complete value has been found.
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value,
bool* matched, Cleanable* value_pinner = nullptr);
bool* matched, Status* read_status,
Cleanable* value_pinner = nullptr);

// Simplified version of the previous function. Should only be used when we
// know that the operation is a Put.
Expand Down Expand Up @@ -204,7 +205,7 @@ class GetContext {
void MergeWithWideColumnBaseValue(const Slice& entity);

bool GetBlobValue(const Slice& user_key, const Slice& blob_index,
PinnableSlice* blob_value);
PinnableSlice* blob_value, Status* read_status);

void appendToReplayLog(ValueType type, Slice value, Slice ts);

Expand Down
8 changes: 7 additions & 1 deletion table/mock_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,13 @@ Status MockTableReader::Get(const ReadOptions&, const Slice& key,
}

bool dont_care __attribute__((__unused__));
if (!get_context->SaveValue(parsed_key, iter->value(), &dont_care)) {
Status read_status;
bool ret = get_context->SaveValue(parsed_key, iter->value(), &read_status,
&dont_care);
if (!read_status.ok()) {
return read_status;
}
if (!ret) {
break;
}
}
Expand Down
8 changes: 6 additions & 2 deletions table/plain/plain_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,12 @@ Status PlainTableReader::Get(const ReadOptions& /*ro*/, const Slice& target,
// can we enable the fast path?
if (internal_comparator_.Compare(found_key, parsed_target) >= 0) {
bool dont_care __attribute__((__unused__));
if (!get_context->SaveValue(found_key, found_value, &dont_care,
dummy_cleanable_.get())) {
bool ret = get_context->SaveValue(found_key, found_value, &dont_care, &s,
dummy_cleanable_.get());
if (!s.ok()) {
return s;
}
if (!ret) {
break;
}
}
Expand Down

0 comments on commit afe445f

Please sign in to comment.