Skip to content

Commit

Permalink
test: Simplify comparing the stream read result (#5435)
Browse files Browse the repository at this point in the history
close #5452
  • Loading branch information
JaySon-Huang authored Jul 29, 2022
1 parent f6f7b0f commit b4ee52e
Show file tree
Hide file tree
Showing 26 changed files with 2,092 additions and 3,167 deletions.
18 changes: 16 additions & 2 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ std::string Block::dumpNames() const
out << ", ";
out << it->name;
}
return out.str();
return out.releaseStr();
}


Expand All @@ -313,7 +313,21 @@ std::string Block::dumpStructure() const
out << ", ";
it->dumpStructure(out);
}
return out.str();
return out.releaseStr();
}

std::string Block::dumpJsonStructure() const
{
WriteBufferFromOwnString out;
out << "[";
for (auto it = data.begin(); it != data.end(); ++it)
{
if (it != data.begin())
out << ",";
it->dumpJsonStructure(out);
}
out << "]";
return out.releaseStr();
}


Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class Block
/** List of names, types and lengths of columns. Designed for debugging. */
std::string dumpStructure() const;

std::string dumpJsonStructure() const;

/** Get the same block, but empty. */
Block cloneEmpty() const;

Expand Down
21 changes: 18 additions & 3 deletions dbms/src/Core/ColumnWithTypeAndName.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Core/ColumnWithTypeAndName.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
Expand Down Expand Up @@ -48,15 +49,29 @@ void ColumnWithTypeAndName::dumpStructure(WriteBuffer & out) const
out << ' ' << column->dumpStructure();
else
out << " nullptr";

out << " " << column_id;
}

String ColumnWithTypeAndName::dumpStructure() const
{
WriteBufferFromOwnString out;
dumpStructure(out);
return out.str();
return out.releaseStr();
}

void ColumnWithTypeAndName::dumpJsonStructure(WriteBuffer & out) const
{
out << fmt::format(R"json({{"name":"{}","id":{},"type":{},"column":{}}})json",
name,
column_id,
(type ? "\"" + type->getName() + "\"" : "null"),
(column ? "\"" + column->dumpStructure() + "\"" : "null"));
}

String ColumnWithTypeAndName::dumpJsonStructure() const
{
WriteBufferFromOwnString out;
dumpJsonStructure(out);
return out.releaseStr();
}

} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Core/ColumnWithTypeAndName.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ struct ColumnWithTypeAndName

void dumpStructure(WriteBuffer & out) const;
String dumpStructure() const;

void dumpJsonStructure(WriteBuffer & out) const;
String dumpJsonStructure() const;
};

} // namespace DB
8 changes: 7 additions & 1 deletion dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
cur_segment = task->segment;
if (is_raw)
{
cur_stream = cur_segment->getInputStreamRaw(*dm_context, columns_to_read, task->read_snapshot, task->ranges, filter, do_delete_mark_filter_for_raw);
cur_stream = cur_segment->getInputStreamRaw(
*dm_context,
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
do_delete_mark_filter_for_raw);
}
else
{
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ inline Handle encodeToPK(T v)
inline size_t getPosByColumnId(const Block & block, ColId col_id)
{
size_t pos = 0;
for (auto & c : block)
for (const auto & c : block)
{
if (c.column_id == col_id)
return pos;
Expand All @@ -57,7 +57,7 @@ inline size_t getPosByColumnId(const Block & block, ColId col_id)

inline ColumnWithTypeAndName tryGetByColumnId(const Block & block, ColId col_id)
{
for (auto & c : block)
for (const auto & c : block)
{
if (c.column_id == col_id)
return c;
Expand All @@ -68,7 +68,7 @@ inline ColumnWithTypeAndName tryGetByColumnId(const Block & block, ColId col_id)
// TODO: we should later optimize getByColumnId.
inline const ColumnWithTypeAndName & getByColumnId(const Block & block, ColId col_id)
{
for (auto & c : block)
for (const auto & c : block)
{
if (c.column_id == col_id)
return c;
Expand Down Expand Up @@ -105,7 +105,7 @@ inline PaddedPODArray<T> const * toColumnVectorDataPtr(const ColumnPtr & column)
{
if (column->isColumnConst())
{
auto * const_col = static_cast<const ColumnConst *>(column.get());
const auto * const_col = static_cast<const ColumnConst *>(column.get());

const ColumnVector<T> & c = assert_cast<const ColumnVector<T> &>(const_col->getDataColumn());
return &c.getData();
Expand Down Expand Up @@ -191,7 +191,7 @@ inline Block genBlock(const ColumnDefines & column_defines, const Columns & colu
Block block;
for (size_t i = 0; i < column_defines.size(); ++i)
{
auto & c = column_defines[i];
const auto & c = column_defines[i];
addColumnToBlock(block, c.id, c.name, c.type, columns[i], c.default_value);
}
return block;
Expand All @@ -200,7 +200,7 @@ inline Block genBlock(const ColumnDefines & column_defines, const Columns & colu
inline Block getNewBlockByHeader(const Block & header, const Block & block)
{
Block new_block;
for (auto & c : header)
for (const auto & c : header)
new_block.insert(block.getByName(c.name));
return new_block;
}
Expand All @@ -215,7 +215,7 @@ inline ColumnDefines getColumnDefinesFromBlock(const Block & block)

inline bool hasColumn(const ColumnDefines & columns, const ColId & col_id)
{
for (auto & c : columns)
for (const auto & c : columns)
{
if (c.id == col_id)
return true;
Expand All @@ -231,8 +231,8 @@ inline bool isSameSchema(const Block & a, const Block & b)
return false;
for (size_t i = 0; i < a.columns(); ++i)
{
auto & ca = a.getByPosition(i);
auto & cb = b.getByPosition(i);
const auto & ca = a.getByPosition(i);
const auto & cb = b.getByPosition(i);

bool col_ok = ca.column_id == cb.column_id;
bool name_ok = ca.name == cb.name;
Expand Down
49 changes: 18 additions & 31 deletions dbms/src/Storages/DeltaMerge/tests/DMTestEnv.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ namespace tests
// Add this so that we can call typeFromString under namespace DB::DM::tests
using DB::tests::typeFromString;

using namespace DB::tests;

/// helper functions for comparing HandleRange
inline ::testing::AssertionResult HandleRangeCompare(
const char * lhs_expr,
Expand Down Expand Up @@ -96,6 +98,18 @@ inline std::vector<Int64> createSignedNumbers(size_t beg, size_t end)
return values;
}

// Mock a common_pk_col that composed by number `rowkey_column_size` of int64 value
inline String genMockCommonHandle(Int64 value, size_t rowkey_column_size)
{
WriteBufferFromOwnString ss;
for (size_t index = 0; index < rowkey_column_size; ++index)
{
::DB::EncodeUInt(static_cast<UInt8>(TiDB::CodecFlagInt), ss);
::DB::EncodeInt64(value, ss);
}
return ss.releaseStr();
}

class DMTestEnv
{
public:
Expand Down Expand Up @@ -285,13 +299,7 @@ class DMTestEnv
for (size_t i = 0; i < num_rows; i++)
{
Int64 value = reversed ? end - 1 - i : beg + i;
WriteBufferFromOwnString ss;
for (size_t index = 0; index < rowkey_column_size; index++)
{
::DB::EncodeUInt(static_cast<UInt8>(TiDB::CodecFlagInt), ss);
::DB::EncodeInt64(value, ss);
}
values.emplace_back(ss.releaseStr());
values.emplace_back(genMockCommonHandle(value, rowkey_column_size));
}
block.insert(DB::tests::createColumn<String>(
std::move(values),
Expand Down Expand Up @@ -410,16 +418,7 @@ class DMTestEnv
const size_t num_rows = 1;
if (is_common_handle)
{
Strings values;
{
WriteBufferFromOwnString ss;
for (size_t index = 0; index < rowkey_column_size; index++)
{
::DB::EncodeUInt(static_cast<UInt8>(TiDB::CodecFlagInt), ss);
::DB::EncodeInt64(pk, ss);
}
values.emplace_back(ss.releaseStr());
}
Strings values{genMockCommonHandle(pk, rowkey_column_size)};
block.insert(DB::tests::createColumn<String>(
std::move(values),
pk_name,
Expand Down Expand Up @@ -466,20 +465,8 @@ class DMTestEnv

static RowKeyRange getRowKeyRangeForClusteredIndex(Int64 start, Int64 end, size_t rowkey_column_size)
{
WriteBufferFromOwnString ss;
for (size_t i = 0; i < rowkey_column_size; i++)
{
EncodeUInt(static_cast<UInt8>(TiDB::CodecFlagInt), ss);
EncodeInt64(start, ss);
}
RowKeyValue start_key = RowKeyValue(true, std::make_shared<String>(ss.releaseStr()));
ss.restart();
for (size_t i = 0; i < rowkey_column_size; i++)
{
EncodeUInt(static_cast<UInt8>(TiDB::CodecFlagInt), ss);
EncodeInt64(end, ss);
}
RowKeyValue end_key = RowKeyValue(true, std::make_shared<String>(ss.releaseStr()));
RowKeyValue start_key = RowKeyValue(true, std::make_shared<String>(genMockCommonHandle(start, rowkey_column_size)));
RowKeyValue end_key = RowKeyValue(true, std::make_shared<String>(genMockCommonHandle(end, rowkey_column_size)));
return RowKeyRange(start_key, end_key, true, rowkey_column_size);
}

Expand Down
89 changes: 24 additions & 65 deletions dbms/src/Storages/DeltaMerge/tests/gtest_column_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <DataStreams/BlocksListBlockInputStream.h>
#include <Storages/DeltaMerge/DMDecoratorStreams.h>
#include <Storages/DeltaMerge/tests/DMTestEnv.h>
#include <TestUtils/FunctionTestUtils.h>
#include <TestUtils/InputStreamTestUtils.h>

namespace DB
{
Expand Down Expand Up @@ -45,7 +47,7 @@ class DebugBlockInputStream : public BlocksListBlockInputStream
bool is_common_handle;
};

BlockInputStreamPtr genColumnFilterInputStream(BlocksList & blocks, const ColumnDefines & columns, bool is_common_handle)
BlockInputStreamPtr genColumnProjInputStream(BlocksList & blocks, const ColumnDefines & columns, bool is_common_handle)
{
ColumnDefine handle_define(
TiDBPkColumnID,
Expand Down Expand Up @@ -85,34 +87,16 @@ TEST(DeleteFilterTest, NormalCase)

ColumnDefines columns = getColumnDefinesFromBlock(blocks.back());

{
auto in = genDeleteFilterInputStream(blocks, columns, false);
in->readPrefix();
Block block = in->read();
ASSERT_EQ(block.rows(), 1);
auto col = block.getByName(str_col_name);
auto val = col.column->getDataAt(0);
ASSERT_EQ(val, "hello");

block = in->read();
ASSERT_EQ(block.rows(), 1);
col = block.getByName(str_col_name);
val = col.column->getDataAt(0);
ASSERT_EQ(val, "world");

block = in->read();
ASSERT_EQ(block.rows(), 1);
col = block.getByName(str_col_name);
val = col.column->getDataAt(0);
ASSERT_EQ(val, "TiFlash");

block = in->read();
ASSERT_FALSE(block); // ensure the stream is ended
in->readSuffix();
}
auto in = genDeleteFilterInputStream(blocks, columns, false);
ASSERT_INPUTSTREAM_COLS_UR(
in,
Strings({str_col_name}),
createColumns({
createColumn<String>({"hello", "world", "TiFlash"}),
}));
}

TEST(ColumnFilterTest, NormalCase)
TEST(ColumnProjectionTest, NormalCase)
{
BlocksList blocks;

Expand All @@ -125,47 +109,22 @@ TEST(ColumnFilterTest, NormalCase)
blocks.push_back(DMTestEnv::prepareOneRowBlock(pk_value, 40, 1, str_col_name, "Storage", false, 1));
}

// Only keep the column `str_col_name`
ColumnDefines columns = getColumnDefinesFromBlock(blocks.back());

for (auto iter = columns.begin(); iter != columns.end(); /**/)
{
auto in = genColumnFilterInputStream(blocks, columns, false);
in->readPrefix();
Block block = in->read();
ASSERT_EQ(block.rows(), 1);
auto col = block.getByName(str_col_name);
auto val = col.column->getDataAt(0);
ASSERT_EQ(val, "hello");

block = in->read();
ASSERT_EQ(block.rows(), 1);
col = block.getByName(str_col_name);
val = col.column->getDataAt(0);
ASSERT_EQ(val, "world");

block = in->read();
ASSERT_EQ(block.rows(), 1);
col = block.getByName(str_col_name);
val = col.column->getDataAt(0);
ASSERT_EQ(val, "");


block = in->read();
ASSERT_EQ(block.rows(), 1);
col = block.getByName(str_col_name);
val = col.column->getDataAt(0);
ASSERT_EQ(val, "TiFlash");

block = in->read();
ASSERT_EQ(block.rows(), 1);
col = block.getByName(str_col_name);
val = col.column->getDataAt(0);
ASSERT_EQ(val, "Storage");

block = in->read();
ASSERT_FALSE(block); // ensure the stream is ended
in->readSuffix();
if (iter->name != str_col_name)
iter = columns.erase(iter);
else
iter++;
}

ASSERT_INPUTSTREAM_BLOCK_UR(
genColumnProjInputStream(blocks, columns, false),
Block({
createColumn<String>({"hello", "world", "", "TiFlash", "Storage"}, str_col_name),
}));
}
} // namespace tests
} // namespace DM
} // namespace DB
} // namespace DB
Loading

0 comments on commit b4ee52e

Please sign in to comment.