diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 65d98815196..09834073b34 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -261,6 +261,7 @@ struct Settings M(SettingUInt64, dt_bg_gc_check_interval, 600, "Background gc thread check interval, the unit is second.")\ M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 15, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.")\ M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all segments")\ + M(SettingFloat, dt_bg_gc_delta_delete_ratio_to_trigger_gc, 0.3, "Trigger segment's gc when the ratio of delta delete range to stable exceeds this ratio.")\ M(SettingUInt64, dt_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaTree Engine. By default 0 means no limit.")\ M(SettingBool, dt_enable_rough_set_filter, true, "Whether to parse where expression as Rough Set Index filter or not.") \ M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.")\ @@ -362,4 +363,4 @@ struct Settings }; -} // namespace DB +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index c27a0deb15b..581315ffb8a 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -319,6 +319,8 @@ class DeltaValueSnapshot : public std::enable_shared_from_thistryToDeleteRange(); dp_delete) + squashed_delete_range = squashed_delete_range.merge(dp_delete->getDeleteRange()); + } + return squashed_delete_range; +} + // ================================================ // DeltaValueReader // ================================================ - DeltaValueReader::DeltaValueReader(const DMContext & context, const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index da6515fa9d3..7b1c7eff457 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -94,14 +94,14 @@ bool DeltaMergeStore::MergeDeltaTaskPool::addTask(const BackgroundTask & task, c std::scoped_lock lock(mutex); switch (task.type) { - case Split: - case Merge: - case MergeDelta: + case TaskType::Split: + case TaskType::Merge: + case TaskType::MergeDelta: heavy_tasks.push(task); return true; - case Compact: - case Flush: - case PlaceIndex: + case TaskType::Compact: + case TaskType::Flush: + case TaskType::PlaceIndex: light_tasks.push(task); return false; default: @@ -701,7 +701,7 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, updated_segments.push_back(segment); fiu_do_on(FailPoints::segment_merge_after_ingest_packs, { segment->flushCache(*dm_context); - segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_BG_Thread_Pool); + segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundThreadPool); storage_pool.gc(global_context.getSettingsRef(), StoragePool::Seconds(0)); }); break; @@ -874,7 +874,7 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context) for (auto & segment : all_segments) { - segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_FG); + segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground); } } @@ -1074,7 +1074,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const { if (segment->hasAbandoned()) return; - auto & delta = segment->getDelta(); size_t delta_saved_rows = delta->getRows(/* use_unsaved */ false); @@ -1118,8 +1117,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const bool should_background_merge_delta = ((delta_check_rows >= delta_limit_rows || delta_check_bytes >= delta_limit_bytes) // && (delta_rows - delta_last_try_merge_delta_rows >= delta_cache_limit_rows - || delta_bytes - delta_last_try_merge_delta_bytes >= delta_cache_limit_bytes)) - || delta_deletes >= 2; + || delta_bytes - delta_last_try_merge_delta_bytes >= delta_cache_limit_bytes)); bool should_foreground_merge_delta_by_rows_or_bytes = delta_check_rows >= forceMergeDeltaRows(dm_context) || delta_check_bytes >= forceMergeDeltaBytes(dm_context); bool should_foreground_merge_delta_by_deletes = delta_deletes >= forceMergeDeltaDeletes(dm_context); @@ -1176,6 +1174,8 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const } } + // Need to check the latest delta (maybe updated after foreground flush). If it is updating by another thread, + // give up adding more tasks on this version of delta. if (segment->getDelta()->isUpdating()) return; @@ -1202,14 +1202,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const if (it == segments.end()) return {}; next_segment = it->second; - - auto limit = dm_context->segment_limit_rows / 5; + auto limit = dm_context->segment_limit_rows / 5; if (next_segment->getEstimatedRows() >= limit) return {}; } return next_segment; }; - SegmentPtr merge_sibling; auto try_fg_merge_delta = [&]() -> SegmentPtr { if (should_foreground_merge_delta_by_rows_or_bytes || should_foreground_merge_delta_by_deletes) @@ -1228,7 +1226,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const .Observe(watch.elapsedSeconds()); }); - return segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_FG); + return segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground); } return {}; }; @@ -1264,6 +1262,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const return false; }; auto try_bg_merge = [&]() { + SegmentPtr merge_sibling; if (should_merge && (merge_sibling = getMergeSibling())) { try_add_background_task(BackgroundTask{TaskType::Merge, dm_context, segment, merge_sibling}); @@ -1299,8 +1298,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const if (try_fg_split(segment)) return; - SegmentPtr new_segment; - if ((new_segment = try_fg_merge_delta())) + if (SegmentPtr new_segment = try_fg_merge_delta(); new_segment) { // After merge delta, we better check split immediately. if (try_bg_split(new_segment)) @@ -1359,35 +1357,35 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) { switch (task.type) { - case Split: + case TaskType::Split: std::tie(left, right) = segmentSplit(*task.dm_context, task.segment, false); type = ThreadType::BG_Split; break; - case Merge: + case TaskType::Merge: segmentMerge(*task.dm_context, task.segment, task.next_segment, false); type = ThreadType::BG_Merge; break; - case MergeDelta: { + case TaskType::MergeDelta: { FAIL_POINT_PAUSE(FailPoints::pause_before_dt_background_delta_merge); - left = segmentMergeDelta(*task.dm_context, task.segment, TaskRunThread::Thread_BG_Thread_Pool); + left = segmentMergeDelta(*task.dm_context, task.segment, TaskRunThread::BackgroundThreadPool); type = ThreadType::BG_MergeDelta; // Wake up all waiting threads if failpoint is enabled FailPointHelper::disableFailPoint(FailPoints::pause_until_dt_background_delta_merge); break; } - case Compact: + case TaskType::Compact: task.segment->compactDelta(*task.dm_context); left = task.segment; type = ThreadType::BG_Compact; break; - case Flush: + case TaskType::Flush: task.segment->flushCache(*task.dm_context); // After flush cache, better place delta index. task.segment->placeDeltaIndex(*task.dm_context); left = task.segment; type = ThreadType::BG_Flush; break; - case PlaceIndex: + case TaskType::PlaceIndex: task.segment->placeDeltaIndex(*task.dm_context); break; default: @@ -1397,7 +1395,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) catch (const Exception & e) { LOG_ERROR(log, - "Task " << toString(task.type) << " on Segment [" << task.segment->segmentId() + "Task " << DeltaMergeStore::toString(task.type) << " on Segment [" << task.segment->segmentId() << ((bool)task.next_segment ? ("] and [" + DB::toString(task.next_segment->segmentId())) : "") << "] failed. Error msg: " << e.message()); e.rethrow(); @@ -1408,6 +1406,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) throw; } + // continue to check whether we need to apply more tasks after this task is ended. if (left) checkSegmentUpdate(task.dm_context, left, type); if (right) @@ -1420,14 +1419,14 @@ namespace GC { // Returns true if it needs gc. // This is for optimization purpose, does not mean to be accurate. -bool shouldCompact(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, Logger * log) +bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, Logger * log) { // Always GC. if (ratio_threshold < 1.0) return true; auto & property = seg->getStable()->getStableProperty(); - LOG_DEBUG(log, property.toDebugString()); + LOG_TRACE(log, __PRETTY_FUNCTION__ << property.toDebugString()); // No data older than safe_point to GC. if (property.gc_hint_version > gc_safepoint) return false; @@ -1439,6 +1438,33 @@ bool shouldCompact(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ra return true; return false; } + +bool shouldCompactDeltaWithStable( + const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, Logger * log) +{ + auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range); + if (actual_delete_range.none()) + return false; + + auto [delete_rows, delete_bytes] = snap->stable->getApproxRowsAndBytes(context, actual_delete_range); + + auto stable_rows = snap->stable->getRows(); + auto stable_bytes = snap->stable->getBytes(); + + LOG_TRACE(log, + __PRETTY_FUNCTION__ << " delete range rows [" << delete_rows << "], delete_bytes [" << delete_bytes << "] stable_rows [" + << stable_rows << "] stable_bytes [" << stable_bytes << "]"); + + // 1. for small tables, the data may just reside in delta and stable_rows may be 0, + // so the `=` in `>=` is needed to cover the scenario when set tiflash replica of small tables to 0. + // (i.e. `actual_delete_range` is not none, but `delete_rows` and `stable_rows` are both 0). + // 2. the disadvantage of `=` in `>=` is that it may trigger an extra gc when write apply snapshot file to an empty segment, + // because before write apply snapshot file, it will write a delete range first, and will meet the following gc criteria. + // But the cost should be really minor because merge delta on an empty segment should be very fast. + // What's more, we can ignore this kind of delete range in future to avoid this extra gc. + bool should_compact = (delete_rows >= stable_rows * ratio_threshold) || (delete_bytes >= stable_bytes * ratio_threshold); + return should_compact; +} } // namespace GC UInt64 DeltaMergeStore::onSyncGc(Int64 limit) @@ -1455,7 +1481,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) if (segments.size() == 1) { const auto & seg = segments.begin()->second; - if (seg->getStable()->getRows() == 0) + if (seg->getEstimatedRows() == 0) return 0; } } @@ -1463,16 +1489,19 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire); LOG_DEBUG(log, "GC on table " << table_name << " start with key: " << next_gc_check_key.toDebugString() - << ", gc_safe_point: " << gc_safe_point); + << ", gc_safe_point: " << gc_safe_point << ", max gc limit: " << limit); UInt64 check_segments_num = 0; Int64 gc_segments_num = 0; while (gc_segments_num < limit) { - SegmentPtr segment; // If the store is shut down, give up running GC on it. if (shutdown_called.load(std::memory_order_relaxed)) break; + + auto dm_context = newDMContext(global_context, global_context.getSettingsRef()); + SegmentPtr segment; + SegmentSnapshotPtr segment_snap; { std::shared_lock lock(read_write_mutex); @@ -1487,21 +1516,20 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) segment = segment_it->second; next_gc_check_key = segment_it->first.toRowKeyValue(); + segment_snap = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); } - if (segment->hasAbandoned()) - continue; - - if (segment->getLastCheckGCSafePoint() >= gc_safe_point) + assert(segment != nullptr); + if (segment->hasAbandoned() || segment->getLastCheckGCSafePoint() >= gc_safe_point || segment_snap == nullptr) continue; const auto segment_id = segment->segmentId(); RowKeyRange segment_range = segment->getRowKeyRange(); - if (segment->getDelta()->isUpdating()) + + // meet empty segment, try merge it + if (segment_snap->getRows() == 0) { - LOG_DEBUG(log, - "GC is skipped Segment [" << segment_id << "] [range=" << segment_range.toDebugString() << "] [table=" << table_name - << "]"); + checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC); continue; } @@ -1511,9 +1539,8 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) // On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process, // it's still worth to wait another gc_safe_point to check this segment again. segment->setLastCheckGCSafePoint(gc_safe_point); - - auto dm_context = newDMContext(global_context, global_context.getSettingsRef()); dm_context->min_version = gc_safe_point; + // calculate StableProperty if needed if (!segment->getStable()->isStablePropertyCached()) segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle()); @@ -1522,22 +1549,32 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) { // Check whether we should apply gc on this segment const bool should_compact - = GC::shouldCompact(segment, gc_safe_point, global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, log); + = GC::shouldCompactStable( + segment, gc_safe_point, global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, log) + || GC::shouldCompactDeltaWithStable(*dm_context, + segment_snap, + segment_range, + global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc, + log); bool finish_gc_on_segment = false; if (should_compact) { - ThreadType type = ThreadType::BG_GC; - segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_BG_GC); - if (segment) + if (segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundGCThread, segment_snap); segment) { // Continue to check whether we need to apply more tasks on this segment - checkSegmentUpdate(dm_context, segment, type); + checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC); gc_segments_num++; finish_gc_on_segment = true; LOG_INFO(log, "GC-merge-delta done Segment [" << segment_id << "] [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); } + else + { + LOG_INFO(log, + "GC aborted on Segment [" << segment_id << "] [range=" << segment_range.toDebugString() + << "] [table=" << table_name << "]"); + } } if (!finish_gc_on_segment) LOG_DEBUG(log, @@ -1568,7 +1605,7 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP { std::shared_lock lock(read_write_mutex); - if (!isSegmentValid(segment)) + if (!isSegmentValid(lock, segment)) { LOG_DEBUG(log, "Give up segment [" << segment->segmentId() << "] split"); return {}; @@ -1620,7 +1657,7 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP { std::unique_lock lock(read_write_mutex); - if (!isSegmentValid(segment)) + if (!isSegmentValid(lock, segment)) { LOG_DEBUG(log, "Give up segment [" << segment->segmentId() << "] split"); wbs.setRollback(); @@ -1691,12 +1728,12 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le { std::shared_lock lock(read_write_mutex); - if (!isSegmentValid(left)) + if (!isSegmentValid(lock, left)) { LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); return; } - if (!isSegmentValid(right)) + if (!isSegmentValid(lock, right)) { LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); return; @@ -1735,7 +1772,7 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le { std::unique_lock lock(read_write_mutex); - if (!isSegmentValid(left) || !isSegmentValid(right)) + if (!isSegmentValid(lock, left) || !isSegmentValid(lock, right)) { LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); wbs.setRollback(); @@ -1778,23 +1815,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le check(dm_context.db_context); } -SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, const TaskRunThread run_thread) +SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, + const SegmentPtr & segment, + const TaskRunThread run_thread, + SegmentSnapshotPtr segment_snap) { LOG_DEBUG(log, toString(run_thread) << " merge delta, segment [" << segment->segmentId() << "], safe point:" << dm_context.min_version); - SegmentSnapshotPtr segment_snap; - ColumnDefinesPtr schema_snap; + ColumnDefinesPtr schema_snap; + { std::shared_lock lock(read_write_mutex); - if (!isSegmentValid(segment)) + if (!isSegmentValid(lock, segment)) { LOG_DEBUG(log, "Give up merge delta, segment [" << segment->segmentId() << "]"); return {}; } - segment_snap = segment->createSnapshot(dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + // Try to generate a new snapshot if there is no pre-allocated one if (!segment_snap) + segment_snap = segment->createSnapshot(dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + + if (unlikely(!segment_snap)) { LOG_DEBUG(log, "Give up merge delta, segment [" << segment->segmentId() << "]"); return {}; @@ -1812,13 +1855,13 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm switch (run_thread) { - case TaskRunThread::Thread_BG_Thread_Pool: + case TaskRunThread::BackgroundThreadPool: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge).Increment(); break; - case TaskRunThread::Thread_FG: + case TaskRunThread::Foreground: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge_fg).Increment(); break; - case TaskRunThread::Thread_BG_GC: + case TaskRunThread::BackgroundGCThread: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge_bg_gc).Increment(); break; default: @@ -1829,15 +1872,15 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm SCOPE_EXIT({ switch (run_thread) { - case TaskRunThread::Thread_BG_Thread_Pool: + case TaskRunThread::BackgroundThreadPool: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge) .Observe(watch_delta_merge.elapsedSeconds()); break; - case TaskRunThread::Thread_FG: + case TaskRunThread::Foreground: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge_fg) .Observe(watch_delta_merge.elapsedSeconds()); break; - case TaskRunThread::Thread_BG_GC: + case TaskRunThread::BackgroundGCThread: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge_bg_gc) .Observe(watch_delta_merge.elapsedSeconds()); break; @@ -1856,7 +1899,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm { std::unique_lock read_write_lock(read_write_mutex); - if (!isSegmentValid(segment)) + if (!isSegmentValid(read_write_lock, segment)) { LOG_DEBUG(log, "Give up merge delta, segment [" << segment->segmentId() << "]"); wbs.setRollback(); @@ -1901,7 +1944,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm return new_segment; } -bool DeltaMergeStore::isSegmentValid(const SegmentPtr & segment) +bool DeltaMergeStore::doIsSegmentValid(const SegmentPtr & segment) { if (segment->hasAbandoned()) { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 2901cb390af..5186459b680 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -172,9 +172,9 @@ class DeltaMergeStore : private boost::noncopyable enum TaskRunThread { - Thread_BG_Thread_Pool, - Thread_FG, - Thread_BG_GC, + BackgroundThreadPool, + Foreground, + BackgroundGCThread, }; static std::string toString(ThreadType type) @@ -204,21 +204,6 @@ class DeltaMergeStore : private boost::noncopyable } } - static std::string toString(TaskRunThread type) - { - switch (type) - { - case Thread_BG_Thread_Pool: - return "BackgroundThreadPool"; - case Thread_FG: - return "Foreground"; - case Thread_BG_GC: - return "BackgroundGCThread"; - default: - return "Unknown"; - } - } - static std::string toString(TaskType type) { switch (type) @@ -240,6 +225,21 @@ class DeltaMergeStore : private boost::noncopyable } } + static std::string toString(TaskRunThread type) + { + switch (type) + { + case BackgroundThreadPool: + return "BackgroundThreadPool"; + case Foreground: + return "Foreground"; + case BackgroundGCThread: + return "BackgroundGCThread"; + default: + return "Unknown"; + } + } + struct BackgroundTask { TaskType type; @@ -403,7 +403,7 @@ class DeltaMergeStore : private boost::noncopyable private: #endif - DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id=""); + DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id = ""); static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; } @@ -414,13 +414,19 @@ class DeltaMergeStore : private boost::noncopyable SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground); void segmentMerge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right, bool is_foreground); - SegmentPtr segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, const TaskRunThread thread); + SegmentPtr segmentMergeDelta(DMContext & dm_context, + const SegmentPtr & segment, + const TaskRunThread thread, + SegmentSnapshotPtr segment_snap = nullptr); bool updateGCSafePoint(); bool handleBackgroundTask(bool heavy); - bool isSegmentValid(const SegmentPtr & segment); + // isSegmentValid should be protected by lock on `read_write_mutex` + inline bool isSegmentValid(std::shared_lock &, const SegmentPtr & segment) { return doIsSegmentValid(segment); } + inline bool isSegmentValid(std::unique_lock &, const SegmentPtr & segment) { return doIsSegmentValid(segment); } + bool doIsSegmentValid(const SegmentPtr & segment); void restoreStableFiles(); diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index c2b24042c0b..a405030be25 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -138,13 +138,13 @@ class Segment : private boost::noncopyable size_t expected_block_size = DEFAULT_BLOCK_SIZE); /// Return a stream which is suitable for exporting data. - /// reorgize_block: put those rows with the same pk rows into the same block or not. + /// reorganize_block: put those rows with the same pk rows into the same block or not. BlockInputStreamPtr getInputStreamForDataExport(const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRange & data_range, size_t expected_block_size = DEFAULT_BLOCK_SIZE, - bool reorgnize_block = true) const; + bool reorganize_block = true) const; BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read, diff --git a/dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp b/dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp index 38b03d3de5e..824eb8e9340 100644 --- a/dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp +++ b/dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp @@ -16,6 +16,7 @@ namespace DB namespace ErrorCodes { extern const int BAD_ARGUMENTS; +extern const int NOT_IMPLEMENTED; } namespace DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 1c249720754..a2d7c821ff1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -138,7 +138,6 @@ try { // flush segment segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -357,9 +356,9 @@ try if (merge_delta_after_delete) { - // flush segment for apply delete range + // flush cache before applying merge delete range or the delete range will not be compacted to stable + segment->flushCache(dmContext()); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -381,6 +380,22 @@ try } in->readSuffix(); } + + // For the case that apply merge delta after delete range, we ensure that data on disk are compacted + if (merge_delta_after_delete) + { + // read raw after delete range + auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + in->readPrefix(); + size_t num_rows = 0; + while (Block block = in->read()) + { + num_rows += block.rows(); + } + in->readSuffix(); + // Only 2 rows are left on disk, others are compacted. + ASSERT_EQ(num_rows, 2UL); + } } CATCH @@ -410,9 +425,8 @@ try } { - // flush segment + // merge delta to create stable segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -424,14 +438,13 @@ try // flush segment segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } if (merge_delta_after_delete) { - // flush segment for apply delete range + // flush cache before applying merge delete range or the delete range will not be compacted to stable + segment->flushCache(dmContext()); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -453,6 +466,22 @@ try } in->readSuffix(); } + + // For the case that apply merge delta after delete range, we ensure that data on disk are compacted + if (merge_delta_after_delete) + { + // read raw after delete range + auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + in->readPrefix(); + size_t num_rows = 0; + while (Block block = in->read()) + { + num_rows += block.rows(); + } + in->readSuffix(); + // Only 2 rows are left on disk, others are compacted. + ASSERT_EQ(num_rows, 2UL); + } } CATCH @@ -466,7 +495,6 @@ try segment->write(dmContext(), std::move(block)); // flush [0, 50) to segment's stable segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } auto [read_before_delete, merge_delta_after_delete] = GetParam(); @@ -501,9 +529,9 @@ try if (merge_delta_after_delete) { - // flush segment for apply delete range + // flush cache before applying merge delete range or the delete range will not be compacted to stable + segment->flushCache(dmContext()); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -525,6 +553,22 @@ try } in->readSuffix(); } + + // For the case that apply merge delta after delete range, we ensure that data on disk are compacted + if (merge_delta_after_delete) + { + // read raw after delete range + auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + size_t num_rows = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows += block.rows(); + } + in->readSuffix(); + // Only 2 rows are left on disk, others are compacted. + ASSERT_EQ(num_rows, 2UL); + } } CATCH @@ -540,18 +584,25 @@ try } { - // flush segment + // do delta-merge move data to stable segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } + auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const HandleRange & expect_range) { + // set `is_update=false` to get full squash delete range + auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead); + auto squash_range = snap->delta->getSquashDeleteRange(); + ASSERT_ROWKEY_RANGE_EQ(squash_range, RowKeyRange::fromHandleRange(expect_range)); + }; + { // Test delete range [70, 100) HandleRange del{70, 100}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); // Add trace msg when ASSERT failed + check_segment_squash_delete_range(segment, HandleRange{70, 100}); + segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -581,9 +632,10 @@ try // Test delete range [63, 70) HandleRange del{63, 70}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); + check_segment_squash_delete_range(segment, HandleRange{63, 100}); + segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -612,9 +664,9 @@ try // Test delete range [1, 32) HandleRange del{1, 32}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); + check_segment_squash_delete_range(segment, HandleRange{1, 100}); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -642,9 +694,9 @@ try // delete should be idempotent HandleRange del{1, 32}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); + check_segment_squash_delete_range(segment, HandleRange{1, 100}); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -672,9 +724,9 @@ try // There is an overlap range [0, 1) HandleRange del{0, 2}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); + check_segment_squash_delete_range(segment, HandleRange{0, 100}); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -695,6 +747,36 @@ try } in->readSuffix(); } + + { + Block block = DMTestEnv::prepareSimpleWriteBlock(9, 16, false); + segment->write(dmContext(), std::move(block)); + SCOPED_TRACE("check after write"); + // if we write some new data, we can still get the delete range + check_segment_squash_delete_range(segment, HandleRange{0, 100}); + } + + { + // Read after new write + auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + in->readPrefix(); + while (Block block = in->read()) + { + ASSERT_EQ(block.rows(), num_rows_write - 33 + 7); + for (auto & iter : block) + { + auto c = iter.column; + if (iter.name == DMTestEnv::pk_name) + { + EXPECT_EQ(c->getInt(0), 9); + EXPECT_EQ(c->getInt(6), 15); + EXPECT_EQ(c->getInt(7), 32); + EXPECT_EQ(c->getInt(block.rows() - 1), 62); + } + } + } + in->readSuffix(); + } } CATCH @@ -873,7 +955,6 @@ try segment->write(dmContext(), std::move(block)); // flush segment segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } SegmentPtr new_segment = Segment::restoreSegment(dmContext(), segment->segmentId()); @@ -940,7 +1021,6 @@ try { // flush segment segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } for (size_t i = (num_batches_written - 1) * num_rows_per_write + 2; i < num_batches_written * num_rows_per_write; i++) @@ -1033,10 +1113,10 @@ class Segment_test_2 : public Segment_test, public testing::WithParamInterface> genDMFile(DMContext & context, const Block & block) { - auto delegator = context.path_pool.getStableDiskDelegator(); - auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto delegator = context.path_pool.getStableDiskDelegator(); + auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); auto input_stream = std::make_shared(block); - auto store_path = delegator.choosePath(); + auto store_path = delegator.choosePath(); DMFileBlockOutputStream::Flags flags; flags.setSingleFile(DMTestEnv::getPseudoRandomNumber() % 2); @@ -1046,9 +1126,9 @@ class Segment_test_2 : public Segment_test, public testing::WithParamInterfacegetBytesOnDisk(), store_path); - auto & pk_column = block.getByPosition(0).column; - auto min_pk = pk_column->getInt(0); - auto max_pk = pk_column->getInt(block.rows() - 1); + auto & pk_column = block.getByPosition(0).column; + auto min_pk = pk_column->getInt(0); + auto max_pk = pk_column->getInt(block.rows() - 1); HandleRange range(min_pk, max_pk + 1); return {RowKeyRange::fromHandleRange(range), {file_id}}; @@ -1073,13 +1153,13 @@ try segment->write(dmContext(), std::move(block)); break; case Segment_test_Mode::V2_FileOnly: { - auto delegate = dmContext().path_pool.getStableDiskDelegator(); - auto file_provider = dmContext().db_context.getFileProvider(); - auto [range, file_ids] = genDMFile(dmContext(), block); - auto file_id = file_ids[0]; - auto file_parent_path = delegate.getDTFilePath(file_id); - auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path); - auto pack = std::make_shared(dmContext(), file, range); + auto delegate = dmContext().path_pool.getStableDiskDelegator(); + auto file_provider = dmContext().db_context.getFileProvider(); + auto [range, file_ids] = genDMFile(dmContext(), block); + auto file_id = file_ids[0]; + auto file_parent_path = delegate.getDTFilePath(file_id); + auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path); + auto pack = std::make_shared(dmContext(), file, range); WriteBatches wbs(*storage_pool); wbs.data.putExternal(file_id, 0); wbs.writeLogAndData(); diff --git a/dbms/src/Storages/GCManager.cpp b/dbms/src/Storages/GCManager.cpp index bf4f89ac3aa..81997249cc5 100644 --- a/dbms/src/Storages/GCManager.cpp +++ b/dbms/src/Storages/GCManager.cpp @@ -11,10 +11,15 @@ extern const int TABLE_IS_DROPPED; bool GCManager::work() { auto & global_settings = global_context.getSettingsRef(); + if (gc_check_stop_watch.elapsedSeconds() < global_settings.dt_bg_gc_check_interval) + return false; Int64 gc_segments_limit = global_settings.dt_bg_gc_max_segments_to_check_every_round; // limit less than or equal to 0 means no gc if (gc_segments_limit <= 0) + { + gc_check_stop_watch.restart(); return false; + } LOG_INFO(log, "Start GC with table id: " << next_table_id); // Get a storage snapshot with weak_ptrs first @@ -72,6 +77,7 @@ bool GCManager::work() iter = storages.begin(); next_table_id = iter->first; LOG_INFO(log, "End GC and next gc will start with table id: " << next_table_id); + gc_check_stop_watch.restart(); // Always return false return false; } diff --git a/dbms/src/Storages/GCManager.h b/dbms/src/Storages/GCManager.h index 81199d447e9..db809d76f7c 100644 --- a/dbms/src/Storages/GCManager.h +++ b/dbms/src/Storages/GCManager.h @@ -19,6 +19,8 @@ class GCManager TableID next_table_id = InvalidTableID; + AtomicStopwatch gc_check_stop_watch; + Logger * log; }; } // namespace DB diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index d932fcde1f3..69da9770cdb 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -4,25 +4,24 @@ #include #include #include +#include #include #include -#include #include - #include #include #ifdef __linux__ -#include #include +#include #endif namespace CurrentMetrics { - extern const Metric BackgroundPoolTask; - extern const Metric MemoryTrackingInBackgroundProcessingPool; -} +extern const Metric BackgroundPoolTask; +extern const Metric MemoryTrackingInBackgroundProcessingPool; +} // namespace CurrentMetrics namespace DB { @@ -128,9 +127,9 @@ void BackgroundProcessingPool::threadFunction() const auto name = "BkgPool" + std::to_string(tid++); setThreadName(name.data()); is_background_thread = true; - #ifdef __linux__ +#ifdef __linux__ addThreadId(syscall(SYS_gettid)); - #endif +#endif } MemoryTracker memory_tracker; @@ -174,8 +173,8 @@ void BackgroundProcessingPool::threadFunction() { std::unique_lock lock(tasks_mutex); wake_event.wait_for(lock, - std::chrono::duration(sleep_seconds - + std::uniform_real_distribution(0, sleep_seconds_random_part)(rng))); + std::chrono::duration( + sleep_seconds + std::uniform_real_distribution(0, sleep_seconds_random_part)(rng))); continue; } @@ -184,8 +183,9 @@ void BackgroundProcessingPool::threadFunction() if (min_time > current_time) { std::unique_lock lock(tasks_mutex); - wake_event.wait_for(lock, std::chrono::microseconds( - min_time - current_time + std::uniform_int_distribution(0, sleep_seconds_random_part * 1000000)(rng))); + wake_event.wait_for(lock, + std::chrono::microseconds( + min_time - current_time + std::uniform_int_distribution(0, sleep_seconds_random_part * 1000000)(rng))); } std::shared_lock rlock(task->rwlock); @@ -269,4 +269,4 @@ void BackgroundProcessingPool::addThreadId(pid_t tid) thread_ids.push_back(tid); } -} +} // namespace DB diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h index 68dc72219d5..5386d7a3a2b 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h @@ -1,17 +1,18 @@ #pragma once -#include -#include -#include -#include +#include +#include +#include + +#include #include +#include +#include +#include #include +#include #include -#include -#include -#include -#include -#include +#include namespace DB { @@ -49,11 +50,11 @@ class BackgroundProcessingPool /// Read lock is hold when task is executed. std::shared_mutex rwlock; - std::atomic removed {false}; + std::atomic removed{false}; /// only can be invoked by one thread at same time. const bool multi; - std::atomic_bool occupied {false}; + std::atomic_bool occupied{false}; const uint64_t interval_milliseconds; @@ -65,14 +66,19 @@ class BackgroundProcessingPool BackgroundProcessingPool(int size_); - size_t getNumberOfThreads() const - { - return size; - } + size_t getNumberOfThreads() const { return size; } /// if multi == false, this task can only be called by one thread at same time. /// If interval_ms is zero, this task will be scheduled with `sleep_seconds`. /// If interval_ms is not zero, this task will be scheduled with `interval_ms`. + /// + /// But at each scheduled time, there may be multiple threads try to run the same task, + /// and then execute the same task one by one in sequential order(not simultaneously) even if `multi` is false. + /// For example, consider the following case when it's time to schedule a task, + /// 1. thread A get the task, mark the task as occupied and begin to execute it + /// 2. thread B also get the same task + /// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task + /// 4. thread B find the task is not occupied and execute the task again almost immediately TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0); void removeTask(const TaskHandle & task); @@ -80,22 +86,23 @@ class BackgroundProcessingPool std::vector getThreadIds(); void addThreadId(pid_t tid); + private: - using Tasks = std::multimap; /// key is desired next time to execute (priority). + using Tasks = std::multimap; /// key is desired next time to execute (priority). using Threads = std::vector; const size_t size; static constexpr double sleep_seconds = 10; static constexpr double sleep_seconds_random_part = 1.0; - Tasks tasks; /// Ordered in priority. + Tasks tasks; /// Ordered in priority. std::mutex tasks_mutex; Threads threads; - std::vector thread_ids; // Linux Thread ID + std::vector thread_ids; // Linux Thread ID std::mutex thread_ids_mtx; - std::atomic shutdown {false}; + std::atomic shutdown{false}; std::condition_variable wake_event; @@ -104,4 +111,4 @@ class BackgroundProcessingPool using BackgroundProcessingPoolPtr = std::shared_ptr; -} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 74779bef7d6..eee67e8403e 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -285,9 +285,12 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const table.regions.erase(internal_region_it); if (table.regions.empty()) { - /// All regions of this table is removed, the storage maybe drop or pd - /// move it to another node, we can optimize outdated data. - table_to_optimize.insert(table_id); + if (auto & tmt = context->getTMTContext(); !tmt.isBgFlushDisabled()) + { + /// All regions of this table is removed, the storage maybe drop or pd + /// move it to another node, we can optimize outdated data. + table_to_optimize.insert(table_id); + } tables.erase(table_id); } LOG_INFO(log, __FUNCTION__ << ": remove [region " << region_id << "] in RegionTable done");