diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 7496107d889..99ed06ac1f3 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -537,6 +537,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva "Raft read index events counter", \ Counter, \ F(type_bypass_lock, {{"type", "bypass_lock"}}), \ + F(type_zero_read_tso, {{"type", "zero_read_tso"}}), \ F(type_use_histroy, {{"type", "use_histroy"}}), \ F(type_use_cache, {{"type", "use_cache"}})) \ M(tiflash_raft_learner_read_failures_count, \ diff --git a/dbms/src/Interpreters/SharedContexts/Disagg.cpp b/dbms/src/Interpreters/SharedContexts/Disagg.cpp index eee4d0f4547..c4aa7fbf650 100644 --- a/dbms/src/Interpreters/SharedContexts/Disagg.cpp +++ b/dbms/src/Interpreters/SharedContexts/Disagg.cpp @@ -105,4 +105,13 @@ void SharedContextDisagg::initFastAddPeerContext(UInt64 fap_concur) fap_context = std::make_shared(fap_concur); } + +SharedContextDisagg::~SharedContextDisagg() +{ + if (fap_context) + { + fap_context->shutdown(); + } +} + } // namespace DB diff --git a/dbms/src/Interpreters/SharedContexts/Disagg.h b/dbms/src/Interpreters/SharedContexts/Disagg.h index 07a619a4907..2924a36dd58 100644 --- a/dbms/src/Interpreters/SharedContexts/Disagg.h +++ b/dbms/src/Interpreters/SharedContexts/Disagg.h @@ -79,6 +79,8 @@ struct SharedContextDisagg : private boost::noncopyable : global_context(global_context_) {} + ~SharedContextDisagg(); + void initReadNodePageCache(const PathPool & path_pool, const String & cache_dir, size_t cache_capacity); /// Note that the unit of max_size is quantity, not byte size. It controls how diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 9ff701d1a45..b16499c4dee 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -438,6 +438,7 @@ KVStore::~KVStore() LOG_INFO(log, "Destroy KVStore"); stopThreadAllocInfo(); releaseReadIndexWorkers(); + LOG_INFO(log, "Destroy KVStore Finished"); } FileUsageStatistics KVStore::getFileUsageStatistics() const @@ -611,6 +612,7 @@ void KVStore::stopThreadAllocInfo() is_terminated = true; monitoring_cv.notify_all(); } + LOG_INFO(log, "KVStore shutdown, wait thread alloc monitor join"); monitoring_thread->join(); delete monitoring_thread; monitoring_thread = nullptr; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp index 588ec80a9f3..62e9bcb7962 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.cpp @@ -43,6 +43,11 @@ FastAddPeerContext::FastAddPeerContext(uint64_t thread_count) tasks_trace = std::make_shared(thread_count, thread_count, 1000); } +void FastAddPeerContext::shutdown() const +{ + tasks_trace->shutdown(); +} + ParsedCheckpointDataHolderPtr FastAddPeerContext::CheckpointCacheElement::getParsedCheckpointData(Context & context) { std::scoped_lock lock(mu); diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h index 6c1411e0ab9..0ec62668cbc 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h @@ -22,6 +22,7 @@ namespace DB { +class FastAddPeerContext; using FAPAsyncTasks = AsyncTasks, FastAddPeerRes>; struct CheckpointInfo; using CheckpointInfoPtr = std::shared_ptr; @@ -34,6 +35,7 @@ class FastAddPeerContext { public: explicit FastAddPeerContext(uint64_t thread_count = 0); + void shutdown() const; // Return parsed checkpoint data and its corresponding seq which is newer than `required_seq` if exists, otherwise return pair std::pair getNewerCheckpointData( diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp index 03b9219ed32..0b17fd156aa 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -117,6 +117,10 @@ std::vector LearnerReadWorker::buildBatchReadIndexReq // If using `std::numeric_limits::max()`, set `start-ts` 0 to get the latest index but let read-index-worker do not record as history. auto read_index_tso = mvcc_query_info.read_tso == std::numeric_limits::max() ? 0 : mvcc_query_info.read_tso; + if (read_index_tso == 0) + { + GET_METRIC(tiflash_raft_read_index_events_count, type_zero_read_tso).Increment(); + } for (const auto & region_to_query : regions_info) { const RegionID region_id = region_to_query.region_id; @@ -325,13 +329,14 @@ RegionsReadIndexResult LearnerReadWorker::readIndex( log_lvl, "[Learner Read] Batch read index, num_regions={} num_requests={} num_stale_read={} num_cached_index={} " "num_unavailable={} " - "cost={}ms", + "cost={}ms, read_tso={}", stats.num_regions, stats.num_read_index_request, stats.num_stale_read, stats.num_cached_read_index, unavailable_regions.size(), - stats.read_index_elapsed_ms); + stats.read_index_elapsed_ms, + mvcc_query_info.read_tso); return batch_read_index_result; } @@ -427,10 +432,11 @@ void LearnerReadWorker::waitIndex( LOG_IMPL( log, log_lvl, - "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}", + "[Learner Read] Finish wait index and resolve locks, wait_cost={}ms n_regions={} n_unavailable={}, read_tso={}", stats.wait_index_elapsed_ms, stats.num_regions, - unavailable_regions.size()); + unavailable_regions.size(), + mvcc_query_info.read_tso); auto bypass_formatter = [&](const RegionQueryInfo & query_info) -> String { if (query_info.bypass_lock_ts == nullptr) @@ -464,9 +470,10 @@ void LearnerReadWorker::waitIndex( LOG_DEBUG( log, - "[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}", + "[Learner Read] Learner Read Summary, regions_info={}, unavailable_regions_info={}, read_tso={}", region_info_formatter(), - unavailable_regions.toDebugString()); + unavailable_regions.toDebugString(), + mvcc_query_info.read_tso); } std::tuple // @@ -505,13 +512,14 @@ LearnerReadWorker::waitUntilDataAvailable( log, log_lvl, "[Learner Read] batch read index | wait index" - " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}", + " total_cost={} read_cost={} wait_cost={} n_regions={} n_stale_read={} n_unavailable={}, read_tso={}", time_elapsed_ms, stats.read_index_elapsed_ms, stats.wait_index_elapsed_ms, stats.num_regions, stats.num_stale_read, - unavailable_regions.size()); + unavailable_regions.size(), + mvcc_query_info.read_tso); return {start_time, end_time}; } diff --git a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp index 273e85a58c8..e6309d1e5b1 100644 --- a/dbms/src/Storages/KVStore/Read/ReadIndex.cpp +++ b/dbms/src/Storages/KVStore/Read/ReadIndex.cpp @@ -368,6 +368,7 @@ void KVStore::stopReadIndexWorkers() const void KVStore::releaseReadIndexWorkers() { + LOG_INFO(log, "KVStore shutdown, deleting read index worker"); if (read_index_worker_manager) { delete read_index_worker_manager; diff --git a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h index de5e0839bb2..e497a230f89 100644 --- a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h +++ b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h @@ -53,7 +53,24 @@ struct AsyncTasks , log(DB::Logger::get()) {} - ~AsyncTasks() { LOG_INFO(log, "Pending {} tasks when destructing", count()); } + void shutdown() + { + LOG_INFO(log, "Pending {} tasks when destructing", count()); + // To avoid the "last owner" problem in worker thread. + thread_pool->wait(); + shut.store(true); + LOG_INFO(log, "Finish finalize thread pool"); + } + + ~AsyncTasks() + { + if (!shut.load()) + { + LOG_INFO(log, "Destruct without shutdown"); + // Potential deadlock if the instance is held and released directly or indirectly by a task in its worker. + shutdown(); + } + } using TaskState = AsyncTaskHelper::TaskState; @@ -241,6 +258,8 @@ struct AsyncTasks // 1. There is already a task registered with the same name and not canceled or fetched. bool addTaskWithCancel(Key k, Func f, CancelFunc cf) { + if (shut.load()) + return false; std::scoped_lock l(mtx); RUNTIME_CHECK(!tasks.contains(k)); using P = std::packaged_task; @@ -399,5 +418,6 @@ struct AsyncTasks std::unique_ptr thread_pool; mutable std::mutex mtx; LoggerPtr log; + std::atomic_bool shut = false; }; } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index ff5e230169c..16426df352f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -121,7 +121,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase { auto & global_context = TiFlashTestEnv::getGlobalContext(); KVStoreTestBase::TearDown(); - global_context.getSharedContextDisagg()->fap_context.reset(); + global_context.getSharedContextDisagg()->fap_context->shutdown(); if (!already_initialize_data_store) { global_context.getSharedContextDisagg()->remote_data_store = nullptr;