diff --git a/contrib/tiflash-proxy b/contrib/tiflash-proxy index 7e2e6374bae..ac4614fdfbb 160000 --- a/contrib/tiflash-proxy +++ b/contrib/tiflash-proxy @@ -1 +1 @@ -Subproject commit 7e2e6374baec77e667daec3c9f1c900a8097e959 +Subproject commit ac4614fdfbb9ac88beb09d5044d707bd703d65d6 diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 20ad1e2482f..3385b075295 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -116,7 +116,7 @@ namespace DB F(type_seg_split_bg, {"type", "seg_split_bg"}), \ F(type_seg_split_fg, {"type", "seg_split_fg"}), \ F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \ - F(type_seg_merge, {"type", "seg_merge"}), F(type_seg_merge_fg, {"type", "seg_merge_fg"}), \ + F(type_seg_merge_bg_gc, {"type", "type_seg_merge_bg_gc"}), \ F(type_place_index_update, {"type", "place_index_update"})) \ M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \ F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \ @@ -128,8 +128,7 @@ namespace DB F(type_seg_split_bg, {{"type", "seg_split_bg"}}, ExpBuckets{0.001, 2, 20}), \ F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.001, 2, 20}), \ F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \ - F(type_seg_merge, {{"type", "seg_merge"}}, ExpBuckets{0.001, 2, 20}), \ - F(type_seg_merge_fg, {{"type", "seg_merge_fg"}}, ExpBuckets{0.001, 2, 20}), \ + F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \ F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \ M(tiflash_storage_throughput_bytes, "Calculate the throughput of tasks of storage in bytes", Gauge, /**/ \ F(type_write, {"type", "write"}), /**/ \ diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 3dc2f25fb15..f2f6b829874 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -472,13 +472,18 @@ class DeltaMergeStore : private boost::noncopyable */ SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, SegmentSplitReason reason, std::optional opt_split_at = std::nullopt, SegmentSplitMode opt_split_mode = SegmentSplitMode::Auto); + enum class SegmentMergeReason + { + BackgroundGCThread, + }; + /** * Merge multiple continuous segments (order by segment start key) into one. * Throw exception if < 2 segments are given. * Fail if given segments are not continuous or not valid. * After merging, all specified segments will be abandoned (with `segment->hasAbandoned() == true`). */ - SegmentPtr segmentMerge(DMContext & dm_context, const std::vector & ordered_segments, bool is_foreground); + SegmentPtr segmentMerge(DMContext & dm_context, const std::vector & ordered_segments, SegmentMergeReason reason); enum class MergeDeltaReason { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 4979bc793f3..ea387529198 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -336,7 +336,7 @@ bool shouldCompactStableWithTooManyInvalidVersion(const SegmentPtr & seg, DB::Ti return false; } -bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double invalid_data_ratio_threshold, const LoggerPtr & log) +bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentPtr & segment, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double invalid_data_ratio_threshold, const LoggerPtr & log) { auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range); if (actual_delete_range.none()) @@ -347,8 +347,6 @@ bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapsh auto stable_rows = snap->stable->getRows(); auto stable_bytes = snap->stable->getBytes(); - LOG_FMT_TRACE(log, "delete range rows [{}], delete_bytes [{}] stable_rows [{}] stable_bytes [{}]", delete_rows, delete_bytes, stable_rows, stable_bytes); - // 1. for small tables, the data may just reside in delta and stable_rows may be 0, // so the `=` in `>=` is needed to cover the scenario when set tiflash replica of small tables to 0. // (i.e. `actual_delete_range` is not none, but `delete_rows` and `stable_rows` are both 0). @@ -356,7 +354,20 @@ bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapsh // because before write apply snapshot file, it will write a delete range first, and will meet the following gc criteria. // But the cost should be really minor because merge delta on an empty segment should be very fast. // What's more, we can ignore this kind of delete range in future to avoid this extra gc. - return (delete_rows >= stable_rows * invalid_data_ratio_threshold) || (delete_bytes >= stable_bytes * invalid_data_ratio_threshold); + auto check_result = (delete_rows >= stable_rows * invalid_data_ratio_threshold) || (delete_bytes >= stable_bytes * invalid_data_ratio_threshold); + + LOG_FMT_TRACE( + log, + "GC - Checking shouldCompactDeltaWithStable, " + "check_result={} delete_rows={}, delete_bytes={} stable_rows={} stable_bytes={} segment={}", + check_result, + delete_rows, + delete_bytes, + stable_rows, + stable_bytes, + segment->simpleInfo()); + + return check_result; } std::unordered_set getDMFileIDs(const SegmentPtr & seg) @@ -385,15 +396,20 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte double invalid_data_ratio_threshold, const LoggerPtr & log) { - std::unordered_set prev_segment_file_ids = getDMFileIDs(prev_seg); - std::unordered_set next_segment_file_ids = getDMFileIDs(next_seg); auto [first_pack_included, last_pack_included] = snap->stable->isFirstAndLastPackIncludedInRange(context, seg->getRowKeyRange()); // Do a quick check about whether the DTFile is completely included in the segment range if (first_pack_included && last_pack_included) { + LOG_FMT_TRACE(log, "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange marking " + "segment as valid data ratio checked because all packs are included, segment={}", + seg->info()); seg->setValidDataRatioChecked(); return false; } + + std::unordered_set prev_segment_file_ids = getDMFileIDs(prev_seg); + std::unordered_set next_segment_file_ids = getDMFileIDs(next_seg); + bool contains_invalid_data = false; const auto & dt_files = snap->stable->getDMFiles(); if (!first_pack_included) @@ -415,6 +431,26 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte // Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments. if (!contains_invalid_data) { + LOG_FMT_TRACE( + log, + "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange checked false because no invalid data, " + "segment={} first_pack_included={} last_pack_included={} prev_seg_files=[{}] next_seg_files=[{}] my_files=[{}]", + seg->simpleInfo(), + first_pack_included, + last_pack_included, + fmt::join(prev_segment_file_ids, ","), + fmt::join(next_segment_file_ids, ","), + [&] { + FmtBuffer fmt_buf; + fmt_buf.joinStr( + dt_files.begin(), + dt_files.end(), + [](const DMFilePtr & dt_file, FmtBuffer & fb) { + fb.fmtAppend("{}", dt_file->fileId()); + }, + ","); + return fmt_buf.toString(); + }()); return false; } @@ -428,9 +464,18 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte auto valid_rows = snap->stable->getRows(); auto valid_bytes = snap->stable->getBytes(); - LOG_FMT_TRACE(log, "valid_rows [{}], valid_bytes [{}] total_rows [{}] total_bytes [{}]", valid_rows, valid_bytes, total_rows, total_bytes); + auto check_result = (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold)); + LOG_FMT_TRACE( + log, + "GC - Checking shouldCompactStableWithTooMuchDataOutOfSegmentRange, " + "check_result={} valid_rows={} valid_bytes={} file_rows={} file_bytes={}", + check_result, + valid_rows, + valid_bytes, + total_rows, + total_bytes); seg->setValidDataRatioChecked(); - return (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold)); + return check_result; } } // namespace GC @@ -465,7 +510,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c "GC - Trigger Merge, segment={} table={}", segment->simpleInfo(), table_name); - auto new_segment = segmentMerge(*dm_context, segments_to_merge, false); + auto new_segment = segmentMerge(*dm_context, segments_to_merge, SegmentMergeReason::BackgroundGCThread); if (new_segment) { checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC); @@ -510,6 +555,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_conte if (GC::shouldCompactDeltaWithStable( *dm_context, + segment, segment_snap, segment_range, invalid_data_ratio_threshold, @@ -519,7 +565,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_conte compact_reason = GC::MergeDeltaReason::TooManyDeleteRange; } - if (!should_compact && segment->isValidDataRatioChecked()) + if (!should_compact && !segment->isValidDataRatioChecked()) { if (GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange( *dm_context, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 19bd366854e..ba13bc72a70 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -208,14 +208,14 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP return {new_left, new_right}; } -SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vector & ordered_segments, bool is_foreground) +SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vector & ordered_segments, SegmentMergeReason reason) { RUNTIME_CHECK(ordered_segments.size() >= 2, ordered_segments.size()); LOG_FMT_INFO( log, - "Merge - Begin, is_foreground={} safe_point={} segments_to_merge={}", - is_foreground, + "Merge - Begin, reason={} safe_point={} segments_to_merge={}", + magic_enum::enum_name(reason), dm_context.min_version, Segment::simpleInfo(ordered_segments)); @@ -289,16 +289,24 @@ SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vect } CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_SegmentMerge}; - if (is_foreground) - GET_METRIC(tiflash_storage_subtask_count, type_seg_merge_fg).Increment(); - else - GET_METRIC(tiflash_storage_subtask_count, type_seg_merge).Increment(); + switch (reason) + { + case SegmentMergeReason::BackgroundGCThread: + GET_METRIC(tiflash_storage_subtask_count, type_seg_merge_bg_gc).Increment(); + break; + default: + break; + } Stopwatch watch_seg_merge; SCOPE_EXIT({ - if (is_foreground) - GET_METRIC(tiflash_storage_subtask_duration_seconds, type_seg_merge_fg).Observe(watch_seg_merge.elapsedSeconds()); - else - GET_METRIC(tiflash_storage_subtask_duration_seconds, type_seg_merge).Observe(watch_seg_merge.elapsedSeconds()); + switch (reason) + { + case SegmentMergeReason::BackgroundGCThread: + GET_METRIC(tiflash_storage_subtask_duration_seconds, type_seg_merge_bg_gc).Observe(watch_seg_merge.elapsedSeconds()); + break; + default: + break; + } }); WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter()); @@ -344,9 +352,9 @@ SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vect LOG_FMT_INFO( log, - "Merge - Finish, {} segments are merged into one, is_foreground={} merged={} segments_to_merge={}", + "Merge - Finish, {} segments are merged into one, reason={} merged={} segments_to_merge={}", ordered_segments.size(), - is_foreground, + magic_enum::enum_name(reason), merged->info(), Segment::info(ordered_segments)); } @@ -368,7 +376,12 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( const MergeDeltaReason reason, SegmentSnapshotPtr segment_snap) { - LOG_FMT_INFO(log, "MergeDelta - Begin, reason={} safe_point={} segment={}", magic_enum::enum_name(reason), dm_context.min_version, segment->info()); + LOG_FMT_INFO( + log, + "MergeDelta - Begin, reason={} safe_point={} segment={}", + magic_enum::enum_name(reason), + dm_context.min_version, + segment->info()); ColumnDefinesPtr schema_snap; @@ -479,7 +492,11 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( new_segment->check(dm_context, "After segmentMergeDelta"); } - LOG_FMT_INFO(log, "MergeDelta - Finish, delta is merged, old_segment={} new_segment={}", segment->info(), new_segment->info()); + LOG_FMT_INFO( + log, + "MergeDelta - Finish, delta is merged, old_segment={} new_segment={}", + segment->info(), + new_segment->info()); } wbs.writeRemoves(); diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index f7ee3c060e1..1d90c411130 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -328,6 +328,8 @@ std::vector KVStore::preHandleSSTsToDTFiles( } physical_table_id = storage->getTableInfo().id; + auto & global_settings = context.getGlobalContext().getSettingsRef(); + // Read from SSTs and refine the boundary of blocks output to DTFiles auto sst_stream = std::make_shared( new_region, @@ -345,9 +347,9 @@ std::vector KVStore::preHandleSSTsToDTFiles( schema_snap, snapshot_apply_method, job_type, - /* split_after_rows */ 0, - /* split_after_size */ 0, - tmt.getContext()); + /* split_after_rows */ global_settings.dt_segment_limit_rows, + /* split_after_size */ global_settings.dt_segment_limit_size, + context); stream->writePrefix(); stream->write(); diff --git a/dbms/src/Storages/Transaction/ProxyFFI.cpp b/dbms/src/Storages/Transaction/ProxyFFI.cpp index 87cedb59b97..d66875c268d 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFI.cpp @@ -560,11 +560,8 @@ raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint return state; } -void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts) +// just a dummy function to avoid proxy fn_handle_safe_ts_update.is_some() panic. +void HandleSafeTSUpdate(EngineStoreServerWrap *, uint64_t, uint64_t, uint64_t) { - RegionTable & region_table = server->tmt->getRegionTable(); - region_table.updateSelfSafeTS(region_id, self_safe_ts); - region_table.updateLeaderSafeTS(region_id, leader_safe_ts); - LOG_FMT_TRACE(&Poco::Logger::get(__FUNCTION__), "update safe ts in region_id={}, leader_safe_ts={}, self_safe_ts={}", region_id, leader_safe_ts, self_safe_ts); } } // namespace DB diff --git a/dbms/src/Storages/Transaction/ProxyFFI.h b/dbms/src/Storages/Transaction/ProxyFFI.h index 7107d9f0c5c..fde97041873 100644 --- a/dbms/src/Storages/Transaction/ProxyFFI.h +++ b/dbms/src/Storages/Transaction/ProxyFFI.h @@ -147,8 +147,8 @@ BaseBuffView strIntoView(const std::string * str_ptr); CppStrWithView GetConfig(EngineStoreServerWrap *, uint8_t full); void SetStore(EngineStoreServerWrap *, BaseBuffView); void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view); -void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts); } +void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts); inline EngineStoreServerHelper GetEngineStoreServerHelper( EngineStoreServerWrap * tiflash_instance_wrap) diff --git a/dbms/src/Storages/Transaction/ProxyFFIStatusService.cpp b/dbms/src/Storages/Transaction/ProxyFFIStatusService.cpp index d2eb4454b05..792f149f588 100644 --- a/dbms/src/Storages/Transaction/ProxyFFIStatusService.cpp +++ b/dbms/src/Storages/Transaction/ProxyFFIStatusService.cpp @@ -55,18 +55,11 @@ HttpRequestRes HandleHttpRequestSyncStatus( // if storage is not created in ch, flash replica should not be available. if (tmt.getStorages().get(table_id)) { - RegionTable & region_table = tmt.getRegionTable(); - region_table.handleInternalRegionsByTable(table_id, [&](const RegionTable::InternalRegions & regions) { + tmt.getRegionTable().handleInternalRegionsByTable(table_id, [&](const RegionTable::InternalRegions & regions) { + count = regions.size(); region_list.reserve(regions.size()); for (const auto & region : regions) - { - if (!region_table.isSafeTSLag(region.first)) - { - region_list.push_back(region.first); - } - } - count = region_list.size(); - LOG_FMT_DEBUG(&Poco::Logger::get(__FUNCTION__), "table_id={}, total_region_count={}, ready_region_count={}", table_id, regions.size(), count); + region_list.push_back(region.first); }); } ss << count << std::endl; diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index f575c39c1a9..487f581ab5d 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -186,11 +186,7 @@ void RegionTable::removeTable(TableID table_id) // Remove from region list. for (const auto & region_info : table.regions) - { regions.erase(region_info.first); - leader_safe_ts.erase(region_info.first); - self_safe_ts.erase(region_info.first); - } // Remove from table map. tables.erase(it); @@ -268,8 +264,6 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const handle_range = internal_region_it->second.range_in_table; regions.erase(it); - leader_safe_ts.erase(region_id); - self_safe_ts.erase(region_id); table.regions.erase(internal_region_it); if (table.regions.empty()) { diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 5e488e36826..adc6c5659fc 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -23,11 +23,9 @@ #include #include #include -#include #include #include -#include #include #include #include @@ -62,12 +60,6 @@ namespace DM struct ExternalDTFileInfo; } -using SafeTS = UInt64; -enum : SafeTS -{ - InvalidSafeTS = std::numeric_limits::max() -}; - class RegionTable : private boost::noncopyable { public: @@ -89,7 +81,7 @@ class RegionTable : private boost::noncopyable struct Table : boost::noncopyable { - explicit Table(const TableID table_id_) + Table(const TableID table_id_) : table_id(table_id_) {} TableID table_id; @@ -98,7 +90,6 @@ class RegionTable : private boost::noncopyable using TableMap = std::unordered_map; using RegionInfoMap = std::unordered_map; - using SafeTsMap = std::unordered_map; using DirtyRegions = std::unordered_set; using TableToOptimize = std::unordered_set; @@ -107,7 +98,7 @@ class RegionTable : private boost::noncopyable { using FlushThresholdsData = std::vector>; - explicit FlushThresholds(FlushThresholdsData && data_) + FlushThresholds(FlushThresholdsData && data_) : data(std::make_shared(std::move(data_))) {} @@ -131,7 +122,7 @@ class RegionTable : private boost::noncopyable mutable std::mutex mutex; }; - explicit RegionTable(Context & context_); + RegionTable(Context & context_); void restore(); void setFlushThresholds(const FlushThresholds::FlushThresholdsData & flush_thresholds_); @@ -141,14 +132,14 @@ class RegionTable : private boost::noncopyable /// This functional only shrink the table range of this region_id void shrinkRegionRange(const Region & region); - void removeRegion(RegionID region_id, bool remove_data, const RegionTaskLock &); + void removeRegion(const RegionID region_id, bool remove_data, const RegionTaskLock &); bool tryFlushRegions(); RegionDataReadInfoList tryFlushRegion(RegionID region_id, bool try_persist = false); RegionDataReadInfoList tryFlushRegion(const RegionPtrWithBlock & region, bool try_persist); - void handleInternalRegionsByTable(TableID table_id, std::function && callback) const; - std::vector> getRegionsByTable(TableID table_id) const; + void handleInternalRegionsByTable(const TableID table_id, std::function && callback) const; + std::vector> getRegionsByTable(const TableID table_id) const; /// Write the data of the given region into the table with the given table ID, fill the data list for outer to remove. /// Will trigger schema sync on read error for only once, @@ -173,49 +164,17 @@ class RegionTable : private boost::noncopyable Poco::Logger * log); /// extend range for possible InternalRegion or add one. - void extendRegionRange(RegionID region_id, const RegionRangeKeys & region_range_keys); - void updateSelfSafeTS(UInt64 region_id, UInt64 safe_ts) - { - if (safe_ts == InvalidSafeTS) - { - return; - } - std::lock_guard lock(mutex); - self_safe_ts[region_id] = safe_ts; - } - void updateLeaderSafeTS(UInt64 region_id, UInt64 safe_ts) - { - if (safe_ts == InvalidSafeTS) - { - return; - } - std::lock_guard lock(mutex); - leader_safe_ts[region_id] = safe_ts; - } - - // unit: ms. If safe_ts diff is larger than 2min, we think the data synchronization progress is far behind the leader. - static const UInt64 SafeTsDiffThreshold = 2 * 60 * 1000; - bool isSafeTSLag(UInt64 region_id) - { - auto self_it = self_safe_ts.find(region_id); - auto leader_it = leader_safe_ts.find(region_id); - if (self_it == self_safe_ts.end() || leader_it == leader_safe_ts.end()) - { - return false; - } - LOG_FMT_TRACE(log, "region_id:{}, table_id:{}, leader_safe_ts:{}, self_safe_ts:{}", region_id, regions[region_id], leader_it->second, self_it->second); - return (leader_it->second > self_it->second) && (leader_it->second - self_it->second > SafeTsDiffThreshold); - } + void extendRegionRange(const RegionID region_id, const RegionRangeKeys & region_range_keys); private: friend class MockTiDB; friend class StorageDeltaMerge; - Table & getOrCreateTable(TableID table_id); + Table & getOrCreateTable(const TableID table_id); void removeTable(TableID table_id); InternalRegion & insertRegion(Table & table, const Region & region); InternalRegion & getOrInsertRegion(const Region & region); - InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, RegionID region_id); + InternalRegion & insertRegion(Table & table, const RegionRangeKeys & region_range_keys, const RegionID region_id); InternalRegion & doGetInternalRegion(TableID table_id, RegionID region_id); RegionDataReadInfoList flushRegion(const RegionPtrWithBlock & region, bool try_persist) const; @@ -225,11 +184,6 @@ class RegionTable : private boost::noncopyable private: TableMap tables; RegionInfoMap regions; - // safe_ts is maintained by check_leader RPC (https://github.com/tikv/tikv/blob/1ea26a2ac8761af356cc5c0825eb89a0b8fc9749/components/resolved_ts/src/advance.rs#L262), - // leader_safe_ts is the safe_ts in leader, leader will send to learner to advance safe_ts of learner, and TiFlash will record the safe_ts in check_leader RPC as leader_safe_ts. - SafeTsMap leader_safe_ts; - // self_safe_ts is the safe_ts in learner. When TiFlash proxy receive from leader, TiFlash will update self_safe_ts when TiFlash has applied the raft log to applied_index. - SafeTsMap self_safe_ts; DirtyRegions dirty_regions; FlushThresholds flush_thresholds;