Skip to content

Commit

Permalink
[BugFix] The usage and size of pindex is not update during deletion (…
Browse files Browse the repository at this point in the history
…backport #45597) (#45652)

Signed-off-by: zhangqiang <[email protected]>
Co-authored-by: zhangqiang <[email protected]>
  • Loading branch information
mergify[bot] and sevev authored May 15, 2024
1 parent 8c90c73 commit f62aa6a
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
21 changes: 11 additions & 10 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3270,6 +3270,8 @@ Status PersistentIndex::_flush_advance_or_append_wal(size_t n, const Slice* keys
return Status::OK();
}

// 1. insert/upsert: kv num and usage in add_usage_and_size is greater than 0
// 2. erase: kv num and usage in add_usage_and_size is less than 0
Status PersistentIndex::_update_usage_and_size_by_key_length(
std::vector<std::pair<int64_t, int64_t>>& add_usage_and_size) {
if (_key_size > 0) {
Expand All @@ -3285,16 +3287,14 @@ Status PersistentIndex::_update_usage_and_size_by_key_length(
}
} else {
for (int key_size = 1; key_size <= kSliceMaxFixLength; key_size++) {
if (add_usage_and_size[key_size].second > 0) {
auto iter = _usage_and_size_by_key_length.find(key_size);
if (iter == _usage_and_size_by_key_length.end()) {
std::string msg = strings::Substitute("no key_size: $0 in usage info", key_size);
LOG(WARNING) << msg;
return Status::InternalError(msg);
} else {
iter->second.first = std::max(0L, iter->second.first + add_usage_and_size[key_size].first);
iter->second.second = std::max(0L, iter->second.second + add_usage_and_size[key_size].second);
}
auto iter = _usage_and_size_by_key_length.find(key_size);
if (iter == _usage_and_size_by_key_length.end()) {
std::string msg = strings::Substitute("no key_size: $0 in usage info", key_size);
LOG(WARNING) << msg;
return Status::InternalError(msg);
} else {
iter->second.first = std::max(0L, iter->second.first + add_usage_and_size[key_size].first);
iter->second.second = std::max(0L, iter->second.second + add_usage_and_size[key_size].second);
}
}

Expand Down Expand Up @@ -3389,6 +3389,7 @@ Status PersistentIndex::erase(size_t n, const Slice* keys, IndexValue* old_value
}
std::vector<std::pair<int64_t, int64_t>> add_usage_and_size(kFixedMaxKeySize + 1,
std::pair<int64_t, int64_t>(0, 0));
// decrease kv num and usage, the value in add_usage_and_size is less than 0
for (size_t i = 0; i < n; i++) {
if (old_values[i].get_value() != NullIndexValue) {
_size--;
Expand Down
13 changes: 13 additions & 0 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ class PersistentIndex {
size_t key_size() const { return _key_size; }

size_t size() const { return _size; }
size_t usage() const { return _usage; }
size_t memory_usage() const { return _memory_usage.load(); }

EditVersion version() const { return _version; }
Expand Down Expand Up @@ -647,6 +648,18 @@ class PersistentIndex {
return res;
}

// not thread safe, just for unit test
std::pair<int64_t, int64_t> kv_stat_in_estimate_stats() {
std::pair<int64_t, int64_t> res;
for (auto [_, stat] : _usage_and_size_by_key_length) {
res.first += stat.first;
res.second += stat.second;
}
return res;
}

void clear_kv_stat() { _usage_and_size_by_key_length.clear(); }

Status reset(Tablet* tablet, EditVersion version, PersistentIndexMetaPB* index_meta);

void reset_cancel_major_compaction();
Expand Down
27 changes: 27 additions & 0 deletions be/test/storage/persistent_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2267,6 +2267,8 @@ PARALLEL_TEST(PersistentIndexTest, test_index_keep_delete) {
ASSERT_TRUE(index.commit(&index_meta).ok());
ASSERT_TRUE(index.on_commited().ok());
ASSERT_EQ(0, index.kv_num_in_immutable_index());
ASSERT_EQ(0, index.kv_stat_in_estimate_stats().first);
ASSERT_EQ(0, index.kv_stat_in_estimate_stats().second);

ASSERT_OK(index.prepare(EditVersion(cur_version++, 0), N));
// erase non-exist keys
Expand All @@ -2279,6 +2281,31 @@ PARALLEL_TEST(PersistentIndexTest, test_index_keep_delete) {
ASSERT_TRUE(index.commit(&index_meta).ok());
ASSERT_TRUE(index.on_commited().ok());
ASSERT_EQ(N, index.kv_num_in_immutable_index());
ASSERT_EQ(N, index.kv_stat_in_estimate_stats().second);
ASSERT_EQ(index.usage(), index.kv_stat_in_estimate_stats().first);

std::vector<IndexValue> old_values2(keys.size(), IndexValue(NullIndexValue));
ASSERT_OK(index.prepare(EditVersion(cur_version++, 0), N));
// flush advance
config::l0_max_mem_usage = 1024;
ASSERT_TRUE(index.upsert(keys.size(), key_slices.data(), values.data(), old_values2.data()).ok());
ASSERT_TRUE(index.commit(&index_meta).ok());
ASSERT_TRUE(index.on_commited().ok());
ASSERT_EQ(N, index.kv_num_in_immutable_index());
ASSERT_EQ(N, index.kv_stat_in_estimate_stats().second);
ASSERT_EQ(index.usage(), index.kv_stat_in_estimate_stats().first);

vector<IndexValue> erase_old_values2(erase_keys.size());
ASSERT_OK(index.prepare(EditVersion(cur_version++, 0), N));
ASSERT_TRUE(index.erase(erase_keys.size(), erase_key_slices.data(), erase_old_values2.data()).ok());
ASSERT_TRUE(index.commit(&index_meta).ok());
ASSERT_TRUE(index.on_commited().ok());
ASSERT_EQ(0, index.kv_num_in_immutable_index());
ASSERT_EQ(0, index.kv_stat_in_estimate_stats().first);
ASSERT_EQ(0, index.kv_stat_in_estimate_stats().second);

index.clear_kv_stat();
ASSERT_FALSE(index.upsert(keys.size(), key_slices.data(), values.data(), old_values.data()).ok());
}
ASSERT_TRUE(fs::remove_all(kPersistentIndexDir).ok());
}
Expand Down

0 comments on commit f62aa6a

Please sign in to comment.