Skip to content

Commit

Permalink
Refactor KVStore/RegionPersister test (#5679)
Browse files Browse the repository at this point in the history
ref #5170
  • Loading branch information
JaySon-Huang authored Aug 23, 2022
1 parent 112a51e commit 4ec074b
Show file tree
Hide file tree
Showing 17 changed files with 324 additions and 410 deletions.
10 changes: 6 additions & 4 deletions dbms/src/Debug/MockSSTReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct MockSSTReader
Data() = default;
};

MockSSTReader(const Data & data_)
explicit MockSSTReader(const Data & data_)
: iter(data_.begin())
, end(data_.end())
, remained(iter != end)
Expand All @@ -70,16 +70,18 @@ struct MockSSTReader
};


class RegionMockTest
class RegionMockTest final
{
public:
RegionMockTest(KVStorePtr kvstore_, RegionPtr region_);
RegionMockTest(KVStore * kvstore_, RegionPtr region_);
~RegionMockTest();

DISALLOW_COPY_AND_MOVE(RegionMockTest);

private:
TiFlashRaftProxyHelper mock_proxy_helper{};
const TiFlashRaftProxyHelper * ori_proxy_helper{};
KVStorePtr kvstore;
KVStore * kvstore;
RegionPtr region;
};
} // namespace DB
8 changes: 4 additions & 4 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ void fn_gc(SSTReaderPtr ptr, ColumnFamilyType)
delete reader;
}

RegionMockTest::RegionMockTest(KVStorePtr kvstore_, RegionPtr region_)
RegionMockTest::RegionMockTest(KVStore * kvstore_, RegionPtr region_)
: kvstore(kvstore_)
, region(region_)
{
Expand Down Expand Up @@ -465,7 +465,7 @@ void MockRaftCommand::dbgFuncIngestSST(Context & context, const ASTs & args, DBG

FailPointHelper::enableFailPoint(FailPoints::force_set_sst_decode_rand);
// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, region);
RegionMockTest mock_test(kvstore.get(), region);

{
// Mocking ingest a SST for column family "Write"
Expand Down Expand Up @@ -646,7 +646,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles(Context & context, c
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, start_handle, end_handle + 10000, index);

// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, new_region);
RegionMockTest mock_test(kvstore.get(), new_region);

std::vector<SSTView> sst_views;
{
Expand Down Expand Up @@ -743,7 +743,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFilesWithHandles(Context &
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, region_start_handle, region_end_handle, index);

// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, new_region);
RegionMockTest mock_test(kvstore.get(), new_region);

std::vector<SSTView> sst_views;
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/DTTool/DTTool.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class ImitativeEnv

global_context->setDeltaIndexManager(1024 * 1024 * 100 /*100MB*/);

global_context->getTMTContext().restore();
auto & path_pool = global_context->getPathPool();
global_context->getTMTContext().restore(path_pool);
return global_context;
}

Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1229,8 +1229,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
if (proxy_conf.is_proxy_runnable && !tiflash_instance_wrap.proxy_helper)
throw Exception("Raft Proxy Helper is not set, should not happen");
auto & path_pool = global_context->getPathPool();
/// initialize TMTContext
global_context->getTMTContext().restore(tiflash_instance_wrap.proxy_helper);
global_context->getTMTContext().restore(path_pool, tiflash_instance_wrap.proxy_helper);
}

/// setting up elastic thread pool
Expand Down Expand Up @@ -1307,14 +1308,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
assert(tiflash_instance_wrap.proxy_helper->getProxyStatus() == RaftProxyStatus::Running);
LOG_FMT_INFO(log, "store {}, tiflash proxy is ready to serve, try to wake up all regions' leader", tmt_context.getKVStore()->getStoreID(std::memory_order_seq_cst));
size_t runner_cnt = config().getUInt("flash.read_index_runner_count", 1); // if set 0, DO NOT enable read-index worker
tmt_context.getKVStore()->initReadIndexWorkers(
auto & kvstore_ptr = tmt_context.getKVStore();
kvstore_ptr->initReadIndexWorkers(
[&]() {
// get from tmt context
return std::chrono::milliseconds(tmt_context.readIndexWorkerTick());
},
/*running thread count*/ runner_cnt);
tmt_context.getKVStore()->asyncRunReadIndexWorkers();
WaitCheckRegionReady(tmt_context, terminate_signals_counter);
WaitCheckRegionReady(tmt_context, *kvstore_ptr, terminate_signals_counter);
}
SCOPE_EXIT({
if (proxy_conf.is_proxy_runnable && tiflash_instance_wrap.status != EngineStoreServerStatus::Running)
Expand Down Expand Up @@ -1391,4 +1393,4 @@ int mainEntryClickHouseServer(int argc, char ** argv)
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}
}
3 changes: 2 additions & 1 deletion dbms/src/Server/tests/gtest_server_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,10 @@ dt_open_file_max_idle_seconds = 20
dt_page_gc_low_write_prob = 0.2
)"};
auto & global_ctx = TiFlashTestEnv::getGlobalContext();
auto & global_path_pool = global_ctx.getPathPool();
RegionManager region_manager;
RegionPersister persister(global_ctx, region_manager);
persister.restore(nullptr, PageStorage::Config{});
persister.restore(global_path_pool, nullptr, PageStorage::Config{});

auto verify_persister_reload_config = [&global_ctx](RegionPersister & persister) {
DB::Settings & settings = global_ctx.getSettingsRef();
Expand Down
19 changes: 10 additions & 9 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ KVStore::KVStore(Context & context, TiDB::SnapshotApplyMethod snapshot_apply_met
// default config about compact-log: period 120s, rows 40k, bytes 32MB.
}

void KVStore::restore(const TiFlashRaftProxyHelper * proxy_helper)
void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper)
{
auto task_lock = genTaskLock();
auto manage_lock = genRegionWriteLock(task_lock);

this->proxy_helper = proxy_helper;
manage_lock.regions = region_persister->restore(proxy_helper);
manage_lock.regions = region_persister->restore(path_pool, proxy_helper);

LOG_FMT_INFO(log, "Restored {} regions", manage_lock.regions.size());

Expand Down Expand Up @@ -625,6 +625,7 @@ EngineStoreApplyRes KVStore::handleAdminRaftCmd(raft_cmdpb::AdminRequest && requ

void WaitCheckRegionReady(
const TMTContext & tmt,
KVStore & kvstore,
const std::atomic_size_t & terminate_signals_counter,
double wait_tick_time,
double max_wait_tick_time,
Expand All @@ -644,7 +645,7 @@ void WaitCheckRegionReady(
Stopwatch region_check_watch;
size_t total_regions_cnt = 0;
{
tmt.getKVStore()->traverseRegions([&remain_regions](RegionID region_id, const RegionPtr &) { remain_regions.emplace(region_id); });
kvstore.traverseRegions([&remain_regions](RegionID region_id, const RegionPtr &) { remain_regions.emplace(region_id); });
total_regions_cnt = remain_regions.size();
}
while (region_check_watch.elapsedSeconds() < get_wait_region_ready_timeout_sec * batch_read_index_time_rate
Expand All @@ -654,7 +655,7 @@ void WaitCheckRegionReady(
for (auto it = remain_regions.begin(); it != remain_regions.end();)
{
auto region_id = *it;
if (auto region = tmt.getKVStore()->getRegion(region_id); region)
if (auto region = kvstore.getRegion(region_id); region)
{
batch_read_index_req.emplace_back(GenRegionReadIndexReq(*region));
it++;
Expand All @@ -664,7 +665,7 @@ void WaitCheckRegionReady(
it = remain_regions.erase(it);
}
}
auto read_index_res = tmt.getKVStore()->batchReadIndex(batch_read_index_req, tmt.batchReadIndexTimeout());
auto read_index_res = kvstore.batchReadIndex(batch_read_index_req, tmt.batchReadIndexTimeout());
for (auto && [resp, region_id] : read_index_res)
{
bool need_retry = resp.read_index() == 0;
Expand Down Expand Up @@ -716,7 +717,7 @@ void WaitCheckRegionReady(
for (auto it = regions_to_check.begin(); it != regions_to_check.end();)
{
auto [region_id, latest_index] = *it;
if (auto region = tmt.getKVStore()->getRegion(region_id); region)
if (auto region = kvstore.getRegion(region_id); region)
{
if (region->appliedIndex() >= latest_index)
{
Expand Down Expand Up @@ -752,7 +753,7 @@ void WaitCheckRegionReady(
regions_to_check.begin(),
regions_to_check.end(),
[&](const auto & e, FmtBuffer & b) {
if (auto r = tmt.getKVStore()->getRegion(e.first); r)
if (auto r = kvstore.getRegion(e.first); r)
{
b.fmtAppend("{},{},{}", e.first, e.second, r->appliedIndex());
}
Expand All @@ -771,14 +772,14 @@ void WaitCheckRegionReady(
region_check_watch.elapsedSeconds());
}

void WaitCheckRegionReady(const TMTContext & tmt, const std::atomic_size_t & terminate_signals_counter)
void WaitCheckRegionReady(const TMTContext & tmt, KVStore & kvstore, const std::atomic_size_t & terminate_signals_counter)
{
// wait interval to check region ready, not recommended to modify only if for tesing
auto wait_region_ready_tick = tmt.getContext().getConfigRef().getUInt64("flash.wait_region_ready_tick", 0);
auto wait_region_ready_timeout_sec = static_cast<double>(tmt.waitRegionReadyTimeout());
const double max_wait_tick_time = 0 == wait_region_ready_tick ? 20.0 : wait_region_ready_timeout_sec;
double min_wait_tick_time = 0 == wait_region_ready_tick ? 2.5 : static_cast<double>(wait_region_ready_tick); // default tick in TiKV is about 2s (without hibernate-region)
return WaitCheckRegionReady(tmt, terminate_signals_counter, min_wait_tick_time, max_wait_tick_time, wait_region_ready_timeout_sec);
return WaitCheckRegionReady(tmt, kvstore, terminate_signals_counter, min_wait_tick_time, max_wait_tick_time, wait_region_ready_timeout_sec);
}

void KVStore::setStore(metapb::Store store_)
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ class ReadIndexWorkerManager;
using BatchReadIndexRes = std::vector<std::pair<kvrpcpb::ReadIndexResponse, uint64_t>>;
class ReadIndexStressTest;
struct FileUsageStatistics;
class PathPool;
class RegionPersister;

/// TODO: brief design document.
class KVStore final : private boost::noncopyable
{
public:
KVStore(Context & context, TiDB::SnapshotApplyMethod snapshot_apply_method_);
void restore(const TiFlashRaftProxyHelper *);
void restore(PathPool & path_pool, const TiFlashRaftProxyHelper *);

RegionPtr getRegion(RegionID region_id) const;

Expand Down Expand Up @@ -162,7 +163,9 @@ class KVStore final : private boost::noncopyable

FileUsageStatistics getFileUsageStatistics() const;

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
friend class MockTiDB;
friend struct MockTiDBTable;
friend struct MockRaftCommand;
Expand Down Expand Up @@ -231,7 +234,9 @@ class KVStore final : private boost::noncopyable
void releaseReadIndexWorkers();
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
RegionManager region_manager;

std::unique_ptr<RegionPersister> region_persister;
Expand Down Expand Up @@ -275,7 +280,7 @@ class KVStoreTaskLock : private boost::noncopyable
std::lock_guard<std::mutex> lock;
};

void WaitCheckRegionReady(const TMTContext &, const std::atomic_size_t & terminate_signals_counter);
void WaitCheckRegionReady(const TMTContext &, const std::atomic_size_t &, double, double, double);
void WaitCheckRegionReady(const TMTContext &, KVStore & kvstore, const std::atomic_size_t & terminate_signals_counter);
void WaitCheckRegionReady(const TMTContext &, KVStore & kvstore, const std::atomic_size_t &, double, double, double);

} // namespace DB
8 changes: 4 additions & 4 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Region : public std::enable_shared_from_this<Region>
class CommittedScanner : private boost::noncopyable
{
public:
CommittedScanner(const RegionPtr & store_, bool use_lock = true)
explicit CommittedScanner(const RegionPtr & store_, bool use_lock = true)
: store(store_)
{
if (use_lock)
Expand Down Expand Up @@ -97,7 +97,7 @@ class Region : public std::enable_shared_from_this<Region>
class CommittedRemover : private boost::noncopyable
{
public:
CommittedRemover(const RegionPtr & store_, bool use_lock = true)
explicit CommittedRemover(const RegionPtr & store_, bool use_lock = true)
: store(store_)
{
if (use_lock)
Expand Down Expand Up @@ -245,11 +245,11 @@ class RegionRaftCommandDelegate : public Region
const RegionRangeKeys & getRange();
UInt64 appliedIndex();

RegionRaftCommandDelegate() = delete;

private:
friend class tests::RegionKVStoreTest;

RegionRaftCommandDelegate() = delete;

Regions execBatchSplit(
const raft_cmdpb::AdminRequest & request,
const raft_cmdpb::AdminResponse & response,
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/Transaction/RegionMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ class RegionMeta
metapb::Region getMetaRegion() const;
raft_serverpb::MergeState getMergeState() const;

private:
RegionMeta() = delete;

private:
friend class MetaRaftCommandDelegate;
friend class tests::RegionKVStoreTest;

Expand Down Expand Up @@ -157,8 +158,6 @@ class MetaRaftCommandDelegate
friend class RegionRaftCommandDelegate;
friend class tests::RegionKVStoreTest;

MetaRaftCommandDelegate() = delete;

const metapb::Peer & getPeer() const;
const raft_serverpb::RaftApplyState & applyState() const;
const RegionState & regionState() const;
Expand Down Expand Up @@ -192,6 +191,8 @@ class MetaRaftCommandDelegate
static RegionMergeResult computeRegionMergeResult(
const metapb::Region & source_region,
const metapb::Region & target_region);

MetaRaftCommandDelegate() = delete;
};

} // namespace DB
20 changes: 10 additions & 10 deletions dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,15 @@ void RegionPersister::forceTransformKVStoreV2toV3()
page_writer->writeIntoV2(std::move(write_batch_del_v2), nullptr);
}

RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper, PageStorage::Config config)
RegionMap RegionPersister::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper, PageStorage::Config config)
{
{
auto & path_pool = global_context.getPathPool();
auto delegator = path_pool.getPSDiskDelegatorRaft();
auto provider = global_context.getFileProvider();
auto run_mode = global_context.getPageStorageRunMode();
const auto global_run_mode = global_context.getPageStorageRunMode();
auto run_mode = global_run_mode;

switch (run_mode)
switch (global_run_mode)
{
case PageStorageRunMode::ONLY_V2:
{
Expand All @@ -245,8 +245,8 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper,
config,
provider);
page_storage_v2->restore();
page_writer = std::make_shared<PageWriter>(run_mode, page_storage_v2, /*storage_v3_*/ nullptr);
page_reader = std::make_shared<PageReader>(run_mode, ns_id, page_storage_v2, /*storage_v3_*/ nullptr, /*readlimiter*/ global_context.getReadLimiter());
page_writer = std::make_shared<PageWriter>(global_run_mode, page_storage_v2, /*storage_v3_*/ nullptr);
page_reader = std::make_shared<PageReader>(global_run_mode, ns_id, page_storage_v2, /*storage_v3_*/ nullptr, /*readlimiter*/ global_context.getReadLimiter());
}
else
{
Expand All @@ -270,8 +270,8 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper,
config,
provider);
page_storage_v3->restore();
page_writer = std::make_shared<PageWriter>(run_mode, /*storage_v2_*/ nullptr, page_storage_v3);
page_reader = std::make_shared<PageReader>(run_mode, ns_id, /*storage_v2_*/ nullptr, page_storage_v3, global_context.getReadLimiter());
page_writer = std::make_shared<PageWriter>(global_run_mode, /*storage_v2_*/ nullptr, page_storage_v3);
page_reader = std::make_shared<PageReader>(global_run_mode, ns_id, /*storage_v2_*/ nullptr, page_storage_v3, global_context.getReadLimiter());
break;
}
case PageStorageRunMode::MIX_MODE:
Expand All @@ -296,8 +296,8 @@ RegionMap RegionPersister::restore(const TiFlashRaftProxyHelper * proxy_helper,

if (const auto & kvstore_remain_pages = page_storage_v2->getNumberOfPages(); kvstore_remain_pages != 0)
{
page_writer = std::make_shared<PageWriter>(run_mode, page_storage_v2, page_storage_v3);
page_reader = std::make_shared<PageReader>(run_mode, ns_id, page_storage_v2, page_storage_v3, global_context.getReadLimiter());
page_writer = std::make_shared<PageWriter>(global_run_mode, page_storage_v2, page_storage_v3);
page_reader = std::make_shared<PageReader>(global_run_mode, ns_id, page_storage_v2, page_storage_v3, global_context.getReadLimiter());

LOG_FMT_INFO(log, "Current kvstore transform to V3 begin [pages_before_transform={}]", kvstore_remain_pages);
forceTransformKVStoreV2toV3();
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Transaction/RegionPersister.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace DB
{
class Context;

class PathPool;
class Region;
using RegionPtr = std::shared_ptr<Region>;
using RegionMap = std::unordered_map<RegionID, RegionPtr>;
Expand All @@ -50,7 +51,7 @@ class RegionPersister final : private boost::noncopyable
void drop(RegionID region_id, const RegionTaskLock &);
void persist(const Region & region);
void persist(const Region & region, const RegionTaskLock & lock);
RegionMap restore(const TiFlashRaftProxyHelper * proxy_helper = nullptr, PageStorage::Config config = PageStorage::Config{});
RegionMap restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper = nullptr, PageStorage::Config config = PageStorage::Config{});
bool gc();

using RegionCacheWriteElement = std::tuple<RegionID, MemoryWriteBuffer, size_t, UInt64>;
Expand Down
Loading

0 comments on commit 4ec074b

Please sign in to comment.