diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index eae93ee0a49..b428dda1956 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -555,12 +555,140 @@ class CommandLMove : public Commander { bool dst_left_; }; +class CommandBLMove : public Commander, + private EvbufCallbackBase, + private EventCallbackBase { + public: + Status Parse(const std::vector &args) override { + auto arg_val = util::ToLower(args_[3]); + if (arg_val != "left" && arg_val != "right") { + return {Status::RedisParseErr, errInvalidSyntax}; + } + + src_left_ = arg_val == "left"; + arg_val = util::ToLower(args_[4]); + if (arg_val != "left" && arg_val != "right") { + return {Status::RedisParseErr, errInvalidSyntax}; + } + dst_left_ = arg_val == "left"; + + auto parse_result = ParseFloat(args[args.size() - 1]); + if (!parse_result) { + return {Status::RedisParseErr, errTimeoutIsNotFloat}; + } + if (*parse_result < 0) { + return {Status::RedisParseErr, errTimeoutIsNegative}; + } + timeout_ = static_cast(*parse_result * 1000 * 1000); + + return Status::OK(); + } + + Status Execute(Server *svr, Connection *conn, std::string *output) override { + svr_ = svr; + conn_ = conn; + + redis::List list_db(svr->storage, conn->GetNamespace()); + std::string elem; + auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem); + if (!s.ok() && !s.IsNotFound()) { + return {Status::RedisExecErr, s.ToString()}; + } + if (!elem.empty()) { + *output = redis::BulkString(elem); + return Status::OK(); + } + + if (conn->IsInExec()) { + *output = redis::MultiLen(-1); + return Status::OK(); // no blocking in multi-exec + } + + svr_->BlockOnKey(args_[1], conn_); + auto bev = conn->GetBufferEvent(); + SetCB(bev); + + if (timeout_) { + timer_.reset(NewTimer(bufferevent_get_base(bev))); + int64_t timeout_second = timeout_ / 1000 / 1000; + int64_t timeout_microsecond = timeout_ % (1000 * 1000); + timeval tm = {timeout_second, static_cast(timeout_microsecond)}; + evtimer_add(timer_.get(), &tm); + } + + return {Status::BlockingCmd}; + } + + void OnWrite(bufferevent *bev) { + redis::List list_db(svr_->storage, conn_->GetNamespace()); + std::string elem; + auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem); + if (!s.ok() && !s.IsNotFound()) { + conn_->Reply(redis::Error("ERR " + s.ToString())); + return; + } + + if (elem.empty()) { + // The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and + // connection B added a new element; then connection A was unblocked, but this element may be taken by + // another connection C. So we need to block connection A again and wait for the element being added + // by disabling the WRITE event. + bufferevent_disable(bev, EV_WRITE); + return; + } + + conn_->Reply(redis::BulkString(elem)); + + if (timer_) { + timer_.reset(); + } + + unblockOnSrc(); + conn_->SetCB(bev); + bufferevent_enable(bev, EV_READ); + // We need to manually trigger the read event since we will stop processing commands + // in connection after the blocking command, so there may have some commands to be processed. + // Related issue: https://github.com/apache/kvrocks/issues/831 + bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); + } + + void OnEvent(bufferevent *bev, int16_t events) { + if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { + if (timer_ != nullptr) { + timer_.reset(); + } + unblockOnSrc(); + } + conn_->OnEvent(bev, events); + } + + void TimerCB(int, int16_t) { + conn_->Reply(redis::MultiLen(-1)); + timer_.reset(); + unblockOnSrc(); + auto bev = conn_->GetBufferEvent(); + conn_->SetCB(bev); + bufferevent_enable(bev, EV_READ); + } + + private: + bool src_left_; + bool dst_left_; + int64_t timeout_ = 0; // microseconds + Server *svr_ = nullptr; + Connection *conn_ = nullptr; + UniqueEvent timer_; + + void unblockOnSrc() { svr_->UnblockOnKey(args_[1], conn_); } +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("blpop", -3, "write no-script", 1, -2, 1), MakeCmdAttr("brpop", -3, "write no-script", 1, -2, 1), MakeCmdAttr("lindex", 3, "read-only", 1, 1, 1), MakeCmdAttr("linsert", 5, "write", 1, 1, 1), MakeCmdAttr("llen", 2, "read-only", 1, 1, 1), MakeCmdAttr("lmove", 5, "write", 1, 2, 1), + MakeCmdAttr("blmove", 6, "write", 1, 2, 1), MakeCmdAttr("lpop", -2, "write", 1, 1, 1), // MakeCmdAttr("lpush", -3, "write", 1, 1, 1), MakeCmdAttr("lpushx", -3, "write", 1, 1, 1), diff --git a/tests/gocase/unit/type/list/list_test.go b/tests/gocase/unit/type/list/list_test.go index 94f069607c0..05199997ba1 100644 --- a/tests/gocase/unit/type/list/list_test.go +++ b/tests/gocase/unit/type/list/list_test.go @@ -885,4 +885,60 @@ func TestList(t *testing.T) { }) } } + + t.Run("Test BLMOVE on different keys", func(t *testing.T) { + require.NoError(t, rdb.Del(ctx, "list1{t}").Err()) + require.NoError(t, rdb.Del(ctx, "list2{t}").Err()) + require.NoError(t, rdb.RPush(ctx, "list1{t}", "1").Err()) + require.NoError(t, rdb.RPush(ctx, "list1{t}", "2").Err()) + require.NoError(t, rdb.RPush(ctx, "list1{t}", "3").Err()) + require.NoError(t, rdb.RPush(ctx, "list1{t}", "4").Err()) + require.NoError(t, rdb.RPush(ctx, "list1{t}", "5").Err()) + require.NoError(t, rdb.BLMove(ctx, "list1{t}", "list2{t}", "RIGHT", "LEFT", time.Millisecond*1000).Err()) + require.NoError(t, rdb.BLMove(ctx, "list1{t}", "list2{t}", "LEFT", "RIGHT", time.Millisecond*1000).Err()) + require.EqualValues(t, 3, rdb.LLen(ctx, "list1{t}").Val()) + require.EqualValues(t, 2, rdb.LLen(ctx, "list2{t}").Val()) + require.Equal(t, []string{"2", "3", "4"}, rdb.LRange(ctx, "list1{t}", 0, -1).Val()) + require.Equal(t, []string{"5", "1"}, rdb.LRange(ctx, "list2{t}", 0, -1).Val()) + }) + + for _, from := range []string{"LEFT", "RIGHT"} { + for _, to := range []string{"LEFT", "RIGHT"} { + t.Run(fmt.Sprintf("BLMOVE %s %s on the list node", from, to), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, "target_key{t}").Err()) + require.NoError(t, rdb.RPush(ctx, "target_key{t}", 1).Err()) + createList("list{t}", []string{"a", "b", "c", "d"}) + require.NoError(t, rd.WriteArgs("blmove", "list{t}", "target_key{t}", from, to, "1")) + r, err1 := rd.ReadLine() + require.Equal(t, "$1", r) + require.NoError(t, err1) + elem, err2 := rd.ReadLine() + require.NoError(t, err2) + if from == "RIGHT" { + require.Equal(t, elem, "d") + require.Equal(t, []string{"a", "b", "c"}, rdb.LRange(ctx, "list{t}", 0, -1).Val()) + } else { + require.Equal(t, elem, "a") + require.Equal(t, []string{"b", "c", "d"}, rdb.LRange(ctx, "list{t}", 0, -1).Val()) + } + if to == "RIGHT" { + require.Equal(t, elem, rdb.RPop(ctx, "target_key{t}").Val()) + } else { + require.Equal(t, elem, rdb.LPop(ctx, "target_key{t}").Val()) + } + }) + } + } + + t.Run("Test BLMOVE block behaviour", func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, "blist", "target").Err()) + require.NoError(t, rd.WriteArgs("blmove", "blist", "target", "left", "right", "0")) + require.EqualValues(t, 2, rdb.LPush(ctx, "blist", "foo", "bar").Val()) + rd.MustRead(t, "$3") + require.Equal(t, "bar", rdb.LRange(ctx, "target", 0, -1).Val()[0]) + }) }