Skip to content

Commit

Permalink
test(KVStore): Support Proxy's apply pattern on normal write and comp…
Browse files Browse the repository at this point in the history
…act log (#5623)

ref #5170
  • Loading branch information
CalvinNeo authored Sep 26, 2022
1 parent 5609e68 commit 3bdb49e
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 21 deletions.
212 changes: 211 additions & 1 deletion dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@

#include <Common/Exception.h>
#include <Debug/MockRaftStoreProxy.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/ProxyFFICommon.h>
#include <Storages/Transaction/RegionMeta.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/tests/region_helper.h>

namespace DB
{
namespace RegionBench
{
extern void setupPutRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &, const TiKVValue &);
extern void setupDelRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &);
} // namespace RegionBench

kvrpcpb::ReadIndexRequest make_read_index_reqs(uint64_t region_id, uint64_t start_ts)
{
kvrpcpb::ReadIndexRequest req;
Expand Down Expand Up @@ -145,6 +156,22 @@ raft_serverpb::RaftApplyState MockProxyRegion::getApply()
return apply;
}

void MockProxyRegion::updateAppliedIndex(uint64_t index)
{
auto _ = genLockGuard();
this->apply.set_applied_index(index);
}

uint64_t MockProxyRegion::getLatestAppliedIndex()
{
return this->getApply().applied_index();
}

uint64_t MockProxyRegion::getLatestCommitTerm()
{
return this->getApply().commit_term();
}

uint64_t MockProxyRegion::getLatestCommitIndex()
{
return this->getApply().commit_index();
Expand All @@ -165,7 +192,11 @@ void MockProxyRegion::setSate(raft_serverpb::RegionLocalState s)
MockProxyRegion::MockProxyRegion(uint64_t id_)
: id(id_)
{
apply.set_commit_index(5);
apply.set_commit_index(RAFT_INIT_LOG_INDEX);
apply.set_commit_term(RAFT_INIT_LOG_TERM);
apply.set_applied_index(RAFT_INIT_LOG_INDEX);
apply.mutable_truncated_state()->set_index(RAFT_INIT_LOG_INDEX);
apply.mutable_truncated_state()->set_term(RAFT_INIT_LOG_TERM);
state.mutable_region()->set_id(id);
}

Expand Down Expand Up @@ -295,6 +326,185 @@ void MockRaftStoreProxy::unsafeInvokeForTest(std::function<void(MockRaftStorePro
cb(*this);
}

void MockRaftStoreProxy::bootstrap(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id)
{
UNUSED(tmt);
auto _ = genLockGuard();
regions.emplace(region_id, std::make_shared<MockProxyRegion>(region_id));

auto task_lock = kvs.genTaskLock();
auto lock = kvs.genRegionWriteLock(task_lock);
{
auto region = tests::makeRegion(region_id, RecordKVFormat::genKey(region_id, 0), RecordKVFormat::genKey(region_id, 10));
lock.regions.emplace(region_id, region);
lock.index.add(region);
}
}

std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::normalWrite(
UInt64 region_id,
std::vector<HandleID> && keys,
std::vector<std::string> && vals,
std::vector<WriteCmdType> && cmd_types,
std::vector<ColumnFamilyType> && cmd_cf)
{
uint64_t index = 0;
uint64_t term = 0;
{
auto region = getRegion(region_id);
assert(region != nullptr);
// We have a new entry.
index = region->getLatestCommitIndex() + 1;
term = region->getLatestCommitTerm();
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);
// We record them, as persisted raft log, for potential recovery.
region->commands[index] = {
term,
MockProxyRegion::NormalWrite{
keys,
vals,
cmd_types,
cmd_cf,
}};
}
return std::make_tuple(index, term);
}

std::tuple<uint64_t, uint64_t> MockRaftStoreProxy::compactLog(UInt64 region_id, UInt64 compact_index)
{
uint64_t index = 0;
uint64_t term = 0;
{
auto region = getRegion(region_id);
assert(region != nullptr);
// We have a new entry.
index = region->getLatestCommitIndex() + 1;
term = region->getLatestCommitTerm();
// The new entry is committed on Proxy's side.
region->updateCommitIndex(index);
// We record them, as persisted raft log, for potential recovery.
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.mutable_compact_log();
request.set_cmd_type(raft_cmdpb::AdminCmdType::CompactLog);
request.mutable_compact_log()->set_compact_index(compact_index);
// Find compact term, otherwise log must have been compacted.
if (region->commands.count(compact_index))
{
request.mutable_compact_log()->set_compact_term(region->commands[index].term);
}
region->commands[index] = {
term,
MockProxyRegion::AdminCommand{
request,
response,
}};
}
return std::make_tuple(index, term);
}

void MockRaftStoreProxy::doApply(
KVStore & kvs,
TMTContext & tmt,
const FailCond & cond,
UInt64 region_id,
uint64_t index)
{
auto region = getRegion(region_id);
assert(region != nullptr);
// We apply this committed entry.
raft_cmdpb::RaftCmdRequest request;
auto & cmd = region->commands[index];
auto term = cmd.term;
if (cmd.has_write_request())
{
auto & c = cmd.write();
auto & keys = c.keys;
auto & vals = c.vals;
auto & cmd_types = c.cmd_types;
auto & cmd_cf = c.cmd_cf;
size_t n = keys.size();

assert(n == vals.size());
assert(n == cmd_types.size());
assert(n == cmd_cf.size());
for (size_t i = 0; i < n; i++)
{
if (cmd_types[i] == WriteCmdType::Put)
{
auto cf_name = CFToName(cmd_cf[i]);
auto key = RecordKVFormat::genKey(1, keys[i], 1);
TiKVValue value = std::move(vals[i]);
RegionBench::setupPutRequest(request.add_requests(), cf_name, key, value);
}
else
{
auto cf_name = CFToName(cmd_cf[i]);
auto key = RecordKVFormat::genKey(1, keys[i], 1);
RegionBench::setupDelRequest(request.add_requests(), cf_name, key);
}
}
}
else if (cmd.has_admin_request())
{
}

if (cond.type == MockRaftStoreProxy::FailCond::Type::BEFORE_KVSTORE_WRITE)
return;

auto old_applied = kvs.getRegion(region_id)->appliedIndex();
auto old_applied_term = kvs.getRegion(region_id)->appliedIndexTerm();
if (cmd.has_write_request())
{
// TiFlash write
kvs.handleWriteRaftCmd(std::move(request), region_id, index, term, tmt);
}
if (cmd.has_admin_request())
{
kvs.handleAdminRaftCmd(std::move(cmd.admin().request), std::move(cmd.admin().response), region_id, index, term, tmt);
}

if (cond.type == MockRaftStoreProxy::FailCond::Type::BEFORE_KVSTORE_ADVANCE)
{
kvs.getRegion(region_id)->setApplied(old_applied, old_applied_term);
return;
}

if (cmd.has_admin_request())
{
if (cmd.admin().cmd_type() == raft_cmdpb::AdminCmdType::CompactLog)
{
auto i = cmd.admin().request.compact_log().compact_index();
// TODO We should remove (0, index] here, it is enough to remove exactly index now.
region->commands.erase(i);
}
}

// Proxy advance
if (cond.type == MockRaftStoreProxy::FailCond::Type::BEFORE_PROXY_ADVANCE)
return;
region->updateAppliedIndex(index);
}

void MockRaftStoreProxy::replay(
KVStore & kvs,
TMTContext & tmt,
uint64_t region_id,
uint64_t to)
{
auto region = getRegion(region_id);
assert(region != nullptr);
FailCond cond;
for (uint64_t i = region->apply.applied_index() + 1; i <= to; i++)
{
doApply(kvs, tmt, cond, region_id, i);
}
}

void GCMonitor::add(RawObjType type, int64_t diff)
{
auto _ = genLockGuard();
Expand Down
97 changes: 93 additions & 4 deletions dbms/src/Debug/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

#pragma once

#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/ReadIndexWorker.h>
#include <kvproto/raft_serverpb.pb.h>
#include <raft_cmdpb.pb.h>

#include <ext/singleton.h>

Expand All @@ -26,19 +28,63 @@ kvrpcpb::ReadIndexRequest make_read_index_reqs(uint64_t region_id, uint64_t star
struct MockProxyRegion : MutexLockWrap
{
raft_serverpb::RegionLocalState getState();

raft_serverpb::RaftApplyState getApply();

void updateAppliedIndex(uint64_t index);
uint64_t getLatestAppliedIndex();
uint64_t getLatestCommitTerm();
uint64_t getLatestCommitIndex();

void updateCommitIndex(uint64_t index);
void setSate(raft_serverpb::RegionLocalState);

explicit MockProxyRegion(uint64_t id);

struct NormalWrite
{
std::vector<HandleID> keys;
std::vector<std::string> vals;
std::vector<WriteCmdType> cmd_types;
std::vector<ColumnFamilyType> cmd_cf;
};

struct AdminCommand
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
raft_cmdpb::AdminCmdType cmd_type() const
{
return request.cmd_type();
}
};

struct CachedCommand
{
uint64_t term;
std::variant<NormalWrite, AdminCommand> inner;

bool has_admin_request() const
{
return std::holds_alternative<AdminCommand>(inner);
}

bool has_write_request() const
{
return std::holds_alternative<NormalWrite>(inner);
}

AdminCommand & admin()
{
return std::get<AdminCommand>(inner);
}

NormalWrite & write()
{
return std::get<NormalWrite>(inner);
}
};

const uint64_t id;
raft_serverpb::RegionLocalState state;
raft_serverpb::RaftApplyState apply;
std::map<uint64_t, CachedCommand> commands;
};

using MockProxyRegionPtr = std::shared_ptr<MockProxyRegion>;
Expand Down Expand Up @@ -104,6 +150,49 @@ struct MockRaftStoreProxy : MutexLockWrap
void unsafeInvokeForTest(std::function<void(MockRaftStoreProxy &)> && cb);

static TiFlashRaftProxyHelper SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr);
/// Mutation funcs.
struct FailCond
{
enum Type
{
NORMAL,
BEFORE_KVSTORE_WRITE,
BEFORE_KVSTORE_ADVANCE,
BEFORE_PROXY_ADVANCE,
};
Type type = NORMAL;
};

/// We assume that we generate one command, and immediately commit.
/// boostrap a region
void bootstrap(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id);

/// normal write to a region
std::tuple<uint64_t, uint64_t> normalWrite(
UInt64 region_id,
std::vector<HandleID> && keys,
std::vector<std::string> && vals,
std::vector<WriteCmdType> && cmd_types,
std::vector<ColumnFamilyType> && cmd_cf);

std::tuple<uint64_t, uint64_t> compactLog(UInt64 region_id, UInt64 compact_index);

void doApply(
KVStore & kvs,
TMTContext & tmt,
const FailCond & cond,
UInt64 region_id,
uint64_t index);

void replay(
KVStore & kvs,
TMTContext & tmt,
uint64_t region_id,
uint64_t to);


std::unordered_set<uint64_t> region_id_to_drop;
std::unordered_set<uint64_t> region_id_to_error;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class KVStore final : private boost::noncopyable
#ifndef DBMS_PUBLIC_GTEST
private:
#endif
friend struct MockRaftStoreProxy;
friend class MockTiDB;
friend struct MockTiDBTable;
friend struct MockRaftCommand;
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ UInt64 Region::appliedIndex() const
return meta.appliedIndex();
}

UInt64 Region::appliedIndexTerm() const
{
return meta.appliedIndexTerm();
}

void Region::setApplied(UInt64 index, UInt64 term)
{
std::unique_lock lock(mutex);
meta.setApplied(index, term);
}

RegionPtr Region::splitInto(RegionMeta && meta)
{
RegionPtr new_region = std::make_shared<Region>(std::move(meta), proxy_helper);
Expand Down
Loading

0 comments on commit 3bdb49e

Please sign in to comment.