From dbfec4a525feabdc6ec05ea1a715ad5b7ebba8ff Mon Sep 17 00:00:00 2001 From: hehechen Date: Wed, 1 Mar 2023 10:54:07 +0800 Subject: [PATCH 1/2] background delta flush Signed-off-by: hehechen --- contrib/tiflash-proxy | 2 +- dbms/src/Common/TiFlashMetrics.h | 10 ++- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 8 ++ .../src/Storages/DeltaMerge/DeltaMergeStore.h | 3 +- .../DeltaMerge/DeltaMergeStore_InternalBg.cpp | 31 ++++++- dbms/src/Storages/DeltaMerge/RowKeyRange.h | 5 ++ dbms/src/Storages/DeltaMerge/Segment.cpp | 4 + dbms/src/Storages/StorageDeltaMerge.cpp | 2 +- dbms/src/Storages/Transaction/KVStore.cpp | 80 ++++++++++++++++++- dbms/src/Storages/Transaction/KVStore.h | 3 + dbms/src/Storages/Transaction/ProxyFFI.cpp | 13 ++- dbms/src/Storages/Transaction/ProxyFFI.h | 3 + dbms/src/Storages/Transaction/Region.h | 12 +++ dbms/src/Storages/Transaction/RegionTable.h | 1 + 14 files changed, 168 insertions(+), 9 deletions(-) diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index 2d3b2b033cd..75cc2d64469 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit 2d3b2b033cd8b2e06894ed14a6334b248f324a3d +Subproject commit 75cc2d6446940f27c0b62fe9e77c290bd7e75bac diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 6d8ea8fd81c..5a406e62a84 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -134,7 +134,11 @@ namespace DB F(type_seg_split_fg, {"type", "seg_split_fg"}), \ F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \ F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \ - F(type_place_index_update, {"type", "place_index_update"})) \ + F(type_place_index_update, {"type", "place_index_update"}), \ + F(type_compact_log_segment_bg, {"type", "compact_log_segment_bg"}), \ + F(type_compact_log_segment_fg, {"type", "compact_log_segment_fg"}), \ + F(type_compact_log_region_bg, {"type", "compact_log_region_bg"}), \ + F(type_compact_log_region_fg, {"type", "compact_log_region_fg"})) \ M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \ F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \ F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \ @@ -146,7 +150,9 @@ namespace DB F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.001, 2, 20}), \ F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \ F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \ - F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \ + F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_compact_log_bg, {{"type", "compact_log_bg"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_compact_log_fg, {{"type", "compact_log_fg"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_storage_throughput_bytes, "Calculate the throughput of tasks of storage in bytes", Gauge, /**/ \ F(type_write, {"type", "write"}), /**/ \ F(type_ingest, {"type", "ingest"}), /**/ \ diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 5f07d6c7fd4..a10bacf2809 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -116,6 +117,7 @@ std::pair DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const Back case TaskType::Compact: case TaskType::Flush: case TaskType::PlaceIndex: + case TaskType::NotifyCompactLog: is_heavy = false; // reserve some task space for heavy tasks if (max_task_num > 1 && light_tasks.size() >= static_cast(max_task_num * 0.9)) @@ -1173,6 +1175,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const auto delta_cache_limit_rows = dm_context->delta_cache_limit_rows; auto delta_cache_limit_bytes = dm_context->delta_cache_limit_bytes; + bool should_background_compact_log = (unsaved_rows >= delta_cache_limit_rows / 4 || unsaved_bytes >= delta_cache_limit_bytes / 4); bool should_background_flush = (unsaved_rows >= delta_cache_limit_rows || unsaved_bytes >= delta_cache_limit_bytes) // && (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows || delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes); @@ -1246,6 +1249,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const delta_last_try_flush_bytes = delta_bytes; LOG_DEBUG(log, "Foreground flush cache in checkSegmentUpdate, thread={} segment={}", thread_type, segment->info()); segment->flushCache(*dm_context); + triggerCompactLog(dm_context, segment, false); } else if (should_background_flush) { @@ -1260,6 +1264,10 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment}); } } + if (should_background_compact_log) + { + try_add_background_task(BackgroundTask{TaskType::NotifyCompactLog, dm_context, segment}); + } } // Need to check the latest delta (maybe updated after foreground flush). If it is updating by another thread, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 0f5de36e872..d98be5e307e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -193,6 +193,7 @@ class DeltaMergeStore : private boost::noncopyable Compact, Flush, PlaceIndex, + NotifyCompactLog, }; struct BackgroundTask @@ -588,7 +589,7 @@ class DeltaMergeStore : private boost::noncopyable private: void dropAllSegments(bool keep_first_segment); String getLogTracingId(const DMContext & dm_ctx); - + void triggerCompactLog(const DMContextPtr & dm_context, const SegmentPtr & segment, bool is_background); #ifndef DBMS_PUBLIC_GTEST private: #else diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index db071013477..b48c28b77cf 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -41,7 +42,6 @@ extern const char pause_until_dt_background_delta_merge[]; namespace DM { - // A callback class for scanning the DMFiles on local filesystem class LocalDMFileGcScanner final { @@ -307,6 +307,8 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) left = task.segment; type = ThreadType::BG_Flush; break; + case TaskType::NotifyCompactLog: + triggerCompactLog(task.dm_context, task.segment, true); case TaskType::PlaceIndex: task.segment->placeDeltaIndex(*task.dm_context); break; @@ -829,5 +831,32 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options) return gc_segments_num; } +void DeltaMergeStore::triggerCompactLog(const DMContextPtr & dm_context, const SegmentPtr & segment, bool is_background) +{ + auto & tmt = dm_context->db_context.getTMTContext(); + auto & kv_store = tmt.getKVStore(); + + if (is_background) + { + GET_METRIC(tiflash_storage_subtask_count, type_compact_log_segment_bg).Increment(); + } + else + { + GET_METRIC(tiflash_storage_subtask_count, type_compact_log_segment_fg).Increment(); + } + + Stopwatch watch; + SCOPE_EXIT({ + if (is_background) + { + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_compact_log_bg).Observe(watch.elapsedSeconds()); + } + else + { + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_compact_log_fg).Observe(watch.elapsedSeconds()); + } + }); + kv_store->copmactLogByRowKeyRange(tmt, segment->getRowKeyRange(), physical_table_id, is_background); +} } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRange.h b/dbms/src/Storages/DeltaMerge/RowKeyRange.h index 96491471875..8c394d76e2c 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRange.h +++ b/dbms/src/Storages/DeltaMerge/RowKeyRange.h @@ -277,6 +277,11 @@ inline bool operator<(const RowKeyValueRef & a, const RowKeyValueRef & b) return compare(a, b) < 0; } +inline bool operator<=(const RowKeyValueRef & a, const RowKeyValueRef & b) +{ + return compare(a, b) <= 0; +} + inline bool operator<(const StringRef & a, const RowKeyValueRef & b) { return compare(a, b) < 0; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 85bbfcb0ac6..bd943ee6fa6 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include #include #include @@ -38,6 +40,8 @@ #include #include #include +#include +#include #include #include #include diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 8a76f1d90da..7a6fb20c43a 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1418,7 +1418,7 @@ BlockInputStreamPtr StorageDeltaMerge::status() auto & name_col = columns[0]; auto & value_col = columns[1]; - StoreStats stat; + DM::StoreStats stat; if (storeInited()) { stat = _store->getStoreStats(); diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index 48738cb31af..b7508eabda4 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -98,6 +98,7 @@ RegionPtr KVStore::getRegion(RegionID region_id) const return it->second; return nullptr; } +// TODO: may get regions not in segment? RegionMap KVStore::getRegionsByRangeOverlap(const RegionRange & range) const { auto manage_lock = genRegionReadLock(); @@ -365,7 +366,7 @@ bool KVStore::tryFlushRegionData(UInt64 region_id, bool force_persist, bool try_ } else { - return canFlushRegionDataImpl(curr_region_ptr, true, try_until_succeed, tmt, region_task_lock, index, term); + return canFlushRegionDataImpl(curr_region_ptr, false, try_until_succeed, tmt, region_task_lock, index, term); } } @@ -400,7 +401,7 @@ bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 fl LOG_DEBUG(log, "{} flush region due to tryFlushRegionData, index {} term {}", curr_region.toString(false), index, term); return forceFlushRegionDataImpl(curr_region, try_until_succeed, tmt, region_task_lock, index, term); } - return can_flush; + return false; } bool KVStore::forceFlushRegionDataImpl(Region & curr_region, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term) @@ -859,4 +860,79 @@ FileUsageStatistics KVStore::getFileUsageStatistics() const return region_persister->getFileUsageStatistics(); } +// We need to get applied index before flushing cache, and can't hold region task lock when flush cache to avoid hang write cmd apply. +void KVStore::copmactLogByRowKeyRange(TMTContext & tmt, const DM::RowKeyRange & rowkey_range, TableID table_id, bool is_background) +{ + auto range = std::make_pair(TiKVRangeKey::makeTiKVRangeKey(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.start.toRegionKey(table_id))), + TiKVRangeKey::makeTiKVRangeKey(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.end.toRegionKey(table_id)))); + std::unordered_map> region_copmact_indexes; + auto region_read_lock = genRegionReadLock(); + auto region_map = getRegionsByRangeOverlap(range); + for (const auto & overlapped_region : region_map) + { + region_copmact_indexes[overlapped_region.first] = {overlapped_region.second->appliedIndex(), overlapped_region.second->appliedIndexTerm()}; + } + auto storage = tmt.getStorages().get(table_id); + if (unlikely(storage == nullptr)) + { + LOG_WARNING(log, + "tryFlushRegionCacheInStorage can not get table for table id {}, ignored", + table_id); + return; + } + storage->flushCache(tmt.getContext(), rowkey_range); + for (const auto & region : region_copmact_indexes) + { + auto reion_ptr = getRegion(region.first); + if (!reion_ptr) + { + LOG_INFO(log, "region {} has been removed, ignore", region.first); + continue; + } + auto region_rowkey_range = DM::RowKeyRange::fromRegionRange( + reion_ptr->getRange(), + table_id, + storage->isCommonHandle(), + storage->getRowKeyColumnSize()); + if (rowkey_range.getStart() <= region_rowkey_range.getStart() && region_rowkey_range.getEnd() <= rowkey_range.getEnd()) + { + notifyCompactLog(region.first, region.second.first, region.second.second, is_background); + reion_ptr->setFlushedState(region.second.first, region.second.second); + } + else + { + storage->flushCache(tmt.getContext(), region_rowkey_range, true); + notifyCompactLog(region.first, region.second.first, region.second.second, is_background); + reion_ptr->setFlushedState(region.second.first, region.second.second); + } + } +} + +// the caller guarantee that delta cache has been flushed. This function need to persiste region cache before trigger proxy to compact log. +void KVStore::notifyCompactLog(RegionID region_id, UInt64 compact_index, UInt64 compact_term, bool is_background) +{ + auto region = getRegion(region_id); + if (!region) + { + LOG_INFO(log, "region {} has been removed, ignore", region_id); + return; + } + if (region->lastCompactLogTime() + Seconds{region_compact_log_period.load(std::memory_order_relaxed)} > Clock::now()) + { + return; + } + if (is_background) + { + GET_METRIC(tiflash_storage_subtask_count, type_compact_log_region_bg).Increment(); + } + else + { + GET_METRIC(tiflash_storage_subtask_count, type_compact_log_region_fg).Increment(); + } + auto region_task_lock = region_manager.genRegionTaskLock(region_id); + persistRegion(*region, region_task_lock, "tryFlushRegionData"); + region->markCompactLog(); + region->cleanApproxMemCacheInfo(); + getProxyHelper()->notifyCompactLog(region_id, compact_index, compact_term); +} } // namespace DB diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 9b3f3286695..276bc0f5a0c 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -167,6 +168,8 @@ class KVStore final : private boost::noncopyable FileUsageStatistics getFileUsageStatistics() const; + void copmactLogByRowKeyRange(TMTContext & tmt, const DM::RowKeyRange & rowkey_range, TableID table_id, bool is_background); + void notifyCompactLog(RegionID region_id, UInt64 compact_index, UInt64 compact_term, bool is_background); #ifndef DBMS_PUBLIC_GTEST private: #endif diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index 7177822644c..216be36f8de 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -797,16 +797,27 @@ raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint return state; } +void TiFlashRaftProxyHelper::notifyCompactLog(uint64_t region_id, uint64_t compact_index, uint64_t compact_term) const +{ + this->fn_notify_compact_log(this->proxy_ptr, region_id, compact_index, compact_term, compact_index); +} + void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts) { RegionTable & region_table = server->tmt->getRegionTable(); region_table.updateSafeTS(region_id, leader_safe_ts, self_safe_ts); } - std::string_view buffToStrView(const BaseBuffView & buf) { return std::string_view{buf.data, buf.len}; } +FlushedState GetFlushedState(EngineStoreServerWrap * server, uint64_t region_id) +{ + auto & kvstore = server->tmt->getKVStore(); + auto region_ptr = kvstore->getRegion(region_id); + return region_ptr->getFlushedState(); +} + } // namespace DB diff --git a/dbms/src/Storages/Transaction/ProxyFFI.h b/dbms/src/Storages/Transaction/ProxyFFI.h index bc71e47c529..7b5de2ad101 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.h +++ b/dbms/src/Storages/Transaction/ProxyFFI.h @@ -116,6 +116,7 @@ struct TiFlashRaftProxyHelper : RaftStoreProxyFFIHelper TimerTask makeTimerTask(uint64_t time_ms) const; bool pollTimerTask(TimerTask & task, RawVoidPtr waker = nullptr) const; raft_serverpb::RegionLocalState getRegionLocalState(uint64_t region_id) const; + void notifyCompactLog(uint64_t region_id, uint64_t compact_index, uint64_t compact_term) const; }; extern "C" { @@ -169,6 +170,7 @@ CppStrWithView GetConfig(EngineStoreServerWrap *, uint8_t full); void SetStore(EngineStoreServerWrap *, BaseBuffView); void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view); void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts); +FlushedState GetFlushedState(EngineStoreServerWrap * server, uint64_t region_id); } inline EngineStoreServerHelper GetEngineStoreServerHelper( @@ -215,6 +217,7 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper( .fn_set_store = SetStore, .fn_set_pb_msg_by_bytes = SetPBMsByBytes, .fn_handle_safe_ts_update = HandleSafeTSUpdate, + .fn_get_flushed_state = GetFlushedState, }; } diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index f4aead264c9..be5ac2f29ca 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -202,6 +203,16 @@ class Region : public std::enable_shared_from_this std::pair getApproxMemCacheInfo() const; void cleanApproxMemCacheInfo() const; + void setFlushedState(uint64_t flushed_index, uint64_t flushed_term) + { + flushed_state.applied_index = flushed_index; + flushed_state.applied_term = flushed_term; + } + FlushedState getFlushedState() + { + return flushed_state; + } + private: Region() = delete; friend class RegionRaftCommandDelegate; @@ -237,6 +248,7 @@ class Region : public std::enable_shared_from_this mutable std::atomic last_compact_log_time{Timepoint::min()}; mutable std::atomic approx_mem_cache_rows{0}; mutable std::atomic approx_mem_cache_bytes{0}; + FlushedState flushed_state{0, 0}; }; class RegionRaftCommandDelegate : public Region diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 36686b44d90..c28f61e82bb 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include From 2f873bfea6094a071de735f550fe1ecdaa592c2c Mon Sep 17 00:00:00 2001 From: hehechen Date: Wed, 1 Mar 2023 17:12:13 +0800 Subject: [PATCH 2/2] fix lock Signed-off-by: hehechen --- contrib/tiflash-proxy | 2 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 2 +- dbms/src/Storages/Transaction/KVStore.cpp | 88 +++++++++---------- dbms/src/Storages/Transaction/Region.cpp | 15 +++- 4 files changed, 58 insertions(+), 49 deletions(-) diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index 75cc2d64469..dc302d3e04b 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit 75cc2d6446940f27c0b62fe9e77c290bd7e75bac +Subproject commit dc302d3e04b89ff617e2cd3dfddb5eb5c03b2cde diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index a10bacf2809..81dd5e95f91 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1175,7 +1175,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const auto delta_cache_limit_rows = dm_context->delta_cache_limit_rows; auto delta_cache_limit_bytes = dm_context->delta_cache_limit_bytes; - bool should_background_compact_log = (unsaved_rows >= delta_cache_limit_rows / 4 || unsaved_bytes >= delta_cache_limit_bytes / 4); + bool should_background_compact_log = (unsaved_rows >= delta_cache_limit_rows || unsaved_bytes >= delta_cache_limit_bytes); bool should_background_flush = (unsaved_rows >= delta_cache_limit_rows || unsaved_bytes >= delta_cache_limit_bytes) // && (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows || delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes); diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index b7508eabda4..995a563eec9 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -30,6 +30,8 @@ #include #include +#include + namespace DB { namespace ErrorCodes @@ -370,7 +372,7 @@ bool KVStore::tryFlushRegionData(UInt64 region_id, bool force_persist, bool try_ } } -bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term) +bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8, bool, TMTContext &, const RegionTaskLock &, UInt64, UInt64) { if (curr_region_ptr == nullptr) { @@ -382,25 +384,6 @@ bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 fl LOG_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes); - bool can_flush = false; - if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed) - || size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed)) - { - // if rows or bytes more than threshold, flush cache and persist mem data. - can_flush = true; - } - else - { - // if there is little data in mem, wait until time interval reached threshold. - // use random period so that lots of regions will not be persisted at same time. - auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT - can_flush = !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now()); - } - if (can_flush && flush_if_possible) - { - LOG_DEBUG(log, "{} flush region due to tryFlushRegionData, index {} term {}", curr_region.toString(false), index, term); - return forceFlushRegionDataImpl(curr_region, try_until_succeed, tmt, region_task_lock, index, term); - } return false; } @@ -861,17 +844,11 @@ FileUsageStatistics KVStore::getFileUsageStatistics() const } // We need to get applied index before flushing cache, and can't hold region task lock when flush cache to avoid hang write cmd apply. +// 1. store applied index and applied term, +// 2. flush cache, +// 3. notify regions to compact log and store fushed state with applied index/term before flushing cache. void KVStore::copmactLogByRowKeyRange(TMTContext & tmt, const DM::RowKeyRange & rowkey_range, TableID table_id, bool is_background) { - auto range = std::make_pair(TiKVRangeKey::makeTiKVRangeKey(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.start.toRegionKey(table_id))), - TiKVRangeKey::makeTiKVRangeKey(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.end.toRegionKey(table_id)))); - std::unordered_map> region_copmact_indexes; - auto region_read_lock = genRegionReadLock(); - auto region_map = getRegionsByRangeOverlap(range); - for (const auto & overlapped_region : region_map) - { - region_copmact_indexes[overlapped_region.first] = {overlapped_region.second->appliedIndex(), overlapped_region.second->appliedIndexTerm()}; - } auto storage = tmt.getStorages().get(table_id); if (unlikely(storage == nullptr)) { @@ -880,7 +857,41 @@ void KVStore::copmactLogByRowKeyRange(TMTContext & tmt, const DM::RowKeyRange & table_id); return; } + auto range = std::make_pair(TiKVRangeKey::makeTiKVRangeKey(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.start.toRegionKey(table_id))), + TiKVRangeKey::makeTiKVRangeKey(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.end.toRegionKey(table_id)))); + std::unordered_map> region_copmact_indexes; + { + auto task_lock = genTaskLock(); + auto region_map = getRegionsByRangeOverlap(range); + for (const auto & overlapped_region : region_map) + { + auto region_rowkey_range = DM::RowKeyRange::fromRegionRange( + overlapped_region.second->getRange(), + table_id, + storage->isCommonHandle(), + storage->getRowKeyColumnSize()); + auto region_task_lock = region_manager.genRegionTaskLock(overlapped_region.first); + region_copmact_indexes[overlapped_region.first] = {overlapped_region.second->appliedIndex(), overlapped_region.second->appliedIndexTerm(), region_rowkey_range}; + persistRegion(*overlapped_region.second, region_task_lock, "triggerCompactLog"); + } + } storage->flushCache(tmt.getContext(), rowkey_range); + // flush all segments in the range of regions. + // TODO: combine continues range to do one flush. + for (const auto & region : region_copmact_indexes) + { + auto region_rowkey_range = std::get<2>(region.second); + if (rowkey_range.getStart() <= region_rowkey_range.getStart() && region_rowkey_range.getEnd() < rowkey_range.getEnd()) + { + // This segment has flushed, skip it. + LOG_DEBUG(log, "flushed segment of region {}", region.first); + continue; + } + LOG_DEBUG(log, "flush extra segment of region {}, region range:[{},{}], flushed segment range:[{},{}]", region.first, region_rowkey_range.getStart().toDebugString(), region_rowkey_range.getEnd().toDebugString(), rowkey_range.getStart().toDebugString(), rowkey_range.getEnd().toDebugString()); + storage->flushCache(tmt.getContext(), std::get<2>(region.second)); + } + // forbid regions being removed. + auto task_lock = genTaskLock(); for (const auto & region : region_copmact_indexes) { auto reion_ptr = getRegion(region.first); @@ -889,22 +900,7 @@ void KVStore::copmactLogByRowKeyRange(TMTContext & tmt, const DM::RowKeyRange & LOG_INFO(log, "region {} has been removed, ignore", region.first); continue; } - auto region_rowkey_range = DM::RowKeyRange::fromRegionRange( - reion_ptr->getRange(), - table_id, - storage->isCommonHandle(), - storage->getRowKeyColumnSize()); - if (rowkey_range.getStart() <= region_rowkey_range.getStart() && region_rowkey_range.getEnd() <= rowkey_range.getEnd()) - { - notifyCompactLog(region.first, region.second.first, region.second.second, is_background); - reion_ptr->setFlushedState(region.second.first, region.second.second); - } - else - { - storage->flushCache(tmt.getContext(), region_rowkey_range, true); - notifyCompactLog(region.first, region.second.first, region.second.second, is_background); - reion_ptr->setFlushedState(region.second.first, region.second.second); - } + notifyCompactLog(region.first, std::get<0>(region.second), std::get<1>(region.second), is_background); } } @@ -930,7 +926,7 @@ void KVStore::notifyCompactLog(RegionID region_id, UInt64 compact_index, UInt64 GET_METRIC(tiflash_storage_subtask_count, type_compact_log_region_fg).Increment(); } auto region_task_lock = region_manager.genRegionTaskLock(region_id); - persistRegion(*region, region_task_lock, "tryFlushRegionData"); + region->setFlushedState(compact_index, compact_term); region->markCompactLog(); region->cleanApproxMemCacheInfo(); getProxyHelper()->notifyCompactLog(region_id, compact_index, compact_term); diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index 56134925ba6..d5ee9329045 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -50,7 +50,20 @@ RegionData::WriteCFIter Region::removeDataByWriteIt(const RegionData::WriteCFIte RegionDataReadInfo Region::readDataByWriteIt(const RegionData::ConstWriteCFIter & write_it, bool need_value) const { - return data.readDataByWriteIt(write_it, need_value); + try + { + return data.readDataByWriteIt(write_it, need_value); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("(region id {}, applied_index:{}, applied_term:{}, flushed_index:{}, flushed_term:{})", + meta.regionId(), + appliedIndex(), + appliedIndexTerm(), + flushed_state.applied_index, + flushed_state.applied_term)); + throw; + } } DecodedLockCFValuePtr Region::getLockInfo(const RegionLockReadQuery & query) const