Skip to content

Commit

Permalink
Storage: (Refactor) Avoid referencing DeltaMergeStore.h (#5835)
Browse files Browse the repository at this point in the history
ref #5831
  • Loading branch information
breezewish authored Sep 13, 2022
1 parent e9e43d9 commit 6d0cbc8
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 69 deletions.
12 changes: 6 additions & 6 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,20 @@ namespace DB
M(tiflash_storage_command_count, "Total number of storage's command, such as delete range / shutdown /startup", Counter, \
F(type_delete_range, {"type", "delete_range"}), F(type_ingest, {"type", "ingest"})) \
M(tiflash_storage_subtask_count, "Total number of storage's sub task", Counter, \
F(type_delta_merge, {"type", "delta_merge"}), \
F(type_delta_merge_fg, {"type", "delta_merge_fg"}), \
F(type_delta_merge_fg_rpc, {"type", "delta_merge_fg_rpc"}), \
F(type_delta_merge_bg, {"type", "delta_merge_bg"}), \
F(type_delta_merge_bg_gc, {"type", "delta_merge_bg_gc"}), \
F(type_delta_merge_fg, {"type", "delta_merge_fg"}), \
F(type_delta_merge_manual, {"type", "delta_merge_manual"}), \
F(type_delta_compact, {"type", "delta_compact"}), \
F(type_delta_flush, {"type", "delta_flush"}), \
F(type_seg_split, {"type", "seg_split"}), F(type_seg_split_fg, {"type", "seg_split_fg"}), \
F(type_seg_merge, {"type", "seg_merge"}), F(type_seg_merge_fg, {"type", "seg_merge_fg"}), \
F(type_place_index_update, {"type", "place_index_update"})) \
M(tiflash_storage_subtask_duration_seconds, "Bucketed histogram of storage's sub task duration", Histogram, \
F(type_delta_merge, {{"type", "delta_merge"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_fg, {{"type", "delta_merge_fg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_fg_rpc, {{"type", "delta_merge_fg_rpc"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_bg, {{"type", "delta_merge_bg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_bg_gc, {{"type", "delta_merge_bg_gc"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_fg, {{"type", "delta_merge_fg"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_merge_manual, {{"type", "delta_merge_manual"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_compact, {{"type", "delta_compact"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delta_flush, {{"type", "delta_flush"}}, ExpBuckets{0.001, 2, 20}), \
F(type_seg_split, {{"type", "seg_split"}}, ExpBuckets{0.001, 2, 20}), \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Management/ManualCompact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ
// Repeatedly merge multiple segments as much as possible.
while (true)
{
auto compacted_range = dm_storage->mergeDeltaBySegment(global_context, start_key, DM::DeltaMergeStore::TaskRunThread::ForegroundRPC);
auto compacted_range = dm_storage->mergeDeltaBySegment(global_context, start_key);

if (compacted_range == std::nullopt)
{
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,13 +862,13 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context)

for (auto & segment : all_segments)
{
segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground);
segmentMergeDelta(*dm_context, segment, MergeDeltaReason::Manual);
}

LOG_FMT_INFO(log, "Finish table mergeDeltaAll");
}

std::optional<DM::RowKeyRange> DeltaMergeStore::mergeDeltaBySegment(const Context & context, const RowKeyValue & start_key, const TaskRunThread run_thread)
std::optional<DM::RowKeyRange> DeltaMergeStore::mergeDeltaBySegment(const Context & context, const RowKeyValue & start_key)
{
LOG_FMT_INFO(log, "Table mergeDeltaBySegment, start={}", start_key.toDebugString());

Expand All @@ -895,7 +895,7 @@ std::optional<DM::RowKeyRange> DeltaMergeStore::mergeDeltaBySegment(const Contex

if (segment->flushCache(*dm_context))
{
const auto new_segment = segmentMergeDelta(*dm_context, segment, run_thread);
const auto new_segment = segmentMergeDelta(*dm_context, segment, MergeDeltaReason::Manual);
if (new_segment)
{
const auto segment_end = new_segment->getRowKeyRange().end;
Expand Down Expand Up @@ -1412,7 +1412,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const
GET_METRIC(tiflash_storage_write_stall_duration_seconds, type_delta_merge_by_delete_range).Observe(watch.elapsedSeconds());
});

return segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground);
return segmentMergeDelta(*dm_context, segment, MergeDeltaReason::ForegroundWrite);
}
return {};
};
Expand Down Expand Up @@ -1579,7 +1579,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy)
case TaskType::MergeDelta:
{
FAIL_POINT_PAUSE(FailPoints::pause_before_dt_background_delta_merge);
left = segmentMergeDelta(*task.dm_context, task.segment, TaskRunThread::BackgroundThreadPool);
left = segmentMergeDelta(*task.dm_context, task.segment, MergeDeltaReason::BackgroundThreadPool);
type = ThreadType::BG_MergeDelta;
// Wake up all waiting threads if failpoint is enabled
FailPointHelper::disableFailPoint(FailPoints::pause_until_dt_background_delta_merge);
Expand Down Expand Up @@ -1786,7 +1786,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit)
bool finish_gc_on_segment = false;
if (should_compact)
{
if (segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundGCThread, segment_snap); segment)
if (segment = segmentMergeDelta(*dm_context, segment, MergeDeltaReason::BackgroundGCThread, segment_snap); segment)
{
// Continue to check whether we need to apply more tasks on this segment
segment_snap = {};
Expand Down
21 changes: 10 additions & 11 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,12 @@ class DeltaMergeStore : private boost::noncopyable
PlaceIndex,
};

// TODO: Rename to MergeDeltaThreadType
enum TaskRunThread
enum MergeDeltaReason
{
BackgroundThreadPool,
Foreground,
ForegroundRPC,
BackgroundGCThread,
ForegroundWrite,
Manual,
};

static std::string toString(ThreadType type)
Expand Down Expand Up @@ -237,18 +236,18 @@ class DeltaMergeStore : private boost::noncopyable
}
}

static std::string toString(TaskRunThread type)
static std::string toString(MergeDeltaReason type)
{
switch (type)
{
case BackgroundThreadPool:
return "BackgroundThreadPool";
case Foreground:
return "Foreground";
case ForegroundRPC:
return "ForegroundRPC";
case BackgroundGCThread:
return "BackgroundGCThread";
case ForegroundWrite:
return "ForegroundWrite";
case Manual:
return "Manual";
default:
return "Unknown";
}
Expand Down Expand Up @@ -397,7 +396,7 @@ class DeltaMergeStore : private boost::noncopyable
/// If there is no segment found by the start key, nullopt is returned.
///
/// This function is called when using `ALTER TABLE [TABLE] COMPACT ...` from TiDB.
std::optional<DM::RowKeyRange> mergeDeltaBySegment(const Context & context, const DM::RowKeyValue & start_key, TaskRunThread run_thread);
std::optional<DM::RowKeyRange> mergeDeltaBySegment(const Context & context, const DM::RowKeyValue & start_key);

/// Compact the delta layer, merging multiple fragmented delta files into larger ones.
/// This is a minor compaction as it does not merge the delta into stable layer.
Expand Down Expand Up @@ -495,7 +494,7 @@ class DeltaMergeStore : private boost::noncopyable
SegmentPtr segmentMergeDelta(
DMContext & dm_context,
const SegmentPtr & segment,
TaskRunThread thread,
MergeDeltaReason reason,
SegmentSnapshotPtr segment_snap = nullptr);

bool updateGCSafePoint();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void DeltaMergeStore::ingestFiles(
updated_segments.push_back(segment);
fiu_do_on(FailPoints::segment_merge_after_ingest_packs, {
segment->flushCache(*dm_context);
segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundThreadPool);
segmentMergeDelta(*dm_context, segment, MergeDeltaReason::ForegroundWrite);
storage_pool->gc(global_context.getSettingsRef(), StoragePool::Seconds(0));
});
break;
Expand Down
40 changes: 20 additions & 20 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,10 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le
SegmentPtr DeltaMergeStore::segmentMergeDelta(
DMContext & dm_context,
const SegmentPtr & segment,
const TaskRunThread run_thread,
const MergeDeltaReason reason,
SegmentSnapshotPtr segment_snap)
{
LOG_FMT_INFO(log, "MergeDelta - Begin, thread={} safe_point={} segment={}", toString(run_thread), dm_context.min_version, segment->info());
LOG_FMT_INFO(log, "MergeDelta - Begin, reason={} safe_point={} segment={}", toString(reason), dm_context.min_version, segment->info());

ColumnDefinesPtr schema_snap;

Expand Down Expand Up @@ -338,39 +338,39 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(
CurrentMetrics::Increment cur_dm_total_bytes{CurrentMetrics::DT_DeltaMergeTotalBytes, static_cast<Int64>(segment_snap->getBytes())};
CurrentMetrics::Increment cur_dm_total_rows{CurrentMetrics::DT_DeltaMergeTotalRows, static_cast<Int64>(segment_snap->getRows())};

switch (run_thread)
switch (reason)
{
case TaskRunThread::BackgroundThreadPool:
GET_METRIC(tiflash_storage_subtask_count, type_delta_merge).Increment();
case MergeDeltaReason::BackgroundThreadPool:
GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_bg).Increment();
break;
case TaskRunThread::Foreground:
GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_fg).Increment();
case MergeDeltaReason::BackgroundGCThread:
GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_bg_gc).Increment();
break;
case TaskRunThread::ForegroundRPC:
GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_fg_rpc).Increment();
case MergeDeltaReason::ForegroundWrite:
GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_fg).Increment();
break;
case TaskRunThread::BackgroundGCThread:
GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_bg_gc).Increment();
case MergeDeltaReason::Manual:
GET_METRIC(tiflash_storage_subtask_count, type_delta_merge_manual).Increment();
break;
default:
break;
}

Stopwatch watch_delta_merge;
SCOPE_EXIT({
switch (run_thread)
switch (reason)
{
case TaskRunThread::BackgroundThreadPool:
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge).Observe(watch_delta_merge.elapsedSeconds());
case MergeDeltaReason::BackgroundThreadPool:
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_bg).Observe(watch_delta_merge.elapsedSeconds());
break;
case TaskRunThread::Foreground:
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_fg).Observe(watch_delta_merge.elapsedSeconds());
case MergeDeltaReason::BackgroundGCThread:
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_bg_gc).Observe(watch_delta_merge.elapsedSeconds());
break;
case TaskRunThread::ForegroundRPC:
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_fg_rpc).Observe(watch_delta_merge.elapsedSeconds());
case MergeDeltaReason::ForegroundWrite:
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_fg).Observe(watch_delta_merge.elapsedSeconds());
break;
case TaskRunThread::BackgroundGCThread:
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_bg_gc).Observe(watch_delta_merge.elapsedSeconds());
case MergeDeltaReason::Manual:
GET_METRIC(tiflash_storage_subtask_duration_seconds, type_delta_merge_manual).Observe(watch_delta_merge.elapsedSeconds());
break;
default:
break;
Expand Down
28 changes: 14 additions & 14 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3095,25 +3095,25 @@ try
if (store->isCommonHandle())
{
// Specifies MAX_KEY. nullopt should be returned.
auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MAX_KEY, DeltaMergeStore::TaskRunThread::Foreground);
auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MAX_KEY);
ASSERT_EQ(result, std::nullopt);
}
else
{
// Specifies MAX_KEY. nullopt should be returned.
auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MAX_KEY, DeltaMergeStore::TaskRunThread::Foreground);
auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MAX_KEY);
ASSERT_EQ(result, std::nullopt);
}
std::optional<RowKeyRange> result_1;
{
// Specifies MIN_KEY. In this case, the first segment should be processed.
if (store->isCommonHandle())
{
result_1 = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY, DeltaMergeStore::TaskRunThread::Foreground);
result_1 = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY);
}
else
{
result_1 = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY, DeltaMergeStore::TaskRunThread::Foreground);
result_1 = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY);
}
// The returned range is the same as first segment's range.
ASSERT_NE(result_1, std::nullopt);
Expand All @@ -3125,15 +3125,15 @@ try
}
{
// Compact the first segment again, nothing should change.
auto result = store->mergeDeltaBySegment(*db_context, result_1->start, DeltaMergeStore::TaskRunThread::Foreground);
auto result = store->mergeDeltaBySegment(*db_context, result_1->start);
ASSERT_EQ(*result, *result_1);

helper->verifyExpectedRowsForAllSegments();
}
std::optional<RowKeyRange> result_2;
{
// Compact again using the end key just returned. The second segment should be processed.
result_2 = store->mergeDeltaBySegment(*db_context, result_1->end, DeltaMergeStore::TaskRunThread::Foreground);
result_2 = store->mergeDeltaBySegment(*db_context, result_1->end);
ASSERT_NE(result_2, std::nullopt);
ASSERT_EQ(*result_2, std::next(store->segments.begin())->second->getRowKeyRange());

Expand All @@ -3151,12 +3151,12 @@ TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, InvalidKey)
if (store->isCommonHandle())
{
// For common handle, give int handle key and have a try
store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY, DeltaMergeStore::TaskRunThread::Foreground);
store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY);
}
else
{
// For int handle, give common handle key and have a try
store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY, DeltaMergeStore::TaskRunThread::Foreground);
store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY);
}
});
}
Expand All @@ -3172,13 +3172,13 @@ try
ASSERT_NE(it, store->segments.end());
auto seg = it->second;

result = store->mergeDeltaBySegment(*db_context, seg->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground);
result = store->mergeDeltaBySegment(*db_context, seg->getRowKeyRange().start);
ASSERT_NE(result, std::nullopt);
helper->verifyExpectedRowsForAllSegments();
}
{
// As we are the last segment, compact "next segment" should result in failure. A nullopt is returned.
auto result2 = store->mergeDeltaBySegment(*db_context, result->end, DeltaMergeStore::TaskRunThread::Foreground);
auto result2 = store->mergeDeltaBySegment(*db_context, result->end);
ASSERT_EQ(result2, std::nullopt);
helper->verifyExpectedRowsForAllSegments();
}
Expand Down Expand Up @@ -3207,7 +3207,7 @@ try
auto range = std::next(store->segments.begin())->second->getRowKeyRange();
auto compact_key = range.start.toPrefixNext();

auto result = store->mergeDeltaBySegment(*db_context, compact_key, DeltaMergeStore::TaskRunThread::Foreground);
auto result = store->mergeDeltaBySegment(*db_context, compact_key);
ASSERT_NE(result, std::nullopt);

helper->expected_stable_rows[1] += helper->expected_delta_rows[1];
Expand Down Expand Up @@ -3251,7 +3251,7 @@ try
}
{
auto segment1 = std::next(store->segments.begin())->second;
auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground);
auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start);
ASSERT_NE(result, std::nullopt);

segment1 = std::next(store->segments.begin())->second;
Expand Down Expand Up @@ -3299,7 +3299,7 @@ try
// Start a mergeDelta. It should hit retry immediately due to a flush is in progress.
auto th_merge_delta = std::async([&]() {
auto segment1 = std::next(store->segments.begin())->second;
auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground);
auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start);
ASSERT_NE(result, std::nullopt);
// All rows in the delta layer should be merged into the stable layer.
helper->expected_stable_rows[1] += helper->expected_delta_rows[1];
Expand Down Expand Up @@ -3350,7 +3350,7 @@ try
auto th_merge_delta = std::async([&] {
// mergeDeltaBySegment for segment1
auto segment1 = std::next(store->segments.begin())->second;
auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground);
auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start);
ASSERT_NE(result, std::nullopt);

// Although original segment1 has been split into 2, we still expect only segment1's delta
Expand Down
Loading

0 comments on commit 6d0cbc8

Please sign in to comment.