Skip to content

Commit

Permalink
feat: support clean read base on fast mode (#5371)
Browse files Browse the repository at this point in the history
ref #5252
  • Loading branch information
hongyunyan authored Jul 15, 2022
1 parent 45c30df commit f9c001c
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 31 deletions.
1 change: 0 additions & 1 deletion dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include <unordered_set>


namespace DB
{
namespace DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
read_columns,
is_common_handle,
enable_clean_read,
is_fast_mode,
max_data_version,
std::move(pack_filter),
mark_cache,
Expand Down
17 changes: 11 additions & 6 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,18 @@ class DMFileBlockInputStreamBuilder

// **** filters **** //

// Only set this param to true when
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// Only set enable param to true when
// in normal mode:
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// in fast mode:
// 1. You don't need pk columns
// If you have no idea what it means, then simply set it to false.
// `max_data_version_` is the MVCC filter version for reading. Used by clean read check
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable, UInt64 max_data_version_)
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable, bool is_fast_mode_, UInt64 max_data_version_)
{
enable_clean_read = enable;
is_fast_mode = is_fast_mode_;
max_data_version = max_data_version_;
return *this;
}
Expand Down Expand Up @@ -139,6 +143,7 @@ class DMFileBlockInputStreamBuilder

// clean read
bool enable_clean_read = false;
bool is_fast_mode = false;
UInt64 max_data_version = std::numeric_limits<UInt64>::max();
// Rough set filter
RSOperatorPtr rs_filter;
Expand All @@ -150,8 +155,8 @@ class DMFileBlockInputStreamBuilder
bool enable_column_cache = false;
ColumnCachePtr column_cache;
ReadLimiterPtr read_limiter;
size_t aio_threshold;
size_t max_read_buffer_size;
size_t aio_threshold{};
size_t max_read_buffer_size{};
size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD;
bool read_one_pack_every_time = false;

Expand Down
34 changes: 30 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <Storages/Page/PageUtil.h>
#include <fmt/format.h>

#include <string>

namespace CurrentMetrics
{
extern const Metric OpenFileForRead;
Expand Down Expand Up @@ -210,6 +212,7 @@ DMFileReader::DMFileReader(
bool is_common_handle_,
// clean read
bool enable_clean_read_,
bool is_fast_mode_,
UInt64 max_read_version_,
// filters
DMFilePackFilter && pack_filter_,
Expand All @@ -230,6 +233,7 @@ DMFileReader::DMFileReader(
, read_one_pack_every_time(read_one_pack_every_time_)
, single_file_mode(dmfile_->isSingleFileMode())
, enable_clean_read(enable_clean_read_)
, is_fast_mode(is_fast_mode_)
, max_read_version(max_read_version_)
, pack_filter(std::move(pack_filter_))
, skip_packs_by_column(read_columns.size(), 0)
Expand Down Expand Up @@ -338,13 +342,16 @@ Block DMFileReader::read()
}

// TODO: this will need better algorithm: we should separate those packs which can and can not do clean read.
bool do_clean_read = enable_clean_read && expected_handle_res == All && not_clean_rows == 0;
if (do_clean_read)
bool do_clean_read_on_normal_mode = enable_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_mode);

bool do_clean_read_on_handle = enable_clean_read && is_fast_mode && expected_handle_res == All;

if (do_clean_read_on_normal_mode)
{
UInt64 max_version = 0;
for (size_t pack_id = start_pack_id; pack_id < next_pack_id; ++pack_id)
max_version = std::max(pack_filter.getMaxVersion(pack_id), max_version);
do_clean_read = max_version <= max_read_version;
do_clean_read_on_normal_mode = max_version <= max_read_version;
}

for (size_t i = 0; i < read_columns.size(); ++i)
Expand All @@ -353,7 +360,24 @@ Block DMFileReader::read()
{
// For clean read of column pk, version, tag, instead of loading data from disk, just create placeholder column is OK.
auto & cd = read_columns[i];
if (do_clean_read && isExtraColumn(cd))
if (cd.id == EXTRA_HANDLE_COLUMN_ID && do_clean_read_on_handle)
{
// Return the first row's handle
ColumnPtr column;
if (is_common_handle)
{
StringRef min_handle = pack_filter.getMinStringHandle(start_pack_id);
column = cd.type->createColumnConst(read_rows, Field(min_handle.data, min_handle.size));
}
else
{
Handle min_handle = pack_filter.getMinHandle(start_pack_id);
column = cd.type->createColumnConst(read_rows, Field(min_handle));
}
res.insert(ColumnWithTypeAndName{column, cd.type, cd.name, cd.id});
skip_packs_by_column[i] = read_packs;
}
else if (do_clean_read_on_normal_mode && isExtraColumn(cd))
{
ColumnPtr column;
if (cd.id == EXTRA_HANDLE_COLUMN_ID)
Expand Down Expand Up @@ -441,6 +465,7 @@ Block DMFileReader::read()
auto column = data_type->createColumn();
readFromDisk(cd, column, start_pack_id, read_rows, skip_packs_by_column[i], single_file_mode);
auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd);

res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id});
skip_packs_by_column[i] = 0;
}
Expand All @@ -456,6 +481,7 @@ Block DMFileReader::read()
dmfile->path());
// New column after ddl is not exist in this DMFile, fill with default value
ColumnPtr column = createColumnWithDefaultValue(cd, read_rows);

res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id});
skip_packs_by_column[i] = 0;
}
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class DMFileReader
// 2. You don't need pk, version and delete_tag columns
// If you have no idea what it means, then simply set it to false.
bool enable_clean_read_,
bool is_fast_mode_,
// The the MVCC filter version. Used by clean read check.
UInt64 max_read_version_,
// filters
Expand Down Expand Up @@ -122,8 +123,10 @@ class DMFileReader
const bool single_file_mode;

/// Clean read optimize
// If there is no delta for some packs in stable, we can try to do clean read.
// In normal mode, if there is no delta for some packs in stable, we can try to do clean read.
// In fast mode, we always try to do clean read.
const bool enable_clean_read;
const bool is_fast_mode;
const UInt64 max_read_version;

/// Filters
Expand Down
25 changes: 22 additions & 3 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
bool filter_delete_mark,
size_t expected_block_size)
{
/// Now, we use filter_delete_mark to determine whether it is in fast mode or just from `selraw * xxxx`
/// But this way seems not to be robustness enough, maybe we need another flag?
auto new_columns_to_read = std::make_shared<ColumnDefines>();


Expand All @@ -523,20 +525,37 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
new_columns_to_read->push_back(getTagColumnDefine());
}

bool enable_clean_read = filter_delete_mark;

for (const auto & c : columns_to_read)
{
if (c.id != EXTRA_HANDLE_COLUMN_ID && (!(filter_delete_mark && c.id == TAG_COLUMN_ID)))
new_columns_to_read->push_back(c);
if (c.id != EXTRA_HANDLE_COLUMN_ID)
{
if (!(filter_delete_mark && c.id == TAG_COLUMN_ID))
new_columns_to_read->push_back(c);
}
else
{
enable_clean_read = false;
}
}

/// when we read in fast mode, if columns_to_read does not include EXTRA_HANDLE_COLUMN_ID,
/// we can try to use clean read to make optimization in stable part.
/// when the pack is under totally data_ranges and has no rows whose del_mark = 1 --> we don't need read handle_column/tag_column/version_column
/// when the pack is under totally data_ranges and has rows whose del_mark = 1 --> we don't need read handle_column/version_column
/// others --> we don't need read version_column
/// Considering the del min max index has some problem now, we first only optimize with handle column.

BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
data_ranges,
filter,
std::numeric_limits<UInt64>::max(),
expected_block_size,
false);
/* enable_clean_read */ enable_clean_read,
/* is_fast_mode */ filter_delete_mark);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ StableValueSpace::Snapshot::getInputStream(
const RSOperatorPtr & filter,
UInt64 max_data_version,
size_t expected_block_size,
bool enable_clean_read)
bool enable_clean_read,
bool is_fast_mode)
{
LOG_FMT_DEBUG(log, "max_data_version: {}, enable_clean_read: {}", max_data_version, enable_clean_read);
SkippableBlockInputStreams streams;
Expand All @@ -337,7 +338,7 @@ StableValueSpace::Snapshot::getInputStream(
{
DMFileBlockInputStreamBuilder builder(context.db_context);
builder
.enableCleanRead(enable_clean_read, max_data_version)
.enableCleanRead(enable_clean_read, is_fast_mode, max_data_version)
.setRSOperator(filter)
.setColumnCache(column_caches[i])
.setTracingID(context.tracing_id)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
const RSOperatorPtr & filter,
UInt64 max_data_version,
size_t expected_block_size,
bool enable_clean_read);
bool enable_clean_read,
bool is_fast_mode = false);

RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const;

Expand Down
Loading

0 comments on commit f9c001c

Please sign in to comment.