diff --git a/dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h b/dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h index 24c405496ff..7c1feb6cfaf 100644 --- a/dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h +++ b/dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h @@ -22,7 +22,6 @@ #include - namespace DB { namespace DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index c9212c4b81e..56b11b975a1 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -61,6 +61,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & read_columns, is_common_handle, enable_clean_read, + is_fast_mode, max_data_version, std::move(pack_filter), mark_cache, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index a36bf50a937..a7f2fe9d556 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -69,14 +69,18 @@ class DMFileBlockInputStreamBuilder // **** filters **** // - // Only set this param to true when - // 1. There is no delta. - // 2. You don't need pk, version and delete_tag columns + // Only set enable param to true when + // in normal mode: + // 1. There is no delta. + // 2. You don't need pk, version and delete_tag columns + // in fast mode: + // 1. You don't need pk columns // If you have no idea what it means, then simply set it to false. // `max_data_version_` is the MVCC filter version for reading. Used by clean read check - DMFileBlockInputStreamBuilder & enableCleanRead(bool enable, UInt64 max_data_version_) + DMFileBlockInputStreamBuilder & enableCleanRead(bool enable, bool is_fast_mode_, UInt64 max_data_version_) { enable_clean_read = enable; + is_fast_mode = is_fast_mode_; max_data_version = max_data_version_; return *this; } @@ -139,6 +143,7 @@ class DMFileBlockInputStreamBuilder // clean read bool enable_clean_read = false; + bool is_fast_mode = false; UInt64 max_data_version = std::numeric_limits::max(); // Rough set filter RSOperatorPtr rs_filter; @@ -150,8 +155,8 @@ class DMFileBlockInputStreamBuilder bool enable_column_cache = false; ColumnCachePtr column_cache; ReadLimiterPtr read_limiter; - size_t aio_threshold; - size_t max_read_buffer_size; + size_t aio_threshold{}; + size_t max_read_buffer_size{}; size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD; bool read_one_pack_every_time = false; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 423d8d4d031..779de36da63 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -25,6 +25,8 @@ #include #include +#include + namespace CurrentMetrics { extern const Metric OpenFileForRead; @@ -210,6 +212,7 @@ DMFileReader::DMFileReader( bool is_common_handle_, // clean read bool enable_clean_read_, + bool is_fast_mode_, UInt64 max_read_version_, // filters DMFilePackFilter && pack_filter_, @@ -230,6 +233,7 @@ DMFileReader::DMFileReader( , read_one_pack_every_time(read_one_pack_every_time_) , single_file_mode(dmfile_->isSingleFileMode()) , enable_clean_read(enable_clean_read_) + , is_fast_mode(is_fast_mode_) , max_read_version(max_read_version_) , pack_filter(std::move(pack_filter_)) , skip_packs_by_column(read_columns.size(), 0) @@ -338,13 +342,16 @@ Block DMFileReader::read() } // TODO: this will need better algorithm: we should separate those packs which can and can not do clean read. - bool do_clean_read = enable_clean_read && expected_handle_res == All && not_clean_rows == 0; - if (do_clean_read) + bool do_clean_read_on_normal_mode = enable_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_mode); + + bool do_clean_read_on_handle = enable_clean_read && is_fast_mode && expected_handle_res == All; + + if (do_clean_read_on_normal_mode) { UInt64 max_version = 0; for (size_t pack_id = start_pack_id; pack_id < next_pack_id; ++pack_id) max_version = std::max(pack_filter.getMaxVersion(pack_id), max_version); - do_clean_read = max_version <= max_read_version; + do_clean_read_on_normal_mode = max_version <= max_read_version; } for (size_t i = 0; i < read_columns.size(); ++i) @@ -353,7 +360,24 @@ Block DMFileReader::read() { // For clean read of column pk, version, tag, instead of loading data from disk, just create placeholder column is OK. auto & cd = read_columns[i]; - if (do_clean_read && isExtraColumn(cd)) + if (cd.id == EXTRA_HANDLE_COLUMN_ID && do_clean_read_on_handle) + { + // Return the first row's handle + ColumnPtr column; + if (is_common_handle) + { + StringRef min_handle = pack_filter.getMinStringHandle(start_pack_id); + column = cd.type->createColumnConst(read_rows, Field(min_handle.data, min_handle.size)); + } + else + { + Handle min_handle = pack_filter.getMinHandle(start_pack_id); + column = cd.type->createColumnConst(read_rows, Field(min_handle)); + } + res.insert(ColumnWithTypeAndName{column, cd.type, cd.name, cd.id}); + skip_packs_by_column[i] = read_packs; + } + else if (do_clean_read_on_normal_mode && isExtraColumn(cd)) { ColumnPtr column; if (cd.id == EXTRA_HANDLE_COLUMN_ID) @@ -441,6 +465,7 @@ Block DMFileReader::read() auto column = data_type->createColumn(); readFromDisk(cd, column, start_pack_id, read_rows, skip_packs_by_column[i], single_file_mode); auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd); + res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id}); skip_packs_by_column[i] = 0; } @@ -456,6 +481,7 @@ Block DMFileReader::read() dmfile->path()); // New column after ddl is not exist in this DMFile, fill with default value ColumnPtr column = createColumnWithDefaultValue(cd, read_rows); + res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id}); skip_packs_by_column[i] = 0; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 9211918c2d0..c04c93871a7 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -76,6 +76,7 @@ class DMFileReader // 2. You don't need pk, version and delete_tag columns // If you have no idea what it means, then simply set it to false. bool enable_clean_read_, + bool is_fast_mode_, // The the MVCC filter version. Used by clean read check. UInt64 max_read_version_, // filters @@ -122,8 +123,10 @@ class DMFileReader const bool single_file_mode; /// Clean read optimize - // If there is no delta for some packs in stable, we can try to do clean read. + // In normal mode, if there is no delta for some packs in stable, we can try to do clean read. + // In fast mode, we always try to do clean read. const bool enable_clean_read; + const bool is_fast_mode; const UInt64 max_read_version; /// Filters diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 891a49365e3..91d65b7fee2 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -514,6 +514,8 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, bool filter_delete_mark, size_t expected_block_size) { + /// Now, we use filter_delete_mark to determine whether it is in fast mode or just from `selraw * xxxx` + /// But this way seems not to be robustness enough, maybe we need another flag? auto new_columns_to_read = std::make_shared(); @@ -523,12 +525,28 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, new_columns_to_read->push_back(getTagColumnDefine()); } + bool enable_clean_read = filter_delete_mark; + for (const auto & c : columns_to_read) { - if (c.id != EXTRA_HANDLE_COLUMN_ID && (!(filter_delete_mark && c.id == TAG_COLUMN_ID))) - new_columns_to_read->push_back(c); + if (c.id != EXTRA_HANDLE_COLUMN_ID) + { + if (!(filter_delete_mark && c.id == TAG_COLUMN_ID)) + new_columns_to_read->push_back(c); + } + else + { + enable_clean_read = false; + } } + /// when we read in fast mode, if columns_to_read does not include EXTRA_HANDLE_COLUMN_ID, + /// we can try to use clean read to make optimization in stable part. + /// when the pack is under totally data_ranges and has no rows whose del_mark = 1 --> we don't need read handle_column/tag_column/version_column + /// when the pack is under totally data_ranges and has rows whose del_mark = 1 --> we don't need read handle_column/version_column + /// others --> we don't need read version_column + /// Considering the del min max index has some problem now, we first only optimize with handle column. + BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( dm_context, *new_columns_to_read, @@ -536,7 +554,8 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, filter, std::numeric_limits::max(), expected_block_size, - false); + /* enable_clean_read */ enable_clean_read, + /* is_fast_mode */ filter_delete_mark); BlockInputStreamPtr delta_stream = std::make_shared(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index ed97bd8f421..4a6b653f3af 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -328,7 +328,8 @@ StableValueSpace::Snapshot::getInputStream( const RSOperatorPtr & filter, UInt64 max_data_version, size_t expected_block_size, - bool enable_clean_read) + bool enable_clean_read, + bool is_fast_mode) { LOG_FMT_DEBUG(log, "max_data_version: {}, enable_clean_read: {}", max_data_version, enable_clean_read); SkippableBlockInputStreams streams; @@ -337,7 +338,7 @@ StableValueSpace::Snapshot::getInputStream( { DMFileBlockInputStreamBuilder builder(context.db_context); builder - .enableCleanRead(enable_clean_read, max_data_version) + .enableCleanRead(enable_clean_read, is_fast_mode, max_data_version) .setRSOperator(filter) .setColumnCache(column_caches[i]) .setTracingID(context.tracing_id) diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index 24384674d80..542b8ea7a62 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -150,7 +150,8 @@ class StableValueSpace : public std::enable_shared_from_this const RSOperatorPtr & filter, UInt64 max_data_version, size_t expected_block_size, - bool enable_clean_read); + bool enable_clean_read, + bool is_fast_mode = false); RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_mode.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_mode.cpp index 0bc54cecf6f..360f3e96315 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_mode.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_for_fast_mode.cpp @@ -290,7 +290,7 @@ try /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; - + in->readPrefix(); switch (mode) { case TestMode::V1_BlockOnly: @@ -364,7 +364,7 @@ try } } - + in->readSuffix(); ASSERT_EQ(num_rows_read, 3 * num_write_rows); } } @@ -436,7 +436,7 @@ try /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; - + in->readPrefix(); switch (mode) { case TestMode::V1_BlockOnly: @@ -509,8 +509,7 @@ try break; } } - - + in->readSuffix(); ASSERT_EQ(num_rows_read, 3 * num_write_rows); } } @@ -585,6 +584,7 @@ try /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; + in->readPrefix(); switch (mode) { case TestMode::V1_BlockOnly: @@ -630,6 +630,7 @@ try { int block_index = 0; int begin_value = 0; + while (Block block = in->read()) { if (block_index == 1) @@ -657,8 +658,7 @@ try break; } } - - + in->readSuffix(); ASSERT_EQ(num_rows_read, 3 * num_write_rows); } } @@ -735,6 +735,7 @@ try /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; + in->readPrefix(); while (Block block = in->read()) { num_rows_read += block.rows(); @@ -750,7 +751,7 @@ try } } } - + in->readSuffix(); ASSERT_EQ(num_rows_read, 3 * num_write_rows); } @@ -829,6 +830,7 @@ try /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; + in->readPrefix(); switch (mode) { case TestMode::V1_BlockOnly: @@ -866,6 +868,7 @@ try { auto block_index = 0; auto begin_value = 0; + while (Block block = in->read()) { if (block_index == 1) @@ -896,6 +899,7 @@ try { auto block_index = 0; auto begin_value = num_write_rows; + while (Block block = in->read()) { if (block_index == 1) @@ -920,17 +924,18 @@ try num_rows_read += block.rows(); block_index += 1; } + break; } } - + in->readSuffix(); ASSERT_EQ(num_rows_read, 3 * num_write_rows); } } CATCH - +// Insert + Delete row TEST_P(DeltaMergeStoreRWTest, TestFastModeWithDeleteRow) try { @@ -1010,6 +1015,7 @@ try /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; + in->readPrefix(); // filter del mark = 1, thus just read the insert data before delete while (Block block = in->read()) { @@ -1026,7 +1032,7 @@ try } } } - + in->readSuffix(); ASSERT_EQ(num_rows_read, num_rows_write); } @@ -1050,6 +1056,7 @@ try /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; + in->readPrefix(); while (Block block = in->read()) { num_rows_read += block.rows(); @@ -1065,6 +1072,7 @@ try } } } + in->readSuffix(); ASSERT_EQ(num_rows_read, num_rows_write); } @@ -1109,6 +1117,7 @@ try /* is_raw_read= */ true, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; + in->readPrefix(); while (Block block = in->read()) { num_rows_read += block.rows(); @@ -1124,7 +1133,7 @@ try } } } - + in->readSuffix(); ASSERT_EQ(num_rows_read, num_rows_write); } // Delete range [0, 64) @@ -1149,6 +1158,7 @@ try size_t num_rows_read = 0; // filter del mark = 1, thus just read the insert data before delete + in->readPrefix(); while (Block block = in->read()) { num_rows_read += block.rows(); @@ -1164,6 +1174,7 @@ try } } } + in->readSuffix(); ASSERT_EQ(num_rows_read, num_rows_write); } @@ -1222,6 +1233,7 @@ try /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; + in->readPrefix(); while (Block block = in->read()) { num_rows_read += block.rows(); @@ -1237,6 +1249,7 @@ try } } } + in->readSuffix(); ASSERT_EQ(num_rows_read, num_rows_write - num_deleted_rows); } @@ -1322,6 +1335,7 @@ try /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; + in->readPrefix(); switch (mode) { case TestMode::V1_BlockOnly: @@ -1416,7 +1430,7 @@ try break; } } - + in->readSuffix(); ASSERT_EQ(num_rows_read, 3 * num_write_rows); } @@ -1434,6 +1448,8 @@ try /* is_raw_read= */ false, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; + + in->readPrefix(); while (Block block = in->read()) { num_rows_read += block.rows(); @@ -1449,12 +1465,127 @@ try } } } + in->readSuffix(); ASSERT_EQ(num_rows_read, 1.5 * num_write_rows); } } CATCH +TEST_P(DeltaMergeStoreRWTest, TestFastModeForCleanRead) +try +{ + const size_t num_rows_write = 128; + { + // Create a block with sequential Int64 handle in range [0, 128) + Block block = DMTestEnv::prepareSimpleWriteBlock(0, 128, false); + + switch (mode) + { + case TestMode::V1_BlockOnly: + case TestMode::V2_BlockOnly: + store->write(*db_context, db_context->getSettingsRef(), block); + break; + default: + { + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + auto [range, file_ids] = genDMFile(*dm_context, block); + store->ingestFiles(dm_context, range, file_ids, false); + break; + } + } + } + + store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + + store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + + store->mergeDeltaAll(*db_context); + + // could do clean read with no optimization + { + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*db_context, + db_context->getSettingsRef(), + columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + TRACING_NAME, + /* is_raw_read= */ true, + /* expected_block_size= */ 1024)[0]; + size_t num_rows_read = 0; + + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == DMTestEnv::pk_name) + { + ASSERT_EQ(c->getInt(i), i); + } + } + } + } + in->readSuffix(); + + ASSERT_EQ(num_rows_read, num_rows_write); + } + + // Delete range [0, 64) + const size_t num_deleted_rows = 64; + { + HandleRange range(0, num_deleted_rows); + store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(range)); + } + + store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + + store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + + store->mergeDeltaAll(*db_context); + + // could do clean read with handle optimization + { + const auto & columns = store->getTableColumns(); + ColumnDefines real_columns; + for (auto & col : columns) + { + if (col.name != EXTRA_HANDLE_COLUMN_NAME) + { + real_columns.emplace_back(col); + } + } + + BlockInputStreamPtr in = store->read(*db_context, + db_context->getSettingsRef(), + real_columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + TRACING_NAME, + /* is_raw_read= */ true, + /* expected_block_size= */ 1024)[0]; + size_t num_rows_read = 0; + + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + } + in->readSuffix(); + + ASSERT_EQ(num_rows_read, num_rows_write - num_deleted_rows); + } +} +CATCH } // namespace tests } // namespace DM } // namespace DB \ No newline at end of file