Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support clean read base on fast mode #5371

Merged
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b29ee6b
add fast mode logic
hongyunyan Jul 5, 2022
c60185e
fix code format
hongyunyan Jul 5, 2022
14266f7
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Jul 7, 2022
0dff68b
compatible old code
hongyunyan Jul 7, 2022
3039aab
fix bug
hongyunyan Jul 7, 2022
732ab10
Merge branch 'master' into hongyunyan_fast_mode_non_schema_part
hongyunyan Jul 7, 2022
7898ae8
fix test
hongyunyan Jul 7, 2022
e8c52be
add test for fast mode
hongyunyan Jul 10, 2022
67aed53
Merge branch 'master' into hongyunyan_fast_mode_non_schema_part
hongyunyan Jul 10, 2022
e01ad7c
fix format
hongyunyan Jul 10, 2022
377734f
support dtworkload in fast mode
hongyunyan Jul 10, 2022
acb5080
remove useless code
hongyunyan Jul 11, 2022
94a3bc6
fix delete column error
hongyunyan Jul 11, 2022
506bc15
support clean read in fast mode
hongyunyan Jul 12, 2022
49b6cf9
fix comments
hongyunyan Jul 12, 2022
5a8fd0d
update fast mode
hongyunyan Jul 12, 2022
29cf500
Merge branch 'hongyunyan_fast_mode_non_schema_part' into hongyunyan_c…
hongyunyan Jul 12, 2022
d23e0e8
for test
hongyunyan Jul 12, 2022
ef60b22
Merge branch 'hongyunyan_clean_read_base_on_fast_mode' of https://git…
hongyunyan Jul 12, 2022
191ac02
include delele handle
hongyunyan Jul 13, 2022
47a053b
support handle optimization
hongyunyan Jul 13, 2022
f641530
hongyunyan_clean_read_base_on_fast_mode
hongyunyan Jul 13, 2022
6648b68
fix comment
hongyunyan Jul 13, 2022
3d64809
Merge branch 'hongyunyan_fast_mode_non_schema_part' into hongyunyan_c…
hongyunyan Jul 13, 2022
1187896
fix bug
hongyunyan Jul 13, 2022
391ebd9
merge master
hongyunyan Jul 13, 2022
97e7191
fix bug
hongyunyan Jul 13, 2022
f2ce510
merge conflict
hongyunyan Jul 14, 2022
dd56b02
merge master
hongyunyan Jul 14, 2022
15ea2f4
add test
hongyunyan Jul 14, 2022
620060f
fix format
hongyunyan Jul 14, 2022
d48eed7
change name
hongyunyan Jul 15, 2022
242a4f8
fix comments
hongyunyan Jul 15, 2022
9746854
Update dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_…
JaySon-Huang Jul 15, 2022
cf8d717
Merge branch 'master' into hongyunyan_clean_read_base_on_fast_mode
ti-chi-bot Jul 15, 2022
8d14cbf
Merge branch 'master' into hongyunyan_clean_read_base_on_fast_mode
ti-chi-bot Jul 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include <unordered_set>


namespace DB
{
namespace DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
read_columns,
is_common_handle,
enable_clean_read,
is_fast_mode,
max_data_version,
std::move(pack_filter),
mark_cache,
Expand Down
17 changes: 11 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,18 @@ class DMFileBlockInputStreamBuilder

// **** filters **** //

// Only set this param to true when
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// Only set enable param to true when
// in normal mode:
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// in fast mode:
// 1. You don't need pk columns
// If you have no idea what it means, then simply set it to false.
// `max_data_version_` is the MVCC filter version for reading. Used by clean read check
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable, UInt64 max_data_version_)
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable, bool is_fast_mode_, UInt64 max_data_version_)
{
enable_clean_read = enable;
is_fast_mode = is_fast_mode_;
max_data_version = max_data_version_;
return *this;
}
Expand Down Expand Up @@ -139,6 +143,7 @@ class DMFileBlockInputStreamBuilder

// clean read
bool enable_clean_read = false;
bool is_fast_mode = false;
UInt64 max_data_version = std::numeric_limits<UInt64>::max();
// Rough set filter
RSOperatorPtr rs_filter;
Expand All @@ -150,8 +155,8 @@ class DMFileBlockInputStreamBuilder
bool enable_column_cache = false;
ColumnCachePtr column_cache;
ReadLimiterPtr read_limiter;
size_t aio_threshold;
size_t max_read_buffer_size;
size_t aio_threshold{};
size_t max_read_buffer_size{};
size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD;
bool read_one_pack_every_time = false;

Expand Down
35 changes: 31 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <Storages/Page/PageUtil.h>
#include <fmt/format.h>

#include <string>

namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
Expand Down Expand Up @@ -210,6 +212,7 @@ DMFileReader::DMFileReader(
bool is_common_handle_,
// clean read
bool enable_clean_read_,
bool is_fast_mode_,
UInt64 max_read_version_,
// filters
DMFilePackFilter && pack_filter_,
Expand All @@ -230,6 +233,7 @@ DMFileReader::DMFileReader(
, read_one_pack_every_time(read_one_pack_every_time_)
, single_file_mode(dmfile_->isSingleFileMode())
, enable_clean_read(enable_clean_read_)
, is_fast_mode(is_fast_mode_)
, max_read_version(max_read_version_)
, pack_filter(std::move(pack_filter_))
, skip_packs_by_column(read_columns.size(), 0)
Expand Down Expand Up @@ -338,13 +342,16 @@ Block DMFileReader::read()
}

// TODO: this will need better algorithm: we should separate those packs which can and can not do clean read.
bool do_clean_read = enable_clean_read && expected_handle_res == All && not_clean_rows == 0;
if (do_clean_read)
bool do_clean_read_on_normal_mode = enable_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_mode);

bool do_clean_read_on_handle = enable_clean_read && is_fast_mode && expected_handle_res == All;

if (do_clean_read_on_normal_mode)
{
UInt64 max_version = 0;
for (size_t pack_id = start_pack_id; pack_id < next_pack_id; ++pack_id)
max_version = std::max(pack_filter.getMaxVersion(pack_id), max_version);
do_clean_read = max_version <= max_read_version;
do_clean_read_on_normal_mode = max_version <= max_read_version;
}

for (size_t i = 0; i < read_columns.size(); ++i)
Expand All @@ -353,7 +360,24 @@ Block DMFileReader::read()
{
// For clean read of column pk, version, tag, instead of loading data from disk, just create placeholder column is OK.
auto & cd = read_columns[i];
if (do_clean_read && isExtraColumn(cd))
if (cd.id == EXTRA_HANDLE_COLUMN_ID && do_clean_read_on_handle)
{
// Return the first row's handle
ColumnPtr column;
if (is_common_handle)
{
StringRef min_handle = pack_filter.getMinStringHandle(start_pack_id);
column = cd.type->createColumnConst(read_rows, Field(min_handle.data, min_handle.size));
}
else
{
Handle min_handle = pack_filter.getMinHandle(start_pack_id);
column = cd.type->createColumnConst(read_rows, Field(min_handle));
}
res.insert(ColumnWithTypeAndName{column, cd.type, cd.name, cd.id});
skip_packs_by_column[i] = read_packs;
}
else if (do_clean_read_on_normal_mode && isExtraColumn(cd))
{
ColumnPtr column;
if (cd.id == EXTRA_HANDLE_COLUMN_ID)
Expand Down Expand Up @@ -433,6 +457,7 @@ Block DMFileReader::read()
}
// Cast column's data from DataType in disk to what we need now
auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(result_column), cd);

hongyunyan marked this conversation as resolved.
Show resolved Hide resolved
res.insert(ColumnWithTypeAndName{converted_column, cd.type, cd.name, cd.id});
}
else
Expand All @@ -441,6 +466,7 @@ Block DMFileReader::read()
auto column = data_type->createColumn();
readFromDisk(cd, column, start_pack_id, read_rows, skip_packs_by_column[i], single_file_mode);
auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd);

res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id});
skip_packs_by_column[i] = 0;
}
Expand All @@ -456,6 +482,7 @@ Block DMFileReader::read()
dmfile->path());
// New column after ddl is not exist in this DMFile, fill with default value
ColumnPtr column = createColumnWithDefaultValue(cd, read_rows);

res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id});
skip_packs_by_column[i] = 0;
}
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class DMFileReader
// 2. You don't need pk, version and delete_tag columns
// If you have no idea what it means, then simply set it to false.
bool enable_clean_read_,
bool is_fast_mode_,
// The the MVCC filter version. Used by clean read check.
UInt64 max_read_version_,
// filters
Expand Down Expand Up @@ -122,8 +123,10 @@ class DMFileReader
const bool single_file_mode;

/// Clean read optimize
// If there is no delta for some packs in stable, we can try to do clean read.
// In normal mode, if there is no delta for some packs in stable, we can try to do clean read.
// In fast mode, we always try to do clean read.
const bool enable_clean_read;
const bool is_fast_mode;
const UInt64 max_read_version;

/// Filters
Expand Down
25 changes: 22 additions & 3 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
bool filter_delete_mark,
size_t expected_block_size)
{
/// Now, we use filter_delete_mark to determine whether it is in fast mode or just from `selraw * xxxx`
/// But this way seems not to be robustness enough, maybe we need another flag?
auto new_columns_to_read = std::make_shared<ColumnDefines>();


Expand All @@ -523,20 +525,37 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
new_columns_to_read->push_back(getTagColumnDefine());
}

bool enable_clean_read = filter_delete_mark;

for (const auto & c : columns_to_read)
{
if (c.id != EXTRA_HANDLE_COLUMN_ID && (!(filter_delete_mark && c.id == TAG_COLUMN_ID)))
new_columns_to_read->push_back(c);
if (c.id != EXTRA_HANDLE_COLUMN_ID)
{
if (!(filter_delete_mark && c.id == TAG_COLUMN_ID))
new_columns_to_read->push_back(c);
}
else
{
enable_clean_read = false;
}
}

/// when we read in fast mode, if columns_to_read does not include EXTRA_HANDLE_COLUMN_ID,
/// we can try to use clean read to make optimization in stable part.
/// when the pack is under totally data_ranges and has no rows whose del_mark = 1 --> we don't need read handle_column/tag_column/version_column
/// when the pack is under totally data_ranges and has rows whose del_mark = 1 --> we don't need read handle_column/version_column
/// others --> we don't need read version_column
/// Considering the del min max index has some problem now, we first only optimize with handle column.
hongyunyan marked this conversation as resolved.
Show resolved Hide resolved

BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
data_ranges,
filter,
std::numeric_limits<UInt64>::max(),
expected_block_size,
false);
/* enable_clean_read */ enable_clean_read,
/* is_fast_mode */ filter_delete_mark);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ StableValueSpace::Snapshot::getInputStream(
const RSOperatorPtr & filter,
UInt64 max_data_version,
size_t expected_block_size,
bool enable_clean_read)
bool enable_clean_read,
bool is_fast_mode)
{
LOG_FMT_DEBUG(log, "max_data_version: {}, enable_clean_read: {}", max_data_version, enable_clean_read);
SkippableBlockInputStreams streams;
Expand All @@ -337,7 +338,7 @@ StableValueSpace::Snapshot::getInputStream(
{
DMFileBlockInputStreamBuilder builder(context.db_context);
builder
.enableCleanRead(enable_clean_read, max_data_version)
.enableCleanRead(enable_clean_read, is_fast_mode, max_data_version)
.setRSOperator(filter)
.setColumnCache(column_caches[i])
.setTracingID(context.tracing_id)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
const RSOperatorPtr & filter,
UInt64 max_data_version,
size_t expected_block_size,
bool enable_clean_read);
bool enable_clean_read,
bool is_fast_mode = false);

RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ try
}
CATCH


// Insert + Delete row
TEST_P(DeltaMergeStoreRWTest, TestFastModeWithDeleteRow)
try
{
Expand Down Expand Up @@ -1455,6 +1455,116 @@ try
}
CATCH

TEST_P(DeltaMergeStoreRWTest, TestFastModeForCleanRead)
try
{
const size_t num_rows_write = 128;
{
// Create a block with sequential Int64 handle in range [0, 128)
Block block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false);

switch (mode)
{
case TestMode::V1_BlockOnly:
case TestMode::V2_BlockOnly:
store->write(*db_context, db_context->getSettingsRef(), block);
break;
default:
{
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
auto [range, file_ids] = genDMFile(*dm_context, block);
store->ingestFiles(dm_context, range, file_ids, false);
break;
}
}
}

store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

store->mergeDeltaAll(*db_context);

// could do clean read with no optimization
{
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(*db_context,
db_context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* is_raw_read= */ true,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;

while (Block block = in->read())
{
num_rows_read += block.rows();
for (auto && iter : block)
{
auto c = iter.column;
for (Int64 i = 0; i < Int64(c->size()); ++i)
{
if (iter.name == DMTestEnv::pk_name)
{
ASSERT_EQ(c->getInt(i), i);
}
}
}
}

ASSERT_EQ(num_rows_read, num_rows_write);
}

// Delete range [0, 64)
const size_t num_deleted_rows = 64;
{
HandleRange range(0, num_deleted_rows);
store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(range));
}

store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

store->mergeDeltaAll(*db_context);

// could do clean read with handle optimization
{
const auto & columns = store->getTableColumns();
ColumnDefines real_columns;
for (auto & col : columns)
{
if (col.name != EXTRA_HANDLE_COLUMN_NAME)
{
real_columns.emplace_back(col);
}
}

BlockInputStreamPtr in = store->read(*db_context,
db_context->getSettingsRef(),
real_columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* max_version= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* is_raw_read= */ true,
/* expected_block_size= */ 1024)[0];
size_t num_rows_read = 0;

while (Block block = in->read())
hongyunyan marked this conversation as resolved.
Show resolved Hide resolved
{
num_rows_read += block.rows();
}

ASSERT_EQ(num_rows_read, num_rows_write - num_deleted_rows);
}
}
CATCH
} // namespace tests
} // namespace DM
} // namespace DB