Skip to content

Commit

Permalink
Storage: Fix getSquashDeleteRange does not return correctly squashe…
Browse files Browse the repository at this point in the history
…d key-range when using common-handle (#9530)

close #9529

Storage: Fix `getSquashDeleteRange` does not return correctly squashed key-range when using common-handle
  • Loading branch information
JaySon-Huang authored Oct 15, 2024
1 parent ce0d411 commit 5e09f4a
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace DB::DM
{
RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange() const
RowKeyRange ColumnFileSetSnapshot::getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const
{
RowKeyRange squashed_delete_range = RowKeyRange::newNone(is_common_handle, rowkey_column_size);
for (const auto & column_file : column_files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class BlockOrDelete
{}

bool isBlock() { return static_cast<bool>(block); }
auto & getBlock() { return block; };
auto & getBlock() { return block; }
auto getBlockOffset() const { return block_offset; }
auto & getDeleteRange() { return delete_range; }
};
Expand All @@ -64,9 +64,6 @@ class ColumnFileSetSnapshot
size_t bytes{0};
size_t deletes{0};

bool is_common_handle{false};
size_t rowkey_column_size{0};

public:
/// This field is public writeable intentionally. It allows us to build a snapshot first,
/// then change how these data can be read later.
Expand All @@ -87,8 +84,6 @@ class ColumnFileSetSnapshot
c->rows = rows;
c->bytes = bytes;
c->deletes = deletes;
c->is_common_handle = is_common_handle;
c->rowkey_column_size = rowkey_column_size;

return c;
}
Expand All @@ -100,7 +95,7 @@ class ColumnFileSetSnapshot
size_t getBytes() const { return bytes; }
size_t getDeletes() const { return deletes; }

RowKeyRange getSquashDeleteRange() const;
RowKeyRange getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const;

const auto & getDataProvider() const { return data_provider; }
};
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,14 @@ class DeltaValueSnapshot
size_t getMemTableSetRowsOffset() const { return persisted_files_snap->getRows(); }
size_t getMemTableSetDeletesOffset() const { return persisted_files_snap->getDeletes(); }

RowKeyRange getSquashDeleteRange() const;
/**
* Get a "squash" delete range in the delta layer. We can use this to ** estimate ** how many rows in the
* stable layer is pending for delete.
* The squash delete range may be larger than that will actually removed rows. For example, if there are
* two delete range [0, 30) and [100, 200), the squash delete range will be [0, 200). Actually after applying
* the two delete range, the rows [30, 100) will not be removed.
*/
RowKeyRange getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const;

const auto & getSharedDeltaIndex() const { return shared_delta_index; }
size_t getDeltaIndexEpoch() const { return delta_index_epoch; }
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(
return snap;
}

RowKeyRange DeltaValueSnapshot::getSquashDeleteRange() const
RowKeyRange DeltaValueSnapshot::getSquashDeleteRange(bool is_common_handle, size_t rowkey_column_size) const
{
auto delete_range1 = mem_table_snap->getSquashDeleteRange();
auto delete_range2 = persisted_files_snap->getSquashDeleteRange();
auto delete_range1 = mem_table_snap->getSquashDeleteRange(is_common_handle, rowkey_column_size);
auto delete_range2 = persisted_files_snap->getSquashDeleteRange(is_common_handle, rowkey_column_size);
return delete_range1.merge(delete_range2);
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,9 @@ bool shouldCompactDeltaWithStable(
double invalid_data_ratio_threshold,
const LoggerPtr & log)
{
auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range);
auto actual_delete_range
= snap->delta->getSquashDeleteRange(segment_range.is_common_handle, segment_range.rowkey_column_size)
.shrink(segment_range);
if (actual_delete_range.none())
return false;

Expand Down
31 changes: 31 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Storages/DeltaMerge/File/DMFilePackFilter.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>

#include <magic_enum.hpp>

namespace DB::DM
{

Expand All @@ -31,6 +35,33 @@ void DMFilePackFilter::init(ReadTag read_tag)
std::vector<RSOperatorPtr> handle_filters;
for (auto & rowkey_range : rowkey_ranges)
handle_filters.emplace_back(toFilter(rowkey_range));
#ifndef NDEBUG
// sanity check under debug mode to ensure the rowkey_range is correct common-handle or int64-handle
if (!rowkey_ranges.empty())
{
bool is_common_handle = rowkey_ranges.begin()->is_common_handle;
auto handle_col_type = dmfile->getColumnStat(EXTRA_HANDLE_COLUMN_ID).type;
if (is_common_handle)
RUNTIME_CHECK_MSG(
handle_col_type->getTypeId() == TypeIndex::String,
"handle_col_type_id={}",
magic_enum::enum_name(handle_col_type->getTypeId()));
else
RUNTIME_CHECK_MSG(
handle_col_type->getTypeId() == TypeIndex::Int64,
"handle_col_type_id={}",
magic_enum::enum_name(handle_col_type->getTypeId()));
for (size_t i = 1; i < rowkey_ranges.size(); ++i)
{
RUNTIME_CHECK_MSG(
is_common_handle == rowkey_ranges[i].is_common_handle,
"i={} is_common_handle={} ith.is_common_handle={}",
i,
is_common_handle,
rowkey_ranges[i].is_common_handle);
}
}
#endif
for (size_t i = 0; i < pack_count; ++i)
{
handle_res[i] = RSResult::None;
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Storages/DeltaMerge/Filter/FilterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ namespace DB::DM

inline RSOperatorPtr toFilter(RowKeyRange & rowkey_range)
{
Attr handle_attr
= {EXTRA_HANDLE_COLUMN_NAME,
EXTRA_HANDLE_COLUMN_ID,
rowkey_range.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE};
Attr handle_attr = {
EXTRA_HANDLE_COLUMN_NAME,
EXTRA_HANDLE_COLUMN_ID,
rowkey_range.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE,
};
if (rowkey_range.is_common_handle)
{
auto left = createGreaterEqual(
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <Common/nocopyable.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/Remote/DisaggTaskId.h>
#include <Storages/DeltaMerge/Remote/Proto/remote.pb.h>
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ ColumnFileSetSnapshotPtr Serializer::deserializeColumnFileSet(
{
auto empty_data_provider = std::make_shared<ColumnFileDataProviderNop>();
auto ret = std::make_shared<ColumnFileSetSnapshot>(empty_data_provider);
ret->is_common_handle = segment_range.is_common_handle;
ret->rowkey_column_size = segment_range.rowkey_column_size;
ret->column_files.reserve(proto.size());
for (const auto & remote_column_file : proto)
{
Expand Down
46 changes: 26 additions & 20 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/File/DMFileWriter.h>
#include <Storages/DeltaMerge/Range.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
Expand Down Expand Up @@ -2122,28 +2123,33 @@ try
RowKeyRange range;
Int64 start, end;
};
std::vector<QueryRangeInfo> ranges;
ranges.emplace_back(
DMTestEnv::getRowKeyRangeForClusteredIndex(0, span_per_part, rowkey_column_size),
0,
span_per_part); // only first part
ranges.emplace_back(
DMTestEnv::getRowKeyRangeForClusteredIndex(800, num_rows_write, rowkey_column_size),
800,
num_rows_write);
ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(256, 700, rowkey_column_size), 256, 700); //
ranges.emplace_back(DMTestEnv::getRowKeyRangeForClusteredIndex(0, 0, rowkey_column_size), 0, 0); // none
ranges.emplace_back(
DMTestEnv::getRowKeyRangeForClusteredIndex(0, num_rows_write, rowkey_column_size),
0,
num_rows_write); // full range
ranges.emplace_back(
DMTestEnv::getRowKeyRangeForClusteredIndex(
std::vector<QueryRangeInfo> ranges{
QueryRangeInfo{
DMTestEnv::getRowKeyRangeForClusteredIndex(0, span_per_part, rowkey_column_size),
0,
span_per_part // only first part
},
QueryRangeInfo{
DMTestEnv::getRowKeyRangeForClusteredIndex(800, num_rows_write, rowkey_column_size),
800,
num_rows_write},
QueryRangeInfo{DMTestEnv::getRowKeyRangeForClusteredIndex(256, 700, rowkey_column_size), 256, 700},
QueryRangeInfo{DMTestEnv::getRowKeyRangeForClusteredIndex(0, 0, rowkey_column_size), 0, 0}, //none
QueryRangeInfo{
DMTestEnv::getRowKeyRangeForClusteredIndex(0, num_rows_write, rowkey_column_size),
0,
num_rows_write,
}, // full range
QueryRangeInfo{
DMTestEnv::getRowKeyRangeForClusteredIndex(
std::numeric_limits<Int64>::min(),
std::numeric_limits<Int64>::max(),
rowkey_column_size),
std::numeric_limits<Int64>::min(),
std::numeric_limits<Int64>::max(),
rowkey_column_size),
std::numeric_limits<Int64>::min(),
std::numeric_limits<Int64>::max()); // full range
}, // full range
};

for (const auto & range : ranges)
{
// Test read
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ try
auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const HandleRange & expect_range) {
// set `is_update=false` to get full squash delete range
auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead);
auto squash_range = snap->delta->getSquashDeleteRange();
auto squash_range = snap->delta->getSquashDeleteRange(/*is_common_handle=*/false, /*rowkey_column_size=*/1);
ASSERT_ROWKEY_RANGE_EQ(squash_range, RowKeyRange::fromHandleRange(expect_range));
};

Expand Down Expand Up @@ -957,6 +957,7 @@ try
HandleRange del{1, 32};
segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)});
SCOPED_TRACE("check after range: " + del.toDebugString());
// suqash_delete_range will consider [1, 100) maybe deleted
check_segment_squash_delete_range(segment, HandleRange{1, 100});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@
#include <ctime>
#include <memory>

namespace DB
namespace CurrentMetrics
{
namespace DM
{
namespace tests
extern const Metric DT_SnapshotOfRead;
extern const Metric DT_SnapshotOfReadRaw;
extern const Metric DT_SnapshotOfSegmentSplit;
extern const Metric DT_SnapshotOfSegmentMerge;
extern const Metric DT_SnapshotOfDeltaMerge;
extern const Metric DT_SnapshotOfPlaceIndex;
} // namespace CurrentMetrics
namespace DB::DM::tests
{

class SegmentCommonHandleTest : public DB::base::TiFlashStorageTestBasic
{
public:
Expand Down Expand Up @@ -582,16 +588,27 @@ try
}

{
// flush segment
// do delta-merge move data to stable
segment = segment->mergeDelta(dmContext(), tableColumns());
}

auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const RowKeyRange & expect_range) {
// set `is_update=false` to get full squash delete range
auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead);
auto squash_range = snap->delta->getSquashDeleteRange(is_common_handle, rowkey_column_size);
ASSERT_ROWKEY_RANGE_EQ(squash_range, expect_range);
};

{
// Test delete range [70, 100)
segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(70, 100, rowkey_column_size)});
// flush segment
auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(70, 100, rowkey_column_size);
SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed
// mem-table
segment->write(dmContext(), {del_row_range});
check_segment_squash_delete_range(segment, del_row_range);
// persisted-file
segment->flushCache(dmContext());
segment = segment->mergeDelta(dmContext(), tableColumns());
check_segment_squash_delete_range(segment, del_row_range);
}

{
Expand Down Expand Up @@ -621,10 +638,15 @@ try

{
// Test delete range [63, 70)
segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(63, 70, rowkey_column_size)});
// flush segment
auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(63, 70, rowkey_column_size);
auto merged_del_range = DMTestEnv::getRowKeyRangeForClusteredIndex(63, 100, rowkey_column_size);
SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed
// mem-table
segment->write(dmContext(), {del_row_range});
check_segment_squash_delete_range(segment, merged_del_range);
// persisted-file
segment->flushCache(dmContext());
segment = segment->mergeDelta(dmContext(), tableColumns());
check_segment_squash_delete_range(segment, merged_del_range);
}

{
Expand Down Expand Up @@ -654,10 +676,14 @@ try

{
// Test delete range [1, 32)
segment->write(dmContext(), {DMTestEnv::getRowKeyRangeForClusteredIndex(1, 32, rowkey_column_size)});
// flush segment
segment->flushCache(dmContext());
segment = segment->mergeDelta(dmContext(), tableColumns());
auto del_row_range = DMTestEnv::getRowKeyRangeForClusteredIndex(1, 32, rowkey_column_size);
SCOPED_TRACE("check after range: " + del_row_range.toDebugString()); // Add trace msg when ASSERT failed
segment->write(dmContext(), {del_row_range});
auto merged_del_range = DMTestEnv::getRowKeyRangeForClusteredIndex(
1,
100,
rowkey_column_size); // suqash_delete_range will consider [1, 100) maybe deleted
check_segment_squash_delete_range(segment, merged_del_range);
}

{
Expand Down Expand Up @@ -871,7 +897,7 @@ try

ASSERT_EQ(c1->size(), c2->size());

for (Int64 i = 0; i < Int64(c1->size()); i++)
for (Int64 i = 0; i < static_cast<Int64>(c1->size()); i++)
{
if (iter1->name == DMTestEnv::pk_name)
{
Expand Down Expand Up @@ -972,8 +998,8 @@ try
segment->write(
dmContext(),
DMTestEnv::getRowKeyRangeForClusteredIndex(
Int64((num_batches_written - 1) * num_rows_per_write),
Int64((num_batches_written - 1) * num_rows_per_write + 2),
static_cast<Int64>((num_batches_written - 1) * num_rows_per_write),
static_cast<Int64>((num_batches_written - 1) * num_rows_per_write + 2),
rowkey_column_size));
}

Expand All @@ -986,7 +1012,7 @@ try
i < num_batches_written * num_rows_per_write;
i++)
{
temp.push_back(Int64(i));
temp.push_back(static_cast<Int64>(i));
}

{
Expand Down Expand Up @@ -1025,6 +1051,4 @@ try
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB
} // namespace DB::DM::tests

0 comments on commit 5e09f4a

Please sign in to comment.