diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 3328f60b15d..da4372670be 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -91,6 +91,7 @@ void ColumnFileBigReader::initStream() .setTracingID(context.tracing_id) .build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}); + header = file_stream->getHeader(); // If we only need to read pk and version columns, then cache columns data in memory. if (pk_ver_only) { @@ -223,7 +224,20 @@ Block ColumnFileBigReader::readNextBlock() { initStream(); - return file_stream->read(); + if (pk_ver_only) + { + if (next_block_index_in_cache >= cached_pk_ver_columns.size()) + { + return {}; + } + auto & columns = cached_pk_ver_columns[next_block_index_in_cache]; + next_block_index_in_cache += 1; + return header.cloneWithColumns(std::move(columns)); + } + else + { + return file_stream->read(); + } } ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h index dba7eca7247..44302399960 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h @@ -104,6 +104,8 @@ class ColumnFileBigReader : public ColumnFileReader const ColumnFileBig & column_file; const ColumnDefinesPtr col_defs; + Block header; + bool pk_ver_only; DMFileBlockInputStreamPtr file_stream; @@ -112,6 +114,7 @@ class ColumnFileBigReader : public ColumnFileReader // we cache them to minimize the cost. std::vector cached_pk_ver_columns; std::vector cached_block_rows_end; + size_t next_block_index_in_cache = 0; // The data members for reading all columns, but can only read once. size_t rows_before_cur_block = 0; @@ -131,7 +134,24 @@ class ColumnFileBigReader : public ColumnFileReader , column_file(column_file_) , col_defs(col_defs_) { - pk_ver_only = col_defs->size() <= 2; + if (col_defs_->size() == 1) + { + if ((*col_defs)[0].id == EXTRA_HANDLE_COLUMN_ID) + { + pk_ver_only = true; + } + } + else if (col_defs_->size() == 2) + { + if ((*col_defs)[0].id == EXTRA_HANDLE_COLUMN_ID && (*col_defs)[1].id == VERSION_COLUMN_ID) + { + pk_ver_only = true; + } + } + else + { + pk_ver_only = false; + } } size_t readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) override; diff --git a/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt b/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt index 14ecdfc5e79..44ca6ccb367 100644 --- a/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt +++ b/dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt @@ -63,6 +63,9 @@ target_link_libraries(dm_test_delta_index_manager dbms gtest_main clickhouse_fun add_executable(dm_test_delta_value_space EXCLUDE_FROM_ALL gtest_dm_delta_value_space.cpp) target_link_libraries(dm_test_delta_value_space dbms gtest_main clickhouse_functions) +add_executable(dm_test_column_file EXCLUDE_FROM_ALL gtest_dm_column_file.cpp) +target_link_libraries(dm_test_column_file dbms gtest_main clickhouse_functions) + add_subdirectory (bank EXCLUDE_FROM_ALL) add_subdirectory (stress EXCLUDE_FROM_ALL) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp new file mode 100644 index 00000000000..e8744f6fe22 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp @@ -0,0 +1,167 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace DM +{ +namespace tests +{ +class ColumnFileTest + : public DB::base::TiFlashStorageTestBasic +{ +public: + ColumnFileTest() = default; + + static void SetUpTestCase() {} + + void SetUp() override + { + TiFlashStorageTestBasic::SetUp(); + + parent_path = TiFlashStorageTestBasic::getTemporaryPath(); + path_pool = std::make_unique(db_context->getPathPool().withTable("test", "DMFile_Test", false)); + storage_pool = std::make_unique(*db_context, /*ns_id*/ 100, *path_pool, "test.t1"); + column_cache = std::make_shared(); + dm_context = std::make_unique( // + *db_context, + *path_pool, + *storage_pool, + /*hash_salt*/ 0, + 0, + settings.not_compress_columns, + false, + 1, + db_context->getSettingsRef()); + } + + DMContext & dmContext() { return *dm_context; } + + Context & dbContext() { return *db_context; } + +private: + std::unique_ptr dm_context; + /// all these var live as ref in dm_context + std::unique_ptr path_pool; + std::unique_ptr storage_pool; + DeltaMergeStore::Settings settings; + +protected: + String parent_path; + ColumnCachePtr column_cache; +}; + +TEST_F(ColumnFileTest, ColumnFileBigRead) +try +{ + auto table_columns = DMTestEnv::getDefaultColumns(); + auto dm_file = DMFile::create(1, parent_path, false, std::make_optional()); + const size_t num_rows_write_per_batch = 8192; + const size_t batch_num = 3; + const UInt64 tso_value = 100; + { + auto stream = std::make_shared(dbContext(), dm_file, *table_columns); + stream->writePrefix(); + for (size_t i = 0; i < batch_num; i += 1) + { + Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch * i, num_rows_write_per_batch * (i + 1), false, 100); + stream->write(block, {}); + } + stream->writeSuffix(); + } + + // test read + ColumnFileBig column_file_big(dmContext(), dm_file, RowKeyRange::newAll(false, 1)); + ColumnDefinesPtr column_defines = std::make_shared(); + column_defines->emplace_back(getExtraHandleColumnDefine(/*is_common_handle=*/false)); + column_defines->emplace_back(getVersionColumnDefine()); + auto column_file_big_reader = column_file_big.getReader(dmContext(), /*storage_snap*/ nullptr, column_defines); + { + size_t num_rows_read = 0; + while (Block in = column_file_big_reader->readNextBlock()) + { + ASSERT_EQ(in.columns(), column_defines->size()); + ASSERT_TRUE(in.has(EXTRA_HANDLE_COLUMN_NAME)); + ASSERT_TRUE(in.has(VERSION_COLUMN_NAME)); + auto & pk_column = in.getByName(EXTRA_HANDLE_COLUMN_NAME).column; + for (size_t i = 0; i < pk_column->size(); i++) + { + ASSERT_EQ(pk_column->getInt(i), num_rows_read + i); + } + auto & version_column = in.getByName(VERSION_COLUMN_NAME).column; + for (size_t i = 0; i < version_column->size(); i++) + { + ASSERT_EQ(version_column->getInt(i), tso_value); + } + num_rows_read += in.rows(); + } + ASSERT_EQ(num_rows_read, num_rows_write_per_batch * batch_num); + } + + { + ColumnDefinesPtr column_defines_pk_and_del = std::make_shared(); + column_defines_pk_and_del->emplace_back(getExtraHandleColumnDefine(/*is_common_handle=*/false)); + column_defines_pk_and_del->emplace_back(getTagColumnDefine()); + auto column_file_big_reader2 = column_file_big_reader->createNewReader(column_defines_pk_and_del); + size_t num_rows_read = 0; + while (Block in = column_file_big_reader2->readNextBlock()) + { + ASSERT_EQ(in.columns(), column_defines_pk_and_del->size()); + ASSERT_TRUE(in.has(EXTRA_HANDLE_COLUMN_NAME)); + ASSERT_TRUE(in.has(TAG_COLUMN_NAME)); + auto & pk_column = in.getByName(EXTRA_HANDLE_COLUMN_NAME).column; + for (size_t i = 0; i < pk_column->size(); i++) + { + ASSERT_EQ(pk_column->getInt(i), num_rows_read + i); + } + auto & del_column = in.getByName(TAG_COLUMN_NAME).column; + for (size_t i = 0; i < del_column->size(); i++) + { + ASSERT_EQ(del_column->getInt(i), 0); + } + num_rows_read += in.rows(); + } + ASSERT_EQ(num_rows_read, num_rows_write_per_batch * batch_num); + } + + { + auto column_file_big_reader3 = column_file_big_reader->createNewReader(table_columns); + size_t num_rows_read = 0; + while (Block in = column_file_big_reader3->readNextBlock()) + { + ASSERT_EQ(in.columns(), table_columns->size()); + num_rows_read += in.rows(); + } + ASSERT_EQ(num_rows_read, num_rows_write_per_batch * batch_num); + } +} +CATCH + +} // namespace tests +} // namespace DM +} // namespace DB