diff --git a/dbms/src/Debug/MockRaftStoreProxy.cpp b/dbms/src/Debug/MockRaftStoreProxy.cpp index 97a24ecf524..8cdc3195af6 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.cpp +++ b/dbms/src/Debug/MockRaftStoreProxy.cpp @@ -14,10 +14,21 @@ #include #include +#include +#include #include +#include +#include +#include namespace DB { +namespace RegionBench +{ +extern void setupPutRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &, const TiKVValue &); +extern void setupDelRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &); +} // namespace RegionBench + kvrpcpb::ReadIndexRequest make_read_index_reqs(uint64_t region_id, uint64_t start_ts) { kvrpcpb::ReadIndexRequest req; @@ -145,6 +156,22 @@ raft_serverpb::RaftApplyState MockProxyRegion::getApply() return apply; } +void MockProxyRegion::updateAppliedIndex(uint64_t index) +{ + auto _ = genLockGuard(); + this->apply.set_applied_index(index); +} + +uint64_t MockProxyRegion::getLatestAppliedIndex() +{ + return this->getApply().applied_index(); +} + +uint64_t MockProxyRegion::getLatestCommitTerm() +{ + return this->getApply().commit_term(); +} + uint64_t MockProxyRegion::getLatestCommitIndex() { return this->getApply().commit_index(); @@ -165,7 +192,11 @@ void MockProxyRegion::setSate(raft_serverpb::RegionLocalState s) MockProxyRegion::MockProxyRegion(uint64_t id_) : id(id_) { - apply.set_commit_index(5); + apply.set_commit_index(RAFT_INIT_LOG_INDEX); + apply.set_commit_term(RAFT_INIT_LOG_TERM); + apply.set_applied_index(RAFT_INIT_LOG_INDEX); + apply.mutable_truncated_state()->set_index(RAFT_INIT_LOG_INDEX); + apply.mutable_truncated_state()->set_term(RAFT_INIT_LOG_TERM); state.mutable_region()->set_id(id); } @@ -295,6 +326,185 @@ void MockRaftStoreProxy::unsafeInvokeForTest(std::function(region_id)); + + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = tests::makeRegion(region_id, RecordKVFormat::genKey(region_id, 0), RecordKVFormat::genKey(region_id, 10)); + lock.regions.emplace(region_id, region); + lock.index.add(region); + } +} + +std::tuple MockRaftStoreProxy::normalWrite( + UInt64 region_id, + std::vector && keys, + std::vector && vals, + std::vector && cmd_types, + std::vector && cmd_cf) +{ + uint64_t index = 0; + uint64_t term = 0; + { + auto region = getRegion(region_id); + assert(region != nullptr); + // We have a new entry. + index = region->getLatestCommitIndex() + 1; + term = region->getLatestCommitTerm(); + // The new entry is committed on Proxy's side. + region->updateCommitIndex(index); + // We record them, as persisted raft log, for potential recovery. + region->commands[index] = { + term, + MockProxyRegion::NormalWrite{ + keys, + vals, + cmd_types, + cmd_cf, + }}; + } + return std::make_tuple(index, term); +} + +std::tuple MockRaftStoreProxy::compactLog(UInt64 region_id, UInt64 compact_index) +{ + uint64_t index = 0; + uint64_t term = 0; + { + auto region = getRegion(region_id); + assert(region != nullptr); + // We have a new entry. + index = region->getLatestCommitIndex() + 1; + term = region->getLatestCommitTerm(); + // The new entry is committed on Proxy's side. + region->updateCommitIndex(index); + // We record them, as persisted raft log, for potential recovery. + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.mutable_compact_log(); + request.set_cmd_type(raft_cmdpb::AdminCmdType::CompactLog); + request.mutable_compact_log()->set_compact_index(compact_index); + // Find compact term, otherwise log must have been compacted. + if (region->commands.count(compact_index)) + { + request.mutable_compact_log()->set_compact_term(region->commands[index].term); + } + region->commands[index] = { + term, + MockProxyRegion::AdminCommand{ + request, + response, + }}; + } + return std::make_tuple(index, term); +} + +void MockRaftStoreProxy::doApply( + KVStore & kvs, + TMTContext & tmt, + const FailCond & cond, + UInt64 region_id, + uint64_t index) +{ + auto region = getRegion(region_id); + assert(region != nullptr); + // We apply this committed entry. + raft_cmdpb::RaftCmdRequest request; + auto & cmd = region->commands[index]; + auto term = cmd.term; + if (cmd.has_write_request()) + { + auto & c = cmd.write(); + auto & keys = c.keys; + auto & vals = c.vals; + auto & cmd_types = c.cmd_types; + auto & cmd_cf = c.cmd_cf; + size_t n = keys.size(); + + assert(n == vals.size()); + assert(n == cmd_types.size()); + assert(n == cmd_cf.size()); + for (size_t i = 0; i < n; i++) + { + if (cmd_types[i] == WriteCmdType::Put) + { + auto cf_name = CFToName(cmd_cf[i]); + auto key = RecordKVFormat::genKey(1, keys[i], 1); + TiKVValue value = std::move(vals[i]); + RegionBench::setupPutRequest(request.add_requests(), cf_name, key, value); + } + else + { + auto cf_name = CFToName(cmd_cf[i]); + auto key = RecordKVFormat::genKey(1, keys[i], 1); + RegionBench::setupDelRequest(request.add_requests(), cf_name, key); + } + } + } + else if (cmd.has_admin_request()) + { + } + + if (cond.type == MockRaftStoreProxy::FailCond::Type::BEFORE_KVSTORE_WRITE) + return; + + auto old_applied = kvs.getRegion(region_id)->appliedIndex(); + auto old_applied_term = kvs.getRegion(region_id)->appliedIndexTerm(); + if (cmd.has_write_request()) + { + // TiFlash write + kvs.handleWriteRaftCmd(std::move(request), region_id, index, term, tmt); + } + if (cmd.has_admin_request()) + { + kvs.handleAdminRaftCmd(std::move(cmd.admin().request), std::move(cmd.admin().response), region_id, index, term, tmt); + } + + if (cond.type == MockRaftStoreProxy::FailCond::Type::BEFORE_KVSTORE_ADVANCE) + { + kvs.getRegion(region_id)->setApplied(old_applied, old_applied_term); + return; + } + + if (cmd.has_admin_request()) + { + if (cmd.admin().cmd_type() == raft_cmdpb::AdminCmdType::CompactLog) + { + auto i = cmd.admin().request.compact_log().compact_index(); + // TODO We should remove (0, index] here, it is enough to remove exactly index now. + region->commands.erase(i); + } + } + + // Proxy advance + if (cond.type == MockRaftStoreProxy::FailCond::Type::BEFORE_PROXY_ADVANCE) + return; + region->updateAppliedIndex(index); +} + +void MockRaftStoreProxy::replay( + KVStore & kvs, + TMTContext & tmt, + uint64_t region_id, + uint64_t to) +{ + auto region = getRegion(region_id); + assert(region != nullptr); + FailCond cond; + for (uint64_t i = region->apply.applied_index() + 1; i <= to; i++) + { + doApply(kvs, tmt, cond, region_id, i); + } +} + void GCMonitor::add(RawObjType type, int64_t diff) { auto _ = genLockGuard(); diff --git a/dbms/src/Debug/MockRaftStoreProxy.h b/dbms/src/Debug/MockRaftStoreProxy.h index b5727eed5de..ac55e501ea8 100644 --- a/dbms/src/Debug/MockRaftStoreProxy.h +++ b/dbms/src/Debug/MockRaftStoreProxy.h @@ -14,8 +14,10 @@ #pragma once +#include #include #include +#include #include @@ -26,19 +28,63 @@ kvrpcpb::ReadIndexRequest make_read_index_reqs(uint64_t region_id, uint64_t star struct MockProxyRegion : MutexLockWrap { raft_serverpb::RegionLocalState getState(); - raft_serverpb::RaftApplyState getApply(); - + void updateAppliedIndex(uint64_t index); + uint64_t getLatestAppliedIndex(); + uint64_t getLatestCommitTerm(); uint64_t getLatestCommitIndex(); - void updateCommitIndex(uint64_t index); void setSate(raft_serverpb::RegionLocalState); - explicit MockProxyRegion(uint64_t id); + struct NormalWrite + { + std::vector keys; + std::vector vals; + std::vector cmd_types; + std::vector cmd_cf; + }; + + struct AdminCommand + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + raft_cmdpb::AdminCmdType cmd_type() const + { + return request.cmd_type(); + } + }; + + struct CachedCommand + { + uint64_t term; + std::variant inner; + + bool has_admin_request() const + { + return std::holds_alternative(inner); + } + + bool has_write_request() const + { + return std::holds_alternative(inner); + } + + AdminCommand & admin() + { + return std::get(inner); + } + + NormalWrite & write() + { + return std::get(inner); + } + }; + const uint64_t id; raft_serverpb::RegionLocalState state; raft_serverpb::RaftApplyState apply; + std::map commands; }; using MockProxyRegionPtr = std::shared_ptr; @@ -104,6 +150,49 @@ struct MockRaftStoreProxy : MutexLockWrap void unsafeInvokeForTest(std::function && cb); static TiFlashRaftProxyHelper SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr); + /// Mutation funcs. + struct FailCond + { + enum Type + { + NORMAL, + BEFORE_KVSTORE_WRITE, + BEFORE_KVSTORE_ADVANCE, + BEFORE_PROXY_ADVANCE, + }; + Type type = NORMAL; + }; + + /// We assume that we generate one command, and immediately commit. + /// boostrap a region + void bootstrap( + KVStore & kvs, + TMTContext & tmt, + UInt64 region_id); + + /// normal write to a region + std::tuple normalWrite( + UInt64 region_id, + std::vector && keys, + std::vector && vals, + std::vector && cmd_types, + std::vector && cmd_cf); + + std::tuple compactLog(UInt64 region_id, UInt64 compact_index); + + void doApply( + KVStore & kvs, + TMTContext & tmt, + const FailCond & cond, + UInt64 region_id, + uint64_t index); + + void replay( + KVStore & kvs, + TMTContext & tmt, + uint64_t region_id, + uint64_t to); + std::unordered_set region_id_to_drop; std::unordered_set region_id_to_error; diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 173cf8e3322..0315793e99f 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -167,6 +167,7 @@ class KVStore final : private boost::noncopyable #ifndef DBMS_PUBLIC_GTEST private: #endif + friend struct MockRaftStoreProxy; friend class MockTiDB; friend struct MockTiDBTable; friend struct MockRaftCommand; diff --git a/dbms/src/Storages/Transaction/Region.cpp b/dbms/src/Storages/Transaction/Region.cpp index a40b6e268e7..9fd45e6c744 100644 --- a/dbms/src/Storages/Transaction/Region.cpp +++ b/dbms/src/Storages/Transaction/Region.cpp @@ -96,6 +96,17 @@ UInt64 Region::appliedIndex() const return meta.appliedIndex(); } +UInt64 Region::appliedIndexTerm() const +{ + return meta.appliedIndexTerm(); +} + +void Region::setApplied(UInt64 index, UInt64 term) +{ + std::unique_lock lock(mutex); + meta.setApplied(index, term); +} + RegionPtr Region::splitInto(RegionMeta && meta) { RegionPtr new_region = std::make_shared(std::move(meta), proxy_helper); diff --git a/dbms/src/Storages/Transaction/Region.h b/dbms/src/Storages/Transaction/Region.h index 91d0f3c8b81..a20bb11ca43 100644 --- a/dbms/src/Storages/Transaction/Region.h +++ b/dbms/src/Storages/Transaction/Region.h @@ -167,8 +167,11 @@ class Region : public std::enable_shared_from_this std::tuple waitIndex(UInt64 index, const UInt64 timeout_ms, std::function && check_running); UInt64 appliedIndex() const; + UInt64 appliedIndexTerm() const; void notifyApplied() { meta.notifyAll(); } + // Export for tests. + void setApplied(UInt64 index, UInt64 term); RegionVersion version() const; RegionVersion confVer() const; diff --git a/dbms/src/Storages/Transaction/RegionMeta.cpp b/dbms/src/Storages/Transaction/RegionMeta.cpp index ad5a681d6ff..13a20a39a9b 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.cpp +++ b/dbms/src/Storages/Transaction/RegionMeta.cpp @@ -106,6 +106,12 @@ UInt64 RegionMeta::appliedIndex() const return apply_state.applied_index(); } +UInt64 RegionMeta::appliedIndexTerm() const +{ + std::lock_guard lock(mutex); + return applied_term; +} + RegionMeta::RegionMeta(RegionMeta && rhs) : region_id(rhs.regionId()) { @@ -247,7 +253,7 @@ RegionMergeResult MetaRaftCommandDelegate::computeRegionMergeResult( const metapb::Region & source_region, const metapb::Region & target_region) { - RegionMergeResult res; + RegionMergeResult res{}; res.version = std::max(source_region.region_epoch().version(), target_region.region_epoch().version()) + 1; diff --git a/dbms/src/Storages/Transaction/RegionMeta.h b/dbms/src/Storages/Transaction/RegionMeta.h index ee64ab3c958..39616c1fa8d 100644 --- a/dbms/src/Storages/Transaction/RegionMeta.h +++ b/dbms/src/Storages/Transaction/RegionMeta.h @@ -70,6 +70,7 @@ class RegionMeta UInt64 storeId() const; UInt64 appliedIndex() const; + UInt64 appliedIndexTerm() const; ImutRegionRangePtr getRange() const; diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp index acde883846c..2ed63481e43 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -43,7 +43,6 @@ extern void ChangeRegionStateRange(RegionState & region_state, bool source_at_le namespace tests { - // TODO: Use another way to workaround calling the private methods on KVStore class RegionKVStoreTest : public ::testing::Test { @@ -65,9 +64,18 @@ class RegionKVStoreTest : public ::testing::Test proxy_instance = std::make_unique(); proxy_helper = std::make_unique(MockRaftStoreProxy::SetRaftStoreProxyFFIHelper( RaftStoreProxyPtr{proxy_instance.get()})); - proxy_instance->init(100); - kvstore->restore(*path_pool, proxy_helper.get()); + { + auto store = metapb::Store{}; + store.set_id(1234); + kvstore->setStore(store); + ASSERT_EQ(kvstore->getStoreID(), store.id()); + } + } + + void create_default_regions() + { + proxy_instance->init(100); } void TearDown() override {} @@ -117,15 +125,10 @@ class RegionKVStoreTest : public ::testing::Test TEST_F(RegionKVStoreTest, NewProxy) { + create_default_regions(); auto ctx = TiFlashTestEnv::getGlobalContext(); KVStore & kvs = getKVS(); - { - auto store = metapb::Store{}; - store.set_id(1234); - kvs.setStore(store); - ASSERT_EQ(kvs.getStoreID(), store.id()); - } { ASSERT_EQ(kvs.getRegion(0), nullptr); auto task_lock = kvs.genTaskLock(); @@ -160,6 +163,7 @@ TEST_F(RegionKVStoreTest, NewProxy) TEST_F(RegionKVStoreTest, ReadIndex) { + create_default_regions(); auto ctx = TiFlashTestEnv::getGlobalContext(); // start mock proxy in other thread @@ -755,6 +759,7 @@ void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) TEST_F(RegionKVStoreTest, Region) { + create_default_regions(); TableID table_id = 100; { auto meta = RegionMeta(createPeer(2, true), createRegionInfo(666, RecordKVFormat::genKey(0, 0), RecordKVFormat::genKey(0, 1000)), initialApplyState()); @@ -852,6 +857,7 @@ TEST_F(RegionKVStoreTest, Region) TEST_F(RegionKVStoreTest, KVStore) { + create_default_regions(); auto ctx = TiFlashTestEnv::getGlobalContext(); KVStore & kvs = getKVS(); @@ -1262,16 +1268,180 @@ TEST_F(RegionKVStoreTest, KVStore) } } -TEST_F(RegionKVStoreTest, KVStoreRestore) +TEST_F(RegionKVStoreTest, KVStoreFailRecovery) { + auto ctx = TiFlashTestEnv::getGlobalContext(); { - KVStore & kvs = getKVS(); + auto applied_index = 0; + auto region_id = 1; { - auto store = metapb::Store{}; - store.set_id(1234); - kvs.setStore(store); - ASSERT_EQ(kvs.getStoreID(), store.id()); + KVStore & kvs = getKVS(); + proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id); + MockRaftStoreProxy::FailCond cond; + + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + ASSERT_NE(r1, nullptr); + ASSERT_NE(kvr1, nullptr); + applied_index = r1->getLatestAppliedIndex(); + ASSERT_EQ(r1->getLatestAppliedIndex(), kvr1->appliedIndex()); + auto [index, term] = proxy_instance->normalWrite(region_id, {33}, {"v1"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1); + ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); + kvs.tryPersist(region_id); + } + { + const KVStore & kvs = reloadKVSFromDisk(); + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1); + ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); + ASSERT_EQ(kvr1->appliedIndex(), r1->getLatestCommitIndex()); + } + } + + { + auto applied_index = 0; + auto region_id = 2; + { + KVStore & kvs = getKVS(); + proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id); + MockRaftStoreProxy::FailCond cond; + cond.type = MockRaftStoreProxy::FailCond::Type::BEFORE_KVSTORE_WRITE; + + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + applied_index = r1->getLatestAppliedIndex(); + ASSERT_EQ(r1->getLatestAppliedIndex(), kvr1->appliedIndex()); + auto [index, term] = proxy_instance->normalWrite(region_id, {34}, {"v1"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + // KVStore failed before write and advance. + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); + ASSERT_EQ(kvr1->appliedIndex(), applied_index); + kvs.tryPersist(region_id); + } + { + KVStore & kvs = reloadKVSFromDisk(); + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); + ASSERT_EQ(kvr1->appliedIndex(), applied_index); + ASSERT_EQ(kvr1->appliedIndex(), r1->getLatestCommitIndex() - 1); + proxy_instance->replay(kvs, ctx.getTMTContext(), region_id, r1->getLatestCommitIndex()); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1); + ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); + } + } + + { + auto applied_index = 0; + auto region_id = 3; + { + KVStore & kvs = getKVS(); + proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id); + MockRaftStoreProxy::FailCond cond; + cond.type = MockRaftStoreProxy::FailCond::Type::BEFORE_KVSTORE_ADVANCE; + + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + applied_index = r1->getLatestAppliedIndex(); + ASSERT_EQ(r1->getLatestAppliedIndex(), kvr1->appliedIndex()); + auto [index, term] = proxy_instance->normalWrite(region_id, {34}, {"v1"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + // KVStore failed before advance. + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); + ASSERT_EQ(kvr1->appliedIndex(), applied_index); + kvs.tryPersist(region_id); + } + { + KVStore & kvs = reloadKVSFromDisk(); + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); + ASSERT_EQ(kvr1->appliedIndex(), applied_index); + ASSERT_EQ(kvr1->appliedIndex(), r1->getLatestCommitIndex() - 1); + EXPECT_THROW(proxy_instance->replay(kvs, ctx.getTMTContext(), region_id, r1->getLatestCommitIndex()), Exception); + } + } + + { + auto applied_index = 0; + auto region_id = 4; + { + KVStore & kvs = getKVS(); + proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id); + MockRaftStoreProxy::FailCond cond; + cond.type = MockRaftStoreProxy::FailCond::Type::BEFORE_PROXY_ADVANCE; + + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + applied_index = r1->getLatestAppliedIndex(); + ASSERT_EQ(r1->getLatestAppliedIndex(), kvr1->appliedIndex()); + LOG_FMT_INFO(&Poco::Logger::get("kvstore"), "applied_index {}", applied_index); + auto [index, term] = proxy_instance->normalWrite(region_id, {35}, {"v1"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + // KVStore succeed. Proxy failed before advance. + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); + ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); + kvs.tryPersist(region_id); + } + { + MockRaftStoreProxy::FailCond cond; + KVStore & kvs = reloadKVSFromDisk(); + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index); + ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); + ASSERT_EQ(kvr1->appliedIndex(), r1->getLatestCommitIndex()); + // Proxy shall replay from handle 35. + proxy_instance->replay(kvs, ctx.getTMTContext(), region_id, r1->getLatestCommitIndex()); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1); + auto [index, term] = proxy_instance->normalWrite(region_id, {36}, {"v2"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 2); + } + } +} + +TEST_F(RegionKVStoreTest, KVStoreAdminCommands) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + { + auto applied_index = 0; + auto region_id = 1; + { + KVStore & kvs = getKVS(); + proxy_instance->bootstrap(kvs, ctx.getTMTContext(), region_id); + MockRaftStoreProxy::FailCond cond; + + auto kvr1 = kvs.getRegion(region_id); + auto r1 = proxy_instance->getRegion(region_id); + ASSERT_NE(r1, nullptr); + ASSERT_NE(kvr1, nullptr); + applied_index = r1->getLatestAppliedIndex(); + ASSERT_EQ(r1->getLatestAppliedIndex(), kvr1->appliedIndex()); + auto [index, term] = proxy_instance->normalWrite(region_id, {33}, {"v1"}, {WriteCmdType::Put}, {ColumnFamilyType::Default}); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 1); + ASSERT_EQ(kvr1->appliedIndex(), applied_index + 1); + + kvr1->markCompactLog(); + kvs.setRegionCompactLogConfig(0, 0, 0); + auto [index2, term2] = proxy_instance->compactLog(region_id, index); + // In tryFlushRegionData we will call handleWriteRaftCmd, which will already cause an advance. + ASSERT_TRUE(kvs.tryFlushRegionData(region_id, true, ctx.getTMTContext(), index2, term)); + proxy_instance->doApply(kvs, ctx.getTMTContext(), cond, region_id, index2); + ASSERT_EQ(r1->getLatestAppliedIndex(), applied_index + 2); + ASSERT_EQ(kvr1->appliedIndex(), applied_index + 2); } + } +} + +TEST_F(RegionKVStoreTest, KVStoreRestore) +{ + { + KVStore & kvs = getKVS(); { ASSERT_EQ(kvs.getRegion(0), nullptr); auto task_lock = kvs.genTaskLock();