Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the potential concurrency problem when clone the shared delta index (#2030) #2033

Merged
merged 2 commits into from
May 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ std::pair<size_t, size_t> findPack(const DeltaPacks & packs, size_t rows_offset,
if (deletes_count == deletes_offset)
{
if (unlikely(rows_count != rows_offset))
throw Exception("deletes_offset and rows_offset are not matched");
throw Exception("rows_count and rows_offset are expected to be equal. pack_index: " + DB::toString(pack_index)
+ ", pack_size: " + DB::toString(packs.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset));
return {pack_index, 0};
}
++deletes_count;
Expand All @@ -36,14 +39,19 @@ std::pair<size_t, size_t> findPack(const DeltaPacks & packs, size_t rows_offset,
if (rows_count > rows_offset)
{
if (unlikely(deletes_count != deletes_offset))
throw Exception("deletes_offset and rows_offset are not matched");
throw Exception("deletes_count and deletes_offset are expected to be equal. pack_index: " + DB::toString(pack_index)
+ ", pack_size: " + DB::toString(packs.size()) + ", rows_count: " + DB::toString(rows_count)
+ ", rows_offset: " + DB::toString(rows_offset) + ", deletes_count: " + DB::toString(deletes_count)
+ ", deletes_offset: " + DB::toString(deletes_offset));

return {pack_index, pack->getRows() - (rows_count - rows_offset)};
}
}
}
if (rows_count != rows_offset || deletes_count != deletes_offset)
throw Exception("illegal rows_offset(" + DB::toString(rows_offset) + "), deletes_count(" + DB::toString(deletes_count) + ")");
throw Exception("illegal rows_offset and deletes_offset. pack_size: " + DB::toString(packs.size())
+ ", rows_count: " + DB::toString(rows_count) + ", rows_offset: " + DB::toString(rows_offset)
+ ", deletes_count: " + DB::toString(deletes_count) + ", deletes_offset: " + DB::toString(deletes_offset));

return {pack_index, 0};
}
Expand Down
68 changes: 34 additions & 34 deletions dbms/src/Storages/DeltaMerge/DeltaIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,43 @@ class DeltaIndex
}
}

public:
DeltaIndex() : id(++NEXT_DELTA_INDEX_ID), delta_tree(std::make_shared<DefaultDeltaTree>()), placed_rows(0), placed_deletes(0) {}

DeltaIndex(const DeltaIndex & o) : id(++NEXT_DELTA_INDEX_ID)
DeltaIndexPtr tryCloneInner(size_t placed_deletes_limit, const Updates * updates = nullptr)
{
DeltaTreePtr delta_tree_copy;
size_t placed_rows_copy = 0;
size_t placed_deletes_copy = 0;
// Make sure the delta index do not place more deletes than `placed_deletes_limit`.
// Because delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);
// Safe to reuse the copy of the existing DeltaIndex
if (placed_deletes <= placed_deletes_limit)
{
delta_tree_copy = delta_tree;
placed_rows_copy = placed_rows;
placed_deletes_copy = placed_deletes;
}
}

if (delta_tree_copy)
{
std::scoped_lock lock(o.mutex);
delta_tree_copy = o.delta_tree;
placed_rows = o.placed_rows;
placed_deletes = o.placed_deletes;
auto new_delta_tree = std::make_shared<DefaultDeltaTree>(*delta_tree_copy);
auto new_index = std::make_shared<DeltaIndex>(new_delta_tree, placed_rows_copy, placed_deletes_copy);
// try to do some updates before return it if need
if (updates)
new_index->applyUpdates(*updates);
return new_index;
}
else
{
// Otherwise, create an empty new DeltaIndex.
return std::make_shared<DeltaIndex>();
}
delta_tree = std::make_shared<DefaultDeltaTree>(*delta_tree_copy);
}

// For test cases.
public:
DeltaIndex() : id(++NEXT_DELTA_INDEX_ID), delta_tree(std::make_shared<DefaultDeltaTree>()), placed_rows(0), placed_deletes(0) {}

DeltaIndex(const DeltaTreePtr & delta_tree_, size_t placed_rows_, size_t placed_deletes_)
: id(++NEXT_DELTA_INDEX_ID), delta_tree(delta_tree_), placed_rows(placed_rows_), placed_deletes(placed_deletes_)
{
Expand Down Expand Up @@ -150,37 +171,16 @@ class DeltaIndex
return false;
}

DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes)
{
// Delete ranges can break MVCC view.
{
std::scoped_lock lock(mutex);

if (placed_deletes > deletes)
return std::make_shared<DeltaIndex>();
}
// Otherwise, clone it.
return std::make_shared<DeltaIndex>(*this);
}
DeltaIndexPtr tryClone(size_t /*rows*/, size_t deletes) { return tryCloneInner(deletes); }

DeltaIndexPtr cloneWithUpdates(const Updates & updates)
{
if (unlikely(updates.empty()))
throw Exception("Unexpected empty updates");

{
std::scoped_lock lock(mutex);
// If inserts shuffled before delete range, the old index cannot used any more.
if (placed_deletes > updates.front().delete_ranges_offset)
return std::make_shared<DeltaIndex>();
}

// Otherwise clone a new index, and do some updates.
auto new_index = std::make_shared<DeltaIndex>(*this);
new_index->applyUpdates(updates);
return new_index;
return tryCloneInner(updates.front().delete_ranges_offset, &updates);
}
};

} // namespace DM
} // namespace DB
} // namespace DB