Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the support of RESP3 map type #2028

Merged
merged 5 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/commands/cmd_function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ struct CommandFunction : Commander {
with_code = true;
}

return lua::FunctionList(srv, libname, with_code, output);
return lua::FunctionList(srv, conn, libname, with_code, output);
} else if (parser.EatEqICase("listfunc")) {
std::string funcname;
if (parser.EatEqICase("funcname")) {
funcname = GET_OR_RET(parser.TakeStr());
}

return lua::FunctionListFunc(srv, funcname, output);
return lua::FunctionListFunc(srv, conn, funcname, output);
} else if (parser.EatEqICase("listlib")) {
auto libname = GET_OR_RET(parser.TakeStr().Prefixed("expect a library name"));

return lua::FunctionListLib(srv, libname, output);
return lua::FunctionListLib(srv, conn, libname, output);
} else if (parser.EatEqICase("delete")) {
auto libname = GET_OR_RET(parser.TakeStr());
if (!lua::FunctionIsLibExist(conn, libname)) {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class CommandHGetAll : public Commander {
kv_pairs.emplace_back(p.field);
kv_pairs.emplace_back(p.value);
}
*output = conn->MultiBulkString(kv_pairs, false);
*output = conn->MapOfBulkStrings(kv_pairs);

return Status::OK();
}
Expand Down
13 changes: 11 additions & 2 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class CommandConfig : public Commander {
} else if (args_.size() == 3 && sub_command == "get") {
std::vector<std::string> values;
config->Get(args_[2], &values);
*output = conn->MultiBulkString(values);
*output = conn->MapOfBulkStrings(values);
} else if (args_.size() == 4 && sub_command == "set") {
Status s = config->Set(srv, args_[2], args_[3]);
if (!s.IsOK()) {
Expand Down Expand Up @@ -617,6 +617,12 @@ class CommandDebug : public Commander {
for (int i = 0; i < 3; i++) {
*output += redis::Integer(i);
}
} else if (protocol_type_ == "map") {
*output = conn->SizeOfMap(3);
for (int i = 0; i < 3; i++) {
*output += redis::Integer(i);
*output += conn->Bool(i == 1);
}
} else if (protocol_type_ == "true") {
*output = conn->Bool(true);
} else if (protocol_type_ == "false") {
Expand Down Expand Up @@ -783,7 +789,10 @@ class CommandHello final : public Commander {
} else {
output_list.push_back(redis::BulkString("standalone"));
}
*output = redis::Array(output_list);
*output = conn->SizeOfMap(output_list.size() / 2);
for (const auto &item : output_list) {
*output += item;
}
return Status::OK();
}
};
Expand Down
12 changes: 6 additions & 6 deletions src/commands/cmd_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class CommandSMembers : public Commander {
return {Status::RedisExecErr, s.ToString()};
}

*output = conn->ArrayOfSet(members);
*output = conn->SetOfBulkStrings(members);
return Status::OK();
}
};
Expand Down Expand Up @@ -171,7 +171,7 @@ class CommandSPop : public Commander {
}

if (with_count_) {
*output = conn->ArrayOfSet(members);
*output = conn->SetOfBulkStrings(members);
} else {
if (members.size() > 0) {
*output = redis::BulkString(members.front());
Expand Down Expand Up @@ -211,7 +211,7 @@ class CommandSRandMember : public Commander {
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = conn->ArrayOfSet(members);
*output = conn->SetOfBulkStrings(members);
return Status::OK();
}

Expand Down Expand Up @@ -249,7 +249,7 @@ class CommandSDiff : public Commander {
return {Status::RedisExecErr, s.ToString()};
}

*output = conn->ArrayOfSet(members);
*output = conn->SetOfBulkStrings(members);
return Status::OK();
}
};
Expand All @@ -269,7 +269,7 @@ class CommandSUnion : public Commander {
return {Status::RedisExecErr, s.ToString()};
}

*output = conn->ArrayOfSet(members);
*output = conn->SetOfBulkStrings(members);
return Status::OK();
}
};
Expand All @@ -289,7 +289,7 @@ class CommandSInter : public Commander {
return {Status::RedisExecErr, s.ToString()};
}

*output = conn->ArrayOfSet(members);
*output = conn->SetOfBulkStrings(members);
return Status::OK();
}
};
Expand Down
8 changes: 4 additions & 4 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,9 @@ class CommandXInfo : public Commander {
}

if (!full_) {
output->append(redis::MultiLen(14));
output->append(conn->SizeOfMap(7));
} else {
output->append(redis::MultiLen(12));
output->append(conn->SizeOfMap(6));
}
output->append(redis::BulkString("length"));
output->append(redis::Integer(info.size));
Expand Down Expand Up @@ -503,7 +503,7 @@ class CommandXInfo : public Commander {

output->append(redis::MultiLen(result_vector.size()));
for (auto const &it : result_vector) {
output->append(redis::MultiLen(12));
output->append(conn->SizeOfMap(6));
output->append(redis::BulkString("name"));
output->append(redis::BulkString(it.first));
output->append(redis::BulkString("consumers"));
Expand Down Expand Up @@ -545,7 +545,7 @@ class CommandXInfo : public Commander {
output->append(redis::MultiLen(result_vector.size()));
auto now = util::GetTimeStampMS();
for (auto const &it : result_vector) {
output->append(redis::MultiLen(8));
output->append(conn->SizeOfMap(4));
output->append(redis::BulkString("name"));
output->append(redis::BulkString(it.first));
output->append(redis::BulkString("pending"));
Expand Down
11 changes: 10 additions & 1 deletion src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ std::string Connection::MultiBulkString(const std::vector<std::string> &values,
return result;
}

std::string Connection::ArrayOfSet(const std::vector<std::string> &elems) const {
std::string Connection::SetOfBulkStrings(const std::vector<std::string> &elems) const {
std::string result;
result += SizeOfSet(elems.size());
for (const auto &elem : elems) {
Expand All @@ -172,6 +172,15 @@ std::string Connection::ArrayOfSet(const std::vector<std::string> &elems) const
return result;
}

std::string Connection::MapOfBulkStrings(const std::vector<std::string> &elems) const {
std::string result;
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
result += SizeOfMap(elems.size() / 2);
for (const auto &elem : elems) {
result += BulkString(elem);
}
return result;
}

void Connection::SendFile(int fd) {
// NOTE: we don't need to close the fd, the libevent will do that
auto output = bufferevent_get_output(bev_);
Expand Down
7 changes: 6 additions & 1 deletion src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ class Connection : public EvbufCallbackBase<Connection> {
std::string SizeOfSet(T len) const {
return protocol_version_ == RESP::v3 ? "~" + std::to_string(len) + CRLF : MultiLen(len);
}
std::string ArrayOfSet(const std::vector<std::string> &elems) const;
std::string SetOfBulkStrings(const std::vector<std::string> &elems) const;
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
std::string SizeOfMap(T len) const {
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
return protocol_version_ == RESP::v3 ? "%" + std::to_string(len) + CRLF : MultiLen(len * 2);
}
std::string MapOfBulkStrings(const std::vector<std::string> &elems) const;

using UnsubscribeCallback = std::function<void(std::string, int)>;
void SubscribeChannel(const std::string &channel);
Expand Down
39 changes: 21 additions & 18 deletions src/storage/scripting.cc
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ Status FunctionCall(redis::Connection *conn, const std::string &name, const std:
}

// list all library names and their code (enabled via `with_code`)
Status FunctionList(Server *srv, const std::string &libname, bool with_code, std::string *output) {
Status FunctionList(Server *srv, const redis::Connection *conn, const std::string &libname, bool with_code,
std::string *output) {
std::string start_key = engine::kLuaLibCodePrefix + libname;
std::string end_key = start_key;
end_key.back()++;
Expand All @@ -445,12 +446,13 @@ Status FunctionList(Server *srv, const std::string &libname, bool with_code, std
result.emplace_back(lib.ToString(), iter->value().ToString());
}

output->append(redis::MultiLen(result.size() * (with_code ? 4 : 2)));
output->append(redis::MultiLen(result.size()));
for (const auto &[lib, code] : result) {
output->append(redis::SimpleString("library_name"));
output->append(redis::SimpleString(lib));
output->append(conn->SizeOfMap(with_code ? 2 : 1));
output->append(redis::BulkString("library_name"));
output->append(redis::BulkString(lib));
if (with_code) {
output->append(redis::SimpleString("library_code"));
output->append(redis::BulkString("library_code"));
output->append(redis::BulkString(code));
}
}
Expand All @@ -460,7 +462,7 @@ Status FunctionList(Server *srv, const std::string &libname, bool with_code, std

// extension to Redis Function
// list all function names and their corresponding library names
Status FunctionListFunc(Server *srv, const std::string &funcname, std::string *output) {
Status FunctionListFunc(Server *srv, const redis::Connection *conn, const std::string &funcname, std::string *output) {
std::string start_key = engine::kLuaFuncLibPrefix + funcname;
std::string end_key = start_key;
end_key.back()++;
Expand All @@ -480,12 +482,13 @@ Status FunctionListFunc(Server *srv, const std::string &funcname, std::string *o
result.emplace_back(func.ToString(), iter->value().ToString());
}

output->append(redis::MultiLen(result.size() * 4));
output->append(redis::MultiLen(result.size()));
for (const auto &[func, lib] : result) {
output->append(redis::SimpleString("function_name"));
output->append(redis::SimpleString(func));
output->append(redis::SimpleString("from_library"));
output->append(redis::SimpleString(lib));
output->append(conn->SizeOfMap(2));
output->append(redis::BulkString("function_name"));
output->append(redis::BulkString(func));
output->append(redis::BulkString("from_library"));
output->append(redis::BulkString(lib));
}

return Status::OK();
Expand All @@ -495,7 +498,7 @@ Status FunctionListFunc(Server *srv, const std::string &funcname, std::string *o
// list detailed informantion of a specific library
// NOTE: it is required to load the library to lua runtime before listing (calling this function)
// i.e. it will output nothing if the library is only in storage but not loaded
Status FunctionListLib(Server *srv, const std::string &libname, std::string *output) {
Status FunctionListLib(Server *srv, const redis::Connection *conn, const std::string &libname, std::string *output) {
auto lua = srv->Lua();

lua_getglobal(lua, REDIS_FUNCTION_LIBRARIES);
Expand All @@ -511,11 +514,11 @@ Status FunctionListLib(Server *srv, const std::string &libname, std::string *out
return {Status::NotOK, "The library is not found or not loaded from storage"};
}

output->append(redis::MultiLen(6));
output->append(redis::SimpleString("library_name"));
output->append(redis::SimpleString(libname));
output->append(redis::SimpleString("engine"));
output->append(redis::SimpleString("lua"));
output->append(conn->SizeOfMap(3));
output->append(redis::BulkString("library_name"));
output->append(redis::BulkString(libname));
output->append(redis::BulkString("engine"));
output->append(redis::BulkString("lua"));

auto count = lua_objlen(lua, -1);
output->append(redis::SimpleString("functions"));
Expand All @@ -524,7 +527,7 @@ Status FunctionListLib(Server *srv, const std::string &libname, std::string *out
for (size_t i = 1; i <= count; ++i) {
lua_rawgeti(lua, -1, static_cast<int>(i));
auto func = lua_tostring(lua, -1);
output->append(redis::SimpleString(func));
output->append(redis::BulkString(func));
lua_pop(lua, 1);
}

Expand Down
7 changes: 4 additions & 3 deletions src/storage/scripting.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ Status FunctionLoad(redis::Connection *conn, const std::string &script, bool nee
std::string *lib_name, bool read_only = false);
Status FunctionCall(redis::Connection *conn, const std::string &name, const std::vector<std::string> &keys,
const std::vector<std::string> &argv, std::string *output, bool read_only = false);
Status FunctionList(Server *srv, const std::string &libname, bool with_code, std::string *output);
Status FunctionListFunc(Server *srv, const std::string &funcname, std::string *output);
Status FunctionListLib(Server *srv, const std::string &libname, std::string *output);
Status FunctionList(Server *srv, const redis::Connection *conn, const std::string &libname, bool with_code,
std::string *output);
Status FunctionListFunc(Server *srv, const redis::Connection *conn, const std::string &funcname, std::string *output);
Status FunctionListLib(Server *srv, const redis::Connection *conn, const std::string &libname, std::string *output);
Status FunctionDelete(Server *srv, const std::string &name);
bool FunctionIsLibExist(redis::Connection *conn, const std::string &libname, bool need_check_storage = true,
bool read_only = false);
Expand Down
13 changes: 13 additions & 0 deletions tests/gocase/unit/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ func TestConfigSetCompression(t *testing.T) {
require.ErrorContains(t, rdb.ConfigSet(ctx, configKey, "unsupported").Err(), "invalid enum option")
}

func TestConfigGetRESP3(t *testing.T) {
srv := util.StartServer(t, map[string]string{
"resp3-enabled": "yes",
})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
val := rdb.ConfigGet(ctx, "resp3-enabled").Val()
require.EqualValues(t, "yes", val["resp3-enabled"])
}

func TestStartWithoutConfigurationFile(t *testing.T) {
srv := util.StartServerWithCLIOptions(t, false, map[string]string{}, []string{})
defer srv.Close()
Expand Down
2 changes: 2 additions & 0 deletions tests/gocase/unit/debug/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestDebugProtocolV2(t *testing.T) {
"integer": int64(12345),
"array": []interface{}{int64(0), int64(1), int64(2)},
"set": []interface{}{int64(0), int64(1), int64(2)},
"map": []interface{}{int64(0), int64(0), int64(1), int64(1), int64(2), int64(0)},
"true": int64(1),
"false": int64(0),
}
Expand Down Expand Up @@ -87,6 +88,7 @@ func TestDebugProtocolV3(t *testing.T) {
"integer": int64(12345),
"array": []interface{}{int64(0), int64(1), int64(2)},
"set": []interface{}{int64(0), int64(1), int64(2)},
"map": map[interface{}]interface{}{int64(0): false, int64(1): true, int64(2): false},
"true": true,
"false": false,
}
Expand Down
13 changes: 7 additions & 6 deletions tests/gocase/unit/hello/hello_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@ func TestEnableRESP3(t *testing.T) {
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

r := rdb.Do(ctx, "HELLO", "2")
rList := r.Val().([]interface{})
r, err := rdb.Do(ctx, "HELLO", "2").Result()
require.NoError(t, err)
rList := r.([]interface{})
require.EqualValues(t, rList[2], "proto")
require.EqualValues(t, rList[3], 2)

r = rdb.Do(ctx, "HELLO", "3")
rList = r.Val().([]interface{})
require.EqualValues(t, rList[2], "proto")
require.EqualValues(t, rList[3], 3)
r, err = rdb.Do(ctx, "HELLO", "3").Result()
require.NoError(t, err)
rMap := r.(map[interface{}]interface{})
require.EqualValues(t, rMap["proto"], 3)
}

func TestHelloWithAuth(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion tests/gocase/unit/protocol/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func TestProtocolRESP2(t *testing.T) {
"integer": {":12345"},
"array": {"*3", ":0", ":1", ":2"},
"set": {"*3", ":0", ":1", ":2"},
"map": {"*6", ":0", ":0", ":1", ":1", ":2", ":0"},
"true": {":1"},
"false": {":0"},
"null": {"$-1"},
Expand Down Expand Up @@ -198,7 +199,7 @@ func TestProtocolRESP3(t *testing.T) {

t.Run("debug protocol string", func(t *testing.T) {
require.NoError(t, c.WriteArgs("HELLO", "3"))
values := []string{"*6", "$6", "server", "$5", "redis", "$5", "proto", ":3", "$4", "mode", "$10", "standalone"}
values := []string{"%3", "$6", "server", "$5", "redis", "$5", "proto", ":3", "$4", "mode", "$10", "standalone"}
for _, line := range values {
c.MustRead(t, line)
}
Expand All @@ -208,6 +209,7 @@ func TestProtocolRESP3(t *testing.T) {
"integer": {":12345"},
"array": {"*3", ":0", ":1", ":2"},
"set": {"~3", ":0", ":1", ":2"},
"map": {"%3", ":0", "#f", ":1", "#t", ":2", "#f"},
"true": {"#t"},
"false": {"#f"},
"null": {"_"},
Expand Down
Loading