Skip to content

Commit

Permalink
Merge branch 'master' into astToExecutorToBinder
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Sep 19, 2022
2 parents 6a9c7e0 + 014e55c commit eb32b6e
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 110 deletions.
5 changes: 2 additions & 3 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ namespace DB
F(type_seg_split_bg, {"type", "seg_split_bg"}), \
F(type_seg_split_fg, {"type", "seg_split_fg"}), \
F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \
F(type_seg_merge, {"type", "seg_merge"}), F(type_seg_merge_fg, {"type", "seg_merge_fg"}), \
F(type_seg_merge_bg_gc, {"type", "type_seg_merge_bg_gc"}), \
F(type_place_index_update, {"type", "place_index_update"})) \
M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \
F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \
Expand All @@ -128,8 +128,7 @@ namespace DB
F(type_seg_split_bg, {{"type", "seg_split_bg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_merge, {{"type", "seg_merge"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_merge_fg, {{"type", "seg_merge_fg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_throughput_bytes, "Calculate the throughput of tasks of storage in bytes", Gauge, /**/ \
F(type_write, {"type", "write"}), /**/ \
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,18 @@ class DeltaMergeStore : private boost::noncopyable
*/
SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, SegmentSplitReason reason, std::optional<RowKeyValue> opt_split_at = std::nullopt, SegmentSplitMode opt_split_mode = SegmentSplitMode::Auto);

enum class SegmentMergeReason
{
BackgroundGCThread,
};

/**
* Merge multiple continuous segments (order by segment start key) into one.
* Throw exception if < 2 segments are given.
* Fail if given segments are not continuous or not valid.
* After merging, all specified segments will be abandoned (with `segment->hasAbandoned() == true`).
*/
SegmentPtr segmentMerge(DMContext & dm_context, const std::vector<SegmentPtr> & ordered_segments, bool is_foreground);
SegmentPtr segmentMerge(DMContext & dm_context, const std::vector<SegmentPtr> & ordered_segments, SegmentMergeReason reason);

enum class MergeDeltaReason
{
Expand Down
66 changes: 56 additions & 10 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ bool shouldCompactStableWithTooManyInvalidVersion(const SegmentPtr & seg, DB::Ti
return false;
}

bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double invalid_data_ratio_threshold, const LoggerPtr & log)
bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentPtr & segment, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double invalid_data_ratio_threshold, const LoggerPtr & log)
{
auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range);
if (actual_delete_range.none())
Expand All @@ -347,16 +347,27 @@ bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapsh
auto stable_rows = snap->stable->getRows();
auto stable_bytes = snap->stable->getBytes();

LOG_FMT_TRACE(log, "delete range rows [{}], delete_bytes [{}] stable_rows [{}] stable_bytes [{}]", delete_rows, delete_bytes, stable_rows, stable_bytes);

// 1. for small tables, the data may just reside in delta and stable_rows may be 0,
// so the `=` in `>=` is needed to cover the scenario when set tiflash replica of small tables to 0.
// (i.e. `actual_delete_range` is not none, but `delete_rows` and `stable_rows` are both 0).
// 2. the disadvantage of `=` in `>=` is that it may trigger an extra gc when write apply snapshot file to an empty segment,
// because before write apply snapshot file, it will write a delete range first, and will meet the following gc criteria.
// But the cost should be really minor because merge delta on an empty segment should be very fast.
// What's more, we can ignore this kind of delete range in future to avoid this extra gc.
return (delete_rows >= stable_rows * invalid_data_ratio_threshold) || (delete_bytes >= stable_bytes * invalid_data_ratio_threshold);
auto check_result = (delete_rows >= stable_rows * invalid_data_ratio_threshold) || (delete_bytes >= stable_bytes * invalid_data_ratio_threshold);

LOG_FMT_TRACE(
log,
"GC - Checking shouldCompactDeltaWithStable, "
"check_result={} delete_rows={}, delete_bytes={} stable_rows={} stable_bytes={} segment={}",
check_result,
delete_rows,
delete_bytes,
stable_rows,
stable_bytes,
segment->simpleInfo());

return check_result;
}

std::unordered_set<UInt64> getDMFileIDs(const SegmentPtr & seg)
Expand Down Expand Up @@ -385,15 +396,20 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
double invalid_data_ratio_threshold,
const LoggerPtr & log)
{
std::unordered_set<UInt64> prev_segment_file_ids = getDMFileIDs(prev_seg);
std::unordered_set<UInt64> next_segment_file_ids = getDMFileIDs(next_seg);
auto [first_pack_included, last_pack_included] = snap->stable->isFirstAndLastPackIncludedInRange(context, seg->getRowKeyRange());
// Do a quick check about whether the DTFile is completely included in the segment range
if (first_pack_included && last_pack_included)
{
LOG_FMT_TRACE(log, "GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange marking "
"segment as valid data ratio checked because all packs are included, segment={}",
seg->info());
seg->setValidDataRatioChecked();
return false;
}

std::unordered_set<UInt64> prev_segment_file_ids = getDMFileIDs(prev_seg);
std::unordered_set<UInt64> next_segment_file_ids = getDMFileIDs(next_seg);

bool contains_invalid_data = false;
const auto & dt_files = snap->stable->getDMFiles();
if (!first_pack_included)
Expand All @@ -415,6 +431,26 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
// Only try to compact the segment when there is data out of this segment range and is also not shared by neighbor segments.
if (!contains_invalid_data)
{
LOG_FMT_TRACE(
log,
"GC - shouldCompactStableWithTooMuchDataOutOfSegmentRange checked false because no invalid data, "
"segment={} first_pack_included={} last_pack_included={} prev_seg_files=[{}] next_seg_files=[{}] my_files=[{}]",
seg->simpleInfo(),
first_pack_included,
last_pack_included,
fmt::join(prev_segment_file_ids, ","),
fmt::join(next_segment_file_ids, ","),
[&] {
FmtBuffer fmt_buf;
fmt_buf.joinStr(
dt_files.begin(),
dt_files.end(),
[](const DMFilePtr & dt_file, FmtBuffer & fb) {
fb.fmtAppend("{}", dt_file->fileId());
},
",");
return fmt_buf.toString();
}());
return false;
}

Expand All @@ -428,9 +464,18 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
auto valid_rows = snap->stable->getRows();
auto valid_bytes = snap->stable->getBytes();

LOG_FMT_TRACE(log, "valid_rows [{}], valid_bytes [{}] total_rows [{}] total_bytes [{}]", valid_rows, valid_bytes, total_rows, total_bytes);
auto check_result = (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold));
LOG_FMT_TRACE(
log,
"GC - Checking shouldCompactStableWithTooMuchDataOutOfSegmentRange, "
"check_result={} valid_rows={} valid_bytes={} file_rows={} file_bytes={}",
check_result,
valid_rows,
valid_bytes,
total_rows,
total_bytes);
seg->setValidDataRatioChecked();
return (valid_rows < total_rows * (1 - invalid_data_ratio_threshold)) || (valid_bytes < total_bytes * (1 - invalid_data_ratio_threshold));
return check_result;
}

} // namespace GC
Expand Down Expand Up @@ -465,7 +510,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c
"GC - Trigger Merge, segment={} table={}",
segment->simpleInfo(),
table_name);
auto new_segment = segmentMerge(*dm_context, segments_to_merge, false);
auto new_segment = segmentMerge(*dm_context, segments_to_merge, SegmentMergeReason::BackgroundGCThread);
if (new_segment)
{
checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC);
Expand Down Expand Up @@ -510,6 +555,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_conte

if (GC::shouldCompactDeltaWithStable(
*dm_context,
segment,
segment_snap,
segment_range,
invalid_data_ratio_threshold,
Expand All @@ -519,7 +565,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(const DMContextPtr & dm_conte
compact_reason = GC::MergeDeltaReason::TooManyDeleteRange;
}

if (!should_compact && segment->isValidDataRatioChecked())
if (!should_compact && !segment->isValidDataRatioChecked())
{
if (GC::shouldCompactStableWithTooMuchDataOutOfSegmentRange(
*dm_context,
Expand Down
47 changes: 32 additions & 15 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,14 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP
return {new_left, new_right};
}

SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vector<SegmentPtr> & ordered_segments, bool is_foreground)
SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vector<SegmentPtr> & ordered_segments, SegmentMergeReason reason)
{
RUNTIME_CHECK(ordered_segments.size() >= 2, ordered_segments.size());

LOG_FMT_INFO(
log,
"Merge - Begin, is_foreground={} safe_point={} segments_to_merge={}",
is_foreground,
"Merge - Begin, reason={} safe_point={} segments_to_merge={}",
magic_enum::enum_name(reason),
dm_context.min_version,
Segment::simpleInfo(ordered_segments));

Expand Down Expand Up @@ -289,16 +289,24 @@ SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vect
}

CurrentMetrics::Increment cur_dm_segments{CurrentMetrics::DT_SegmentMerge};
if (is_foreground)
GET_METRIC(tiflash_storage_subtask_count, type_seg_merge_fg).Increment();
else
GET_METRIC(tiflash_storage_subtask_count, type_seg_merge).Increment();
switch (reason)
{
case SegmentMergeReason::BackgroundGCThread:
GET_METRIC(tiflash_storage_subtask_count, type_seg_merge_bg_gc).Increment();
break;
default:
break;
}
Stopwatch watch_seg_merge;
SCOPE_EXIT({
if (is_foreground)
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_seg_merge_fg).Observe(watch_seg_merge.elapsedSeconds());
else
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_seg_merge).Observe(watch_seg_merge.elapsedSeconds());
switch (reason)
{
case SegmentMergeReason::BackgroundGCThread:
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_seg_merge_bg_gc).Observe(watch_seg_merge.elapsedSeconds());
break;
default:
break;
}
});

WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter());
Expand Down Expand Up @@ -344,9 +352,9 @@ SegmentPtr DeltaMergeStore::segmentMerge(DMContext & dm_context, const std::vect

LOG_FMT_INFO(
log,
"Merge - Finish, {} segments are merged into one, is_foreground={} merged={} segments_to_merge={}",
"Merge - Finish, {} segments are merged into one, reason={} merged={} segments_to_merge={}",
ordered_segments.size(),
is_foreground,
magic_enum::enum_name(reason),
merged->info(),
Segment::info(ordered_segments));
}
Expand All @@ -368,7 +376,12 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(
const MergeDeltaReason reason,
SegmentSnapshotPtr segment_snap)
{
LOG_FMT_INFO(log, "MergeDelta - Begin, reason={} safe_point={} segment={}", magic_enum::enum_name(reason), dm_context.min_version, segment->info());
LOG_FMT_INFO(
log,
"MergeDelta - Begin, reason={} safe_point={} segment={}",
magic_enum::enum_name(reason),
dm_context.min_version,
segment->info());

ColumnDefinesPtr schema_snap;

Expand Down Expand Up @@ -479,7 +492,11 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(
new_segment->check(dm_context, "After segmentMergeDelta");
}

LOG_FMT_INFO(log, "MergeDelta - Finish, delta is merged, old_segment={} new_segment={}", segment->info(), new_segment->info());
LOG_FMT_INFO(
log,
"MergeDelta - Finish, delta is merged, old_segment={} new_segment={}",
segment->info(),
new_segment->info());
}

wbs.writeRemoves();
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ std::vector<DM::ExternalDTFileInfo> KVStore::preHandleSSTsToDTFiles(
}
physical_table_id = storage->getTableInfo().id;

auto & global_settings = context.getGlobalContext().getSettingsRef();

// Read from SSTs and refine the boundary of blocks output to DTFiles
auto sst_stream = std::make_shared<DM::SSTFilesToBlockInputStream>(
new_region,
Expand All @@ -345,9 +347,9 @@ std::vector<DM::ExternalDTFileInfo> KVStore::preHandleSSTsToDTFiles(
schema_snap,
snapshot_apply_method,
job_type,
/* split_after_rows */ 0,
/* split_after_size */ 0,
tmt.getContext());
/* split_after_rows */ global_settings.dt_segment_limit_rows,
/* split_after_size */ global_settings.dt_segment_limit_size,
context);

stream->writePrefix();
stream->write();
Expand Down
7 changes: 2 additions & 5 deletions dbms/src/Storages/Transaction/ProxyFFI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,8 @@ raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint
return state;
}

void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts)
// just a dummy function to avoid proxy fn_handle_safe_ts_update.is_some() panic.
void HandleSafeTSUpdate(EngineStoreServerWrap *, uint64_t, uint64_t, uint64_t)
{
RegionTable & region_table = server->tmt->getRegionTable();
region_table.updateSelfSafeTS(region_id, self_safe_ts);
region_table.updateLeaderSafeTS(region_id, leader_safe_ts);
LOG_FMT_TRACE(&Poco::Logger::get(__FUNCTION__), "update safe ts in region_id={}, leader_safe_ts={}, self_safe_ts={}", region_id, leader_safe_ts, self_safe_ts);
}
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ BaseBuffView strIntoView(const std::string * str_ptr);
CppStrWithView GetConfig(EngineStoreServerWrap *, uint8_t full);
void SetStore(EngineStoreServerWrap *, BaseBuffView);
void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view);
void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts);
}
void HandleSafeTSUpdate(EngineStoreServerWrap * server, uint64_t region_id, uint64_t self_safe_ts, uint64_t leader_safe_ts);

inline EngineStoreServerHelper GetEngineStoreServerHelper(
EngineStoreServerWrap * tiflash_instance_wrap)
Expand Down
13 changes: 3 additions & 10 deletions dbms/src/Storages/Transaction/ProxyFFIStatusService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,11 @@ HttpRequestRes HandleHttpRequestSyncStatus(
// if storage is not created in ch, flash replica should not be available.
if (tmt.getStorages().get(table_id))
{
RegionTable & region_table = tmt.getRegionTable();
region_table.handleInternalRegionsByTable(table_id, [&](const RegionTable::InternalRegions & regions) {
tmt.getRegionTable().handleInternalRegionsByTable(table_id, [&](const RegionTable::InternalRegions & regions) {
count = regions.size();
region_list.reserve(regions.size());
for (const auto & region : regions)
{
if (!region_table.isSafeTSLag(region.first))
{
region_list.push_back(region.first);
}
}
count = region_list.size();
LOG_FMT_DEBUG(&Poco::Logger::get(__FUNCTION__), "table_id={}, total_region_count={}, ready_region_count={}", table_id, regions.size(), count);
region_list.push_back(region.first);
});
}
ss << count << std::endl;
Expand Down
6 changes: 0 additions & 6 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,7 @@ void RegionTable::removeTable(TableID table_id)

// Remove from region list.
for (const auto & region_info : table.regions)
{
regions.erase(region_info.first);
leader_safe_ts.erase(region_info.first);
self_safe_ts.erase(region_info.first);
}

// Remove from table map.
tables.erase(it);
Expand Down Expand Up @@ -268,8 +264,6 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const
handle_range = internal_region_it->second.range_in_table;

regions.erase(it);
leader_safe_ts.erase(region_id);
self_safe_ts.erase(region_id);
table.regions.erase(internal_region_it);
if (table.regions.empty())
{
Expand Down
Loading

0 comments on commit eb32b6e

Please sign in to comment.