Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: do del column clean read optimization for fast mode #5470

Merged
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
91e72a5
do del column clean read optimization for fast mode
hongyunyan Jul 26, 2022
7be6539
fix typo
hongyunyan Jul 26, 2022
ad7085d
fix typo
hongyunyan Jul 26, 2022
9c2f4b1
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Jul 26, 2022
44e5522
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Jul 27, 2022
8e6c883
for performance test
hongyunyan Jul 28, 2022
b70a825
add bench test
hongyunyan Aug 12, 2022
8b4922d
remove the file
hongyunyan Aug 12, 2022
c9fdcef
remove code
hongyunyan Aug 12, 2022
b35ba5e
update bench
hongyunyan Aug 15, 2022
329cec5
update code
hongyunyan Aug 15, 2022
f39ee75
format code
hongyunyan Aug 15, 2022
0c2aa86
merge conflict
hongyunyan Aug 15, 2022
65c0d52
format code
hongyunyan Aug 15, 2022
6f34fa4
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Aug 15, 2022
1265813
format code
hongyunyan Aug 15, 2022
7c7fca2
update code
hongyunyan Aug 15, 2022
200812a
fix the compatibility problem
hongyunyan Aug 18, 2022
0f90f78
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Aug 18, 2022
2b41657
Merge branch 'master' into hongyunyan_fast_mode_enable_del_index
hongyunyan Aug 18, 2022
566bc24
revert useless change
hongyunyan Aug 18, 2022
9cc7b09
Merge branch 'hongyunyan_fast_mode_enable_del_index' of https://githu…
hongyunyan Aug 18, 2022
11cb42f
fix comments
hongyunyan Aug 24, 2022
d374601
fix typo
hongyunyan Aug 24, 2022
2c1de09
remove bench code
hongyunyan Aug 24, 2022
4bc077d
remove useless code
hongyunyan Aug 24, 2022
b08aba1
remove useless code
hongyunyan Aug 24, 2022
ae0c218
Merge branch 'master' into hongyunyan_fast_mode_enable_del_index
ti-chi-bot Aug 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Server/DTTool/DTToolMigrate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
if (!args.dry_mode)
output_stream.write(
block,
{stat_iter->not_clean, properties_iter->num_rows(), properties_iter->gc_hint_version()});
{stat_iter->not_clean, properties_iter->deleted_rows(), properties_iter->num_rows(), properties_iter->gc_hint_version()});
stat_iter++;
properties_iter++;
}
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ class DMDeleteFilterBlockInputStream : public IBlockInputStream
if (block.rows() == 0)
continue;

/// if the pack is do clean read for del column, the del column returned is a const column, with size 1.
/// In this case, all the del_mark must be 0. Thus we don't need extra filter.
if (block.getByPosition(delete_col_pos).column->isColumnConst())
{
++total_blocks;
++complete_passed;
total_rows += block.rows();
passed_rows += block.rows();

return getNewBlockByHeader(header, block);
}

delete_col_data = getColumnVectorDataPtr<UInt8>(block, delete_col_pos);

size_t rows = block.rows();
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,30 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
}
}

// Let's set is_delete.
is_deleted.resize(rows);
{
UInt8 * is_deleted_pos = is_deleted.data();
auto * delete_pos = const_cast<UInt8 *>(delete_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
(*is_deleted_pos) = (*delete_pos);
++is_deleted_pos;
++delete_pos;
}
}

{
UInt8 * is_deleted_pos = is_deleted.data();
UInt8 * filter_pos = filter.data();
for (size_t i = 0; i < batch_rows; ++i)
{
(*is_deleted_pos) &= (*filter_pos);
++is_deleted_pos;
++filter_pos;
}
}

// Let's calculate gc_hint_version
gc_hint_version = std::numeric_limits<UInt64>::max();
{
Expand Down Expand Up @@ -307,6 +331,7 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
{
filter[rows - 1] = cur_version >= version_limit || !deleted;
not_clean[rows - 1] = filter[rows - 1] && deleted;
is_deleted[rows - 1] = filter[rows - 1] && deleted;
effective[rows - 1] = filter[rows - 1];
if (filter[rows - 1])
gc_hint_version = std::min(
Expand All @@ -332,6 +357,7 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
filter[rows - 1] = cur_version >= version_limit
|| ((compare(cur_handle, next_handle) != 0 || next_version > version_limit) && !deleted);
not_clean[rows - 1] = filter[rows - 1] && (compare(cur_handle, next_handle) == 0 || deleted);
is_deleted[rows - 1] = filter[rows - 1] && deleted;
effective[rows - 1] = filter[rows - 1] && (compare(cur_handle, next_handle) != 0);
if (filter[rows - 1])
gc_hint_version
Expand All @@ -349,6 +375,7 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
if constexpr (MODE == DM_VERSION_FILTER_MODE_COMPACT)
{
not_clean_rows += countBytesInFilter(not_clean);
deleted_rows += countBytesInFilter(is_deleted);
effective_num_rows += countBytesInFilter(effective);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
LOG_FMT_DEBUG(log,
"Total rows: {}, pass: {:.2f}%"
", complete pass: {:.2f}%, complete not pass: {:.2f}%"
", not clean: {:.2f}%, effective: {:.2f}%"
", not clean: {:.2f}%, is deleted: {:.2f}%, effective: {:.2f}%"
", read tso: {}",
total_rows,
passed_rows * 100.0 / total_rows,
complete_passed * 100.0 / total_blocks,
complete_not_passed * 100.0 / total_blocks,
not_clean_rows * 100.0 / passed_rows,
deleted_rows * 100.0 / passed_rows,
effective_num_rows * 100.0 / passed_rows,
version_limit);
}
Expand All @@ -95,6 +96,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream

size_t getEffectiveNumRows() const { return effective_num_rows; }
size_t getNotCleanRows() const { return not_clean_rows; }
size_t getDeletedRows() const { return deleted_rows; }
UInt64 getGCHintVersion() const { return gc_hint_version; }

private:
Expand All @@ -114,6 +116,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
filter[i]
= cur_version >= version_limit || ((compare(cur_handle, next_handle) != 0 || next_version > version_limit) && !deleted);
not_clean[i] = filter[i] && (compare(cur_handle, next_handle) == 0 || deleted);
is_deleted[i] = filter[i] && deleted;
effective[i] = filter[i] && (compare(cur_handle, next_handle) != 0);
if (filter[i])
gc_hint_version = std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted));
Expand Down Expand Up @@ -210,6 +213,8 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
IColumn::Filter effective{};
// not_clean = selected & (handle equals with next || deleted)
IColumn::Filter not_clean{};
// is_deleted = selected & deleted
IColumn::Filter is_deleted{};

// Calculate per block, when gc_safe_point exceed this version, there must be some data obsolete in this block
// First calculate the gc_hint_version of every pk according to the following rules,
Expand All @@ -236,6 +241,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
size_t complete_not_passed = 0;
size_t not_clean_rows = 0;
size_t effective_num_rows = 0;
size_t deleted_rows = 0;

const LoggerPtr log;
};
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
// if `rowkey_ranges` is empty, we unconditionally read all packs
// `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode.
// It is safe to ignore them here.
if (unlikely(rowkey_ranges.empty() && enable_clean_read))
if (unlikely(rowkey_ranges.empty() && enable_handle_clean_read))
throw Exception("rowkey ranges shouldn't be empty with clean-read enabled", ErrorCodes::LOGICAL_ERROR);

bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;
Expand All @@ -62,7 +62,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
dmfile,
read_columns,
is_common_handle,
enable_clean_read,
enable_handle_clean_read,
enable_del_clean_read,
is_fast_mode,
max_data_version,
std::move(pack_filter),
Expand Down
12 changes: 8 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,20 @@ class DMFileBlockInputStreamBuilder

// **** filters **** //

// Only set enable param to true when
// Only set enable_handle_clean_read_ param to true when
// in normal mode:
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// in fast mode:
// 1. You don't need pk columns
// If you have no idea what it means, then simply set it to false.
// Only set is_fast_mode_ param to true when read in fast mode.
// Only set enable_del_clean_read_ param to true when you don't need del columns in fast mode.
// `max_data_version_` is the MVCC filter version for reading. Used by clean read check
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable, bool is_fast_mode_, UInt64 max_data_version_)
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable_handle_clean_read_, bool is_fast_mode_, bool enable_del_clean_read_, UInt64 max_data_version_)
{
enable_clean_read = enable;
enable_handle_clean_read = enable_handle_clean_read_;
enable_del_clean_read = enable_del_clean_read_;
is_fast_mode = is_fast_mode_;
max_data_version = max_data_version_;
return *this;
Expand Down Expand Up @@ -156,8 +159,9 @@ class DMFileBlockInputStreamBuilder
FileProviderPtr file_provider;

// clean read
bool enable_clean_read = false;
bool enable_handle_clean_read = false;
bool is_fast_mode = false;
bool enable_del_clean_read = false;
UInt64 max_data_version = std::numeric_limits<UInt64>::max();
// Rough set filter
RSOperatorPtr rs_filter;
Expand Down
43 changes: 33 additions & 10 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
#include <Storages/DeltaMerge/convertColumnTypeHelpers.h>
#include <Storages/Page/PageUtil.h>
#include <fmt/format.h>

#include <string>

namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
Expand Down Expand Up @@ -211,7 +208,8 @@ DMFileReader::DMFileReader(
const ColumnDefines & read_columns_,
bool is_common_handle_,
// clean read
bool enable_clean_read_,
bool enable_handle_clean_read_,
bool enable_del_clean_read_,
bool is_fast_mode_,
UInt64 max_read_version_,
// filters
Expand All @@ -233,7 +231,8 @@ DMFileReader::DMFileReader(
, is_common_handle(is_common_handle_)
, read_one_pack_every_time(read_one_pack_every_time_)
, single_file_mode(dmfile_->isSingleFileMode())
, enable_clean_read(enable_clean_read_)
, enable_handle_clean_read(enable_handle_clean_read_)
, enable_del_clean_read(enable_del_clean_read_)
, is_fast_mode(is_fast_mode_)
, max_read_version(max_read_version_)
, pack_filter(std::move(pack_filter_))
Expand Down Expand Up @@ -321,20 +320,35 @@ Block DMFileReader::read()
size_t read_pack_limit = (single_file_mode || read_one_pack_every_time) ? 1 : 0;

const auto & pack_stats = dmfile->getPackStats();

const auto & pack_properties = dmfile->getPackProperties();

size_t read_rows = 0;
size_t not_clean_rows = 0;
size_t deleted_rows = 0;

const std::vector<RSResult> & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter
RSResult expected_handle_res = handle_res[next_pack_id];
for (; next_pack_id < use_packs.size() && use_packs[next_pack_id] && read_rows < rows_threshold_per_read; ++next_pack_id)
{
if (read_pack_limit != 0 && next_pack_id - start_pack_id >= read_pack_limit)
break;
if (enable_clean_read && handle_res[next_pack_id] != expected_handle_res)
if (enable_handle_clean_read && handle_res[next_pack_id] != expected_handle_res)
break;

read_rows += pack_stats[next_pack_id].rows;
not_clean_rows += pack_stats[next_pack_id].not_clean;
// Because deleted_rows is a new field in pack_properties, we need to check whehter this pack has this field.
// If this pack doesn't have this field, then we can't know whether this pack contains deleted rows.
// Thus we just deleted_rows += 1, to make sure we will not do the optimization with del column(just to make deleted_rows != 0).
if (static_cast<size_t>(pack_properties.property_size()) > next_pack_id && pack_properties.property(next_pack_id).has_deleted_rows())
{
deleted_rows += pack_properties.property(next_pack_id).deleted_rows();
}
else
{
deleted_rows += 1;
}
}

if (read_rows == 0)
Expand All @@ -350,9 +364,10 @@ Block DMFileReader::read()
}

// TODO: this will need better algorithm: we should separate those packs which can and can not do clean read.
bool do_clean_read_on_normal_mode = enable_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_mode);
bool do_clean_read_on_normal_mode = enable_handle_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_mode);

bool do_clean_read_on_handle = enable_clean_read && is_fast_mode && expected_handle_res == All;
bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_mode && expected_handle_res == All;
bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_mode && deleted_rows == 0;

if (do_clean_read_on_normal_mode)
{
Expand All @@ -368,7 +383,7 @@ Block DMFileReader::read()
{
// For clean read of column pk, version, tag, instead of loading data from disk, just create placeholder column is OK.
auto & cd = read_columns[i];
if (cd.id == EXTRA_HANDLE_COLUMN_ID && do_clean_read_on_handle)
if (cd.id == EXTRA_HANDLE_COLUMN_ID && do_clean_read_on_handle_on_fast_mode)
{
// Return the first row's handle
ColumnPtr column;
Expand All @@ -385,6 +400,15 @@ Block DMFileReader::read()
res.insert(ColumnWithTypeAndName{column, cd.type, cd.name, cd.id});
skip_packs_by_column[i] = read_packs;
}
else if (cd.id == TAG_COLUMN_ID && do_clean_read_on_del_on_fast_mode)
{
ColumnPtr column;

column = cd.type->createColumnConst(read_rows, Field(static_cast<UInt64>(pack_stats[start_pack_id].first_tag)));
res.insert(ColumnWithTypeAndName{column, cd.type, cd.name, cd.id});

skip_packs_by_column[i] = read_packs;
}
else if (do_clean_read_on_normal_mode && isExtraColumn(cd))
{
ColumnPtr column;
Expand Down Expand Up @@ -503,7 +527,6 @@ Block DMFileReader::read()
e.rethrow();
}
}

return res;
}

Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class DMFileReader
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// If you have no idea what it means, then simply set it to false.
bool enable_clean_read_,
bool enable_handle_clean_read_,
bool enable_del_clean_read_,
bool is_fast_mode_,
// The the MVCC filter version. Used by clean read check.
UInt64 max_read_version_,
Expand Down Expand Up @@ -142,9 +143,11 @@ class DMFileReader
const bool single_file_mode;

/// Clean read optimize
// In normal mode, if there is no delta for some packs in stable, we can try to do clean read.
// In fast mode, we always try to do clean read.
const bool enable_clean_read;
// In normal mode, if there is no delta for some packs in stable, we can try to do clean read (enable_handle_clean_read is true).
// In fast mode, if we don't need handle column, we will try to do clean read on handle_column(enable_handle_clean_read is true).
// if we don't need del column, we will try to do clean read on del_column(enable_del_clean_read is true).
const bool enable_handle_clean_read;
const bool enable_del_clean_read;
const bool is_fast_mode;
const UInt64 max_read_version;

Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ void DMFileWriter::write(const Block & block, const BlockProperty & block_proper
auto * property = properties.add_property();
property->set_num_rows(block_property.effective_num_rows);
property->set_gc_hint_version(block_property.gc_hint_version);
property->set_deleted_rows(block_property.deleted_rows);
}

void DMFileWriter::finalize()
Expand Down Expand Up @@ -194,7 +195,8 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu
{
// For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index.
// Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row.
iter->second->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark);
// For TAG Column, we also ignore del_mark when add minmax index.
iter->second->addPack(column, (col_id == EXTRA_HANDLE_COLUMN_ID || col_id == TAG_COLUMN_ID) ? nullptr : del_mark);
}

auto offset_in_compressed_block = single_file_stream->original_layer.offset();
Expand Down Expand Up @@ -259,7 +261,8 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu
{
// For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index.
// Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row.
stream->minmaxes->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark);
// For TAG Column, we also ignore del_mark when add minmax index.
stream->minmaxes->addPack(column, (col_id == EXTRA_HANDLE_COLUMN_ID || col_id == TAG_COLUMN_ID) ? nullptr : del_mark);
}

/// There could already be enough data to compress into the new block.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class DMFileWriter
struct BlockProperty
{
size_t not_clean_rows;
size_t deleted_rows;
size_t effective_num_rows;
size_t gc_hint_version;
};
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ message PackProperty {
required uint64 gc_hint_version = 1;
// effective rows(multiple versions of one row is count as one include delete)
required uint64 num_rows = 2;
// the number of rows in this pack which are deleted
optional uint64 deleted_rows = 3;
}

message PackProperties {
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ inline std::pair<size_t, size_t> minmax(const IColumn & column, const ColumnVect

for (size_t i = offset; i < offset + limit; ++i)
{
// del_mark_data == nullptr || (del_mark_data != nullptr && (*del_mark_data)[i] != 0)
if (!del_mark_data || !(*del_mark_data)[i])
{
if (batch_min_idx == NONE_EXIST || column.compareAt(i, batch_min_idx, column, -1) < 0)
Expand Down
Loading