Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: do del column clean read optimization for fast mode #5470

Merged
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
91e72a5
do del column clean read optimization for fast mode
hongyunyan Jul 26, 2022
7be6539
fix typo
hongyunyan Jul 26, 2022
ad7085d
fix typo
hongyunyan Jul 26, 2022
9c2f4b1
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Jul 26, 2022
44e5522
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Jul 27, 2022
8e6c883
for performance test
hongyunyan Jul 28, 2022
b70a825
add bench test
hongyunyan Aug 12, 2022
8b4922d
remove the file
hongyunyan Aug 12, 2022
c9fdcef
remove code
hongyunyan Aug 12, 2022
b35ba5e
update bench
hongyunyan Aug 15, 2022
329cec5
update code
hongyunyan Aug 15, 2022
f39ee75
format code
hongyunyan Aug 15, 2022
0c2aa86
merge conflict
hongyunyan Aug 15, 2022
65c0d52
format code
hongyunyan Aug 15, 2022
6f34fa4
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Aug 15, 2022
1265813
format code
hongyunyan Aug 15, 2022
7c7fca2
update code
hongyunyan Aug 15, 2022
200812a
fix the compatibility problem
hongyunyan Aug 18, 2022
0f90f78
Merge branch 'master' of https://github.com/pingcap/tiflash into hong…
hongyunyan Aug 18, 2022
2b41657
Merge branch 'master' into hongyunyan_fast_mode_enable_del_index
hongyunyan Aug 18, 2022
566bc24
revert useless change
hongyunyan Aug 18, 2022
9cc7b09
Merge branch 'hongyunyan_fast_mode_enable_del_index' of https://githu…
hongyunyan Aug 18, 2022
11cb42f
fix comments
hongyunyan Aug 24, 2022
d374601
fix typo
hongyunyan Aug 24, 2022
2c1de09
remove bench code
hongyunyan Aug 24, 2022
4bc077d
remove useless code
hongyunyan Aug 24, 2022
b08aba1
remove useless code
hongyunyan Aug 24, 2022
ae0c218
Merge branch 'master' into hongyunyan_fast_mode_enable_del_index
ti-chi-bot Aug 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Server/DTTool/DTToolMigrate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
if (!args.dry_mode)
output_stream.write(
block,
{stat_iter->not_clean, properties_iter->num_rows(), properties_iter->gc_hint_version()});
{stat_iter->not_clean, properties_iter->deleted_rows(), properties_iter->num_rows(), properties_iter->gc_hint_version()});
stat_iter++;
properties_iter++;
}
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ class DMDeleteFilterBlockInputStream : public IBlockInputStream
if (block.rows() == 0)
continue;

/// if the pack is do clean read for del column, the del column returned is a const column, with size 1.
/// In this case, all the del_mark must be 0. Thus we don't need extra filter.
if (block.getByPosition(delete_col_pos).column->isColumnConst())
{
++total_blocks;
++complete_passed;
total_rows += block.rows();
passed_rows += block.rows();

return getNewBlockByHeader(header, block);
}

delete_col_data = getColumnVectorDataPtr<UInt8>(block, delete_col_pos);

size_t rows = block.rows();
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
bool do_delete_mark_filter_for_raw_,
const int extra_table_id_index,
const TableID physical_table_id,
const String & req_id)
const String & req_id,
bool use_del_optimization_)
: dm_context(dm_context_)
, task_pool(task_pool_)
, after_segment_read(after_segment_read_)
Expand All @@ -63,8 +64,10 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
, is_raw(is_raw_)
, do_delete_mark_filter_for_raw(do_delete_mark_filter_for_raw_)
, extra_table_id_index(extra_table_id_index)
, use_del_optimization(use_del_optimization_)
, physical_table_id(physical_table_id)
, log(Logger::get(NAME, req_id))

{
if (extra_table_id_index != InvalidColumnID)
{
Expand Down Expand Up @@ -110,7 +113,9 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
task->read_snapshot,
task->ranges,
filter,
do_delete_mark_filter_for_raw);
do_delete_mark_filter_for_raw,
DEFAULT_BLOCK_SIZE,
use_del_optimization);
}
else
{
Expand Down Expand Up @@ -176,6 +181,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
const bool do_delete_mark_filter_for_raw;
// position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function.
const int extra_table_id_index;
const bool use_del_optimization;

bool done = false;

Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,30 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
}
}

// Let's set is_delete.
is_deleted.resize(rows);
{
UInt8 * is_deleted_pos = is_deleted.data();
auto * delete_pos = const_cast<UInt8 *>(delete_col_data->data());
for (size_t i = 0; i < batch_rows; ++i)
{
(*is_deleted_pos) = (*delete_pos);
++is_deleted_pos;
++delete_pos;
}
}

{
UInt8 * is_deleted_pos = is_deleted.data();
UInt8 * filter_pos = filter.data();
for (size_t i = 0; i < batch_rows; ++i)
{
(*is_deleted_pos) &= (*filter_pos);
++is_deleted_pos;
++filter_pos;
}
}

// Let's calculate gc_hint_version
gc_hint_version = std::numeric_limits<UInt64>::max();
{
Expand Down Expand Up @@ -307,6 +331,7 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
{
filter[rows - 1] = cur_version >= version_limit || !deleted;
not_clean[rows - 1] = filter[rows - 1] && deleted;
is_deleted[rows - 1] = filter[rows - 1] && deleted;
effective[rows - 1] = filter[rows - 1];
if (filter[rows - 1])
gc_hint_version = std::min(
Expand All @@ -332,6 +357,7 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
filter[rows - 1] = cur_version >= version_limit
|| ((compare(cur_handle, next_handle) != 0 || next_version > version_limit) && !deleted);
not_clean[rows - 1] = filter[rows - 1] && (compare(cur_handle, next_handle) == 0 || deleted);
is_deleted[rows - 1] = filter[rows - 1] && deleted;
effective[rows - 1] = filter[rows - 1] && (compare(cur_handle, next_handle) != 0);
if (filter[rows - 1])
gc_hint_version
Expand All @@ -349,6 +375,7 @@ Block DMVersionFilterBlockInputStream<MODE>::read(FilterPtr & res_filter, bool r
if constexpr (MODE == DM_VERSION_FILTER_MODE_COMPACT)
{
not_clean_rows += countBytesInFilter(not_clean);
deleted_rows += countBytesInFilter(is_deleted);
effective_num_rows += countBytesInFilter(effective);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
LOG_FMT_DEBUG(log,
"Total rows: {}, pass: {:.2f}%"
", complete pass: {:.2f}%, complete not pass: {:.2f}%"
", not clean: {:.2f}%, effective: {:.2f}%"
", not clean: {:.2f}%, is deleted: {:.2f}%, effective: {:.2f}%"
", read tso: {}",
total_rows,
passed_rows * 100.0 / total_rows,
complete_passed * 100.0 / total_blocks,
complete_not_passed * 100.0 / total_blocks,
not_clean_rows * 100.0 / passed_rows,
deleted_rows * 100.0 / passed_rows,
effective_num_rows * 100.0 / passed_rows,
version_limit);
}
Expand All @@ -95,6 +96,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream

size_t getEffectiveNumRows() const { return effective_num_rows; }
size_t getNotCleanRows() const { return not_clean_rows; }
size_t getDeletedRows() const { return deleted_rows; }
UInt64 getGCHintVersion() const { return gc_hint_version; }

private:
Expand All @@ -114,6 +116,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
filter[i]
= cur_version >= version_limit || ((compare(cur_handle, next_handle) != 0 || next_version > version_limit) && !deleted);
not_clean[i] = filter[i] && (compare(cur_handle, next_handle) == 0 || deleted);
is_deleted[i] = filter[i] && deleted;
effective[i] = filter[i] && (compare(cur_handle, next_handle) != 0);
if (filter[i])
gc_hint_version = std::min(gc_hint_version, calculateRowGcHintVersion(cur_handle, cur_version, next_handle, true, deleted));
Expand Down Expand Up @@ -210,6 +213,8 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
IColumn::Filter effective{};
// not_clean = selected & (handle equals with next || deleted)
IColumn::Filter not_clean{};
// is_deleted = selected & deleted
IColumn::Filter is_deleted{};

// Calculate per block, when gc_safe_point exceed this version, there must be some data obsolete in this block
// First calculate the gc_hint_version of every pk according to the following rules,
Expand All @@ -236,6 +241,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream
size_t complete_not_passed = 0;
size_t not_clean_rows = 0;
size_t effective_num_rows = 0;
size_t deleted_rows = 0;

const LoggerPtr log;
};
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,12 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context)

for (auto & segment : all_segments)
{
segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground);
auto res = segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground);
// just for the bench test
while (res == nullptr)
{
res = segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground);
}
hongyunyan marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -1227,7 +1232,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
/* do_delete_mark_filter_for_raw_ */ false, // don't do filter based on del_mark = 1
extra_table_id_index,
physical_table_id,
req_info);
req_info,
false);
}
res.push_back(stream);
}
Expand All @@ -1250,7 +1256,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
bool is_fast_mode,
size_t expected_block_size,
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
size_t extra_table_id_index,
bool use_del_optimization)
{
// Use the id from MPP/Coprocessor level as tracing_id
auto dm_context = newDMContext(db_context, db_settings, tracing_id);
Expand Down Expand Up @@ -1317,7 +1324,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
/* do_delete_mark_filter_for_raw_ */ is_fast_mode,
extra_table_id_index,
physical_table_id,
req_info);
req_info,
use_del_optimization);
}
res.push_back(stream);
}
Expand Down Expand Up @@ -2297,6 +2305,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(
WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter());

auto new_stable = segment->prepareMergeDelta(dm_context, schema_snap, segment_snap, wbs);

wbs.writeLogAndData();
new_stable->enableDMFilesGC();

Expand Down
44 changes: 34 additions & 10 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,14 @@ class DeltaMergeStore : private boost::noncopyable

void setUpBackgroundTask(const DMContextPtr & dm_context);

const String & getDatabaseName() const { return db_name; }
const String & getTableName() const { return table_name; }
const String & getDatabaseName() const
{
return db_name;
}
const String & getTableName() const
{
return table_name;
}

void rename(String new_path, bool clean_rename, String new_database_name, String new_table_name);

Expand Down Expand Up @@ -368,15 +374,15 @@ class DeltaMergeStore : private boost::noncopyable
bool is_fast_mode = false, // set true when read in fast mode
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);
size_t extra_table_id_index = InvalidColumnID,
bool use_del_optimization = false);

/// Try flush all data in `range` to disk and return whether the task succeed.
bool flushCache(const Context & context, const RowKeyRange & range, bool try_until_succeed = true)
{
auto dm_context = newDMContext(context, context.getSettingsRef());
return flushCache(dm_context, range, try_until_succeed);
}

bool flushCache(const DMContextPtr & dm_context, const RowKeyRange & range, bool try_until_succeed = true);

/// Merge delta into the stable layer for all segments.
Expand Down Expand Up @@ -409,18 +415,36 @@ class DeltaMergeStore : private boost::noncopyable
std::shared_lock lock(read_write_mutex);
return store_columns;
}
const ColumnDefines & getTableColumns() const { return original_table_columns; }
const ColumnDefine & getHandle() const { return original_table_handle_define; }
const ColumnDefines & getTableColumns() const
{
return original_table_columns;
}
const ColumnDefine & getHandle() const
{
return original_table_handle_define;
}
BlockPtr getHeader() const;
const Settings & getSettings() const { return settings; }
DataTypePtr getPKDataType() const { return original_table_handle_define.type; }
const Settings & getSettings() const
{
return settings;
}
DataTypePtr getPKDataType() const
{
return original_table_handle_define.type;
}
SortDescription getPrimarySortDescription() const;

void check(const Context & db_context);
DeltaMergeStoreStat getStat();
SegmentStats getSegmentStats();
bool isCommonHandle() const { return is_common_handle; }
size_t getRowKeyColumnSize() const { return rowkey_column_size; }
bool isCommonHandle() const
{
return is_common_handle;
}
size_t getRowKeyColumnSize() const
{
return rowkey_column_size;
}

public:
/// Methods mainly used by region split.
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
// if `rowkey_ranges` is empty, we unconditionally read all packs
// `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode.
// It is safe to ignore them here.
if (unlikely(rowkey_ranges.empty() && enable_clean_read))
if (unlikely(rowkey_ranges.empty() && enable_handle_clean_read))
throw Exception("rowkey ranges shouldn't be empty with clean-read enabled", ErrorCodes::LOGICAL_ERROR);

bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;
Expand All @@ -62,7 +62,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
dmfile,
read_columns,
is_common_handle,
enable_clean_read,
enable_handle_clean_read,
enable_del_clean_read,
is_fast_mode,
max_data_version,
std::move(pack_filter),
Expand Down
12 changes: 8 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,20 @@ class DMFileBlockInputStreamBuilder

// **** filters **** //

// Only set enable param to true when
// Only set enable_handle_clean_read_ param to true when
// in normal mode:
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// in fast mode:
// 1. You don't need pk columns
// If you have no idea what it means, then simply set it to false.
// Only set is_fast_mode_ param to true when read in fast mode.
// Only set enable_del_clean_read_ param to true when you don't need del columns in fast mode.
// `max_data_version_` is the MVCC filter version for reading. Used by clean read check
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable, bool is_fast_mode_, UInt64 max_data_version_)
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable_handle_clean_read_, bool is_fast_mode_, bool enable_del_clean_read_, UInt64 max_data_version_)
{
enable_clean_read = enable;
enable_handle_clean_read = enable_handle_clean_read_;
enable_del_clean_read = enable_del_clean_read_;
is_fast_mode = is_fast_mode_;
max_data_version = max_data_version_;
return *this;
Expand Down Expand Up @@ -156,8 +159,9 @@ class DMFileBlockInputStreamBuilder
FileProviderPtr file_provider;

// clean read
bool enable_clean_read = false;
bool enable_handle_clean_read = false;
bool is_fast_mode = false;
bool enable_del_clean_read = false;
UInt64 max_data_version = std::numeric_limits<UInt64>::max();
// Rough set filter
RSOperatorPtr rs_filter;
Expand Down
Loading