Skip to content

Commit

Permalink
fix read from ColumnFileBigReader (#5413)
Browse files Browse the repository at this point in the history
ref #5252
  • Loading branch information
lidezhu authored Jul 20, 2022
1 parent 1b81525 commit 764bd3c
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 2 deletions.
16 changes: 15 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void ColumnFileBigReader::initStream()
.setTracingID(context.tracing_id)
.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range});

header = file_stream->getHeader();
// If we only need to read pk and version columns, then cache columns data in memory.
if (pk_ver_only)
{
Expand Down Expand Up @@ -223,7 +224,20 @@ Block ColumnFileBigReader::readNextBlock()
{
initStream();

return file_stream->read();
if (pk_ver_only)
{
if (next_block_index_in_cache >= cached_pk_ver_columns.size())
{
return {};
}
auto & columns = cached_pk_ver_columns[next_block_index_in_cache];
next_block_index_in_cache += 1;
return header.cloneWithColumns(std::move(columns));
}
else
{
return file_stream->read();
}
}

ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
Expand Down
22 changes: 21 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class ColumnFileBigReader : public ColumnFileReader
const ColumnFileBig & column_file;
const ColumnDefinesPtr col_defs;

Block header;

bool pk_ver_only;

DMFileBlockInputStreamPtr file_stream;
Expand All @@ -112,6 +114,7 @@ class ColumnFileBigReader : public ColumnFileReader
// we cache them to minimize the cost.
std::vector<Columns> cached_pk_ver_columns;
std::vector<size_t> cached_block_rows_end;
size_t next_block_index_in_cache = 0;

// The data members for reading all columns, but can only read once.
size_t rows_before_cur_block = 0;
Expand All @@ -131,7 +134,24 @@ class ColumnFileBigReader : public ColumnFileReader
, column_file(column_file_)
, col_defs(col_defs_)
{
pk_ver_only = col_defs->size() <= 2;
if (col_defs_->size() == 1)
{
if ((*col_defs)[0].id == EXTRA_HANDLE_COLUMN_ID)
{
pk_ver_only = true;
}
}
else if (col_defs_->size() == 2)
{
if ((*col_defs)[0].id == EXTRA_HANDLE_COLUMN_ID && (*col_defs)[1].id == VERSION_COLUMN_ID)
{
pk_ver_only = true;
}
}
else
{
pk_ver_only = false;
}
}

size_t readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) override;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ target_link_libraries(dm_test_delta_index_manager dbms gtest_main clickhouse_fun
add_executable(dm_test_delta_value_space EXCLUDE_FROM_ALL gtest_dm_delta_value_space.cpp)
target_link_libraries(dm_test_delta_value_space dbms gtest_main clickhouse_functions)

add_executable(dm_test_column_file EXCLUDE_FROM_ALL gtest_dm_column_file.cpp)
target_link_libraries(dm_test_column_file dbms gtest_main clickhouse_functions)

add_subdirectory (bank EXCLUDE_FROM_ALL)

add_subdirectory (stress EXCLUDE_FROM_ALL)
167 changes: 167 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/File/DMFileWriter.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/tests/DMTestEnv.h>
#include <Storages/tests/TiFlashStorageTestBasic.h>
#include <TestUtils/FunctionTestUtils.h>

#include <vector>

namespace DB
{
namespace DM
{
namespace tests
{
class ColumnFileTest
: public DB::base::TiFlashStorageTestBasic
{
public:
ColumnFileTest() = default;

static void SetUpTestCase() {}

void SetUp() override
{
TiFlashStorageTestBasic::SetUp();

parent_path = TiFlashStorageTestBasic::getTemporaryPath();
path_pool = std::make_unique<StoragePathPool>(db_context->getPathPool().withTable("test", "DMFile_Test", false));
storage_pool = std::make_unique<StoragePool>(*db_context, /*ns_id*/ 100, *path_pool, "test.t1");
column_cache = std::make_shared<ColumnCache>();
dm_context = std::make_unique<DMContext>( //
*db_context,
*path_pool,
*storage_pool,
/*hash_salt*/ 0,
0,
settings.not_compress_columns,
false,
1,
db_context->getSettingsRef());
}

DMContext & dmContext() { return *dm_context; }

Context & dbContext() { return *db_context; }

private:
std::unique_ptr<DMContext> dm_context;
/// all these var live as ref in dm_context
std::unique_ptr<StoragePathPool> path_pool;
std::unique_ptr<StoragePool> storage_pool;
DeltaMergeStore::Settings settings;

protected:
String parent_path;
ColumnCachePtr column_cache;
};

TEST_F(ColumnFileTest, ColumnFileBigRead)
try
{
auto table_columns = DMTestEnv::getDefaultColumns();
auto dm_file = DMFile::create(1, parent_path, false, std::make_optional<DMChecksumConfig>());
const size_t num_rows_write_per_batch = 8192;
const size_t batch_num = 3;
const UInt64 tso_value = 100;
{
auto stream = std::make_shared<DMFileBlockOutputStream>(dbContext(), dm_file, *table_columns);
stream->writePrefix();
for (size_t i = 0; i < batch_num; i += 1)
{
Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch * i, num_rows_write_per_batch * (i + 1), false, 100);
stream->write(block, {});
}
stream->writeSuffix();
}

// test read
ColumnFileBig column_file_big(dmContext(), dm_file, RowKeyRange::newAll(false, 1));
ColumnDefinesPtr column_defines = std::make_shared<ColumnDefines>();
column_defines->emplace_back(getExtraHandleColumnDefine(/*is_common_handle=*/false));
column_defines->emplace_back(getVersionColumnDefine());
auto column_file_big_reader = column_file_big.getReader(dmContext(), /*storage_snap*/ nullptr, column_defines);
{
size_t num_rows_read = 0;
while (Block in = column_file_big_reader->readNextBlock())
{
ASSERT_EQ(in.columns(), column_defines->size());
ASSERT_TRUE(in.has(EXTRA_HANDLE_COLUMN_NAME));
ASSERT_TRUE(in.has(VERSION_COLUMN_NAME));
auto & pk_column = in.getByName(EXTRA_HANDLE_COLUMN_NAME).column;
for (size_t i = 0; i < pk_column->size(); i++)
{
ASSERT_EQ(pk_column->getInt(i), num_rows_read + i);
}
auto & version_column = in.getByName(VERSION_COLUMN_NAME).column;
for (size_t i = 0; i < version_column->size(); i++)
{
ASSERT_EQ(version_column->getInt(i), tso_value);
}
num_rows_read += in.rows();
}
ASSERT_EQ(num_rows_read, num_rows_write_per_batch * batch_num);
}

{
ColumnDefinesPtr column_defines_pk_and_del = std::make_shared<ColumnDefines>();
column_defines_pk_and_del->emplace_back(getExtraHandleColumnDefine(/*is_common_handle=*/false));
column_defines_pk_and_del->emplace_back(getTagColumnDefine());
auto column_file_big_reader2 = column_file_big_reader->createNewReader(column_defines_pk_and_del);
size_t num_rows_read = 0;
while (Block in = column_file_big_reader2->readNextBlock())
{
ASSERT_EQ(in.columns(), column_defines_pk_and_del->size());
ASSERT_TRUE(in.has(EXTRA_HANDLE_COLUMN_NAME));
ASSERT_TRUE(in.has(TAG_COLUMN_NAME));
auto & pk_column = in.getByName(EXTRA_HANDLE_COLUMN_NAME).column;
for (size_t i = 0; i < pk_column->size(); i++)
{
ASSERT_EQ(pk_column->getInt(i), num_rows_read + i);
}
auto & del_column = in.getByName(TAG_COLUMN_NAME).column;
for (size_t i = 0; i < del_column->size(); i++)
{
ASSERT_EQ(del_column->getInt(i), 0);
}
num_rows_read += in.rows();
}
ASSERT_EQ(num_rows_read, num_rows_write_per_batch * batch_num);
}

{
auto column_file_big_reader3 = column_file_big_reader->createNewReader(table_columns);
size_t num_rows_read = 0;
while (Block in = column_file_big_reader3->readNextBlock())
{
ASSERT_EQ(in.columns(), table_columns->size());
num_rows_read += in.rows();
}
ASSERT_EQ(num_rows_read, num_rows_write_per_batch * batch_num);
}
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB

0 comments on commit 764bd3c

Please sign in to comment.