Skip to content

Commit

Permalink
Storages: log version of records if enabled (#8874)
Browse files Browse the repository at this point in the history
ref #8864
  • Loading branch information
JinheLin authored Mar 27, 2024
1 parent 38e4272 commit 4440b5e
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 8 deletions.
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ struct Settings
M(SettingDouble, dt_prepare_stream_concurrency_scale, 2.0, "Concurrency of preparing streams of one query equals to num_streams * dt_prepare_stream_concurrency_scale.") \
M(SettingBool, dt_enable_delta_index_error_fallback, true, "Whether fallback to an empty delta index if a delta index error is detected") \
M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \
M(SettingBool, dt_log_record_version, false, "Whether log the version of records when write them to storage") \
M(SettingUInt64, init_thread_count_scale, 100, "Number of thread = number of logical cpu cores * init_thread_count_scale. It just works for thread pool for initStores and loadMetadata") \
M(SettingDouble, cpu_thread_count_scale, 1.0, "Number of thread of computation-intensive thread pool = number of logical cpu cores * cpu_thread_count_scale. Only takes effects at server startup.") \
\
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,17 @@ DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Set

auto dm_context = newDMContext(db_context, db_settings, "write");

if (db_context.getSettingsRef().dt_log_record_version)
{
const auto & ver_col = block.getByName(VERSION_COLUMN_NAME).column;
const auto * ver = toColumnVectorDataPtr<UInt64>(ver_col);
std::unordered_set<UInt64> dedup_ver;
for (auto v : *ver)
{
dedup_ver.insert(v);
}
LOG_DEBUG(log, "Record count: {}, Versions: {}", block.rows(), dedup_ver);
}
const auto bytes = block.bytes();

{
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,21 +740,23 @@ String SegmentReadTask::toString() const
if (dm_context->keyspace_id == DB::NullspaceID)
{
return fmt::format(
"s{}_t{}_{}_{}_{}",
"s{}_t{}_{}_{}_{}_{}",
store_id,
dm_context->physical_table_id,
segment->segmentId(),
segment->segmentEpoch(),
read_snapshot->delta->getDeltaIndexEpoch());
read_snapshot->delta->getDeltaIndexEpoch(),
read_snapshot->getRows());
}
return fmt::format(
"s{}_ks{}_t{}_{}_{}_{}",
"s{}_ks{}_t{}_{}_{}_{}_{}",
store_id,
dm_context->keyspace_id,
dm_context->physical_table_id,
segment->segmentId(),
segment->segmentEpoch(),
read_snapshot->delta->getDeltaIndexEpoch());
read_snapshot->delta->getDeltaIndexEpoch(),
read_snapshot->getRows());
}

GlobalSegmentID SegmentReadTask::getGlobalSegmentID() const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ class SegmentReadTasksPoolTest : public SegmentTestBasic
return std::make_shared<Segment>(Logger::get(), 0, RowKeyRange{}, seg_id, seg_id + 1, nullptr, nullptr);
}

static SegmentSnapshotPtr createSegmentSnapshot()
SegmentSnapshotPtr createSegmentSnapshot()
{
auto delta_snap = std::make_shared<DeltaValueSnapshot>(CurrentMetrics::Metric{});
delta_snap->delta = std::make_shared<DeltaValueSpace>(nullptr);
return std::make_shared<SegmentSnapshot>(std::move(delta_snap), /*stable*/ nullptr, Logger::get());
auto delta = std::make_shared<DeltaValueSpace>(1);
auto delta_snap = delta->createSnapshot(*createDMContext(), false, CurrentMetrics::Metric{});

auto stable = std::make_shared<StableValueSpace>(1);
auto stable_snap = stable->createSnapshot();
return std::make_shared<SegmentSnapshot>(std::move(delta_snap), std::move(stable_snap), Logger::get());
}

SegmentReadTaskPtr createSegmentReadTask(PageIdU64 seg_id)
Expand Down

0 comments on commit 4440b5e

Please sign in to comment.