Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the potential concurrency problem when clone the shared delta index #2030

Merged
merged 7 commits into from
May 31, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ std::pair<size_t, size_t> findPack(const DeltaPacks & packs, size_t rows_offset,
if (deletes_count == deletes_offset)
{
if (unlikely(rows_count != rows_offset))
throw Exception("deletes_offset and rows_offset are not matched");
throw Exception("rows_count and rows_offset are expected to be equal. pack_index: " + DB::toString(pack_index)
+ ", pack_size: " + DB::toString(packs.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset));
return {pack_index, 0};
}
++deletes_count;
Expand All @@ -36,14 +39,19 @@ std::pair<size_t, size_t> findPack(const DeltaPacks & packs, size_t rows_offset,
if (rows_count > rows_offset)
{
if (unlikely(deletes_count != deletes_offset))
throw Exception("deletes_offset and rows_offset are not matched");
throw Exception("deletes_count and deletes_offset are expected to be equal. pack_index: " + DB::toString(pack_index)
+ ", pack_size: " + DB::toString(packs.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset));

return {pack_index, pack->getRows() - (rows_count - rows_offset)};
}
}
}
if (rows_count != rows_offset || deletes_count != deletes_offset)
throw Exception("illegal rows_offset(" + DB::toString(rows_offset) + "), deletes_count(" + DB::toString(deletes_count) + ")");
throw Exception("illegal rows_offset and deletes_offset. pack_size: " + DB::toString(packs.size())
+ ", rows_count: " + DB::toString(rows_count) + ", rows_offset: " + DB::toString(rows_offset)
+ ", deletes_count: " + DB::toString(deletes_count) + ", deletes_offset: " + DB::toString(deletes_offset));

return {pack_index, 0};
}
Expand Down
77 changes: 52 additions & 25 deletions dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,6 @@ class DeltaIndex
public:
DeltaIndex() : id(++NEXT_DELTA_INDEX_ID), delta_tree(std::make_shared<DefaultDeltaTree>()), placed_rows(0), placed_deletes(0) {}

DeltaIndex(const DeltaIndex & o) : id(++NEXT_DELTA_INDEX_ID)
{
DeltaTreePtr delta_tree_copy;
{
std::scoped_lock lock(o.mutex);
delta_tree_copy = o.delta_tree;
placed_rows = o.placed_rows;
placed_deletes = o.placed_deletes;
}
delta_tree = std::make_shared<DefaultDeltaTree>(*delta_tree_copy);
}

// For test cases.
DeltaIndex(const DeltaTreePtr & delta_tree_, size_t placed_rows_, size_t placed_deletes_)
: id(++NEXT_DELTA_INDEX_ID), delta_tree(delta_tree_), placed_rows(placed_rows_), placed_deletes(placed_deletes_)
{
Expand Down Expand Up @@ -152,33 +139,73 @@ class DeltaIndex

DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes)
{
// Delete ranges can break MVCC view.
bool safe_to_copy = false;
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy;
size_t placed_deletes_copy;
// Make sure `placed_deletes` is smaller than the required `deletes`,
// Because delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);

if (placed_deletes > deletes)
return std::make_shared<DeltaIndex>();
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= deletes)
{
safe_to_copy = true;
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
placed_deletes_copy = placed_deletes;
}
}

if (safe_to_copy)
{
auto new_delta_tree = std::make_shared<DefaultDeltaTree>(*delta_tree_copy);
return std::make_shared<DeltaIndex>(new_delta_tree, placed_rows_copy, placed_deletes_copy);
}
else
{
// Otherwise, create an empty new DeltaIndex.
return std::make_shared<DeltaIndex>();
}
// Otherwise, clone it.
return std::make_shared<DeltaIndex>(*this);
}

DeltaIndexPtr cloneWithUpdates(const Updates & updates)
{
if (unlikely(updates.empty()))
throw Exception("Unexpected empty updates");

bool safe_to_copy = false;
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy;
size_t placed_deletes_copy;
// Make sure the delta index has only placed deletes in front of the `updates`
{
std::scoped_lock lock(mutex);
// If inserts shuffled before delete range, the old index cannot used any more.
if (placed_deletes > updates.front().delete_ranges_offset)
return std::make_shared<DeltaIndex>();
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= updates.front().delete_ranges_offset)
{
safe_to_copy = true;
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
placed_deletes_copy = placed_deletes;
}
}

// Otherwise clone a new index, and do some updates.
auto new_index = std::make_shared<DeltaIndex>(*this);
new_index->applyUpdates(updates);
return new_index;
if (safe_to_copy)
{
auto new_delta_tree = std::make_shared<DefaultDeltaTree>(*delta_tree_copy);
auto new_index = std::make_shared<DeltaIndex>(new_delta_tree, placed_rows_copy, placed_deletes_copy);
// try to do some updates before return it
new_index->applyUpdates(updates);
return new_index;
}
else
{
// Otherwise, the `updates` shuffled before placed delete range,
// so we need create an empty new DeltaIndex.
return std::make_shared<DeltaIndex>();
}
}
};

Expand Down