Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add blpop cmd #373

Closed
wants to merge 9 commits into from
28 changes: 28 additions & 0 deletions src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,34 @@ BaseCmd* BaseCmdGroup::GetSubCmd(const std::string& cmdName) {
return subCmd->second.get();
}

void BaseCmd::ServeAndUnblockConns(const std::string& key) {
auto& key_to_conns = g_pikiwidb->GetMapFromKeyToConns();
auto it = key_to_conns.find(key);
if (it == key_to_conns.end()) {
// no client is waitting for this key
return;
}
auto& waitting_list = it->second;
std::vector<std::string> elements;
storage::Status s;

// traverse this list from head to tail(in the order of adding sequence) ,means "first blocked, first get served“
for (auto conn_blocked = waitting_list->begin(); conn_blocked != waitting_list->end();) {
PClient* client = *conn_blocked;
s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(key, 1, &elements);
if (s.ok()) {
client->AppendString(elements[0]);
} else if (s.IsNotFound()) {
// this key has no more elements to serve more blocked conn.
break;
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
client->WriteReply2Client();
conn_blocked = waitting_list->erase(conn_blocked); // remove this conn from current waiting list
}
}

bool BaseCmdGroup::DoInitial(PClient* client) {
client->SetSubCmdName(client->argv_[1]);
if (!subCmds_.contains(client->SubCmdName())) {
Expand Down
3 changes: 3 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ const std::string kCmdNameLPushx = "lpushx";
const std::string kCmdNameRPush = "rpush";
const std::string kCmdNameRPushx = "rpushx";
const std::string kCmdNameLPop = "lpop";
const std::string kCmdNameBLPop = "blpop";
const std::string kCmdNameRPop = "rpop";
const std::string kCmdNameLRem = "lrem";
const std::string kCmdNameLRange = "lrange";
Expand Down Expand Up @@ -315,6 +316,8 @@ class BaseCmd : public std::enable_shared_from_this<BaseCmd> {

uint32_t GetCmdID() const;

void ServeAndUnblockConns(const std::string& key);

protected:
// Execute a specific command
virtual void DoCmd(PClient* client) = 0;
Expand Down
36 changes: 36 additions & 0 deletions src/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "cmd_list.h"
#include "pstd_string.h"
#include "store.h"
#include "pikiwidb.h"

namespace pikiwidb {
LPushCmd::LPushCmd(const std::string& name, int16_t arity)
Expand All @@ -28,6 +29,7 @@ void LPushCmd::DoCmd(PClient* client) {
} else {
client->SetRes(CmdRes::kSyntaxErr, "lpush cmd error");
}
ServeAndUnblockConns(client->Key());
}

LPushxCmd::LPushxCmd(const std::string& name, int16_t arity)
Expand Down Expand Up @@ -115,6 +117,40 @@ void RPushxCmd::DoCmd(PClient* client) {
}
}

BLPopCmd::BLPopCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

bool BLPopCmd::DoInitial(PClient* client) {
client->SetKey(client->argv_[1]);
return true;
}

void BLPopCmd::DoCmd(PClient* client) {
std::vector<std::string> element;
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->LPop(client->Key(), 1, &element);
if (s.ok()) {
client->AppendString(element[0]);
return;
} else if (s.IsNotFound()){
BlockThisClientToWaitLRPush(element, client);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}

void BLPopCmd::BlockThisClientToWaitLRPush(std::vector<std::string>& keys, PClient* client) {
auto& key_to_conns_ = g_pikiwidb->GetMapFromKeyToConns();
std::string key = client->Key();
auto it = key_to_conns_.find(key);
if (it == key_to_conns_.end()) {
key_to_conns_.emplace(key, std::make_unique<std::list<PClient*>>());
it = key_to_conns_.find(key);
}
auto& wait_list_of_this_key = it->second;
wait_list_of_this_key->emplace_back(client);
}

LPopCmd::LPopCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategoryList) {}

Expand Down
13 changes: 13 additions & 0 deletions src/cmd_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ class RPushCmd : public BaseCmd {
void DoCmd(PClient* client) override;
};

class BLPopCmd : public BaseCmd {
public:
BLPopCmd(const std::string& name, int16_t arity);

protected:
bool DoInitial(PClient* client) override;

private:
void DoCmd(PClient* client) override;
void BlockThisClientToWaitLRPush(std::vector<std::string>& keys, PClient* client);

};

class RPopCmd : public BaseCmd {
public:
RPopCmd(const std::string& name, int16_t arity);
Expand Down
1 change: 1 addition & 0 deletions src/cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ void CmdTableManager::InitCmdTable() {
ADD_COMMAND(LPushx, -3);
ADD_COMMAND(RPushx, -3);
ADD_COMMAND(LPop, 2);
ADD_COMMAND(BLPop, 2);
ADD_COMMAND(LIndex, 3);
ADD_COMMAND(LLen, 2);
ADD_COMMAND(RPoplpush, 3);
Expand Down
12 changes: 12 additions & 0 deletions src/pikiwidb.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class PikiwiDB final {

void PushWriteTask(const std::shared_ptr<pikiwidb::PClient>& client) { worker_threads_.PushWriteTask(client); }

std::unordered_map<std::string, std::unique_ptr<std::list<pikiwidb::PClient*>>>& GetMapFromKeyToConns() {
return key_to_blocked_conns_;
}

public:
PString cfg_file_;
uint16_t port_{0};
Expand All @@ -60,6 +64,14 @@ class PikiwiDB final {
pikiwidb::IOThreadPool slave_threads_;
pikiwidb::CmdThreadPool cmd_threads_;
// pikiwidb::CmdTableManager cmd_table_manager_;
/*
* Blpop/BRpop used
*/
/* key_to_blocked_conns_:
* mapping from "Blockkey"(eg. "<db0, list1>") to a list that stored the nodes of client-connections that
* were blocked by command blpop/brpop with key.
*/
std::unordered_map<std::string, std::unique_ptr<std::list<pikiwidb::PClient*>>> key_to_blocked_conns_;

uint32_t cmd_id_ = 0;
};
Expand Down
Loading