diff --git a/src/client.cc b/src/client.cc index 3cf827975..5e7530a4b 100644 --- a/src/client.cc +++ b/src/client.cc @@ -198,7 +198,6 @@ static int ProcessMaster(const char* start, const char* end) { // discard all requests before sync; // or continue serve with old data? TODO return static_cast(end - start); - case kPReplStateWaitAuth: if (end - start >= 5) { if (strncasecmp(start, "+OK\r\n", 5) == 0) { @@ -268,20 +267,20 @@ int PClient::handlePacket(const char* start, int bytes) { const char* ptr = start; if (isPeerMaster()) { - // check slave state - auto recved = ProcessMaster(start, end); - if (recved != -1) { - return recved; - } - } - - if (isJoinCmdTarget()) { - // Proccees the packet at one turn. - auto [len, is_disconnect] = PRAFT.ProcessClusterJoinCmdResponse(this, start, bytes); - if (is_disconnect) { - conn->ActiveClose(); + if (isClusterCmdTarget()) { + // Proccees the packet at one turn. + int len = PRAFT.ProcessClusterCmdResponse(this, start, bytes); // @todo + if (len > 0) { + return len; + } + } else { + // Proccees the packet at one turn. + // check slave state + auto recved = ProcessMaster(start, end); + if (recved != -1) { + return recved; + } } - return len; } auto parseRet = parser_.ParseRequest(ptr, end); @@ -458,9 +457,10 @@ void PClient::OnConnect() { if (g_config.masterauth.empty()) { SetAuth(); } - } else if (isJoinCmdTarget()) { - SetName("ClusterJoinCmdConnection"); - PRAFT.SendNodeInfoRequest(this); + + if (isClusterCmdTarget()) { + PRAFT.SendNodeRequest(this); + } } else { if (g_config.password.empty()) { SetAuth(); @@ -533,8 +533,8 @@ bool PClient::isPeerMaster() const { return repl_addr.GetIP() == PeerIP() && repl_addr.GetPort() == PeerPort(); } -bool PClient::isJoinCmdTarget() const { - return PRAFT.GetJoinCtx().GetPeerIp() == PeerIP() && PRAFT.GetJoinCtx().GetPort() == PeerPort(); +bool PClient::isClusterCmdTarget() const { + return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort(); } int PClient::uniqueID() const { diff --git a/src/client.h b/src/client.h index dc95d8c27..a890697af 100644 --- a/src/client.h +++ b/src/client.h @@ -196,6 +196,7 @@ class PClient : public std::enable_shared_from_this, public CmdRes { void SetAuth() { auth_ = true; } bool GetAuth() const { return auth_; } void RewriteCmd(std::vector& params) { parser_.SetParams(params); } + void Reexecutecommand() { this->executeCommand(); } // All parameters of this command (including the command itself) // e.g:["set","key","value"] @@ -210,7 +211,7 @@ class PClient : public std::enable_shared_from_this, public CmdRes { bool isPeerMaster() const; int uniqueID() const; - bool isJoinCmdTarget() const; + bool isClusterCmdTarget() const; // TcpConnection's life is undetermined, so use weak ptr for safety. std::weak_ptr tcp_connection_; diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index b43575f60..1057bd3fc 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -7,6 +7,9 @@ #include "cmd_admin.h" +#include "braft/raft.h" +#include "rocksdb/version.h" + #include "praft/praft.h" namespace pikiwidb { @@ -84,6 +87,22 @@ InfoCmd::InfoCmd(const std::string& name, int16_t arity) bool InfoCmd::DoInitial(PClient* client) { return true; } +// @todo The info raft command is only supported for the time being +void InfoCmd::DoCmd(PClient* client) { + if (client->argv_.size() <= 1) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + } + + auto cmd = client->argv_[1]; + if (!strcasecmp(cmd.c_str(), "RAFT")) { + InfoRaft(client); + } else if (!strcasecmp(cmd.c_str(), "data")) { + InfoData(client); + } else { + client->SetRes(CmdRes::kErrOther, "the cmd is not supported"); + } +} + /* * INFO raft * Querying Node Information. @@ -98,59 +117,60 @@ bool InfoCmd::DoInitial(PClient* client) { return true; } raft_num_voting_nodes:2 raft_node1:id=1733428433,state=connected,voting=yes,addr=localhost,port=5001,last_conn_secs=5,conn_errors=0,conn_oks=1 */ -// @todo The info raft command is only supported for the time being -void InfoCmd::DoCmd(PClient* client) { - if (client->argv_.size() <= 1) { +void InfoCmd::InfoRaft(PClient* client) { + if (client->argv_.size() != 2) { return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } - auto cmd = client->argv_[1]; - if (!strcasecmp(cmd.c_str(), "RAFT")) { - if (client->argv_.size() != 2) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); - } + if (!PRAFT.IsInitialized()) { + return client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); + } - if (!PRAFT.IsInitialized()) { - return client->SetRes(CmdRes::kErrOther, "don't already cluster member"); - } + auto node_status = PRAFT.GetNodeStatus(); + if (node_status.state == braft::State::STATE_END) { + return client->SetRes(CmdRes::kErrOther, "Node is not initialized"); + } - auto node_status = PRAFT.GetNodeStatus(); - if (node_status.state == braft::State::STATE_END) { - return client->SetRes(CmdRes::kErrOther, "Node is not initialized"); + std::string message; + message += "raft_group_id:" + PRAFT.GetGroupID() + "\r\n"; + message += "raft_node_id:" + PRAFT.GetNodeID() + "\r\n"; + message += "raft_peer_id:" + PRAFT.GetPeerID() + "\r\n"; + if (braft::is_active_state(node_status.state)) { + message += "raft_state:up\r\n"; + } else { + message += "raft_state:down\r\n"; + } + message += "raft_role:" + std::string(braft::state2str(node_status.state)) + "\r\n"; + message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n"; + message += "raft_current_term:" + std::to_string(node_status.term) + "\r\n"; + + if (PRAFT.IsLeader()) { + std::vector peers; + auto status = PRAFT.GetListPeers(&peers); + if (!status.ok()) { + return client->SetRes(CmdRes::kErrOther, status.error_str()); } - std::string message(""); - message += "raft_group_id:" + PRAFT.GetGroupId() + "\r\n"; - message += "raft_node_id:" + PRAFT.GetNodeId() + "\r\n"; - if (braft::is_active_state(node_status.state)) { - message += "raft_state:up\r\n"; - } else { - message += "raft_state:down\r\n"; - } - message += "raft_role:" + std::string(braft::state2str(node_status.state)) + "\r\n"; - // message += "raft_is_voting:" + node_status.is_voting + "\r\n"; - message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n"; - message += "raft_current_term:" + std::to_string(node_status.term) + "\r\n"; - // message += "raft_num_nodes:" + std::to_string(node_status.num_nodes) + "\r\n"; - // message += "raft_num_voting_nodes:" + std::to_string(node_status.num_voting_nodes) + "\r\n"; - - if (PRAFT.IsLeader()) { - std::vector peers; - auto status = PRAFT.GetListPeers(&peers); - if (!status.ok()) { - return client->SetRes(CmdRes::kErrOther, status.error_str()); - } - - for (int i = 0; i < peers.size(); i++) { - message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() + - ",port=" + std::to_string(peers[i].addr.port) + "\r\n"; - } + for (int i = 0; i < peers.size(); i++) { + message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() + + ",port=" + std::to_string(peers[i].addr.port) + "\r\n"; } + } - client->AppendString(message); - } else { - client->SetRes(CmdRes::kErrOther, "ERR the cmd is not supported"); + client->AppendString(message); +} + +void InfoCmd::InfoData(PClient* client) { + if (client->argv_.size() != 2) { + return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } + + std::string message; + message += DATABASES_NUM + std::string(":") + std::to_string(pikiwidb::g_config.databases) + "\r\n"; + message += ROCKSDB_NUM + std::string(":") + std::to_string(pikiwidb::g_config.db_instance_num) + "\r\n"; + message += ROCKSDB_VERSION + std::string(":") + ROCKSDB_NAMESPACE::GetRocksVersionAsString() + "\r\n"; + + client->AppendString(message); } } // namespace pikiwidb diff --git a/src/cmd_admin.h b/src/cmd_admin.h index 8ef4d6e58..270ccbb9a 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -93,6 +93,9 @@ class InfoCmd : public BaseCmd { private: void DoCmd(PClient* client) override; + + void InfoRaft(PClient* client); + void InfoData(PClient* client); }; } // namespace pikiwidb diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index 27f349fa9..ebbef035e 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -5,45 +5,56 @@ * of patent rights can be found in the PATENTS file in the same directory. */ +#include "cmd_raft.h" + #include #include #include +#include "praft/praft.h" +#include "pstd/log.h" +#include "pstd/pstd_string.h" + #include "client.h" -#include "cmd_raft.h" -#include "event_loop.h" -#include "log.h" +#include "config.h" #include "pikiwidb.h" -#include "praft.h" -#include "pstd_string.h" +#include "replication.h" namespace pikiwidb { RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} -bool RaftNodeCmd::DoInitial(PClient* client) { return true; } - -void RaftNodeCmd::DoCmd(PClient* client) { - // Check whether it is a leader. If it is not a leader, return the leader information - if (!PRAFT.IsLeader()) { - return client->SetRes(CmdRes::kWrongLeader, PRAFT.GetLeaderId()); +bool RaftNodeCmd::DoInitial(PClient* client) { + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); + if (cmd != kAddCmd && cmd != kRemoveCmd) { + client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE only"); + return false; } + return true; +} +void RaftNodeCmd::DoCmd(PClient* client) { auto cmd = client->argv_[1]; pstd::StringToUpper(cmd); - if (!strcasecmp(cmd.c_str(), "ADD")) { + if (cmd == kAddCmd) { DoCmdAdd(client); - } else if (!strcasecmp(cmd.c_str(), "REMOVE")) { - DoCmdRemove(client); } else { - client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE only"); + DoCmdRemove(client); } } void RaftNodeCmd::DoCmdAdd(PClient* client) { + // Check whether it is a leader. If it is not a leader, return the leader information + if (!PRAFT.IsLeader()) { + client->SetRes(CmdRes::kWrongLeader, PRAFT.GetLeaderID()); + return; + } + if (client->argv_.size() != 4) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return; } // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. @@ -57,11 +68,45 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { } void RaftNodeCmd::DoCmdRemove(PClient* client) { + // If the node has been initialized, it needs to close the previous initialization and rejoin the other group + if (!PRAFT.IsInitialized()) { + client->SetRes(CmdRes::kErrOther, "Don't already cluster member"); + return; + } + if (client->argv_.size() != 3) { - return client->SetRes(CmdRes::kWrongNum, client->CmdName()); + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return; + } + + // Check whether it is a leader. If it is not a leader, send remove request to leader + if (!PRAFT.IsLeader()) { + // Get the leader information + braft::PeerId leader_peer_id(PRAFT.GetLeaderID()); + // @todo There will be an unreasonable address, need to consider how to deal with it + if (leader_peer_id.is_empty()) { + client->SetRes(CmdRes::kErrOther, + "The leader address of the cluster is incorrect, try again or delete the node from another node"); + return; + } + + // Connect target + std::string peer_ip = butil::ip2str(leader_peer_id.addr.ip).c_str(); + auto port = leader_peer_id.addr.port - pikiwidb::g_config.raft_port_offset; + auto peer_id = client->argv_[2]; + auto ret = + PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id)); + if (!ret) { // other clients have removed + return client->SetRes(CmdRes::kErrOther, "Other clients have removed"); + } + PRAFT.GetClusterCmdCtx().ConnectTargetNode(); + INFO("Sent remove request to leader successfully"); + + // Not reply any message here, we will reply after the connection is established. + client->Clear(); + return; } - // (KKorpse)TODO: Redirect to leader if not leader. auto s = PRAFT.RemovePeer(client->argv_[2]); if (s.ok()) { client->SetRes(CmdRes::kOK); @@ -73,26 +118,32 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} -bool RaftClusterCmd::DoInitial(PClient* client) { return true; } +bool RaftClusterCmd::DoInitial(PClient* client) { + auto cmd = client->argv_[1]; + pstd::StringToUpper(cmd); + if (cmd != kInitCmd && cmd != kJoinCmd) { + client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only"); + return false; + } + return true; +} void RaftClusterCmd::DoCmd(PClient* client) { // parse arguments if (client->argv_.size() < 2) { return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } - auto cmd = client->argv_[1]; if (PRAFT.IsInitialized()) { return client->SetRes(CmdRes::kErrOther, "Already cluster member"); } + auto cmd = client->argv_[1]; pstd::StringToUpper(cmd); if (cmd == kInitCmd) { DoCmdInit(client); - } else if (cmd == kJoinCmd) { - DoCmdJoin(client); } else { - client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER supports INIT/JOIN only"); + DoCmdJoin(client); } } @@ -104,12 +155,12 @@ void RaftClusterCmd::DoCmdInit(PClient* client) { std::string cluster_id; if (client->argv_.size() == 3) { cluster_id = client->argv_[2]; - if (cluster_id.size() != RAFT_DBID_LEN) { + if (cluster_id.size() != RAFT_GROUPID_LEN) { return client->SetRes(CmdRes::kInvalidParameter, - "Cluster id must be " + std::to_string(RAFT_DBID_LEN) + " characters"); + "Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters"); } } else { - cluster_id = pstd::RandomHexChars(RAFT_DBID_LEN); + cluster_id = pstd::RandomHexChars(RAFT_GROUPID_LEN); } auto s = PRAFT.Init(cluster_id, false); if (!s.ok()) { @@ -130,6 +181,13 @@ static inline std::optional> GetIpAndPortFromEnd } void RaftClusterCmd::DoCmdJoin(PClient* client) { + // If the node has been initialized, it needs to close the previous initialization and rejoin the other group + if (PRAFT.IsInitialized()) { + return client->SetRes(CmdRes::kErrOther, + "A node that has been added to a cluster must be removed \ + from the old cluster before it can be added to the new cluster"); + } + if (client->argv_.size() < 3) { return client->SetRes(CmdRes::kWrongNum, client->CmdName()); } @@ -144,31 +202,21 @@ void RaftClusterCmd::DoCmdJoin(PClient* client) { return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); } - auto on_new_conn = [](TcpConnection* obj) { - if (g_pikiwidb) { - g_pikiwidb->OnNewConnection(obj); - } - }; - auto on_fail = [&](EventLoop* loop, const char* peer_ip, int port) { - PRAFT.OnJoinCmdConnectionFailed(loop, peer_ip, port); - }; - - auto loop = EventLoop::Self(); auto ip_port = GetIpAndPortFromEndPoint(addr); if (!ip_port.has_value()) { return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr)); } auto& [peer_ip, port] = *ip_port; - // FIXME: The client here is not smart pointer, may cause undefined behavior. - // should use shared_ptr in DoCmd() rather than raw pointer. - auto ret = PRAFT.GetJoinCtx().Set(client, peer_ip, port); + + // Connect target + auto ret = PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kJoin, client, std::move(peer_ip), port); if (!ret) { // other clients have joined return client->SetRes(CmdRes::kErrOther, "Other clients have joined"); } - loop->Connect(peer_ip.c_str(), port, on_new_conn, on_fail); + PRAFT.GetClusterCmdCtx().ConnectTargetNode(); INFO("Sent join request to leader successfully"); + // Not reply any message here, we will reply after the connection is established. client->Clear(); } - -} // namespace pikiwidb \ No newline at end of file +} // namespace pikiwidb diff --git a/src/cmd_raft.h b/src/cmd_raft.h index 534c90576..b9df47e2c 100644 --- a/src/cmd_raft.h +++ b/src/cmd_raft.h @@ -23,7 +23,7 @@ namespace pikiwidb { * -MOVED : || * *2 * : - * : + * : * * RAFT.NODE REMOVE [id] * Remove an existing node from the cluster. @@ -54,7 +54,7 @@ class RaftNodeCmd : public BaseCmd { * Initializes a new Raft cluster. * is an optional 32 character string, if set, cluster will use it for the id * Reply: - * +OK [dbid] + * +OK [group_id] * * RAFT.CLUSTER JOIN [addr:port] * Join an existing cluster. diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index 0b6558182..76516df5c 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -256,6 +256,7 @@ void PikiwiDB::Run() { void PikiwiDB::Stop() { pikiwidb::PRAFT.ShutDown(); pikiwidb::PRAFT.Join(); + pikiwidb::PRAFT.Clear(); slave_threads_.Exit(); worker_threads_.Exit(); } diff --git a/src/praft/praft.cc b/src/praft/praft.cc index fdaeda8f9..4eb1e385e 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -1,13 +1,10 @@ /* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. */ -// -// praft.cc - #include "praft.h" #include @@ -19,10 +16,10 @@ #include "pstd/pstd_string.h" #include "binlog.pb.h" -#include "client.h" #include "config.h" #include "pikiwidb.h" #include "praft_service.h" +#include "replication.h" #include "store.h" #define ERROR_LOG_AND_STATUS(msg) \ @@ -33,6 +30,52 @@ namespace pikiwidb { +bool ClusterCmdContext::Set(ClusterCmdType cluster_cmd_type, PClient* client, std::string&& peer_ip, int port, + std::string&& peer_id) { + std::unique_lock lck(mtx_); + if (client_ != nullptr) { + return false; + } + assert(client); + cluster_cmd_type_ = cluster_cmd_type; + client_ = client; + peer_ip_ = std::move(peer_ip); + port_ = port; + peer_id_ = std::move(peer_id); + return true; +} + +void ClusterCmdContext::Clear() { + std::unique_lock lck(mtx_); + cluster_cmd_type_ = ClusterCmdType::kNone; + client_ = nullptr; + peer_ip_.clear(); + port_ = 0; + peer_id_.clear(); +} + +bool ClusterCmdContext::IsEmpty() { + std::unique_lock lck(mtx_); + return client_ == nullptr; +} + +void ClusterCmdContext::ConnectTargetNode() { + auto ip = PREPL.GetMasterAddr().GetIP(); + auto port = PREPL.GetMasterAddr().GetPort(); + if (ip == peer_ip_ && port == port_ && PREPL.GetMasterState() == kPReplStateConnected) { + PRAFT.SendNodeRequest(PREPL.GetMaster()); + return; + } + + // reconnect + auto fail_cb = [&](EventLoop*, const char* peer_ip, int port) { + PRAFT.OnClusterCmdConnectionFailed(EventLoop::Self(), peer_ip, port); + }; + PREPL.SetFailCallback(fail_cb); + PREPL.SetMasterState(kPReplStateNone); + PREPL.SetMasterAddr(peer_ip_.c_str(), port_); +} + PRaft& PRaft::Instance() { static PRaft store; return store; @@ -48,6 +91,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { // Add your service into RPC server DummyServiceImpl service(&PRAFT); if (server_->AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + server_.reset(); return ERROR_LOG_AND_STATUS("Failed to add service"); } // raft can share the same RPC server. Notice the second parameter, because @@ -55,6 +99,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { // address of this server is impossible to get before the server starts. You // have to specify the address of the server. if (braft::add_service(server_.get(), port) != 0) { + server_.reset(); return ERROR_LOG_AND_STATUS("Failed to add raft service"); } @@ -64,18 +109,20 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { // Notice the default options of server is used here. Check out details from // the doc of brpc if you would like change some options; if (server_->Start(port, nullptr) != 0) { + server_.reset(); return ERROR_LOG_AND_STATUS("Failed to start server"); } // It's ok to start PRaft; - assert(group_id.size() == RAFT_DBID_LEN); - this->dbid_ = group_id; + assert(group_id.size() == RAFT_GROUPID_LEN); + this->group_id_ = group_id; // FIXME: g_config.ip is default to 127.0.0.0, which may not work in cluster. raw_addr_ = g_config.ip + ":" + std::to_string(port); butil::ip_t ip; auto ret = butil::str2ip(g_config.ip.c_str(), &ip); if (ret != 0) { + server_.reset(); return ERROR_LOG_AND_STATUS("Failed to convert str_ip to butil::ip_t"); } butil::EndPoint addr(ip, port); @@ -95,6 +142,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { initial_conf = raw_addr_ + ":0,"; } if (node_options_.initial_conf.parse_from(initial_conf) != 0) { + server_.reset(); return ERROR_LOG_AND_STATUS("Failed to parse configuration"); } @@ -107,8 +155,9 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { node_options_.raft_meta_uri = prefix + "/raft_meta"; node_options_.snapshot_uri = prefix + "/snapshot"; // node_options_.disable_cli = FLAGS_disable_cli; - node_ = std::make_unique("pikiwidb", braft::PeerId(addr)); // group_id + node_ = std::make_unique(group_id, braft::PeerId(addr)); if (node_->init(node_options_) != 0) { + server_.reset(); node_.reset(); return ERROR_LOG_AND_STATUS("Failed to init raft node"); } @@ -124,7 +173,7 @@ bool PRaft::IsLeader() const { return node_->is_leader(); } -std::string PRaft::GetLeaderId() const { +std::string PRaft::GetLeaderID() const { if (!node_) { ERROR("Node is not initialized"); return "Failed to get leader id"; @@ -143,7 +192,7 @@ std::string PRaft::GetLeaderAddress() const { return addr.c_str(); } -std::string PRaft::GetNodeId() const { +std::string PRaft::GetNodeID() const { if (!node_) { ERROR("Node is not initialized"); return "Failed to get node id"; @@ -151,12 +200,24 @@ std::string PRaft::GetNodeId() const { return node_->node_id().to_string(); } -std::string PRaft::GetGroupId() const { +std::string PRaft::GetPeerID() const { + if (!node_) { + ERROR("Node is not initialized"); + return "Failed to get node id"; + } + + auto node_id = node_->node_id().to_string(); + auto pos = node_id.find(':'); + auto peer_id = node_id.substr(pos + 1, node_id.size()); + return peer_id; +} + +std::string PRaft::GetGroupID() const { if (!node_) { ERROR("Node is not initialized"); return "Failed to get cluster id"; } - return dbid_; + return group_id_; } braft::NodeStatus PRaft::GetNodeStatus() const { @@ -177,14 +238,30 @@ butil::Status PRaft::GetListPeers(std::vector* peers) { return node_->list_peers(peers); } +void PRaft::SendNodeRequest(PClient* client) { + assert(client); + + auto cluster_cmd_type = cluster_cmd_ctx_.GetClusterCmdType(); + switch (cluster_cmd_type) { + case ClusterCmdType::kJoin: + SendNodeInfoRequest(client, "DATA"); + break; + case ClusterCmdType::kRemove: + SendNodeRemoveRequest(client); + break; + default: + client->SetRes(CmdRes::kErrOther, "the command sent to the leader is incorrect"); + break; + } +} + // Gets the cluster id, which is used to initialize node -void PRaft::SendNodeInfoRequest(PClient* client) { +void PRaft::SendNodeInfoRequest(PClient* client, const std::string& info_type) { assert(client); - UnboundedBuffer req; - req.PushData("INFO raft", 9); - req.PushData("\r\n", 2); - client->SendPacket(req); + const std::string cmd_str = "INFO " + info_type + "\r\n"; + client->SendPacket(cmd_str); + client->Clear(); } void PRaft::SendNodeAddRequest(PClient* client) { @@ -201,80 +278,205 @@ void PRaft::SendNodeAddRequest(PClient* client) { req.PushData(raw_addr.data(), raw_addr.size()); req.PushData("\r\n", 2); client->SendPacket(req); + client->Clear(); +} + +void PRaft::SendNodeRemoveRequest(PClient* client) { + assert(client); + + UnboundedBuffer req; + req.PushData("RAFT.NODE REMOVE ", 17); + req.PushData(cluster_cmd_ctx_.GetPeerID().c_str(), cluster_cmd_ctx_.GetPeerID().size()); + req.PushData("\r\n", 2); + client->SendPacket(req); + client->Clear(); +} + +int PRaft::ProcessClusterCmdResponse(PClient* client, const char* start, int len) { + auto cluster_cmd_type = cluster_cmd_ctx_.GetClusterCmdType(); + int ret = 0; + switch (cluster_cmd_type) { + case ClusterCmdType::kJoin: + ret = PRAFT.ProcessClusterJoinCmdResponse(client, start, len); + break; + case ClusterCmdType::kRemove: + ret = PRAFT.ProcessClusterRemoveCmdResponse(client, start, len); + break; + default: + client->SetRes(CmdRes::kErrOther, "RAFT.CLUSTER response supports JOIN/REMOVE only"); + break; + } + + return ret; +} + +void PRaft::CheckRocksDBConfiguration(PClient* client, PClient* join_client, const std::string& reply) { + int databases_num = 0; + int rocksdb_num = 0; + std::string rockdb_version; + std::string line; + std::istringstream iss(reply); + + while (std::getline(iss, line)) { + std::string::size_type pos = line.find(':'); + if (pos != std::string::npos) { + std::string key = line.substr(0, pos); + std::string value = line.substr(pos + 1); + + if (key == DATABASES_NUM && pstd::String2int(value, &databases_num) == 0) { + join_client->SetRes(CmdRes::kErrOther, "Config of databases_num invalid"); + join_client->SendPacket(join_client->Message()); + join_client->Clear(); + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + } else if (key == ROCKSDB_NUM && pstd::String2int(value, &rocksdb_num) == 0) { + join_client->SetRes(CmdRes::kErrOther, "Config of rocksdb_num invalid"); + join_client->SendPacket(join_client->Message()); + join_client->Clear(); + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + } else if (key == ROCKSDB_VERSION) { + rockdb_version = pstd::StringTrimRight(value, "\r"); + } + } + } + + int current_databases_num = pikiwidb::g_config.databases; + int current_rocksdb_num = pikiwidb::g_config.db_instance_num; + std::string current_rocksdb_version = ROCKSDB_NAMESPACE::GetRocksVersionAsString(); + if (current_databases_num != databases_num || current_rocksdb_num != rocksdb_num || + current_rocksdb_version != rockdb_version) { + join_client->SetRes(CmdRes::kErrOther, "Config of databases_num, rocksdb_num or rocksdb_version mismatch"); + join_client->SendPacket(join_client->Message()); + join_client->Clear(); + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + } else { + SendNodeInfoRequest(client, "RAFT"); + } +} + +void PRaft::LeaderRedirection(PClient* join_client, const std::string& reply) { + // Resolve the ip address of the leader + pstd::StringTrimLeft(reply, WRONG_LEADER); + pstd::StringTrim(reply); + braft::PeerId peerId; + peerId.parse(reply); + auto peer_ip = std::string(butil::ip2str(peerId.addr.ip).c_str()); + auto port = peerId.addr.port; + + // Reset the target of the connection + cluster_cmd_ctx_.Clear(); + auto ret = PRAFT.GetClusterCmdCtx().Set(ClusterCmdType::kJoin, join_client, std::move(peer_ip), port); + if (!ret) { // other clients have joined + join_client->SetRes(CmdRes::kErrOther, "Other clients have joined"); + join_client->SendPacket(join_client->Message()); + join_client->Clear(); + return; + } + PRAFT.GetClusterCmdCtx().ConnectTargetNode(); + + // Not reply any message here, we will reply after the connection is established. + join_client->Clear(); } -std::tuple PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len) { +void PRaft::InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply) { + std::string prefix = RAFT_GROUP_ID; + std::string::size_type prefix_length = prefix.length(); + std::string::size_type group_id_start = reply.find(prefix); + group_id_start += prefix_length; // locate the start location of "raft_group_id" + std::string::size_type group_id_end = reply.find("\r\n", group_id_start); + if (group_id_end != std::string::npos) { + std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); + // initialize the slave node + auto s = PRAFT.Init(raft_group_id, true); + if (!s.ok()) { + join_client->SetRes(CmdRes::kErrOther, s.error_str()); + join_client->SendPacket(join_client->Message()); + join_client->Clear(); + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + return; + } + + PRAFT.SendNodeAddRequest(client); + } else { + ERROR("Joined Raft cluster fail, because of invalid raft_group_id"); + join_client->SetRes(CmdRes::kErrOther, "Invalid raft_group_id"); + join_client->SendPacket(join_client->Message()); + join_client->Clear(); + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + } +} + +int PRaft::ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len) { assert(start); - auto join_client = join_ctx_.GetClient(); + auto join_client = cluster_cmd_ctx_.GetClient(); if (!join_client) { WARN("No client when processing cluster join cmd response."); - return std::make_tuple(0, true); + return 0; } - bool is_disconnect = true; std::string reply(start, len); - if (reply.find("+OK") != std::string::npos) { - INFO("Joined Raft cluster, node id: {}, dbid: {}", PRAFT.GetNodeId(), PRAFT.dbid_); + if (reply.find(OK) != std::string::npos) { + INFO("Joined Raft cluster, node id: {}, group_id: {}", PRAFT.GetNodeID(), PRAFT.group_id_); join_client->SetRes(CmdRes::kOK); join_client->SendPacket(join_client->Message()); - is_disconnect = false; - } else if (reply.find("-ERR wrong leader") != std::string::npos) { - // Resolve the ip address of the leader - pstd::StringTrimLeft(reply, "-ERR wrong leader"); - pstd::StringTrim(reply); - braft::PeerId peerId; - peerId.parse(reply); - - // Establish a connection with the leader and send the add request - auto on_new_conn = [](TcpConnection* obj) { - if (g_pikiwidb) { - g_pikiwidb->OnNewConnection(obj); - } - }; - auto fail_cb = [&](EventLoop* loop, const char* peer_ip, int port) { - PRAFT.OnJoinCmdConnectionFailed(loop, peer_ip, port); - }; - - auto loop = EventLoop::Self(); - auto peer_ip = std::string(butil::ip2str(peerId.addr.ip).c_str()); - auto port = peerId.addr.port; - // FIXME: The client here is not smart pointer, may cause undefined behavior. - // should use shared_ptr in DoCmd() rather than raw pointer. - PRAFT.GetJoinCtx().Set(join_client, peer_ip, port); - loop->Connect(peer_ip.c_str(), port, on_new_conn, fail_cb); - - // Not reply any message here, we will reply after the connection is established. join_client->Clear(); - } else if (reply.find("raft_group_id") != std::string::npos) { - std::string prefix = "raft_group_id:"; - std::string::size_type prefix_length = prefix.length(); - std::string::size_type group_id_start = reply.find(prefix); - group_id_start += prefix_length; // 定位到raft_group_id的起始位置 - std::string::size_type group_id_end = reply.find("\r\n", group_id_start); - if (group_id_end != std::string::npos) { - std::string raft_group_id = reply.substr(group_id_start, group_id_end - group_id_start); - // initialize the slave node - auto s = PRAFT.Init(raft_group_id, true); - if (!s.ok()) { - join_client->SetRes(CmdRes::kErrOther, s.error_str()); - join_client->SendPacket(join_client->Message()); - return std::make_tuple(len, is_disconnect); - } - - PRAFT.SendNodeAddRequest(client); - is_disconnect = false; - } else { - ERROR("Joined Raft cluster fail, because of invalid raft_group_id"); - join_client->SetRes(CmdRes::kErrOther, "Invalid raft_group_id"); - join_client->SendPacket(join_client->Message()); - } + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + } else if (reply.find(DATABASES_NUM) != std::string::npos) { + CheckRocksDBConfiguration(client, join_client, reply); + } else if (reply.find(WRONG_LEADER) != std::string::npos) { + LeaderRedirection(join_client, reply); + } else if (reply.find(RAFT_GROUP_ID) != std::string::npos) { + InitializeNodeBeforeAdd(client, join_client, reply); } else { - ERROR("Joined Raft cluster fail, str: {}", start); - join_client->SetRes(CmdRes::kErrOther, std::string(start, len)); + ERROR("Joined Raft cluster fail, str: {}", reply); + join_client->SetRes(CmdRes::kErrOther, reply); join_client->SendPacket(join_client->Message()); + join_client->Clear(); + // If the join fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + } + + return len; +} + +int PRaft::ProcessClusterRemoveCmdResponse(PClient* client, const char* start, int len) { + assert(start); + auto remove_client = cluster_cmd_ctx_.GetClient(); + if (!remove_client) { + WARN("No client when processing cluster remove cmd response."); + return 0; + } + + std::string reply(start, len); + if (reply.find(OK) != std::string::npos) { + INFO("Removed Raft cluster, node id: {}, group_id: {}", PRAFT.GetNodeID(), PRAFT.group_id_); + ShutDown(); + Join(); + Clear(); + + remove_client->SetRes(CmdRes::kOK); + remove_client->SendPacket(remove_client->Message()); + remove_client->Clear(); + } else if (reply.find(NOT_LEADER) != std::string::npos) { + auto remove_client = cluster_cmd_ctx_.GetClient(); + remove_client->Clear(); + remove_client->Reexecutecommand(); + } else { + ERROR("Removed Raft cluster fail, str: {}", reply); + remove_client->SetRes(CmdRes::kErrOther, reply); + remove_client->SendPacket(remove_client->Message()); + remove_client->Clear(); } - return std::make_tuple(len, is_disconnect); + // If the remove fails, clear clusterContext and set it again by using the join command + cluster_cmd_ctx_.Clear(); + + return len; } butil::Status PRaft::AddPeer(const std::string& peer) { @@ -312,13 +514,17 @@ butil::Status PRaft::RemovePeer(const std::string& peer) { return {0, "OK"}; } -void PRaft::OnJoinCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) { - auto cli = join_ctx_.GetClient(); +void PRaft::OnClusterCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) { + auto cli = cluster_cmd_ctx_.GetClient(); if (cli) { - cli->SetRes(CmdRes::kErrOther, "ERR failed to connect to cluster for join, please check logs " + + cli->SetRes(CmdRes::kErrOther, "Failed to connect to cluster for join or remove, please check logs " + std::string(peer_ip) + ":" + std::to_string(port)); cli->SendPacket(cli->Message()); + cli->Clear(); } + cluster_cmd_ctx_.Clear(); + + PREPL.GetMasterAddr().Clear(); } // Shut this node and server down. @@ -361,6 +567,16 @@ void PRaft::AppendLog(const Binlog& log, std::promise&& promise node_->apply(task); } +void PRaft::Clear() { + if (node_) { + node_.reset(); + } + + if (server_) { + server_.reset(); + } +} + void PRaft::on_apply(braft::Iterator& iter) { // A batch of tasks are committed, which must be processed through for (; iter.valid(); iter.next()) { diff --git a/src/praft/praft.h b/src/praft/praft.h index d1217b597..05fbded9a 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -10,59 +10,64 @@ #include #include "braft/raft.h" +#include "brpc/server.h" #include "rocksdb/status.h" +#include "client.h" + namespace pikiwidb { -#define RAFT_DBID_LEN 32 +#define RAFT_GROUPID_LEN 32 + +#define OK "+OK" +#define DATABASES_NUM "databases_num" +#define ROCKSDB_NUM "rocksdb_num" +#define ROCKSDB_VERSION "rocksdb_version" +#define WRONG_LEADER "-ERR wrong leader" +#define RAFT_GROUP_ID "raft_group_id:" +#define NOT_LEADER "Not leader" #define PRAFT PRaft::Instance() -class PClient; class EventLoop; class Binlog; -class JoinCmdContext { +enum ClusterCmdType { + kNone, + kJoin, + kRemove, +}; + +class ClusterCmdContext { friend class PRaft; public: - JoinCmdContext() = default; - ~JoinCmdContext() = default; - - bool Set(PClient* client, const std::string& peer_ip, int port) { - std::unique_lock lck(mtx_); - if (client_ != nullptr) { - return false; - } - assert(client); - client_ = client; - peer_ip_ = peer_ip; - port_ = port; - return true; - } + ClusterCmdContext() = default; + ~ClusterCmdContext() = default; - void Clear() { - std::unique_lock lck(mtx_); - client_ = nullptr; - peer_ip_.clear(); - port_ = 0; - } + bool Set(ClusterCmdType cluster_cmd_type, PClient* client, std::string&& peer_ip, int port, + std::string&& peer_id = ""); + + void Clear(); // @todo the function seems useless - bool IsEmpty() { - std::unique_lock lck(mtx_); - return client_ == nullptr; - } + bool IsEmpty(); + ClusterCmdType GetClusterCmdType() { return cluster_cmd_type_; } PClient* GetClient() { return client_; } const std::string& GetPeerIp() { return peer_ip_; } int GetPort() { return port_; } + const std::string& GetPeerID() { return peer_id_; } + + void ConnectTargetNode(); private: + ClusterCmdType cluster_cmd_type_ = ClusterCmdType::kNone; std::mutex mtx_; PClient* client_ = nullptr; std::string peer_ip_; int port_ = 0; + std::string peer_id_; }; class PRaftWriteDoneClosure : public braft::Closure { @@ -98,21 +103,32 @@ class PRaft : public braft::StateMachine { void ShutDown(); void Join(); void AppendLog(const Binlog& log, std::promise&& promise); + void Clear(); //===--------------------------------------------------------------------===// - // ClusterJoin command + // Cluster command //===--------------------------------------------------------------------===// - JoinCmdContext& GetJoinCtx() { return join_ctx_; } - void SendNodeInfoRequest(PClient* client); + ClusterCmdContext& GetClusterCmdCtx() { return cluster_cmd_ctx_; } + void SendNodeRequest(PClient* client); + void SendNodeInfoRequest(PClient* client, const std::string& info_type); void SendNodeAddRequest(PClient* client); - std::tuple ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len); - void OnJoinCmdConnectionFailed(EventLoop*, const char* peer_ip, int port); + void SendNodeRemoveRequest(PClient* client); + + int ProcessClusterCmdResponse(PClient* client, const char* start, int len); + void CheckRocksDBConfiguration(PClient* client, PClient* join_client, const std::string& reply); + void LeaderRedirection(PClient* join_client, const std::string& reply); + void InitializeNodeBeforeAdd(PClient* client, PClient* join_client, const std::string& reply); + int ProcessClusterJoinCmdResponse(PClient* client, const char* start, int len); + int ProcessClusterRemoveCmdResponse(PClient* client, const char* start, int len); + + void OnClusterCmdConnectionFailed(EventLoop*, const char* peer_ip, int port); bool IsLeader() const; - std::string GetLeaderId() const; std::string GetLeaderAddress() const; - std::string GetNodeId() const; - std::string GetGroupId() const; + std::string GetLeaderID() const; + std::string GetNodeID() const; + std::string GetPeerID() const; + std::string GetGroupID() const; braft::NodeStatus GetNodeStatus() const; butil::Status GetListPeers(std::vector* peers); @@ -138,8 +154,8 @@ class PRaft : public braft::StateMachine { braft::NodeOptions node_options_; // options for raft node std::string raw_addr_; // ip:port of this node - JoinCmdContext join_ctx_; // context for cluster join command - std::string dbid_; // dbid of group, + ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command + std::string group_id_; // group id }; } // namespace pikiwidb diff --git a/src/praft/praft_service.h b/src/praft/praft_service.h index 8efc5f51a..e0a44d6a5 100644 --- a/src/praft/praft_service.h +++ b/src/praft/praft_service.h @@ -1,3 +1,10 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + #pragma once #include "praft.pb.h" @@ -16,4 +23,4 @@ class DummyServiceImpl : public DummyService { PRaft* praft_; }; -} // namespace pikiwidb \ No newline at end of file +} // namespace pikiwidb diff --git a/src/replication.cc b/src/replication.cc index 8dcdac7ad..3a7a0c534 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -191,6 +191,7 @@ void PReplication::Cron() { g_pikiwidb->OnNewConnection(obj); } }; + auto fail_cb = [&](EventLoop*, const char* peer_ip, int port) { WARN("OnCallback: Connect master {}:{} failed", peer_ip, port); @@ -198,6 +199,11 @@ void PReplication::Cron() { if (!masterInfo_.downSince) { masterInfo_.downSince = ::time(nullptr); } + + if (on_fail_) { + on_fail_(EventLoop::Self(), peer_ip, port); + on_fail_ = nullptr; + } }; auto loop = EventLoop::Self(); @@ -207,20 +213,7 @@ void PReplication::Cron() { } break; case kPReplStateConnected: - if (!g_config.masterauth.empty()) { - if (auto master = master_.lock()) { - UnboundedBuffer req; - req.PushData("auth ", 5); - req.PushData(g_config.masterauth.data(), g_config.masterauth.size()); - req.PushData("\r\n", 2); - master->SendPacket(req); - INFO("send auth with password {}", g_config.masterauth); - - masterInfo_.state = kPReplStateWaitAuth; - break; - } - } - // fall through to next case. + break; case kPReplStateWaitAuth: { auto master = master_.lock(); diff --git a/src/replication.h b/src/replication.h index 11d8807f8..8201b610b 100644 --- a/src/replication.h +++ b/src/replication.h @@ -12,6 +12,7 @@ #include #include "common.h" +#include "net/tcp_connection.h" #include "net/unbounded_buffer.h" #include "net/util.h" #include "pstd/memory_file.h" @@ -126,12 +127,14 @@ class PReplication { void SendToSlaves(const std::vector& params); // slave side + void SetFailCallback(TcpConnectionFailCallback cb) { on_fail_ = std::move(cb); } void SaveTmpRdb(const char* data, std::size_t& len); void SetMaster(const std::shared_ptr& cli); void SetMasterState(PReplState s); void SetMasterAddr(const char* ip, unsigned short port); void SetRdbSize(std::size_t s); PReplState GetMasterState() const; + PClient* GetMaster() const { return master_.lock().get(); } SocketAddr GetMasterAddr() const; std::size_t GetRdbSize() const; @@ -151,6 +154,9 @@ class PReplication { PMasterInfo masterInfo_; std::weak_ptr master_; pstd::OutputMemoryFile rdb_; + + // Callback function that failed to connect to the master node + TcpConnectionFailCallback on_fail_ = nullptr; }; } // namespace pikiwidb diff --git a/tests/consistency_test.go b/tests/consistency_test.go index d01072426..9dd15c98e 100644 --- a/tests/consistency_test.go +++ b/tests/consistency_test.go @@ -1,9 +1,11 @@ package pikiwidb_test import ( + "bufio" "context" "log" "strconv" + "strings" "time" . "github.com/onsi/ginkgo/v2" @@ -139,7 +141,7 @@ var _ = Describe("Consistency", Ordered, func() { "fa": "va", "fc": "vc", })) - time.Sleep(100 * time.Millisecond) + time.Sleep(10000 * time.Millisecond) for _, f := range followers { getall, err := f.HGetAll(ctx, testKey).Result() Expect(err).NotTo(HaveOccurred()) @@ -149,4 +151,32 @@ var _ = Describe("Consistency", Ordered, func() { })) } }) + + It("ThreeNodesClusterConstructionTest", func() { + for _, follower := range followers { + info, err := follower.Do(ctx, "info", "raft").Result() + Expect(err).NotTo(HaveOccurred()) + info_str := info.(string) + scanner := bufio.NewScanner(strings.NewReader(info_str)) + var peer_id string + var is_member bool + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "raft_peer_id") { + parts := strings.Split(line, ":") + if len(parts) >= 2 { + peer_id = parts[1] + is_member = true + break + } + } + } + + if is_member { + ret, err := follower.Do(ctx, "raft.node", "remove", peer_id).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(ret).To(Equal(OK)) + } + } + }) })