diff --git a/dbms/src/Storages/DeltaMerge/tests/DMTestEnv.h b/dbms/src/Storages/DeltaMerge/tests/DMTestEnv.h new file mode 100644 index 00000000000..6c6343cb96a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/DMTestEnv.h @@ -0,0 +1,503 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace DM +{ +namespace tests +{ +#define GET_REGION_RANGE(start, end, table_id) RowKeyRange::fromHandleRange(::DB::DM::HandleRange((start), (end))).toRegionRange((table_id)) + +// Add this so that we can call typeFromString under namespace DB::DM::tests +using DB::tests::typeFromString; + +using namespace DB::tests; + +/// helper functions for comparing HandleRange +inline ::testing::AssertionResult HandleRangeCompare( + const char * lhs_expr, + const char * rhs_expr, + const HandleRange & lhs, + const HandleRange & rhs) +{ + if (lhs == rhs) + return ::testing::AssertionSuccess(); + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false); +} +/// helper functions for comparing HandleRange +inline ::testing::AssertionResult RowKeyRangeCompare( + const char * lhs_expr, + const char * rhs_expr, + const RowKeyRange & lhs, + const RowKeyRange & rhs) +{ + if (lhs == rhs) + return ::testing::AssertionSuccess(); + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toDebugString(), rhs.toDebugString(), false); +} +#define ASSERT_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2) +#define ASSERT_ROWKEY_RANGE_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::DM::tests::RowKeyRangeCompare, val1, val2) +#define EXPECT_RANGE_EQ(val1, val2) EXPECT_PRED_FORMAT2(::DB::DM::tests::HandleRangeCompare, val1, val2) +#define GET_GTEST_FULL_NAME \ + (String() + ::testing::UnitTest::GetInstance()->current_test_info()->test_case_name() + "." \ + + ::testing::UnitTest::GetInstance()->current_test_info()->name()) + +inline Strings createNumberStrings(size_t beg, size_t end) +{ + Strings values; + for (size_t i = beg; i < end; ++i) + values.emplace_back(DB::toString(i)); + return values; +} + +template +inline std::vector createNumbers(size_t beg, size_t end, bool reversed = false) +{ + std::vector values; + size_t num_rows = end - beg; + for (size_t i = 0; i < num_rows; ++i) + values.emplace_back(reversed ? static_cast(end - 1 - i) : static_cast(beg + i)); + return values; +} + +inline std::vector createSignedNumbers(size_t beg, size_t end) +{ + std::vector values; + for (size_t i = beg; i < end; ++i) + values.emplace_back(i * (i % 2 == 0 ? -1 : 1)); + return values; +} + +// Mock a common_pk_col that composed by number `rowkey_column_size` of int64 value +inline String genMockCommonHandle(Int64 value, size_t rowkey_column_size) +{ + WriteBufferFromOwnString ss; + for (size_t index = 0; index < rowkey_column_size; ++index) + { + ::DB::EncodeUInt(static_cast(TiDB::CodecFlagInt), ss); + ::DB::EncodeInt64(value, ss); + } + return ss.releaseStr(); +} + +class DMTestEnv +{ +public: + static Context getContext(const ::DB::Settings & settings = DB::Settings()) + { + return ::DB::tests::TiFlashTestEnv::getContext(settings); + } + + static constexpr const char * pk_name = "_tidb_rowid"; + + static constexpr const char * PK_NAME_PK_IS_HANDLE = "id"; + + static constexpr ColId PK_ID_PK_IS_HANDLE = 2; + + enum class PkType + { + // If the primary key is composed of multiple columns and non-clustered-index, + // or users don't define the primary key, TiDB will add a hidden "_tidb_rowid" column + // as the handle column + HiddenTiDBRowID, + // Common handle for clustered-index since 5.0.0 + CommonHandle, + // If user define the primary key that is compatibility with UInt64, use that column + // as the handle column + PkIsHandleInt64, + PkIsHandleInt32, + }; + + static String PkTypeToString(PkType type) + { + switch (type) + { + case PkType::HiddenTiDBRowID: + return "HiddenTiDBRowID"; + case PkType::CommonHandle: + return "CommonHandle"; + case PkType::PkIsHandleInt64: + return "PkIsHandleInt64"; + case PkType::PkIsHandleInt32: + return "PkIsHandleInt32"; + } + return ""; + } + + static ColumnDefinesPtr getDefaultColumns(PkType pk_type = PkType::HiddenTiDBRowID) + { + // Return [handle, ver, del] column defines + ColumnDefinesPtr columns = std::make_shared(); + switch (pk_type) + { + case PkType::HiddenTiDBRowID: + columns->emplace_back(getExtraHandleColumnDefine(/*is_common_handle=*/false)); + break; + case PkType::CommonHandle: + columns->emplace_back(getExtraHandleColumnDefine(/*is_common_handle=*/true)); + break; + case PkType::PkIsHandleInt64: + columns->emplace_back(ColumnDefine{PK_ID_PK_IS_HANDLE, PK_NAME_PK_IS_HANDLE, EXTRA_HANDLE_COLUMN_INT_TYPE}); + break; + case PkType::PkIsHandleInt32: + columns->emplace_back(ColumnDefine{PK_ID_PK_IS_HANDLE, PK_NAME_PK_IS_HANDLE, DataTypeFactory::instance().get("Int32")}); + break; + default: + throw Exception("Unknown pk type for test"); + } + columns->emplace_back(getVersionColumnDefine()); + columns->emplace_back(getTagColumnDefine()); + return columns; + } + + /// Returns a NamesAndTypesList that can be used to construct StorageDeltaMerge. + static NamesAndTypesList getDefaultTableColumns(PkType pk_type = PkType::HiddenTiDBRowID) + { + NamesAndTypesList columns; + switch (pk_type) + { + case PkType::HiddenTiDBRowID: + columns.push_back({EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_INT_TYPE}); + break; + case PkType::CommonHandle: + columns.push_back({PK_NAME_PK_IS_HANDLE, EXTRA_HANDLE_COLUMN_STRING_TYPE}); // For common handle, there must be a user-given primary key. + columns.push_back({EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_STRING_TYPE}); // For common handle, a _tidb_rowid is also constructed. + break; + case PkType::PkIsHandleInt64: + columns.emplace_back(PK_NAME_PK_IS_HANDLE, EXTRA_HANDLE_COLUMN_INT_TYPE); + break; + case PkType::PkIsHandleInt32: + throw Exception("PkIsHandleInt32 is unsupported"); + default: + throw Exception("Unknown pk type for test"); + } + return columns; + } + + /// Returns a TableInfo that can be used to construct StorageDeltaMerge. + static TiDB::TableInfo getMinimalTableInfo(TableID table_id, PkType pk_type = PkType::HiddenTiDBRowID) + { + TiDB::TableInfo table_info; + table_info.id = table_id; + switch (pk_type) + { + case PkType::HiddenTiDBRowID: + table_info.is_common_handle = false; + table_info.pk_is_handle = false; + break; + case PkType::CommonHandle: + { + table_info.is_common_handle = true; + table_info.pk_is_handle = false; + ColumnInfo pk_column; // For common handle, there must be a user-given primary key. + pk_column.id = PK_ID_PK_IS_HANDLE; + pk_column.name = PK_NAME_PK_IS_HANDLE; + pk_column.setPriKeyFlag(); + table_info.columns.push_back(pk_column); + break; + } + case PkType::PkIsHandleInt64: + { + table_info.is_common_handle = false; + table_info.pk_is_handle = true; + ColumnInfo pk_column; + pk_column.id = PK_ID_PK_IS_HANDLE; + pk_column.name = PK_NAME_PK_IS_HANDLE; + pk_column.setPriKeyFlag(); + table_info.columns.push_back(pk_column); + break; + } + case PkType::PkIsHandleInt32: + throw Exception("PkIsHandleInt32 is unsupported"); + default: + throw Exception("Unknown pk type for test"); + } + return table_info; + } + + /// Return a ASTPtr that can be used to construct StorageDeltaMerge. + static ASTPtr getPrimaryKeyExpr(const String & table_name, PkType pk_type = PkType::HiddenTiDBRowID) + { + ASTPtr astptr(new ASTIdentifier(table_name, ASTIdentifier::Kind::Table)); + String name; + switch (pk_type) + { + case PkType::HiddenTiDBRowID: + name = EXTRA_HANDLE_COLUMN_NAME; + break; + case PkType::CommonHandle: + name = EXTRA_HANDLE_COLUMN_NAME; + break; + case PkType::PkIsHandleInt64: + name = PK_NAME_PK_IS_HANDLE; + break; + case PkType::PkIsHandleInt32: + throw Exception("PkIsHandleInt32 is unsupported"); + default: + throw Exception("Unknown pk type for test"); + } + astptr->children.emplace_back(new ASTIdentifier(name)); + return astptr; + } + + /** + * Create a simple block with 3 columns: + * * `pk` - Int64 / `version` / `tag` + * @param beg `pk`'s value begin + * @param end `pk`'s value end (not included) + * @param reversed increasing/decreasing insert `pk`'s value + * @return + */ + static Block prepareSimpleWriteBlock(size_t beg, + size_t end, + bool reversed, + UInt64 tso = 2, + const String & pk_name_ = pk_name, + ColumnID pk_col_id = EXTRA_HANDLE_COLUMN_ID, + DataTypePtr pk_type = EXTRA_HANDLE_COLUMN_INT_TYPE, + bool is_common_handle = false, + size_t rowkey_column_size = 1, + bool with_internal_columns = true, + bool is_deleted = false) + { + Block block; + const size_t num_rows = (end - beg); + if (is_common_handle) + { + // common_pk_col + Strings values; + for (size_t i = 0; i < num_rows; i++) + { + Int64 value = reversed ? end - 1 - i : beg + i; + values.emplace_back(genMockCommonHandle(value, rowkey_column_size)); + } + block.insert(DB::tests::createColumn( + std::move(values), + pk_name_, + pk_col_id)); + } + else + { + // int-like pk_col + block.insert(ColumnWithTypeAndName{ + DB::tests::makeColumn(pk_type, createNumbers(beg, end, reversed)), + pk_type, + pk_name_, + pk_col_id}); + // add extra column if need + if (pk_col_id != EXTRA_HANDLE_COLUMN_ID) + { + block.insert(ColumnWithTypeAndName{ + DB::tests::makeColumn(EXTRA_HANDLE_COLUMN_INT_TYPE, createNumbers(beg, end, reversed)), + EXTRA_HANDLE_COLUMN_INT_TYPE, + EXTRA_HANDLE_COLUMN_NAME, + EXTRA_HANDLE_COLUMN_ID}); + } + } + if (with_internal_columns) + { + // version_col + block.insert(DB::tests::createColumn( + std::vector(num_rows, tso), + VERSION_COLUMN_NAME, + VERSION_COLUMN_ID)); + // tag_col + block.insert(DB::tests::createColumn( + std::vector(num_rows, is_deleted), + TAG_COLUMN_NAME, + TAG_COLUMN_ID)); + } + return block; + } + + /** + * Create a simple block with 3 columns: + * * `pk` - Int64 / `version` / `tag` + * @param beg `pk`'s value begin + * @param end `pk`'s value end (not included) + * @param reversed increasing/decreasing insert `pk`'s value + * @return + */ + static Block prepareSimpleWriteBlock(size_t beg, + size_t end, + bool reversed, + PkType pk_type, + UInt64 tso = 2, + bool with_internal_columns = true) + { + switch (pk_type) + { + case PkType::HiddenTiDBRowID: + return prepareSimpleWriteBlock(beg, end, reversed, tso, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_INT_TYPE, false, 1, with_internal_columns); + case PkType::CommonHandle: + return prepareSimpleWriteBlock(beg, end, reversed, tso, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_STRING_TYPE, true, 1, with_internal_columns); + case PkType::PkIsHandleInt64: + return prepareSimpleWriteBlock(beg, end, reversed, tso, PK_NAME_PK_IS_HANDLE, PK_ID_PK_IS_HANDLE, EXTRA_HANDLE_COLUMN_INT_TYPE, false, 1, with_internal_columns); + break; + case PkType::PkIsHandleInt32: + throw Exception("PkIsHandleInt32 is unsupported"); + default: + throw Exception("Unknown pk type for test"); + } + } + + /** + * Create a simple block with 3 columns: + * * `pk` - Int64 / `version` / `tag` + * @param pk `pk`'s value + * @param ts_beg `timestamp`'s value begin + * @param ts_end `timestamp`'s value end (not included) + * @param reversed increasing/decreasing insert `timestamp`'s value + * @param deleted if deleted is false, set `tag` to 0; otherwise set `tag` to 1 + * @return + */ + static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false, bool deleted = false) + { + Block block; + const size_t num_rows = (ts_end - ts_beg); + // int64 pk_col + block.insert(DB::tests::createColumn( + std::vector(num_rows, pk), + pk_name, + EXTRA_HANDLE_COLUMN_ID)); + // version_col + block.insert(DB::tests::createColumn( + createNumbers(ts_beg, ts_end, reversed), + VERSION_COLUMN_NAME, + VERSION_COLUMN_ID)); + // tag_col + block.insert(DB::tests::createColumn( + std::vector(num_rows, deleted ? 1 : 0), + TAG_COLUMN_NAME, + TAG_COLUMN_ID)); + return block; + } + + /// prepare a row like this: + /// {"pk":pk, "version":tso, "delete_mark":mark, "colname":value} + static Block prepareOneRowBlock( + Int64 pk, + UInt64 tso, + UInt8 mark, + const String & colname, + const String & value, + bool is_common_handle, + size_t rowkey_column_size) + { + Block block; + const size_t num_rows = 1; + if (is_common_handle) + { + Strings values{genMockCommonHandle(pk, rowkey_column_size)}; + block.insert(DB::tests::createColumn( + std::move(values), + pk_name, + EXTRA_HANDLE_COLUMN_ID)); + } + else + { + // int64 pk_col + block.insert(DB::tests::createColumn( + std::vector(num_rows, pk), + pk_name, + EXTRA_HANDLE_COLUMN_ID)); + } + // version_col + block.insert(DB::tests::createColumn( + std::vector(num_rows, tso), + VERSION_COLUMN_NAME, + VERSION_COLUMN_ID)); + // tag_col + block.insert(DB::tests::createColumn( + std::vector(num_rows, mark), + TAG_COLUMN_NAME, + TAG_COLUMN_ID)); + // string column + block.insert(DB::tests::createColumn( + Strings{value}, + colname)); + return block; + } + + static void verifyClusteredIndexValue(const String & value, Int64 ans, size_t rowkey_column_size) + { + size_t cursor = 0; + size_t k = 0; + for (; cursor < value.size() && k < rowkey_column_size; k++) + { + cursor++; + Int64 i_value = DB::DecodeInt64(cursor, value); + EXPECT_EQ(i_value, ans); + } + EXPECT_EQ(k, rowkey_column_size); + EXPECT_EQ(cursor, value.size()); + } + + static RowKeyRange getRowKeyRangeForClusteredIndex(Int64 start, Int64 end, size_t rowkey_column_size) + { + RowKeyValue start_key = RowKeyValue(true, std::make_shared(genMockCommonHandle(start, rowkey_column_size))); + RowKeyValue end_key = RowKeyValue(true, std::make_shared(genMockCommonHandle(end, rowkey_column_size))); + return RowKeyRange(start_key, end_key, true, rowkey_column_size); + } + + static Block prepareBlockWithIncreasingPKAndTs(size_t rows, Int64 start_pk, UInt64 start_ts) + { + Block block; + // int64 pk_col + block.insert(DB::tests::createColumn( + createNumbers(start_pk, start_pk + rows), + EXTRA_HANDLE_COLUMN_NAME, + EXTRA_HANDLE_COLUMN_ID)); + // version_col + block.insert(DB::tests::createColumn( + createNumbers(start_ts, start_ts + rows), + VERSION_COLUMN_NAME, + VERSION_COLUMN_ID)); + // tag_col + block.insert(DB::tests::createColumn( + std::vector(rows, 0), + TAG_COLUMN_NAME, + TAG_COLUMN_ID)); + return block; + } + + static int getPseudoRandomNumber() + { + static int num = 0; + return num++; + } +}; + +} // namespace tests +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h index b35dae0cbe2..84fafbc46ef 100644 --- a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h +++ b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h @@ -273,7 +273,8 @@ class DMTestEnv DataTypePtr pk_type = EXTRA_HANDLE_COLUMN_INT_TYPE, bool is_common_handle = false, size_t rowkey_column_size = 1, - bool with_internal_columns = true) + bool with_internal_columns = true, + bool is_deleted = false) { Block block; const size_t num_rows = (end - beg); @@ -324,7 +325,7 @@ class DMTestEnv VERSION_COLUMN_ID)); // tag_col block.insert(DB::tests::createColumn( - std::vector(num_rows, 0), + std::vector(num_rows, is_deleted), TAG_COLUMN_NAME, TAG_COLUMN_ID)); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp new file mode 100644 index 00000000000..1c68ba3bb2a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -0,0 +1,86 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace DM +{ +namespace tests +{ +class SegmentOperationTest : public SegmentTestBasic +{ +protected: + static void SetUpTestCase() {} +}; + +TEST_F(SegmentOperationTest, Issue4956) +try +{ + SegmentTestOptions options; + reloadWithOptions(options); + + // flush data, make the segment can be split. + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + // write data to cache, reproduce the https://github.com/pingcap/tiflash/issues/4956 + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + deleteRangeSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + auto segment_id = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(segment_id.has_value()); + + mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, *segment_id); +} +CATCH + +TEST_F(SegmentOperationTest, TestSegment) +try +{ + SegmentTestOptions options; + reloadWithOptions(options); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + auto segment_id = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_TRUE(segment_id.has_value()); + + size_t origin_rows = getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID); + + writeSegment(*segment_id); + flushSegmentCache(*segment_id); + deleteRangeSegment(*segment_id); + writeSegmentWithDeletedPack(*segment_id); + mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, *segment_id); + + EXPECT_EQ(getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID), origin_rows); +} +CATCH + +TEST_F(SegmentOperationTest, TestSegmentRandom) +try +{ + SegmentTestOptions options; + options.is_common_handle = true; + reloadWithOptions(options); + randomSegmentTest(100); +} +CATCH +} // namespace tests +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp new file mode 100644 index 00000000000..c676f2e08d5 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -0,0 +1,430 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace DM +{ +namespace tests +{ +void SegmentTestBasic::reloadWithOptions(SegmentTestOptions config) +{ + TiFlashStorageTestBasic::SetUp(); + options = config; + table_columns = std::make_shared(); + + root_segment = reload(config.is_common_handle); + ASSERT_EQ(root_segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); + segments.clear(); + segments[DELTA_MERGE_FIRST_SEGMENT_ID] = root_segment; +} + +PageId SegmentTestBasic::createNewSegmentWithSomeData() +{ + SegmentPtr new_segment; + std::tie(root_segment, new_segment) = root_segment->split(dmContext(), tableColumns()); + + const size_t num_rows_write_per_batch = 100; + { + // write to segment and flush + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false); + new_segment->write(dmContext(), std::move(block), true); + } + { + // write to segment and don't flush + Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false); + new_segment->write(dmContext(), std::move(block), false); + } + return new_segment->segmentId(); +} + +size_t SegmentTestBasic::getSegmentRowNumWithoutMVCC(PageId segment_id) +{ + auto segment = segments[segment_id]; + auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + } + in->readSuffix(); + return num_rows_read; +} + +size_t SegmentTestBasic::getSegmentRowNum(PageId segment_id) +{ + auto segment = segments[segment_id]; + auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()}); + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + } + in->readSuffix(); + return num_rows_read; +} + +void SegmentTestBasic::checkSegmentRow(PageId segment_id, size_t expected_row_num) +{ + auto segment = segments[segment_id]; + // read written data + auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()}); + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + } + in->readSuffix(); + ASSERT_EQ(num_rows_read, expected_row_num); +} + +std::optional SegmentTestBasic::splitSegment(PageId segment_id) +{ + auto origin_segment = segments[segment_id]; + size_t origin_segment_row_num = getSegmentRowNum(segment_id); + SegmentPtr segment, new_segment; + std::tie(segment, new_segment) = origin_segment->split(dmContext(), tableColumns()); + if (new_segment) + { + segments[new_segment->segmentId()] = new_segment; + segments[segment_id] = segment; + + EXPECT_EQ(origin_segment_row_num, getSegmentRowNum(segment_id) + getSegmentRowNum(new_segment->segmentId())); + return new_segment->segmentId(); + } + return std::nullopt; +} + +void SegmentTestBasic::mergeSegment(PageId left_segment_id, PageId right_segment_id) +{ + auto left_segment = segments[left_segment_id]; + auto right_segment = segments[right_segment_id]; + + size_t left_segment_row_num = getSegmentRowNum(left_segment_id); + size_t right_segment_row_num = getSegmentRowNum(right_segment_id); + LOG_FMT_TRACE(&Poco::Logger::root(), "merge in segment:{}:{} and {}:{}", left_segment->segmentId(), left_segment_row_num, right_segment->segmentId(), right_segment_row_num); + + SegmentPtr merged_segment = Segment::merge(dmContext(), tableColumns(), left_segment, right_segment); + segments[merged_segment->segmentId()] = merged_segment; + auto it = segments.find(right_segment->segmentId()); + if (it != segments.end()) + { + segments.erase(it); + } + EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), left_segment_row_num + right_segment_row_num); +} + +void SegmentTestBasic::mergeSegmentDelta(PageId segment_id) +{ + auto segment = segments[segment_id]; + size_t segment_row_num = getSegmentRowNum(segment_id); + SegmentPtr merged_segment = segment->mergeDelta(dmContext(), tableColumns()); + segments[merged_segment->segmentId()] = merged_segment; + EXPECT_EQ(getSegmentRowNum(merged_segment->segmentId()), segment_row_num); +} + +void SegmentTestBasic::flushSegmentCache(PageId segment_id) +{ + auto segment = segments[segment_id]; + size_t segment_row_num = getSegmentRowNum(segment_id); + segment->flushCache(dmContext()); + EXPECT_EQ(getSegmentRowNum(segment_id), segment_row_num); +} + +std::pair SegmentTestBasic::getSegmentKeyRange(SegmentPtr segment) +{ + Int64 start_key, end_key; + if (!options.is_common_handle) + { + start_key = segment->getRowKeyRange().getStart().int_value; + end_key = segment->getRowKeyRange().getEnd().int_value; + return {start_key, end_key}; + } + EXPECT_EQ(segment->getRowKeyRange().getStart().data[0], TiDB::CodecFlagInt); + EXPECT_EQ(segment->getRowKeyRange().getEnd().data[0], TiDB::CodecFlagInt); + { + size_t cursor = 1; + start_key = DecodeInt64(cursor, String(segment->getRowKeyRange().getStart().data, segment->getRowKeyRange().getStart().size)); + } + { + size_t cursor = 1; + end_key = DecodeInt64(cursor, String(segment->getRowKeyRange().getEnd().data, segment->getRowKeyRange().getEnd().size)); + } + return {start_key, end_key}; +} + +void SegmentTestBasic::writeSegment(PageId segment_id, UInt64 write_rows) +{ + if (write_rows == 0) + { + return; + } + auto segment = segments[segment_id]; + size_t segment_row_num = getSegmentRowNumWithoutMVCC(segment_id); + std::pair keys = getSegmentKeyRange(segment); + Int64 start_key = keys.first; + Int64 end_key = keys.second; + UInt64 remain_row_num = 0; + if (static_cast(end_key - start_key) > write_rows) + { + end_key = start_key + write_rows; + } + else + { + remain_row_num = write_rows - static_cast(end_key - start_key); + } + { + // write to segment and not flush + Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, end_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); + segment->write(dmContext(), std::move(block), false); + LOG_FMT_TRACE(&Poco::Logger::root(), "write key range [{}, {})", start_key, end_key); + version++; + } + while (remain_row_num > 0) + { + UInt64 write_num = std::min(remain_row_num, static_cast(end_key - start_key)); + Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, write_num + start_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); + segment->write(dmContext(), std::move(block), false); + remain_row_num -= write_num; + LOG_FMT_TRACE(&Poco::Logger::root(), "write key range [{}, {})", start_key, write_num + start_key); + version++; + } + EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), segment_row_num + write_rows); +} + +void SegmentTestBasic::writeSegmentWithDeletedPack(PageId segment_id) +{ + UInt64 write_rows = DEFAULT_MERGE_BLOCK_SIZE; + auto segment = segments[segment_id]; + size_t segment_row_num = getSegmentRowNumWithoutMVCC(segment_id); + std::pair keys = getSegmentKeyRange(segment); + Int64 start_key = keys.first; + Int64 end_key = keys.second; + UInt64 remain_row_num = 0; + if (static_cast(end_key - start_key) > write_rows) + { + end_key = start_key + write_rows; + } + else + { + remain_row_num = write_rows - static_cast(end_key - start_key); + } + { + // write to segment and not flush + Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, end_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle, 1, true, true); + segment->write(dmContext(), std::move(block), true); + LOG_FMT_TRACE(&Poco::Logger::root(), "write key range [{}, {})", start_key, end_key); + version++; + } + while (remain_row_num > 0) + { + UInt64 write_num = std::min(remain_row_num, static_cast(end_key - start_key)); + Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, write_num + start_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle, 1, true, true); + segment->write(dmContext(), std::move(block), true); + remain_row_num -= write_num; + LOG_FMT_TRACE(&Poco::Logger::root(), "write key range [{}, {})", start_key, write_num + start_key); + version++; + } + EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), segment_row_num + write_rows); +} + +void SegmentTestBasic::deleteRangeSegment(PageId segment_id) +{ + auto segment = segments[segment_id]; + segment->write(dmContext(), /*delete_range*/ segment->getRowKeyRange()); + EXPECT_EQ(getSegmentRowNum(segment_id), 0); +} + +void SegmentTestBasic::writeRandomSegment() +{ + if (segments.empty()) + { + return; + } + PageId random_segment_id = getRandomSegmentId(); + LOG_FMT_TRACE(&Poco::Logger::root(), "start write segment:{}", random_segment_id); + writeSegment(random_segment_id); +} +void SegmentTestBasic::writeRandomSegmentWithDeletedPack() +{ + if (segments.empty()) + { + return; + } + PageId random_segment_id = getRandomSegmentId(); + LOG_FMT_TRACE(&Poco::Logger::root(), "start write segment with deleted pack:{}", random_segment_id); + writeSegmentWithDeletedPack(random_segment_id); +} + +void SegmentTestBasic::deleteRangeRandomSegment() +{ + if (segments.empty()) + { + return; + } + PageId random_segment_id = getRandomSegmentId(); + LOG_FMT_TRACE(&Poco::Logger::root(), "start delete range segment:{}", random_segment_id); + deleteRangeSegment(random_segment_id); +} + +void SegmentTestBasic::splitRandomSegment() +{ + if (segments.empty()) + { + return; + } + PageId random_segment_id = getRandomSegmentId(); + LOG_FMT_TRACE(&Poco::Logger::root(), "start split segment:{}", random_segment_id); + splitSegment(random_segment_id); +} + +void SegmentTestBasic::mergeRandomSegment() +{ + if (segments.empty() || segments.size() == 1) + { + return; + } + std::pair segment_pair; + segment_pair = getRandomMergeablePair(); + LOG_FMT_TRACE(&Poco::Logger::root(), "start merge segment:{} and {}", segment_pair.first, segment_pair.second); + mergeSegment(segment_pair.first, segment_pair.second); +} + +void SegmentTestBasic::mergeDeltaRandomSegment() +{ + if (segments.empty()) + { + return; + } + PageId random_segment_id = getRandomSegmentId(); + LOG_FMT_TRACE(&Poco::Logger::root(), "start merge delta in segment:{}", random_segment_id); + mergeSegmentDelta(random_segment_id); +} + +void SegmentTestBasic::flushCacheRandomSegment() +{ + if (segments.empty()) + { + return; + } + PageId random_segment_id = getRandomSegmentId(); + LOG_FMT_TRACE(&Poco::Logger::root(), "start flush cache in segment:{}", random_segment_id); + flushSegmentCache(random_segment_id); +} + +void SegmentTestBasic::randomSegmentTest(size_t operator_count) +{ + for (size_t i = 0; i < operator_count; i++) + { + auto op = static_cast(random() % SegmentOperaterMax); + segment_operator_entries[op](); + } +} + +PageId SegmentTestBasic::getRandomSegmentId() +{ + auto max_segment_id = segments.rbegin()->first; + PageId random_segment_id = random() % (max_segment_id + 1); + auto it = segments.find(random_segment_id); + while (it == segments.end()) + { + random_segment_id = random() % (max_segment_id + 1); + it = segments.find(random_segment_id); + } + return random_segment_id; +} + +std::pair SegmentTestBasic::getRandomMergeablePair() +{ + while (true) + { + PageId random_left_segment_id = getRandomSegmentId(); + PageId random_right_segment_id = random_left_segment_id; + while (random_right_segment_id == random_left_segment_id) + { + random_right_segment_id = getRandomSegmentId(); + } + auto left_segment = segments[random_left_segment_id]; + auto right_segment = segments[random_right_segment_id]; + if (compare(left_segment->getRowKeyRange().getEnd(), right_segment->getRowKeyRange().getStart()) != 0 || left_segment->nextSegmentId() != right_segment->segmentId()) + { + continue; + } + return {random_left_segment_id, random_right_segment_id}; + } +} + +RowKeyRange SegmentTestBasic::commanHandleKeyRange() +{ + String start_key, end_key; + { + WriteBufferFromOwnString ss; + ::DB::EncodeUInt(static_cast(TiDB::CodecFlagInt), ss); + ::DB::EncodeInt64(std::numeric_limits::min(), ss); + start_key = ss.releaseStr(); + } + { + WriteBufferFromOwnString ss; + ::DB::EncodeUInt(static_cast(TiDB::CodecFlagInt), ss); + ::DB::EncodeInt64(std::numeric_limits::max(), ss); + end_key = ss.releaseStr(); + } + return RowKeyRange(RowKeyValue(true, std::make_shared(start_key), 0), RowKeyValue(true, std::make_shared(end_key), 0), true, 1); +} + +SegmentPtr SegmentTestBasic::reload(bool is_common_handle, const ColumnDefinesPtr & pre_define_columns, DB::Settings && db_settings) +{ + TiFlashStorageTestBasic::reload(std::move(db_settings)); + storage_path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t1", false)); + storage_pool = std::make_unique(*db_context, /*ns_id*/ 100, *storage_path_pool, "test.t1"); + storage_pool->restore(); + ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns(is_common_handle ? DMTestEnv::PkType::CommonHandle : DMTestEnv::PkType::HiddenTiDBRowID) : pre_define_columns; + setColumns(cols); + + return Segment::newSegment(*dm_context, table_columns, is_common_handle ? commanHandleKeyRange() : RowKeyRange::newAll(is_common_handle, 1), storage_pool->newMetaPageId(), 0); +} + +void SegmentTestBasic::setColumns(const ColumnDefinesPtr & columns) +{ + *table_columns = *columns; + + dm_context = std::make_unique(*db_context, + *storage_path_pool, + *storage_pool, + 0, + /*min_version_*/ 0, + settings.not_compress_columns, + options.is_common_handle, + 1, + db_context->getSettingsRef()); +} +} // namespace tests +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h new file mode 100644 index 00000000000..ab0c7d6d0be --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -0,0 +1,123 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace DM +{ +namespace tests +{ +class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic +{ +public: + struct SegmentTestOptions + { + bool is_common_handle = false; + }; + +public: + void reloadWithOptions(SegmentTestOptions config); + + std::optional splitSegment(PageId segment_id); + void mergeSegment(PageId left_segment_id, PageId right_segment_id); + void mergeSegmentDelta(PageId segment_id); + void flushSegmentCache(PageId segment_id); + void writeSegment(PageId segment_id, UInt64 write_rows = 100); + void writeSegmentWithDeletedPack(PageId segment_id); + void deleteRangeSegment(PageId segment_id); + + + void writeRandomSegment(); + void writeRandomSegmentWithDeletedPack(); + void deleteRangeRandomSegment(); + void splitRandomSegment(); + void mergeRandomSegment(); + void mergeDeltaRandomSegment(); + void flushCacheRandomSegment(); + + void randomSegmentTest(size_t operator_count); + + PageId createNewSegmentWithSomeData(); + size_t getSegmentRowNumWithoutMVCC(PageId segment_id); + size_t getSegmentRowNum(PageId segment_id); + void checkSegmentRow(PageId segment_id, size_t expected_row_num); + std::pair getSegmentKeyRange(SegmentPtr segment); + +protected: + // + std::map segments; + + enum SegmentOperaterType + { + Write = 0, + DeleteRange, + Split, + Merge, + MergeDelta, + FlushCache, + WriteDeletedPack, + SegmentOperaterMax + }; + + const std::vector> segment_operator_entries = { + [this] { writeRandomSegment(); }, + [this] { deleteRangeRandomSegment(); }, + [this] { splitRandomSegment(); }, + [this] { mergeRandomSegment(); }, + [this] { mergeDeltaRandomSegment(); }, + [this] { flushCacheRandomSegment(); }, + [this] { + writeRandomSegmentWithDeletedPack(); + }}; + + PageId getRandomSegmentId(); + + std::pair getRandomMergeablePair(); + + RowKeyRange commanHandleKeyRange(); + + SegmentPtr reload(bool is_common_handle, const ColumnDefinesPtr & pre_define_columns = {}, DB::Settings && db_settings = DB::Settings()); + + // setColumns should update dm_context at the same time + void setColumns(const ColumnDefinesPtr & columns); + + const ColumnDefinesPtr & tableColumns() const { return table_columns; } + + DMContext & dmContext() { return *dm_context; } + +protected: + /// all these var lives as ref in dm_context + std::unique_ptr storage_path_pool; + std::unique_ptr storage_pool; + /// dm_context + std::unique_ptr dm_context; + ColumnDefinesPtr table_columns; + DM::DeltaMergeStore::Settings settings; + + SegmentPtr root_segment; + UInt64 version = 0; + SegmentTestOptions options; +}; +} // namespace tests +} // namespace DM +} // namespace DB \ No newline at end of file