Skip to content

Commit

Permalink
Add the support of BLMOVE (#1592)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangsx-1 authored Jul 16, 2023
1 parent 31bcd8a commit fe73c35
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 0 deletions.
128 changes: 128 additions & 0 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,12 +555,140 @@ class CommandLMove : public Commander {
bool dst_left_;
};

class CommandBLMove : public Commander,
private EvbufCallbackBase<CommandBLMove, false>,
private EventCallbackBase<CommandBLMove> {
public:
Status Parse(const std::vector<std::string> &args) override {
auto arg_val = util::ToLower(args_[3]);
if (arg_val != "left" && arg_val != "right") {
return {Status::RedisParseErr, errInvalidSyntax};
}

src_left_ = arg_val == "left";
arg_val = util::ToLower(args_[4]);
if (arg_val != "left" && arg_val != "right") {
return {Status::RedisParseErr, errInvalidSyntax};
}
dst_left_ = arg_val == "left";

auto parse_result = ParseFloat(args[args.size() - 1]);
if (!parse_result) {
return {Status::RedisParseErr, errTimeoutIsNotFloat};
}
if (*parse_result < 0) {
return {Status::RedisParseErr, errTimeoutIsNegative};
}
timeout_ = static_cast<int64_t>(*parse_result * 1000 * 1000);

return Status::OK();
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
conn_ = conn;

redis::List list_db(svr->storage, conn->GetNamespace());
std::string elem;
auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
if (!elem.empty()) {
*output = redis::BulkString(elem);
return Status::OK();
}

if (conn->IsInExec()) {
*output = redis::MultiLen(-1);
return Status::OK(); // no blocking in multi-exec
}

svr_->BlockOnKey(args_[1], conn_);
auto bev = conn->GetBufferEvent();
SetCB(bev);

if (timeout_) {
timer_.reset(NewTimer(bufferevent_get_base(bev)));
int64_t timeout_second = timeout_ / 1000 / 1000;
int64_t timeout_microsecond = timeout_ % (1000 * 1000);
timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
evtimer_add(timer_.get(), &tm);
}

return {Status::BlockingCmd};
}

void OnWrite(bufferevent *bev) {
redis::List list_db(svr_->storage, conn_->GetNamespace());
std::string elem;
auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem);
if (!s.ok() && !s.IsNotFound()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
}

if (elem.empty()) {
// The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and
// connection B added a new element; then connection A was unblocked, but this element may be taken by
// another connection C. So we need to block connection A again and wait for the element being added
// by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}

conn_->Reply(redis::BulkString(elem));

if (timer_) {
timer_.reset();
}

unblockOnSrc();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
// in connection after the blocking command, so there may have some commands to be processed.
// Related issue: https://github.com/apache/kvrocks/issues/831
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}

void OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if (timer_ != nullptr) {
timer_.reset();
}
unblockOnSrc();
}
conn_->OnEvent(bev, events);
}

void TimerCB(int, int16_t) {
conn_->Reply(redis::MultiLen(-1));
timer_.reset();
unblockOnSrc();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}

private:
bool src_left_;
bool dst_left_;
int64_t timeout_ = 0; // microseconds
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;

void unblockOnSrc() { svr_->UnblockOnKey(args_[1], conn_); }
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBLPop>("blpop", -3, "write no-script", 1, -2, 1),
MakeCmdAttr<CommandBRPop>("brpop", -3, "write no-script", 1, -2, 1),
MakeCmdAttr<CommandLIndex>("lindex", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandLInsert>("linsert", 5, "write", 1, 1, 1),
MakeCmdAttr<CommandLLen>("llen", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandLMove>("lmove", 5, "write", 1, 2, 1),
MakeCmdAttr<CommandBLMove>("blmove", 6, "write", 1, 2, 1),
MakeCmdAttr<CommandLPop>("lpop", -2, "write", 1, 1, 1), //
MakeCmdAttr<CommandLPush>("lpush", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandLPushX>("lpushx", -3, "write", 1, 1, 1),
Expand Down
56 changes: 56 additions & 0 deletions tests/gocase/unit/type/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,4 +885,60 @@ func TestList(t *testing.T) {
})
}
}

t.Run("Test BLMOVE on different keys", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "list1{t}").Err())
require.NoError(t, rdb.Del(ctx, "list2{t}").Err())
require.NoError(t, rdb.RPush(ctx, "list1{t}", "1").Err())
require.NoError(t, rdb.RPush(ctx, "list1{t}", "2").Err())
require.NoError(t, rdb.RPush(ctx, "list1{t}", "3").Err())
require.NoError(t, rdb.RPush(ctx, "list1{t}", "4").Err())
require.NoError(t, rdb.RPush(ctx, "list1{t}", "5").Err())
require.NoError(t, rdb.BLMove(ctx, "list1{t}", "list2{t}", "RIGHT", "LEFT", time.Millisecond*1000).Err())
require.NoError(t, rdb.BLMove(ctx, "list1{t}", "list2{t}", "LEFT", "RIGHT", time.Millisecond*1000).Err())
require.EqualValues(t, 3, rdb.LLen(ctx, "list1{t}").Val())
require.EqualValues(t, 2, rdb.LLen(ctx, "list2{t}").Val())
require.Equal(t, []string{"2", "3", "4"}, rdb.LRange(ctx, "list1{t}", 0, -1).Val())
require.Equal(t, []string{"5", "1"}, rdb.LRange(ctx, "list2{t}", 0, -1).Val())
})

for _, from := range []string{"LEFT", "RIGHT"} {
for _, to := range []string{"LEFT", "RIGHT"} {
t.Run(fmt.Sprintf("BLMOVE %s %s on the list node", from, to), func(t *testing.T) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "target_key{t}").Err())
require.NoError(t, rdb.RPush(ctx, "target_key{t}", 1).Err())
createList("list{t}", []string{"a", "b", "c", "d"})
require.NoError(t, rd.WriteArgs("blmove", "list{t}", "target_key{t}", from, to, "1"))
r, err1 := rd.ReadLine()
require.Equal(t, "$1", r)
require.NoError(t, err1)
elem, err2 := rd.ReadLine()
require.NoError(t, err2)
if from == "RIGHT" {
require.Equal(t, elem, "d")
require.Equal(t, []string{"a", "b", "c"}, rdb.LRange(ctx, "list{t}", 0, -1).Val())
} else {
require.Equal(t, elem, "a")
require.Equal(t, []string{"b", "c", "d"}, rdb.LRange(ctx, "list{t}", 0, -1).Val())
}
if to == "RIGHT" {
require.Equal(t, elem, rdb.RPop(ctx, "target_key{t}").Val())
} else {
require.Equal(t, elem, rdb.LPop(ctx, "target_key{t}").Val())
}
})
}
}

t.Run("Test BLMOVE block behaviour", func(t *testing.T) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "blist", "target").Err())
require.NoError(t, rd.WriteArgs("blmove", "blist", "target", "left", "right", "0"))
require.EqualValues(t, 2, rdb.LPush(ctx, "blist", "foo", "bar").Val())
rd.MustRead(t, "$3")
require.Equal(t, "bar", rdb.LRange(ctx, "target", 0, -1).Val()[0])
})
}

0 comments on commit fe73c35

Please sign in to comment.