Skip to content

Commit

Permalink
feat: Support raft.node remove cmd (#221)
Browse files Browse the repository at this point in the history
* fix: Modify the string matching bug

---------

Co-authored-by: longfar <[email protected]>
  • Loading branch information
panlei-coder and longfar-ncy authored Apr 19, 2024
1 parent 74c584b commit 273a2d9
Show file tree
Hide file tree
Showing 13 changed files with 580 additions and 239 deletions.
38 changes: 19 additions & 19 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ static int ProcessMaster(const char* start, const char* end) {
// discard all requests before sync;
// or continue serve with old data? TODO
return static_cast<int>(end - start);

case kPReplStateWaitAuth:
if (end - start >= 5) {
if (strncasecmp(start, "+OK\r\n", 5) == 0) {
Expand Down Expand Up @@ -268,20 +267,20 @@ int PClient::handlePacket(const char* start, int bytes) {
const char* ptr = start;

if (isPeerMaster()) {
// check slave state
auto recved = ProcessMaster(start, end);
if (recved != -1) {
return recved;
}
}

if (isJoinCmdTarget()) {
// Proccees the packet at one turn.
auto [len, is_disconnect] = PRAFT.ProcessClusterJoinCmdResponse(this, start, bytes);
if (is_disconnect) {
conn->ActiveClose();
if (isClusterCmdTarget()) {
// Proccees the packet at one turn.
int len = PRAFT.ProcessClusterCmdResponse(this, start, bytes); // @todo
if (len > 0) {
return len;
}
} else {
// Proccees the packet at one turn.
// check slave state
auto recved = ProcessMaster(start, end);
if (recved != -1) {
return recved;
}
}
return len;
}

auto parseRet = parser_.ParseRequest(ptr, end);
Expand Down Expand Up @@ -458,9 +457,10 @@ void PClient::OnConnect() {
if (g_config.masterauth.empty()) {
SetAuth();
}
} else if (isJoinCmdTarget()) {
SetName("ClusterJoinCmdConnection");
PRAFT.SendNodeInfoRequest(this);

if (isClusterCmdTarget()) {
PRAFT.SendNodeRequest(this);
}
} else {
if (g_config.password.empty()) {
SetAuth();
Expand Down Expand Up @@ -533,8 +533,8 @@ bool PClient::isPeerMaster() const {
return repl_addr.GetIP() == PeerIP() && repl_addr.GetPort() == PeerPort();
}

bool PClient::isJoinCmdTarget() const {
return PRAFT.GetJoinCtx().GetPeerIp() == PeerIP() && PRAFT.GetJoinCtx().GetPort() == PeerPort();
bool PClient::isClusterCmdTarget() const {
return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort();
}

int PClient::uniqueID() const {
Expand Down
3 changes: 2 additions & 1 deletion src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
void SetAuth() { auth_ = true; }
bool GetAuth() const { return auth_; }
void RewriteCmd(std::vector<std::string>& params) { parser_.SetParams(params); }
void Reexecutecommand() { this->executeCommand(); }

// All parameters of this command (including the command itself)
// e.g:["set","key","value"]
Expand All @@ -210,7 +211,7 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
bool isPeerMaster() const;
int uniqueID() const;

bool isJoinCmdTarget() const;
bool isClusterCmdTarget() const;

// TcpConnection's life is undetermined, so use weak ptr for safety.
std::weak_ptr<TcpConnection> tcp_connection_;
Expand Down
106 changes: 63 additions & 43 deletions src/cmd_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

#include "cmd_admin.h"

#include "braft/raft.h"
#include "rocksdb/version.h"

#include "praft/praft.h"

namespace pikiwidb {
Expand Down Expand Up @@ -84,6 +87,22 @@ InfoCmd::InfoCmd(const std::string& name, int16_t arity)

bool InfoCmd::DoInitial(PClient* client) { return true; }

// @todo The info raft command is only supported for the time being
void InfoCmd::DoCmd(PClient* client) {
if (client->argv_.size() <= 1) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

auto cmd = client->argv_[1];
if (!strcasecmp(cmd.c_str(), "RAFT")) {
InfoRaft(client);
} else if (!strcasecmp(cmd.c_str(), "data")) {
InfoData(client);
} else {
client->SetRes(CmdRes::kErrOther, "the cmd is not supported");
}
}

/*
* INFO raft
* Querying Node Information.
Expand All @@ -98,59 +117,60 @@ bool InfoCmd::DoInitial(PClient* client) { return true; }
raft_num_voting_nodes:2
raft_node1:id=1733428433,state=connected,voting=yes,addr=localhost,port=5001,last_conn_secs=5,conn_errors=0,conn_oks=1
*/
// @todo The info raft command is only supported for the time being
void InfoCmd::DoCmd(PClient* client) {
if (client->argv_.size() <= 1) {
void InfoCmd::InfoRaft(PClient* client) {
if (client->argv_.size() != 2) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

auto cmd = client->argv_[1];
if (!strcasecmp(cmd.c_str(), "RAFT")) {
if (client->argv_.size() != 2) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}
if (!PRAFT.IsInitialized()) {
return client->SetRes(CmdRes::kErrOther, "Don't already cluster member");
}

if (!PRAFT.IsInitialized()) {
return client->SetRes(CmdRes::kErrOther, "don't already cluster member");
}
auto node_status = PRAFT.GetNodeStatus();
if (node_status.state == braft::State::STATE_END) {
return client->SetRes(CmdRes::kErrOther, "Node is not initialized");
}

auto node_status = PRAFT.GetNodeStatus();
if (node_status.state == braft::State::STATE_END) {
return client->SetRes(CmdRes::kErrOther, "Node is not initialized");
std::string message;
message += "raft_group_id:" + PRAFT.GetGroupID() + "\r\n";
message += "raft_node_id:" + PRAFT.GetNodeID() + "\r\n";
message += "raft_peer_id:" + PRAFT.GetPeerID() + "\r\n";
if (braft::is_active_state(node_status.state)) {
message += "raft_state:up\r\n";
} else {
message += "raft_state:down\r\n";
}
message += "raft_role:" + std::string(braft::state2str(node_status.state)) + "\r\n";
message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n";
message += "raft_current_term:" + std::to_string(node_status.term) + "\r\n";

if (PRAFT.IsLeader()) {
std::vector<braft::PeerId> peers;
auto status = PRAFT.GetListPeers(&peers);
if (!status.ok()) {
return client->SetRes(CmdRes::kErrOther, status.error_str());
}

std::string message("");
message += "raft_group_id:" + PRAFT.GetGroupId() + "\r\n";
message += "raft_node_id:" + PRAFT.GetNodeId() + "\r\n";
if (braft::is_active_state(node_status.state)) {
message += "raft_state:up\r\n";
} else {
message += "raft_state:down\r\n";
}
message += "raft_role:" + std::string(braft::state2str(node_status.state)) + "\r\n";
// message += "raft_is_voting:" + node_status.is_voting + "\r\n";
message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n";
message += "raft_current_term:" + std::to_string(node_status.term) + "\r\n";
// message += "raft_num_nodes:" + std::to_string(node_status.num_nodes) + "\r\n";
// message += "raft_num_voting_nodes:" + std::to_string(node_status.num_voting_nodes) + "\r\n";

if (PRAFT.IsLeader()) {
std::vector<braft::PeerId> peers;
auto status = PRAFT.GetListPeers(&peers);
if (!status.ok()) {
return client->SetRes(CmdRes::kErrOther, status.error_str());
}

for (int i = 0; i < peers.size(); i++) {
message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() +
",port=" + std::to_string(peers[i].addr.port) + "\r\n";
}
for (int i = 0; i < peers.size(); i++) {
message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() +
",port=" + std::to_string(peers[i].addr.port) + "\r\n";
}
}

client->AppendString(message);
} else {
client->SetRes(CmdRes::kErrOther, "ERR the cmd is not supported");
client->AppendString(message);
}

void InfoCmd::InfoData(PClient* client) {
if (client->argv_.size() != 2) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
}

std::string message;
message += DATABASES_NUM + std::string(":") + std::to_string(pikiwidb::g_config.databases) + "\r\n";
message += ROCKSDB_NUM + std::string(":") + std::to_string(pikiwidb::g_config.db_instance_num) + "\r\n";
message += ROCKSDB_VERSION + std::string(":") + ROCKSDB_NAMESPACE::GetRocksVersionAsString() + "\r\n";

client->AppendString(message);
}

} // namespace pikiwidb
3 changes: 3 additions & 0 deletions src/cmd_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class InfoCmd : public BaseCmd {

private:
void DoCmd(PClient* client) override;

void InfoRaft(PClient* client);
void InfoData(PClient* client);
};

} // namespace pikiwidb
Loading

0 comments on commit 273a2d9

Please sign in to comment.