Skip to content

Commit

Permalink
Showing 19 changed files with 191 additions and 57 deletions.
8 changes: 6 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
@@ -272,8 +272,8 @@ struct Settings
M(SettingUInt64, dt_segment_delta_small_column_file_size, 8388608, "Determine whether a column file in delta is small or not. 8MB by default.") \
M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.") \
M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.") \
M(SettingUInt64, dt_bg_gc_check_interval, 5, "Background gc thread check interval, the unit is second.") \
M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 100, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.") \
M(SettingUInt64, dt_bg_gc_check_interval, 5, "Background gc thread check interval, the unit is second.") \
M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 100, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.") \
M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all " \
"segments") \
M(SettingFloat, dt_bg_gc_delta_delete_ratio_to_trigger_gc, 0.3, "Trigger segment's gc when the ratio of delta delete range to stable exceeds this ratio.") \
@@ -294,6 +294,7 @@ struct Settings
"`dt_stroage_num_max_expect_legacy_files`") \
M(SettingFloat, dt_page_gc_low_write_prob, 0.10, "Probability to run gc when write there is few writes.") \
\
\
M(SettingUInt64, dt_storage_pool_log_write_slots, 4, "Max write concurrency for each StoragePool.log.") \
M(SettingUInt64, dt_storage_pool_log_gc_min_file_num, 10, "Min number of page files to compact") \
M(SettingUInt64, dt_storage_pool_log_gc_min_legacy_num, 3, "Min number of legacy page files to compact") \
@@ -313,6 +314,9 @@ struct Settings
M(SettingFloat, dt_storage_pool_meta_gc_max_valid_rate, 0.35, "Max valid rate of deciding a page file can be compact") \
\
M(SettingUInt64, dt_checksum_frame_size, DBMS_DEFAULT_BUFFER_SIZE, "Frame size for delta tree stable storage") \
\
M(SettingDouble, dt_storage_blob_heavy_gc_valid_rate, 0.2, "Max valid rate of deciding a blob can be compact") \
\
M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \
M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \
M(SettingInt64, dt_compression_level, 1, "The compression level.") \
12 changes: 11 additions & 1 deletion dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
@@ -165,6 +165,7 @@ max_rows_in_set = 455
dt_segment_limit_rows = 1000005
dt_enable_rough_set_filter = 0
max_memory_usage = 102000
dt_storage_blob_heavy_gc_valid_rate = 0.2
dt_storage_pool_data_gc_min_file_num = 8
dt_storage_pool_data_gc_min_legacy_num = 2
dt_storage_pool_data_gc_min_bytes = 256
@@ -221,6 +222,7 @@ dt_compression_level = 1
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_file_num, 8);
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_legacy_num, 2);
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_bytes, 256);
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_blob_heavy_gc_valid_rate, 0.2);
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_delta_small_column_file_size, 8388608);
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_delta_small_column_file_rows, 2048);
ASSERT_EQ(global_ctx.getSettingsRef().dt_segment_limit_size, 536870912);
@@ -274,6 +276,7 @@ try
max_rows_in_set = 455
dt_segment_limit_rows = 1000005
dt_enable_rough_set_filter = 0
dt_storage_blob_heavy_gc_valid_rate = 0.3
max_memory_usage = 102000
dt_storage_pool_data_gc_min_file_num = 8
dt_storage_pool_data_gc_min_legacy_num = 2
@@ -295,16 +298,18 @@ dt_page_gc_low_write_prob = 0.2
EXPECT_NE(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_NE(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
EXPECT_NE(cfg.gc_max_valid_rate, settings.dt_storage_pool_data_gc_max_valid_rate);
EXPECT_NE(cfg.blob_heavy_gc_valid_rate, settings.dt_storage_blob_heavy_gc_valid_rate);
EXPECT_NE(cfg.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds);
EXPECT_NE(cfg.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000);

persister.gc();

cfg = persister.page_storage->getSettings();

EXPECT_NE(cfg.gc_min_files, settings.dt_storage_pool_data_gc_min_file_num);
EXPECT_NE(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_NE(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
EXPECT_NE(cfg.gc_max_valid_rate, settings.dt_storage_pool_data_gc_max_valid_rate);
EXPECT_EQ(cfg.blob_heavy_gc_valid_rate, settings.dt_storage_blob_heavy_gc_valid_rate);
EXPECT_EQ(cfg.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds);
EXPECT_EQ(cfg.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000);
};
@@ -327,6 +332,7 @@ dt_page_gc_low_write_prob = 0.2
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_legacy_num, 2);
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_bytes, 256);
ASSERT_FLOAT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_max_valid_rate, 0.5);
ASSERT_DOUBLE_EQ(global_ctx.getSettingsRef().dt_storage_blob_heavy_gc_valid_rate, 0.3);
ASSERT_EQ(global_ctx.getSettingsRef().dt_open_file_max_idle_seconds, 20);
ASSERT_FLOAT_EQ(global_ctx.getSettingsRef().dt_page_gc_low_write_prob, 0.2);
verify_persister_reload_config(persister);
@@ -346,6 +352,7 @@ max_rows_in_set = 455
dt_segment_limit_rows = 1000005
dt_enable_rough_set_filter = 0
max_memory_usage = 102000
dt_storage_blob_heavy_gc_valid_rate = 0.3
dt_storage_pool_data_gc_min_file_num = 8
dt_storage_pool_data_gc_min_legacy_num = 2
dt_storage_pool_data_gc_min_bytes = 256
@@ -366,6 +373,7 @@ dt_page_gc_low_write_prob = 0.2
EXPECT_NE(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_NE(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
EXPECT_NE(cfg.gc_max_valid_rate, settings.dt_storage_pool_data_gc_max_valid_rate);
EXPECT_NE(cfg.blob_heavy_gc_valid_rate, settings.dt_storage_blob_heavy_gc_valid_rate);
EXPECT_NE(cfg.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds);
EXPECT_NE(cfg.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000);

@@ -376,6 +384,7 @@ dt_page_gc_low_write_prob = 0.2
EXPECT_EQ(cfg.gc_min_legacy_num, settings.dt_storage_pool_data_gc_min_legacy_num);
EXPECT_EQ(cfg.gc_min_bytes, settings.dt_storage_pool_data_gc_min_bytes);
EXPECT_DOUBLE_EQ(cfg.gc_max_valid_rate, settings.dt_storage_pool_data_gc_max_valid_rate);
EXPECT_DOUBLE_EQ(cfg.blob_heavy_gc_valid_rate, settings.dt_storage_blob_heavy_gc_valid_rate);
EXPECT_EQ(cfg.open_file_max_idle_time, settings.dt_open_file_max_idle_seconds);
EXPECT_EQ(cfg.prob_do_gc_when_write_is_low, settings.dt_page_gc_low_write_prob * 1000);
};
@@ -398,6 +407,7 @@ dt_page_gc_low_write_prob = 0.2
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_legacy_num, 2);
ASSERT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_min_bytes, 256);
ASSERT_FLOAT_EQ(global_ctx.getSettingsRef().dt_storage_pool_data_gc_max_valid_rate, 0.5);
ASSERT_DOUBLE_EQ(global_ctx.getSettingsRef().dt_storage_blob_heavy_gc_valid_rate, 0.3);
ASSERT_EQ(global_ctx.getSettingsRef().dt_open_file_max_idle_seconds, 20);
ASSERT_FLOAT_EQ(global_ctx.getSettingsRef().dt_page_gc_low_write_prob, 0.2);
verify_storage_pool_reload_config(storage_pool);
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
@@ -42,11 +42,11 @@ PageStorage::Config extractConfig(const Settings & settings, StorageType subtype
config.gc_min_files = settings.dt_storage_pool_##NAME##_gc_min_file_num; \
config.gc_min_bytes = settings.dt_storage_pool_##NAME##_gc_min_bytes; \
config.gc_min_legacy_num = settings.dt_storage_pool_##NAME##_gc_min_legacy_num; \
config.gc_max_valid_rate = settings.dt_storage_pool_##NAME##_gc_max_valid_rate;
config.gc_max_valid_rate = settings.dt_storage_pool_##NAME##_gc_max_valid_rate; \
config.blob_heavy_gc_valid_rate = settings.dt_storage_blob_heavy_gc_valid_rate;

PageStorage::Config config = getConfigFromSettings(settings);


switch (subtype)
{
case StorageType::Log:
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/ConfigSettings.cpp
Original file line number Diff line number Diff line change
@@ -33,6 +33,9 @@ void mergeConfigFromSettings(const DB::Settings & settings, PageStorage::Config
config.gc_max_expect_legacy_files = settings.dt_page_num_max_expect_legacy_files;
if (settings.dt_page_num_max_gc_valid_rate > 0.0)
config.gc_max_valid_rate_bound = settings.dt_page_num_max_gc_valid_rate;

// V3 setting which export to global setting
config.blob_heavy_gc_valid_rate = settings.dt_storage_blob_heavy_gc_valid_rate;
}

PageStorage::Config getConfigFromSettings(const DB::Settings & settings)
18 changes: 9 additions & 9 deletions dbms/src/Storages/Page/PageDefines.h
Original file line number Diff line number Diff line change
@@ -30,21 +30,21 @@ using Seconds = std::chrono::seconds;
static constexpr UInt64 MB = 1ULL * 1024 * 1024;
static constexpr UInt64 GB = MB * 1024;


// PageStorage V2 define
static constexpr UInt64 PAGE_SIZE_STEP = (1 << 10) * 16; // 16 KB
static constexpr UInt64 PAGE_BUFFER_SIZE = DBMS_DEFAULT_BUFFER_SIZE;
static constexpr UInt64 PAGE_MAX_BUFFER_SIZE = 128 * MB;
static constexpr UInt64 PAGE_SPLIT_SIZE = 1 * MB;
static constexpr UInt64 PAGE_FILE_MAX_SIZE = 1024 * 2 * MB;
static constexpr UInt64 PAGE_FILE_SMALL_SIZE = 2 * MB;
static constexpr UInt64 PAGE_FILE_ROLL_SIZE = 128 * MB;
static constexpr UInt64 PAGE_META_ROLL_SIZE = 2 * MB;

static constexpr UInt64 BLOBFILE_LIMIT_SIZE = 512 * MB;
static constexpr UInt64 BLOBSTORE_CACHED_FD_SIZE = 100;

static_assert(PAGE_SIZE_STEP >= ((1 << 10) * 16), "PAGE_SIZE_STEP should be at least 16 KB");
static_assert((PAGE_SIZE_STEP & (PAGE_SIZE_STEP - 1)) == 0, "PAGE_SIZE_STEP should be power of 2");
static_assert(PAGE_BUFFER_SIZE % PAGE_SIZE_STEP == 0, "PAGE_BUFFER_SIZE should be dividable by PAGE_SIZE_STEP");

// PageStorage V3 define
static constexpr UInt64 BLOBFILE_LIMIT_SIZE = 512 * MB;
static constexpr UInt64 BLOBSTORE_CACHED_FD_SIZE = 100;
static constexpr UInt64 PAGE_META_ROLL_SIZE = 2 * MB;
static constexpr UInt64 MAX_PERSISTED_LOG_FILES = 4;

using NamespaceId = UInt64;
static constexpr NamespaceId MAX_NAMESPACE_ID = UINT64_MAX;
@@ -111,7 +111,7 @@ inline size_t alignPage(size_t n)
template <>
struct fmt::formatter<DB::PageIdV3Internal>
{
constexpr auto parse(format_parse_context & ctx) -> decltype(ctx.begin())
static constexpr auto parse(format_parse_context & ctx) -> decltype(ctx.begin())
{
return ctx.begin();
}
43 changes: 41 additions & 2 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
@@ -74,6 +74,9 @@ class PageStorage : private boost::noncopyable

struct Config
{
//==========================================================================================
// V2 config
//==========================================================================================
SettingBool sync_on_write = true;

SettingUInt64 file_roll_size = PAGE_FILE_ROLL_SIZE;
@@ -109,6 +112,18 @@ class PageStorage : private boost::noncopyable

MVCC::VersionSetConfig version_set_config;

//==========================================================================================
// V3 config
//==========================================================================================
SettingUInt64 blob_file_limit_size = BLOBFILE_LIMIT_SIZE;
SettingUInt64 blob_spacemap_type = 2;
SettingUInt64 blob_cached_fd_size = BLOBSTORE_CACHED_FD_SIZE;
SettingDouble blob_heavy_gc_valid_rate = 0.2;

SettingUInt64 wal_roll_size = PAGE_META_ROLL_SIZE;
SettingUInt64 wal_recover_mode = 0;
SettingUInt64 wal_max_persisted_log_files = MAX_PERSISTED_LOG_FILES;

void reload(const Config & rhs)
{
// Reload is not atomic, but should be good enough
@@ -122,9 +137,18 @@ class PageStorage : private boost::noncopyable
prob_do_gc_when_write_is_low = rhs.prob_do_gc_when_write_is_low;
// Reload fd idle time
open_file_max_idle_time = rhs.open_file_max_idle_time;

// Reload V3 setting
blob_file_limit_size = rhs.blob_file_limit_size;
blob_spacemap_type = rhs.blob_spacemap_type;
blob_cached_fd_size = rhs.blob_cached_fd_size;
blob_heavy_gc_valid_rate = rhs.blob_heavy_gc_valid_rate;
wal_roll_size = rhs.wal_roll_size;
wal_recover_mode = rhs.wal_recover_mode;
wal_max_persisted_log_files = rhs.wal_max_persisted_log_files;
}

String toDebugString() const
String toDebugStringV2() const
{
return fmt::format(
"PageStorage::Config {{gc_min_files: {}, gc_min_bytes:{}, gc_force_hardlink_rate: {:.3f}, gc_max_valid_rate: {:.3f}, "
@@ -140,11 +164,26 @@ class PageStorage : private boost::noncopyable
prob_do_gc_when_write_is_low,
open_file_max_idle_time);
}

String toDebugStringV3() const
{
return fmt::format(
"PageStorage::Config V3 {{"
"blob_file_limit_size: {}, blob_spacemap_type: {}, "
"blob_cached_fd_size: {}, blob_heavy_gc_valid_rate: {:.3f}, "
"wal_roll_size: {}, wal_recover_mode: {}, wal_max_persisted_log_files: {}}}",
blob_file_limit_size.get(),
blob_spacemap_type.get(),
blob_cached_fd_size.get(),
blob_heavy_gc_valid_rate.get(),
wal_roll_size.get(),
wal_recover_mode.get(),
wal_max_persisted_log_files.get());
}
};
void reloadSettings(const Config & new_config) { config.reload(new_config); };
Config getSettings() const { return config; }


public:
static PageStoragePtr
create(
16 changes: 14 additions & 2 deletions dbms/src/Storages/Page/V2/gc/DataCompactor.cpp
Original file line number Diff line number Diff line change
@@ -81,7 +81,13 @@ DataCompactor<SnapshotPtr>::tryMigrate( //
}
else
{
LOG_FMT_DEBUG(log, "{} DataCompactor::tryMigrate exit without compaction [candidates size={}] [total byte size={}], [files without valid page={}] Config{{ {} }}", storage_name, result.candidate_size, result.bytes_migrate, candidates.files_without_valid_pages.size(), config.toDebugString());
LOG_FMT_DEBUG(log, "{} DataCompactor::tryMigrate exit without compaction [candidates size={}] [total byte size={}], "
"[files without valid page={}] Config{{ {} }}", //
storage_name,
result.candidate_size,
result.bytes_migrate,
candidates.files_without_valid_pages.size(),
config.toDebugStringV2());
}

return {result, std::move(migrate_entries_edit)};
@@ -555,7 +561,13 @@ void DataCompactor<SnapshotPtr>::logMigrationDetails(const MigrateInfos & infos,
}
migrate_stream << "]";
remove_stream << "]";
LOG_FMT_DEBUG(log, "{} Migrate pages to PageFile_{}_{}, migrate: {}, remove: {}, Config{{ {} }}", storage_name, migrate_file_id.first, migrate_file_id.second, migrate_stream.str(), remove_stream.str(), config.toDebugString());
LOG_FMT_DEBUG(log, "{} Migrate pages to PageFile_{}_{}, migrate: {}, remove: {}, Config{{ {} }}", //
storage_name,
migrate_file_id.first,
migrate_file_id.second,
migrate_stream.str(),
remove_stream.str(),
config.toDebugStringV2());
}


6 changes: 4 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
@@ -638,10 +638,12 @@ void VersionedPageEntries::collapseTo(const UInt64 seq, const PageIdV3Internal p
* PageDirectory methods *
*************************/

PageDirectory::PageDirectory(String storage_name, WALStorePtr && wal_)
PageDirectory::PageDirectory(String storage_name, WALStorePtr && wal_, UInt64 max_persisted_log_files_)
: sequence(0)
, wal(std::move(wal_))
, max_persisted_log_files(max_persisted_log_files_)
, log(Logger::get("PageDirectory", std::move(storage_name)))

{
}

@@ -1127,7 +1129,7 @@ bool PageDirectory::tryDumpSnapshot(const WriteLimiterPtr & write_limiter)
bool done_any_io = false;
// In order not to make read amplification too high, only apply compact logs when ...
auto files_snap = wal->getFilesSnapshot();
if (files_snap.needSave())
if (files_snap.needSave(max_persisted_log_files))
{
// The records persisted in `files_snap` is older than or equal to all records in `edit`
auto edit = dumpSnapshotToEdit();
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
@@ -286,7 +286,7 @@ using PageDirectoryPtr = std::unique_ptr<PageDirectory>;
class PageDirectory
{
public:
explicit PageDirectory(String storage_name, WALStorePtr && wal);
explicit PageDirectory(String storage_name, WALStorePtr && wal, UInt64 max_persisted_log_files_ = MAX_PERSISTED_LOG_FILES);

PageDirectorySnapshotPtr createSnapshot(const String & tracing_id = "") const;

@@ -398,7 +398,7 @@ class PageDirectory
mutable std::list<std::weak_ptr<PageIdV3Internal>> external_ids;

WALStorePtr wal;

const UInt64 max_persisted_log_files;
LoggerPtr log;
};

8 changes: 4 additions & 4 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
@@ -22,10 +22,10 @@

namespace DB::PS::V3
{
PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator)
PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config)
{
auto [wal, reader] = WALStore::create(storage_name, file_provider, delegator);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(storage_name), std::move(wal));
auto [wal, reader] = WALStore::create(storage_name, file_provider, delegator, config);
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(storage_name), std::move(wal), config.max_persisted_log_files);
loadFromDisk(dir, std::move(reader));

// Reset the `sequence` to the maximum of persisted.
@@ -63,7 +63,7 @@ PageDirectoryPtr PageDirectoryFactory::create(String storage_name, FileProviderP

PageDirectoryPtr PageDirectoryFactory::createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit)
{
auto [wal, reader] = WALStore::create(storage_name, file_provider, delegator);
auto [wal, reader] = WALStore::create(storage_name, file_provider, delegator, WALStore::Config());
(void)reader;
PageDirectoryPtr dir = std::make_unique<PageDirectory>(std::move(storage_name), std::move(wal));
loadEdit(dir, edit);
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ class PageDirectoryFactory
return *this;
}

PageDirectoryPtr create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator);
PageDirectoryPtr create(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, WALStore::Config config);

// just for test
PageDirectoryPtr createFromEdit(String storage_name, FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, const PageEntriesEdit & edit);
6 changes: 3 additions & 3 deletions dbms/src/Storages/Page/V3/PageStorageImpl.cpp
Original file line number Diff line number Diff line change
@@ -36,13 +36,13 @@ PageStorageImpl::PageStorageImpl(
const FileProviderPtr & file_provider_)
: DB::PageStorage(name, delegator_, config_, file_provider_)
, log(Logger::get("PageStorage", name))
, blob_store(name, file_provider_, delegator, blob_config)
, blob_store(name, file_provider_, delegator, parseBlobConfig(config_))
{
LOG_FMT_INFO(log, "PageStorageImpl start. Config{{ {} }}", config.toDebugStringV3());
}

PageStorageImpl::~PageStorageImpl() = default;


void PageStorageImpl::restore()
{
// TODO: clean up blobstore.
@@ -52,7 +52,7 @@ void PageStorageImpl::restore()
PageDirectoryFactory factory;
page_directory = factory
.setBlobStore(blob_store)
.create(storage_name, file_provider, delegator);
.create(storage_name, file_provider, delegator, parseWALConfig(config));
// factory.max_applied_page_id // TODO: return it to outer function
}

26 changes: 24 additions & 2 deletions dbms/src/Storages/Page/V3/PageStorageImpl.h
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
#include <Storages/Page/Snapshot.h>
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/WALStore.h>

namespace DB
{
@@ -35,6 +36,29 @@ class PageStorageImpl : public DB::PageStorage

~PageStorageImpl();

static BlobStore::Config parseBlobConfig(const Config & config)
{
BlobStore::Config blob_config;

blob_config.file_limit_size = config.blob_file_limit_size;
blob_config.cached_fd_size = config.blob_cached_fd_size;
blob_config.spacemap_type = config.blob_spacemap_type;
blob_config.heavy_gc_valid_rate = config.blob_heavy_gc_valid_rate;

return blob_config;
}

static WALStore::Config parseWALConfig(const Config & config)
{
WALStore::Config wal_config;

wal_config.roll_size = config.wal_roll_size;
wal_config.wal_recover_mode = config.wal_recover_mode;
wal_config.max_persisted_log_files = config.wal_max_persisted_log_files;

return wal_config;
}

void restore() override;

void drop() override;
@@ -90,8 +114,6 @@ class PageStorageImpl : public DB::PageStorage

PageDirectoryPtr page_directory;

BlobStore::Config blob_config;

BlobStore blob_store;

std::atomic<bool> gc_is_running = false;
22 changes: 17 additions & 5 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
@@ -122,27 +122,39 @@ WALStoreReader::findCheckpoint(LogFilenameSet && all_files)
WALStoreReaderPtr WALStoreReader::create(String storage_name,
FileProviderPtr & provider,
LogFilenameSet files,
WALRecoveryMode recovery_mode_,
const ReadLimiterPtr & read_limiter)
{
auto [checkpoint, files_to_read] = findCheckpoint(std::move(files));
auto reader = std::make_shared<WALStoreReader>(std::move(storage_name), provider, checkpoint, std::move(files_to_read), read_limiter);
auto reader = std::make_shared<WALStoreReader>(storage_name, provider, checkpoint, std::move(files_to_read), recovery_mode_, read_limiter);
reader->openNextFile();
return reader;
}

WALStoreReaderPtr WALStoreReader::create(String storage_name, FileProviderPtr & provider, PSDiskDelegatorPtr & delegator, const ReadLimiterPtr & read_limiter)
WALStoreReaderPtr WALStoreReader::create(
String storage_name,
FileProviderPtr & provider,
PSDiskDelegatorPtr & delegator,
WALRecoveryMode recovery_mode_,
const ReadLimiterPtr & read_limiter)
{
LogFilenameSet log_files = listAllFiles(delegator, Logger::get("WALStore", storage_name));
return create(storage_name, provider, std::move(log_files), read_limiter);
return create(std::move(storage_name), provider, std::move(log_files), recovery_mode_, read_limiter);
}

WALStoreReader::WALStoreReader(String storage_name, FileProviderPtr & provider_, std::optional<LogFilename> checkpoint, LogFilenameSet && files_, const ReadLimiterPtr & read_limiter_)
WALStoreReader::WALStoreReader(String storage_name,
FileProviderPtr & provider_,
std::optional<LogFilename> checkpoint,
LogFilenameSet && files_,
WALRecoveryMode recovery_mode_,
const ReadLimiterPtr & read_limiter_)
: provider(provider_)
, read_limiter(read_limiter_)
, checkpoint_read_done(!checkpoint.has_value())
, checkpoint_file(checkpoint)
, files_to_read(std::move(files_))
, next_reading_file(files_to_read.begin())
, recovery_mode(recovery_mode_)
, logger(Logger::get("WALStore", std::move(storage_name)))
{}

@@ -208,7 +220,7 @@ bool WALStoreReader::openNextFile()
&reporter,
/*verify_checksum*/ true,
log_num,
WALRecoveryMode::TolerateCorruptedTailRecords);
recovery_mode);
};

if (!checkpoint_read_done)
15 changes: 13 additions & 2 deletions dbms/src/Storages/Page/V3/WAL/WALReader.h
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@

#include <Storages/Page/V3/LogFile/LogFilename.h>
#include <Storages/Page/V3/LogFile/LogReader.h>
#include <Storages/Page/V3/WALStore.h>

namespace DB
{
@@ -49,9 +50,17 @@ class WALStoreReader
static std::tuple<std::optional<LogFilename>, LogFilenameSet>
findCheckpoint(LogFilenameSet && all_files);

static WALStoreReaderPtr create(String storage_name, FileProviderPtr & provider, LogFilenameSet files, const ReadLimiterPtr & read_limiter = nullptr);
static WALStoreReaderPtr create(String storage_name,
FileProviderPtr & provider,
LogFilenameSet files,
WALRecoveryMode recovery_mode_ = WALRecoveryMode::TolerateCorruptedTailRecords,
const ReadLimiterPtr & read_limiter = nullptr);

static WALStoreReaderPtr create(String storage_name, FileProviderPtr & provider, PSDiskDelegatorPtr & delegator, const ReadLimiterPtr & read_limiter = nullptr);
static WALStoreReaderPtr create(String storage_name,
FileProviderPtr & provider,
PSDiskDelegatorPtr & delegator,
WALRecoveryMode recovery_mode_ = WALRecoveryMode::TolerateCorruptedTailRecords,
const ReadLimiterPtr & read_limiter = nullptr);

bool remained() const;

@@ -79,6 +88,7 @@ class WALStoreReader
FileProviderPtr & provider_,
std::optional<LogFilename> checkpoint,
LogFilenameSet && files_,
WALRecoveryMode recovery_mode_,
const ReadLimiterPtr & read_limiter_);

WALStoreReader(const WALStoreReader &) = delete;
@@ -97,6 +107,7 @@ class WALStoreReader
LogFilenameSet::const_iterator next_reading_file;
std::unique_ptr<LogReader> reader;

WALRecoveryMode recovery_mode;
LoggerPtr logger;
};

16 changes: 11 additions & 5 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
@@ -40,26 +40,32 @@ namespace DB::PS::V3
std::pair<WALStorePtr, WALStoreReaderPtr> WALStore::create(
String storage_name,
FileProviderPtr & provider,
PSDiskDelegatorPtr & delegator)
PSDiskDelegatorPtr & delegator,
WALStore::Config config)
{
auto reader = WALStoreReader::create(storage_name, provider, delegator);
auto reader = WALStoreReader::create(storage_name,
provider,
delegator,
static_cast<WALRecoveryMode>(config.wal_recover_mode.get()));
// Create a new LogFile for writing new logs
auto last_log_num = reader->lastLogNum() + 1; // TODO reuse old file
return {
std::unique_ptr<WALStore>(new WALStore(std::move(storage_name), delegator, provider, last_log_num)),
std::unique_ptr<WALStore>(new WALStore(std::move(storage_name), delegator, provider, last_log_num, std::move(config))),
reader};
}

WALStore::WALStore(
String storage_name,
const PSDiskDelegatorPtr & delegator_,
const FileProviderPtr & provider_,
Format::LogNumberType last_log_num_)
Format::LogNumberType last_log_num_,
WALStore::Config config_)
: delegator(delegator_)
, provider(provider_)
, last_log_num(last_log_num_)
, wal_paths_index(0)
, logger(Logger::get("WALStore", std::move(storage_name)))
, config(config_)
{
}

@@ -81,7 +87,7 @@ void WALStore::apply(const PageEntriesEdit & edit, const WriteLimiterPtr & write
std::lock_guard lock(log_file_mutex);
// Roll to a new log file
// TODO: Make it configurable
if (log_file == nullptr || log_file->writtenBytes() > PAGE_META_ROLL_SIZE)
if (log_file == nullptr || log_file->writtenBytes() > config.roll_size)
{
auto log_num = last_log_num++;
auto [new_log_file, filename] = createLogWriter({log_num, 0}, false);
20 changes: 16 additions & 4 deletions dbms/src/Storages/Page/V3/WALStore.h
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
#pragma once

#include <Common/Checksum.h>
#include <Interpreters/SettingsCommon.h>
#include <Storages/Page/V3/LogFile/LogFilename.h>
#include <Storages/Page/V3/LogFile/LogFormat.h>
#include <Storages/Page/V3/LogFile/LogWriter.h>
@@ -82,13 +83,21 @@ using WALStoreReaderPtr = std::shared_ptr<WALStoreReader>;
class WALStore
{
public:
struct Config
{
SettingUInt64 roll_size = PAGE_META_ROLL_SIZE;
SettingUInt64 wal_recover_mode = 0;
SettingUInt64 max_persisted_log_files = MAX_PERSISTED_LOG_FILES;
};

constexpr static const char * wal_folder_prefix = "/wal";

static std::pair<WALStorePtr, WALStoreReaderPtr>
create(
String storage_name,
FileProviderPtr & provider,
PSDiskDelegatorPtr & delegator);
PSDiskDelegatorPtr & delegator,
WALStore::Config config);

void apply(PageEntriesEdit & edit, const PageVersionType & version, const WriteLimiterPtr & write_limiter = nullptr);
void apply(const PageEntriesEdit & edit, const WriteLimiterPtr & write_limiter = nullptr);
@@ -98,10 +107,10 @@ class WALStore
Format::LogNumberType current_writting_log_num;
LogFilenameSet persisted_log_files;

bool needSave() const
bool needSave(const size_t & max_size) const
{
// TODO: Make it configurable and check the reasonable of this number
return persisted_log_files.size() > 4;
return persisted_log_files.size() > max_size;
}
};

@@ -117,7 +126,8 @@ class WALStore
String storage_name,
const PSDiskDelegatorPtr & delegator_,
const FileProviderPtr & provider_,
Format::LogNumberType last_log_num_);
Format::LogNumberType last_log_num_,
WALStore::Config config);

std::tuple<std::unique_ptr<LogWriter>, LogFilename>
createLogWriter(
@@ -133,6 +143,8 @@ class WALStore
std::unique_ptr<LogWriter> log_file;

LoggerPtr logger;

WALStore::Config config;
};

} // namespace PS::V3
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ class PageDirectoryTest : public DB::base::TiFlashStorageTestBasic
FileProviderPtr provider = ctx.getFileProvider();
PSDiskDelegatorPtr delegator = std::make_shared<DB::tests::MockDiskDelegatorSingle>(path);
PageDirectoryFactory factory;
dir = factory.create("PageDirectoryTest", provider, delegator);
dir = factory.create("PageDirectoryTest", provider, delegator, WALStore::Config());
}

protected:
17 changes: 9 additions & 8 deletions dbms/src/Storages/Page/V3/tests/gtest_wal_store.cpp
Original file line number Diff line number Diff line change
@@ -249,6 +249,7 @@ class WALStoreTest

protected:
PSDiskDelegatorPtr delegator;
WALStore::Config config;
};

TEST_P(WALStoreTest, FindCheckpointFile)
@@ -307,7 +308,7 @@ TEST_P(WALStoreTest, Empty)
auto provider = ctx.getFileProvider();
auto path = getTemporaryPath();
size_t num_callback_called = 0;
auto [wal, reader] = WALStore::create(getCurrentTestName(), provider, delegator);
auto [wal, reader] = WALStore::create(getCurrentTestName(), provider, delegator, config);
ASSERT_NE(wal, nullptr);
while (reader->remained())
{
@@ -333,7 +334,7 @@ try

// Stage 1. empty
std::vector<size_t> size_each_edit;
auto [wal, reader] = WALStore::create(getCurrentTestName(), provider, delegator);
auto [wal, reader] = WALStore::create(getCurrentTestName(), provider, delegator, config);
{
size_t num_applied_edit = 0;
auto reader = WALStoreReader::create(getCurrentTestName(), provider, delegator);
@@ -361,7 +362,7 @@ try
wal.reset();
reader.reset();

std::tie(wal, reader) = WALStore::create(getCurrentTestName(), provider, delegator);
std::tie(wal, reader) = WALStore::create(getCurrentTestName(), provider, delegator, config);
{
size_t num_applied_edit = 0;
while (reader->remained())
@@ -393,7 +394,7 @@ try
wal.reset();
reader.reset();

std::tie(wal, reader) = WALStore::create(getCurrentTestName(), provider, delegator);
std::tie(wal, reader) = WALStore::create(getCurrentTestName(), provider, delegator, config);
{
size_t num_applied_edit = 0;
while (reader->remained())
@@ -451,7 +452,7 @@ try
auto provider = ctx.getFileProvider();
auto path = getTemporaryPath();

auto [wal, reader] = WALStore::create(getCurrentTestName(), provider, delegator);
auto [wal, reader] = WALStore::create(getCurrentTestName(), provider, delegator, config);
ASSERT_NE(wal, nullptr);

std::vector<size_t> size_each_edit;
@@ -515,7 +516,7 @@ try

{
size_t num_applied_edit = 0;
std::tie(wal, reader) = WALStore::create(getCurrentTestName(), provider, delegator);
std::tie(wal, reader) = WALStore::create(getCurrentTestName(), provider, delegator, config);
while (reader->remained())
{
auto [ok, edit] = reader->next();
@@ -542,7 +543,7 @@ try
auto path = getTemporaryPath();

// Stage 1. empty
auto [wal, reader] = WALStore::create(getCurrentTestName(), provider, delegator);
auto [wal, reader] = WALStore::create(getCurrentTestName(), provider, delegator, config);
ASSERT_NE(wal, nullptr);

std::mt19937 rd;
@@ -575,7 +576,7 @@ try

size_t num_edits_read = 0;
size_t num_pages_read = 0;
std::tie(wal, reader) = WALStore::create(getCurrentTestName(), provider, delegator);
std::tie(wal, reader) = WALStore::create(getCurrentTestName(), provider, delegator, config);
while (reader->remained())
{
auto [ok, edit] = reader->next();

0 comments on commit 63cae97

Please sign in to comment.