Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Delta flush background #7088

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ namespace DB
F(type_seg_split_fg, {"type", "seg_split_fg"}), \
F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \
F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \
F(type_place_index_update, {"type", "place_index_update"})) \
F(type_place_index_update, {"type", "place_index_update"}), \
F(type_compact_log_segment_bg, {"type", "compact_log_segment_bg"}), \
F(type_compact_log_segment_fg, {"type", "compact_log_segment_fg"}), \
F(type_compact_log_region_bg, {"type", "compact_log_region_bg"}), \
F(type_compact_log_region_fg, {"type", "compact_log_region_fg"})) \
M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \
F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
Expand All @@ -146,7 +150,9 @@ namespace DB
F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20}), \
F(type_compact_log_bg, {{"type", "compact_log_bg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_compact_log_fg, {{"type", "compact_log_fg"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_throughput_bytes, "Calculate the throughput of tasks of storage in bytes", Gauge, /**/ \
F(type_write, {"type", "write"}), /**/ \
F(type_ingest, {"type", "ingest"}), /**/ \
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/V2/VersionSet/PageEntriesVersionSetWithDelta.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <common/logger_useful.h>

Expand Down Expand Up @@ -116,6 +117,7 @@ std::pair<bool, bool> DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const Back
case TaskType::Compact:
case TaskType::Flush:
case TaskType::PlaceIndex:
case TaskType::NotifyCompactLog:
is_heavy = false;
// reserve some task space for heavy tasks
if (max_task_num > 1 && light_tasks.size() >= static_cast<size_t>(max_task_num * 0.9))
Expand Down Expand Up @@ -1173,6 +1175,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
auto delta_cache_limit_rows = dm_context->delta_cache_limit_rows;
auto delta_cache_limit_bytes = dm_context->delta_cache_limit_bytes;

bool should_background_compact_log = (unsaved_rows >= delta_cache_limit_rows || unsaved_bytes >= delta_cache_limit_bytes);
bool should_background_flush = (unsaved_rows >= delta_cache_limit_rows || unsaved_bytes >= delta_cache_limit_bytes) //
&& (delta_rows - delta_last_try_flush_rows >= delta_cache_limit_rows
|| delta_bytes - delta_last_try_flush_bytes >= delta_cache_limit_bytes);
Expand Down Expand Up @@ -1246,6 +1249,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
delta_last_try_flush_bytes = delta_bytes;
LOG_DEBUG(log, "Foreground flush cache in checkSegmentUpdate, thread={} segment={}", thread_type, segment->info());
segment->flushCache(*dm_context);
triggerCompactLog(dm_context, segment, false);
}
else if (should_background_flush)
{
Expand All @@ -1260,6 +1264,10 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
try_add_background_task(BackgroundTask{TaskType::Flush, dm_context, segment});
}
}
if (should_background_compact_log)
{
try_add_background_task(BackgroundTask{TaskType::NotifyCompactLog, dm_context, segment});
}
}

// Need to check the latest delta (maybe updated after foreground flush). If it is updating by another thread,
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class DeltaMergeStore : private boost::noncopyable
Compact,
Flush,
PlaceIndex,
NotifyCompactLog,
};

struct BackgroundTask
Expand Down Expand Up @@ -588,7 +589,7 @@ class DeltaMergeStore : private boost::noncopyable
private:
void dropAllSegments(bool keep_first_segment);
String getLogTracingId(const DMContext & dm_ctx);

void triggerCompactLog(const DMContextPtr & dm_context, const SegmentPtr & segment, bool is_background);
#ifndef DBMS_PUBLIC_GTEST
private:
#else
Expand Down
31 changes: 30 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>

#include <magic_enum.hpp>
Expand All @@ -41,7 +42,6 @@ extern const char pause_until_dt_background_delta_merge[];

namespace DM
{

// A callback class for scanning the DMFiles on local filesystem
class LocalDMFileGcScanner final
{
Expand Down Expand Up @@ -307,6 +307,8 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)
left = task.segment;
type = ThreadType::BG_Flush;
break;
case TaskType::NotifyCompactLog:
triggerCompactLog(task.dm_context, task.segment, true);
case TaskType::PlaceIndex:
task.segment->placeDeltaIndex(*task.dm_context);
break;
Expand Down Expand Up @@ -829,5 +831,32 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options)
return gc_segments_num;
}

void DeltaMergeStore::triggerCompactLog(const DMContextPtr & dm_context, const SegmentPtr & segment, bool is_background)
{
auto & tmt = dm_context->db_context.getTMTContext();
auto & kv_store = tmt.getKVStore();

if (is_background)
{
GET_METRIC(tiflash_storage_subtask_count, type_compact_log_segment_bg).Increment();
}
else
{
GET_METRIC(tiflash_storage_subtask_count, type_compact_log_segment_fg).Increment();
}

Stopwatch watch;
SCOPE_EXIT({
if (is_background)
{
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_compact_log_bg).Observe(watch.elapsedSeconds());
}
else
{
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_compact_log_fg).Observe(watch.elapsedSeconds());
}
});
kv_store->copmactLogByRowKeyRange(tmt, segment->getRowKeyRange(), physical_table_id, is_background);
}
} // namespace DM
} // namespace DB
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/RowKeyRange.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ inline bool operator<(const RowKeyValueRef & a, const RowKeyValueRef & b)
return compare(a, b) < 0;
}

inline bool operator<=(const RowKeyValueRef & a, const RowKeyValueRef & b)
{
return compare(a, b) <= 0;
}

inline bool operator<(const StringRef & a, const RowKeyValueRef & b)
{
return compare(a, b) < 0;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/Transaction/TMTContext.h>

#include <Common/Exception.h>
#include <Common/SyncPoint/SyncPoint.h>
#include <Common/TiFlashMetrics.h>
Expand All @@ -38,6 +40,8 @@
#include <Storages/DeltaMerge/StoragePool.h>
#include <Storages/DeltaMerge/WriteBatchesImpl.h>
#include <Storages/PathPool.h>
#include <Storages/Transaction/RegionTable.h>
#include <Storages/Transaction/KVStore.h>
#include <common/logger_useful.h>
#include <fiu.h>
#include <fmt/core.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ BlockInputStreamPtr StorageDeltaMerge::status()
auto & name_col = columns[0];
auto & value_col = columns[1];

StoreStats stat;
DM::StoreStats stat;
if (storeInited())
{
stat = _store->getStoreStats();
Expand Down
116 changes: 94 additions & 22 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <Storages/Transaction/TMTContext.h>
#include <common/likely.h>

#include <tuple>

namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -98,6 +100,7 @@ RegionPtr KVStore::getRegion(RegionID region_id) const
return it->second;
return nullptr;
}
// TODO: may get regions not in segment?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we also flush data from those obsolete regions?

RegionMap KVStore::getRegionsByRangeOverlap(const RegionRange & range) const
{
auto manage_lock = genRegionReadLock();
Expand Down Expand Up @@ -365,11 +368,11 @@ bool KVStore::tryFlushRegionData(UInt64 region_id, bool force_persist, bool try_
}
else
{
return canFlushRegionDataImpl(curr_region_ptr, true, try_until_succeed, tmt, region_task_lock, index, term);
return canFlushRegionDataImpl(curr_region_ptr, false, try_until_succeed, tmt, region_task_lock, index, term);
}
}

bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 flush_if_possible, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term)
bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8, bool, TMTContext &, const RegionTaskLock &, UInt64, UInt64)
{
if (curr_region_ptr == nullptr)
{
Expand All @@ -381,26 +384,7 @@ bool KVStore::canFlushRegionDataImpl(const RegionPtr & curr_region_ptr, UInt8 fl

LOG_DEBUG(log, "{} approx mem cache info: rows {}, bytes {}", curr_region.toString(false), rows, size_bytes);

bool can_flush = false;
if (rows >= region_compact_log_min_rows.load(std::memory_order_relaxed)
|| size_bytes >= region_compact_log_min_bytes.load(std::memory_order_relaxed))
{
// if rows or bytes more than threshold, flush cache and persist mem data.
can_flush = true;
}
else
{
// if there is little data in mem, wait until time interval reached threshold.
// use random period so that lots of regions will not be persisted at same time.
auto compact_log_period = std::rand() % region_compact_log_period.load(std::memory_order_relaxed); // NOLINT
can_flush = !(curr_region.lastCompactLogTime() + Seconds{compact_log_period} > Clock::now());
}
if (can_flush && flush_if_possible)
{
LOG_DEBUG(log, "{} flush region due to tryFlushRegionData, index {} term {}", curr_region.toString(false), index, term);
return forceFlushRegionDataImpl(curr_region, try_until_succeed, tmt, region_task_lock, index, term);
}
return can_flush;
return false;
}

bool KVStore::forceFlushRegionDataImpl(Region & curr_region, bool try_until_succeed, TMTContext & tmt, const RegionTaskLock & region_task_lock, UInt64 index, UInt64 term)
Expand Down Expand Up @@ -859,4 +843,92 @@ FileUsageStatistics KVStore::getFileUsageStatistics() const
return region_persister->getFileUsageStatistics();
}

// We need to get applied index before flushing cache, and can't hold region task lock when flush cache to avoid hang write cmd apply.
// 1. store applied index and applied term,
// 2. flush cache,
// 3. notify regions to compact log and store fushed state with applied index/term before flushing cache.
void KVStore::copmactLogByRowKeyRange(TMTContext & tmt, const DM::RowKeyRange & rowkey_range, TableID table_id, bool is_background)
{
auto storage = tmt.getStorages().get(table_id);
if (unlikely(storage == nullptr))
{
LOG_WARNING(log,
"tryFlushRegionCacheInStorage can not get table for table id {}, ignored",
table_id);
return;
}
auto range = std::make_pair(TiKVRangeKey::makeTiKVRangeKey<true>(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.start.toRegionKey(table_id))),
TiKVRangeKey::makeTiKVRangeKey<false>(RecordKVFormat::encodeAsTiKVKey(*rowkey_range.end.toRegionKey(table_id))));
std::unordered_map<UInt64, std::tuple<UInt64, UInt64, DM::RowKeyRange>> region_copmact_indexes;
{
auto task_lock = genTaskLock();
auto region_map = getRegionsByRangeOverlap(range);
for (const auto & overlapped_region : region_map)
{
auto region_rowkey_range = DM::RowKeyRange::fromRegionRange(
overlapped_region.second->getRange(),
table_id,
storage->isCommonHandle(),
storage->getRowKeyColumnSize());
auto region_task_lock = region_manager.genRegionTaskLock(overlapped_region.first);
region_copmact_indexes[overlapped_region.first] = {overlapped_region.second->appliedIndex(), overlapped_region.second->appliedIndexTerm(), region_rowkey_range};
persistRegion(*overlapped_region.second, region_task_lock, "triggerCompactLog");
}
}
storage->flushCache(tmt.getContext(), rowkey_range);
Copy link
Member

@CalvinNeo CalvinNeo May 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why flushCache here before the previous loop?

// flush all segments in the range of regions.
// TODO: combine continues range to do one flush.
for (const auto & region : region_copmact_indexes)
{
auto region_rowkey_range = std::get<2>(region.second);
if (rowkey_range.getStart() <= region_rowkey_range.getStart() && region_rowkey_range.getEnd() < rowkey_range.getEnd())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why < here?

{
// This segment has flushed, skip it.
LOG_DEBUG(log, "flushed segment of region {}", region.first);
continue;
}
LOG_DEBUG(log, "flush extra segment of region {}, region range:[{},{}], flushed segment range:[{},{}]", region.first, region_rowkey_range.getStart().toDebugString(), region_rowkey_range.getEnd().toDebugString(), rowkey_range.getStart().toDebugString(), rowkey_range.getEnd().toDebugString());
storage->flushCache(tmt.getContext(), std::get<2>(region.second));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flushCache is not protected by region task lock.

}
// forbid regions being removed.
auto task_lock = genTaskLock();
for (const auto & region : region_copmact_indexes)
{
auto reion_ptr = getRegion(region.first);
if (!reion_ptr)
{
LOG_INFO(log, "region {} has been removed, ignore", region.first);
continue;
}
notifyCompactLog(region.first, std::get<0>(region.second), std::get<1>(region.second), is_background);
}
}

// the caller guarantee that delta cache has been flushed. This function need to persiste region cache before trigger proxy to compact log.
void KVStore::notifyCompactLog(RegionID region_id, UInt64 compact_index, UInt64 compact_term, bool is_background)
{
auto region = getRegion(region_id);
if (!region)
{
LOG_INFO(log, "region {} has been removed, ignore", region_id);
return;
}
if (region->lastCompactLogTime() + Seconds{region_compact_log_period.load(std::memory_order_relaxed)} > Clock::now())
{
return;
}
if (is_background)
{
GET_METRIC(tiflash_storage_subtask_count, type_compact_log_region_bg).Increment();
}
else
{
GET_METRIC(tiflash_storage_subtask_count, type_compact_log_region_fg).Increment();
}
auto region_task_lock = region_manager.genRegionTaskLock(region_id);
region->setFlushedState(compact_index, compact_term);
region->markCompactLog();
region->cleanApproxMemCacheInfo();
getProxyHelper()->notifyCompactLog(region_id, compact_index, compact_term);
}
} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/Transaction/RegionDataRead.h>
#include <Storages/Transaction/RegionManager.h>
#include <Storages/Transaction/StorageEngineType.h>
Expand Down Expand Up @@ -167,6 +168,8 @@ class KVStore final : private boost::noncopyable

FileUsageStatistics getFileUsageStatistics() const;

void copmactLogByRowKeyRange(TMTContext & tmt, const DM::RowKeyRange & rowkey_range, TableID table_id, bool is_background);
void notifyCompactLog(RegionID region_id, UInt64 compact_index, UInt64 compact_term, bool is_background);
#ifndef DBMS_PUBLIC_GTEST
private:
#endif
Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Storages/Transaction/ProxyFFI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -797,16 +797,27 @@ raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint
return state;
}

void TiFlashRaftProxyHelper::notifyCompactLog(uint64_t region_id, uint64_t compact_index, uint64_t compact_term) const
{
this->fn_notify_compact_log(this->proxy_ptr, region_id, compact_index, compact_term, compact_index);
}

void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts)
{
RegionTable & region_table = server->tmt->getRegionTable();
region_table.updateSafeTS(region_id, leader_safe_ts, self_safe_ts);
}


std::string_view buffToStrView(const BaseBuffView & buf)
{
return std::string_view{buf.data, buf.len};
}

FlushedState GetFlushedState(EngineStoreServerWrap * server, uint64_t region_id)
{
auto & kvstore = server->tmt->getKVStore();
auto region_ptr = kvstore->getRegion(region_id);
return region_ptr->getFlushedState();
}

} // namespace DB
Loading