diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 062fd31c5ce..5ab33efec85 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -107,20 +107,20 @@ namespace DB M(tiflash_storage_command_count, "Total number of storage's command, such as delete range / shutdown /startup", Counter, \ F(type_delete_range, {"type", "delete_range"}), F(type_ingest, {"type", "ingest"})) \ M(tiflash_storage_subtask_count, "Total number of storage's sub task", Counter, \ - F(type_delta_merge, {"type", "delta_merge"}), \ - F(type_delta_merge_fg, {"type", "delta_merge_fg"}), \ - F(type_delta_merge_fg_rpc, {"type", "delta_merge_fg_rpc"}), \ + F(type_delta_merge_bg, {"type", "delta_merge_bg"}), \ F(type_delta_merge_bg_gc, {"type", "delta_merge_bg_gc"}), \ + F(type_delta_merge_fg, {"type", "delta_merge_fg"}), \ + F(type_delta_merge_manual, {"type", "delta_merge_manual"}), \ F(type_delta_compact, {"type", "delta_compact"}), \ F(type_delta_flush, {"type", "delta_flush"}), \ F(type_seg_split, {"type", "seg_split"}), F(type_seg_split_fg, {"type", "seg_split_fg"}), \ F(type_seg_merge, {"type", "seg_merge"}), F(type_seg_merge_fg, {"type", "seg_merge_fg"}), \ F(type_place_index_update, {"type", "place_index_update"})) \ M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \ - F(type_delta_merge, {{"type", "delta_merge"}}, ExpBuckets{0.001, 2, 20}), \ - F(type_delta_merge_fg, {{"type", "delta_merge_fg"}}, ExpBuckets{0.001, 2, 20}), \ - F(type_delta_merge_fg_rpc, {{"type", "delta_merge_fg_rpc"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \ F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_merge_fg, {{"type", "delta_merge_fg"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_delta_merge_manual, {{"type", "delta_merge_manual"}}, ExpBuckets{0.001, 2, 20}), \ F(type_delta_compact, {{"type", "delta_compact"}}, ExpBuckets{0.001, 2, 20}), \ F(type_delta_flush, {{"type", "delta_flush"}}, ExpBuckets{0.001, 2, 20}), \ F(type_seg_split, {{"type", "seg_split"}}, ExpBuckets{0.001, 2, 20}), \ diff --git a/dbms/src/Flash/Management/ManualCompact.cpp b/dbms/src/Flash/Management/ManualCompact.cpp index 2143be88cc3..d872ee02fdf 100644 --- a/dbms/src/Flash/Management/ManualCompact.cpp +++ b/dbms/src/Flash/Management/ManualCompact.cpp @@ -167,7 +167,7 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ // Repeatedly merge multiple segments as much as possible. while (true) { - auto compacted_range = dm_storage->mergeDeltaBySegment(global_context, start_key, DM::DeltaMergeStore::TaskRunThread::ForegroundRPC); + auto compacted_range = dm_storage->mergeDeltaBySegment(global_context, start_key); if (compacted_range == std::nullopt) { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 658ef8d63bc..28efa07fe72 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -862,13 +862,13 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context) for (auto & segment : all_segments) { - segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground); + segmentMergeDelta(*dm_context, segment, MergeDeltaReason::Manual); } LOG_FMT_INFO(log, "Finish table mergeDeltaAll"); } -std::optional DeltaMergeStore::mergeDeltaBySegment(const Context & context, const RowKeyValue & start_key, const TaskRunThread run_thread) +std::optional DeltaMergeStore::mergeDeltaBySegment(const Context & context, const RowKeyValue & start_key) { LOG_FMT_INFO(log, "Table mergeDeltaBySegment, start={}", start_key.toDebugString()); @@ -895,7 +895,7 @@ std::optional DeltaMergeStore::mergeDeltaBySegment(const Contex if (segment->flushCache(*dm_context)) { - const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread); + const auto new_segment = segmentMergeDelta(*dm_context, segment, MergeDeltaReason::Manual); if (new_segment) { const auto segment_end = new_segment->getRowKeyRange().end; @@ -1412,7 +1412,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const GET_METRIC(tiflash_storage_write_stall_duration_seconds, type_delta_merge_by_delete_range).Observe(watch.elapsedSeconds()); }); - return segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground); + return segmentMergeDelta(*dm_context, segment, MergeDeltaReason::ForegroundWrite); } return {}; }; @@ -1579,7 +1579,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) case TaskType::MergeDelta: { FAIL_POINT_PAUSE(FailPoints::pause_before_dt_background_delta_merge); - left = segmentMergeDelta(*task.dm_context, task.segment, TaskRunThread::BackgroundThreadPool); + left = segmentMergeDelta(*task.dm_context, task.segment, MergeDeltaReason::BackgroundThreadPool); type = ThreadType::BG_MergeDelta; // Wake up all waiting threads if failpoint is enabled FailPointHelper::disableFailPoint(FailPoints::pause_until_dt_background_delta_merge); @@ -1786,7 +1786,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) bool finish_gc_on_segment = false; if (should_compact) { - if (segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundGCThread, segment_snap); segment) + if (segment = segmentMergeDelta(*dm_context, segment, MergeDeltaReason::BackgroundGCThread, segment_snap); segment) { // Continue to check whether we need to apply more tasks on this segment segment_snap = {}; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 356aff1f775..e8790eb986e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -180,13 +180,12 @@ class DeltaMergeStore : private boost::noncopyable PlaceIndex, }; - // TODO: Rename to MergeDeltaThreadType - enum TaskRunThread + enum MergeDeltaReason { BackgroundThreadPool, - Foreground, - ForegroundRPC, BackgroundGCThread, + ForegroundWrite, + Manual, }; static std::string toString(ThreadType type) @@ -237,18 +236,18 @@ class DeltaMergeStore : private boost::noncopyable } } - static std::string toString(TaskRunThread type) + static std::string toString(MergeDeltaReason type) { switch (type) { case BackgroundThreadPool: return "BackgroundThreadPool"; - case Foreground: - return "Foreground"; - case ForegroundRPC: - return "ForegroundRPC"; case BackgroundGCThread: return "BackgroundGCThread"; + case ForegroundWrite: + return "ForegroundWrite"; + case Manual: + return "Manual"; default: return "Unknown"; } @@ -397,7 +396,7 @@ class DeltaMergeStore : private boost::noncopyable /// If there is no segment found by the start key, nullopt is returned. /// /// This function is called when using `ALTER TABLE [TABLE] COMPACT ...` from TiDB. - std::optional mergeDeltaBySegment(const Context & context, const DM::RowKeyValue & start_key, TaskRunThread run_thread); + std::optional mergeDeltaBySegment(const Context & context, const DM::RowKeyValue & start_key); /// Compact the delta layer, merging multiple fragmented delta files into larger ones. /// This is a minor compaction as it does not merge the delta into stable layer. @@ -495,7 +494,7 @@ class DeltaMergeStore : private boost::noncopyable SegmentPtr segmentMergeDelta( DMContext & dm_context, const SegmentPtr & segment, - TaskRunThread thread, + MergeDeltaReason reason, SegmentSnapshotPtr segment_snap = nullptr); bool updateGCSafePoint(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index b2b04a747eb..5c467ddb962 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -195,7 +195,7 @@ void DeltaMergeStore::ingestFiles( updated_segments.push_back(segment); fiu_do_on(FailPoints::segment_merge_after_ingest_packs, { segment->flushCache(*dm_context); - segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundThreadPool); + segmentMergeDelta(*dm_context, segment, MergeDeltaReason::ForegroundWrite); storage_pool->gc(global_context.getSettingsRef(), StoragePool::Seconds(0)); }); break; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 71a26aebb9e..3a837e7baa4 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -302,10 +302,10 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le SegmentPtr DeltaMergeStore::segmentMergeDelta( DMContext & dm_context, const SegmentPtr & segment, - const TaskRunThread run_thread, + const MergeDeltaReason reason, SegmentSnapshotPtr segment_snap) { - LOG_FMT_INFO(log, "MergeDelta - Begin, thread={} safe_point={} segment={}", toString(run_thread), dm_context.min_version, segment->info()); + LOG_FMT_INFO(log, "MergeDelta - Begin, reason={} safe_point={} segment={}", toString(reason), dm_context.min_version, segment->info()); ColumnDefinesPtr schema_snap; @@ -338,19 +338,19 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( CurrentMetrics::Increment cur_dm_total_bytes{CurrentMetrics::DT_DeltaMergeTotalBytes, static_cast(segment_snap->getBytes())}; CurrentMetrics::Increment cur_dm_total_rows{CurrentMetrics::DT_DeltaMergeTotalRows, static_cast(segment_snap->getRows())}; - switch (run_thread) + switch (reason) { - case TaskRunThread::BackgroundThreadPool: - GET_METRIC(tiflash_storage_subtask_count, type_delta_merge).Increment(); + case MergeDeltaReason::BackgroundThreadPool: + GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_bg).Increment(); break; - case TaskRunThread::Foreground: - GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_fg).Increment(); + case MergeDeltaReason::BackgroundGCThread: + GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_bg_gc).Increment(); break; - case TaskRunThread::ForegroundRPC: - GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_fg_rpc).Increment(); + case MergeDeltaReason::ForegroundWrite: + GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_fg).Increment(); break; - case TaskRunThread::BackgroundGCThread: - GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_bg_gc).Increment(); + case MergeDeltaReason::Manual: + GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_manual).Increment(); break; default: break; @@ -358,19 +358,19 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( Stopwatch watch_delta_merge; SCOPE_EXIT({ - switch (run_thread) + switch (reason) { - case TaskRunThread::BackgroundThreadPool: - GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge).Observe(watch_delta_merge.elapsedSeconds()); + case MergeDeltaReason::BackgroundThreadPool: + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_bg).Observe(watch_delta_merge.elapsedSeconds()); break; - case TaskRunThread::Foreground: - GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_fg).Observe(watch_delta_merge.elapsedSeconds()); + case MergeDeltaReason::BackgroundGCThread: + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_bg_gc).Observe(watch_delta_merge.elapsedSeconds()); break; - case TaskRunThread::ForegroundRPC: - GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_fg_rpc).Observe(watch_delta_merge.elapsedSeconds()); + case MergeDeltaReason::ForegroundWrite: + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_fg).Observe(watch_delta_merge.elapsedSeconds()); break; - case TaskRunThread::BackgroundGCThread: - GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_bg_gc).Observe(watch_delta_merge.elapsedSeconds()); + case MergeDeltaReason::Manual: + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_manual).Observe(watch_delta_merge.elapsedSeconds()); break; default: break; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index f3d1daa739a..bca226c8b7b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -3095,13 +3095,13 @@ try if (store->isCommonHandle()) { // Specifies MAX_KEY. nullopt should be returned. - auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MAX_KEY, DeltaMergeStore::TaskRunThread::Foreground); + auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MAX_KEY); ASSERT_EQ(result, std::nullopt); } else { // Specifies MAX_KEY. nullopt should be returned. - auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MAX_KEY, DeltaMergeStore::TaskRunThread::Foreground); + auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MAX_KEY); ASSERT_EQ(result, std::nullopt); } std::optional result_1; @@ -3109,11 +3109,11 @@ try // Specifies MIN_KEY. In this case, the first segment should be processed. if (store->isCommonHandle()) { - result_1 = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY, DeltaMergeStore::TaskRunThread::Foreground); + result_1 = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY); } else { - result_1 = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY, DeltaMergeStore::TaskRunThread::Foreground); + result_1 = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY); } // The returned range is the same as first segment's range. ASSERT_NE(result_1, std::nullopt); @@ -3125,7 +3125,7 @@ try } { // Compact the first segment again, nothing should change. - auto result = store->mergeDeltaBySegment(*db_context, result_1->start, DeltaMergeStore::TaskRunThread::Foreground); + auto result = store->mergeDeltaBySegment(*db_context, result_1->start); ASSERT_EQ(*result, *result_1); helper->verifyExpectedRowsForAllSegments(); @@ -3133,7 +3133,7 @@ try std::optional result_2; { // Compact again using the end key just returned. The second segment should be processed. - result_2 = store->mergeDeltaBySegment(*db_context, result_1->end, DeltaMergeStore::TaskRunThread::Foreground); + result_2 = store->mergeDeltaBySegment(*db_context, result_1->end); ASSERT_NE(result_2, std::nullopt); ASSERT_EQ(*result_2, std::next(store->segments.begin())->second->getRowKeyRange()); @@ -3151,12 +3151,12 @@ TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, InvalidKey) if (store->isCommonHandle()) { // For common handle, give int handle key and have a try - store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY, DeltaMergeStore::TaskRunThread::Foreground); + store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY); } else { // For int handle, give common handle key and have a try - store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY, DeltaMergeStore::TaskRunThread::Foreground); + store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY); } }); } @@ -3172,13 +3172,13 @@ try ASSERT_NE(it, store->segments.end()); auto seg = it->second; - result = store->mergeDeltaBySegment(*db_context, seg->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground); + result = store->mergeDeltaBySegment(*db_context, seg->getRowKeyRange().start); ASSERT_NE(result, std::nullopt); helper->verifyExpectedRowsForAllSegments(); } { // As we are the last segment, compact "next segment" should result in failure. A nullopt is returned. - auto result2 = store->mergeDeltaBySegment(*db_context, result->end, DeltaMergeStore::TaskRunThread::Foreground); + auto result2 = store->mergeDeltaBySegment(*db_context, result->end); ASSERT_EQ(result2, std::nullopt); helper->verifyExpectedRowsForAllSegments(); } @@ -3207,7 +3207,7 @@ try auto range = std::next(store->segments.begin())->second->getRowKeyRange(); auto compact_key = range.start.toPrefixNext(); - auto result = store->mergeDeltaBySegment(*db_context, compact_key, DeltaMergeStore::TaskRunThread::Foreground); + auto result = store->mergeDeltaBySegment(*db_context, compact_key); ASSERT_NE(result, std::nullopt); helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; @@ -3251,7 +3251,7 @@ try } { auto segment1 = std::next(store->segments.begin())->second; - auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground); + auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start); ASSERT_NE(result, std::nullopt); segment1 = std::next(store->segments.begin())->second; @@ -3299,7 +3299,7 @@ try // Start a mergeDelta. It should hit retry immediately due to a flush is in progress. auto th_merge_delta = std::async([&]() { auto segment1 = std::next(store->segments.begin())->second; - auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground); + auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start); ASSERT_NE(result, std::nullopt); // All rows in the delta layer should be merged into the stable layer. helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; @@ -3350,7 +3350,7 @@ try auto th_merge_delta = std::async([&] { // mergeDeltaBySegment for segment1 auto segment1 = std::next(store->segments.begin())->second; - auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground); + auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start); ASSERT_NE(result, std::nullopt); // Although original segment1 has been split into 2, we still expect only segment1's delta diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 55e8348861a..03289f76839 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -789,9 +789,9 @@ void StorageDeltaMerge::mergeDelta(const Context & context) getAndMaybeInitStore()->mergeDeltaAll(context); } -std::optional StorageDeltaMerge::mergeDeltaBySegment(const Context & context, const DM::RowKeyValue & start_key, const DM::DeltaMergeStore::TaskRunThread run_thread) +std::optional StorageDeltaMerge::mergeDeltaBySegment(const Context & context, const DM::RowKeyValue & start_key) { - return getAndMaybeInitStore()->mergeDeltaBySegment(context, start_key, run_thread); + return getAndMaybeInitStore()->mergeDeltaBySegment(context, start_key); } void StorageDeltaMerge::deleteRange(const DM::RowKeyRange & range_to_delete, const Settings & settings) diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 4fb1f3032da..886e198d336 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -14,12 +14,11 @@ #pragma once +#include #include #include #include #include -#include -#include #include #include #include @@ -27,16 +26,12 @@ #include -namespace Poco -{ -class Logger; -} // namespace Poco - namespace DB { namespace DM { struct RowKeyRange; +struct RowKeyValue; class DeltaMergeStore; using DeltaMergeStorePtr = std::shared_ptr; } // namespace DM @@ -85,7 +80,7 @@ class StorageDeltaMerge /// If there is no segment found by the start key, nullopt is returned. /// /// This function is called when using `ALTER TABLE [TABLE] COMPACT ...` from TiDB. - std::optional mergeDeltaBySegment(const Context & context, const DM::RowKeyValue & start_key, const DM::DeltaMergeStore::TaskRunThread run_thread); + std::optional mergeDeltaBySegment(const Context & context, const DM::RowKeyValue & start_key); void deleteRange(const DM::RowKeyRange & range_to_delete, const Settings & settings);