-
Notifications
You must be signed in to change notification settings - Fork 312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server: add check_and_mutate() interface and implementation #161
Conversation
src/idl/rrdb.thrift
Outdated
@@ -181,6 +187,38 @@ struct check_and_set_response | |||
8:string server; | |||
} | |||
|
|||
struct mutate | |||
{ | |||
1:i32 operation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为什么不直接用 mutate_operation 类型?
src/idl/rrdb.thrift
Outdated
@@ -40,6 +40,12 @@ enum cas_check_type | |||
CT_VALUE_INT_GREATER // int compare: value > operand | |||
} | |||
|
|||
enum mutate_operation | |||
{ | |||
PUT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
枚举值加上 MO_ 前缀比较好
6:bool return_check_value; | ||
} | ||
|
||
struct check_and_mutate_response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个可以直接使用 check_and_set_response,不用新增struct了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感觉这个留着好。以后更有可能的是用check_and_mutate把check_and_set给实现了,到时候还是得改。
甚至我们可以更激进一点,趁着check_and_set还没有大规模使用,这个版本就赶紧把check_and_set给换成check_and_mutate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在 XiaoMi/rdsn#149 这个修复提交后,就可以移除 check_and_set,并发布新版本1.11.0 。这样java client 1.10.0在访问server 1.11.0,check_and_set操作不存在,也能返回handle not found的错误码,用户很容易就知道是什么问题,然后升级到java client 1.11.0就可以了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
和伟杰讨论下,还是按照现在的方案,同时保留check_and_set和check_and_mutate两个接口,以保持各个版本间的兼容性。不过在pegasus_write_service的实现里面,这两个函数的实现上绝大部分逻辑都是一样的,尽量将重复的代码合并。
src/idl/rrdb.thrift
Outdated
{ | ||
1:i32 operation; | ||
2:dsn.blob sort_key; | ||
3:dsn.blob value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value和set_expire_ts_seconds可以都改成optional的,这样传delete操作能稍微省点儿空间。加个注释说一下这两个字段对于delete没用
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我倒觉得没有必要改成optional,这点空间损耗基本可以忽略。我看其他所有struct也都没有加optional。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没加optional,是考虑到,如果加optional,构造函数就没有“被设为optional的参数”,会使得PUT操作初始化有些奇怪。
6:bool return_check_value; | ||
} | ||
|
||
struct check_and_mutate_response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
感觉这个留着好。以后更有可能的是用check_and_mutate把check_and_set给实现了,到时候还是得改。
甚至我们可以更激进一点,趁着check_and_set还没有大规模使用,这个版本就赶紧把check_and_set给换成check_and_mutate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
去掉check_and_set操作吧
6:bool return_check_value; | ||
} | ||
|
||
struct check_and_mutate_response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在 XiaoMi/rdsn#149 这个修复提交后,就可以移除 check_and_set,并发布新版本1.11.0 。这样java client 1.10.0在访问server 1.11.0,check_and_set操作不存在,也能返回handle not found的错误码,用户很容易就知道是什么问题,然后升级到java client 1.11.0就可以了。
src/server/info_collector.cpp
Outdated
} | ||
read_qps[read_qps.size() - 1] = all.get_qps + all.multi_get_qps + all.scan_qps; | ||
write_qps[read_qps.size() - 1] = all.put_qps + all.multi_put_qps + all.remove_qps + | ||
all.multi_remove_qps + all.incr_qps + | ||
all.check_and_set_qps; | ||
all.check_and_set_qps+all.check_and_mutate_qps; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+号两边没有空格,这是没用clang-format?
@@ -544,7 +657,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base | |||
|
|||
// for setting update_response.error after committed. | |||
std::vector<dsn::apps::update_response *> _update_responses; | |||
}; | |||
}; // namespace server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个注释有问题吧?
补充一些ut吧。 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
还有这几个问题:
- thrift自动生成的代码,格式还是与原来保持一致吧,不然会发现diff太多了;
- check_and_set和check_and_mutate实现合并公共代码后,逻辑变成很奇怪。还是不要合并了,分开写吧,代码重复就重复点。
src/include/pegasus/client.h
Outdated
mu.sort_key.assign(sort_key.data(), 0, sort_key.length()); | ||
// mu.value = NULL; //TODO HW unchecked | ||
mu.set_expire_ts_seconds = 0; | ||
mu_list.emplace_back(mu); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mu_list.emplace_back(std::move(mu));
src/include/pegasus/client.h
Outdated
mu.sort_key.assign(sort_key.data(), 0, sort_key.length()); | ||
mu.value.assign(value.data(), 0, value.length()); | ||
mu.set_expire_ts_seconds = 0; | ||
mu_list.emplace_back(mu); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mu_list.emplace_back(std::move(mu));
src/include/pegasus/client.h
Outdated
mu.set_expire_ts_seconds = 0; | ||
mu_list.emplace_back(mu); | ||
} | ||
const std::vector<::dsn::apps::mutate> get_mutations() const |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是不是避免拷贝比较好:
void swap_out_mutations(std::vector<::dsn::apps::mutate> &mutations)
{
int current_time = ::pegasus::utils::epoch_now();
for (auto &pair : ttl_list) {
mu_list[pair.first].set_expire_ts_seconds += (pair.second + current_time);
}
mutations.swap(mu_list);
}
src/include/pegasus/client.h
Outdated
int timeout_milliseconds = 5000, | ||
internal_info *info = nullptr) = 0; | ||
// TODO HW 和java-client | ||
/// Mutations保持一致,shell如何使用mutate是第二步。不知道mutations该不该使用apps::的类,先这么写,如果需要分离再搞一个类就好了 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不要用中文注释。另外这个注释我也没太看明白。
@@ -1015,6 +1015,10 @@ void pegasus_client_impl::async_check_and_mutate(const std::string &hash_key, | |||
req.check_type = (dsn::apps::cas_check_type::type)check_type; | |||
req.check_operand.assign(check_operand.c_str(), 0, check_operand.size()); | |||
req.mutate_list = mutations.get_mutations(); | |||
|
|||
for(auto &mu: req.mutate_list){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里为什么要打debug日志?
@@ -527,6 +537,93 @@ class pegasus_write_service::impl : public dsn::replication::replica_base | |||
return false; | |||
} | |||
|
|||
// for check_and_mutate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
此处增加 private:
将以下函数设为私有。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在private作用域里,346行以下都是
@@ -527,6 +537,93 @@ class pegasus_write_service::impl : public dsn::replication::replica_base | |||
return false; | |||
} | |||
|
|||
// for check_and_mutate | |||
|
|||
int check_phase(const std::string &func_name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const char* func_name,
bool &is_arg_invalid) | ||
{ | ||
if (!is_check_type_supported(check_type)) { | ||
derror_replica("invalid argument for " + func_name + ": decree = {}, error = {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
derror_replica("invalid argument for {}: decree = {}, error = check type {} not supported", func_name, secree, check_type);
auto mutate_list = mutations.get_mutations(); | ||
for (auto &mu : mutate_list) { | ||
req.mutate_list.push_back(mu); | ||
ddebug("mu %d %s %s %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
对于线上不希望打印的日志,用dinfo。因为info是最低的级别,debug线上会打印。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
travis测试不过才加的临时打印,是打算去掉的
@@ -3417,6 +3589,7 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args) | |||
sum.multi_remove_qps += row.multi_remove_qps; | |||
sum.incr_qps += row.incr_qps; | |||
sum.check_and_set_qps += row.check_and_set_qps; | |||
sum.check_and_mutate_qps += row.check_and_mutate_qps; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
上面3575行CAS后面增加: << std::setw(w) << std::right << "CAM"
check_and_set和check_and_mutate实现合并公共代码确实勉强,中间函数参数太多。原来的逻辑,有两处我想修改下。
二是
|
src/shell/main.cpp
Outdated
"[bytes_less|bytes_less_or_equal|bytes_equal|bytes_greater_or_equal|bytes_greater] " | ||
"[int_less|int_less_or_equal|int_equal|int_greater_or_equal|int_greater] " | ||
"[-o|--check_operand str] " | ||
"[-m|--mutations] " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里难道不是必须要设置mutations吗?那么-m
选项存在的意义是什么?
src/shell/commands.h
Outdated
@@ -1454,6 +1454,175 @@ inline bool check_and_set(command_executor *e, shell_context *sc, arguments args | |||
return true; | |||
} | |||
|
|||
inline void load_mutations(pegasus::pegasus_client::mutations &mutations) | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
load mutations时,没有考虑这些情况:
- 如何输入不可见字符:其他地方是调用unescape_str()函数
- 如何处理双引号: set "a b" "c d"
src/shell/commands.h
Outdated
if (op == "set") { | ||
fprintf(stderr, | ||
"LOAD: set sortkey %s, value %s, ttl %d\n", | ||
sort_key.c_str(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pegasus::utils::c_escape_string(sort_key, sc->escape_all).c_str()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
顺便再更新下rdsn
src/shell/commands.h
Outdated
fprintf(stderr, | ||
"LOAD: set sortkey %s, value %s, ttl %d\n", | ||
sort_key.c_str(), | ||
value.c_str(), | ||
pegasus::utils::c_escape_string(unescape_str(args[1]), sc->escape_all).c_str(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"LOAD: set sortkey "%s", value "%s", ttl %d\n"
src/shell/commands.h
Outdated
case 1: | ||
mutations.del(unescape_str(args[1])); | ||
fprintf(stderr, | ||
"LOAD: del sortkey %s\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"LOAD: del sortkey "%s"\n"
@@ -399,12 +413,6 @@ class pegasus_write_service::impl : public dsn::replication::replica_base | |||
decree, key, m.value, static_cast<uint32_t>(m.set_expire_ts_seconds)); | |||
} else if (m.operation == ::dsn::apps::mutate_operation::MO_DELETE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里只有两种情况,所以:
if (m.operation == ::dsn::apps::mutate_operation::MO_PUT) {
} else {
dassert(m.operation == ::dsn::apps::mutate_operation::MO_DELETE, "m.operation = %d", m.operation);
....
}
"mutations have invalid operations"); | ||
// we should write empty record to update rocksdb's last flushed decree | ||
return empty_put(decree); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这么写更好点?在日志中打印更详细的信息
for (int i = 0; i < update.mutate_list.size(); ++i) {
auto& mu = update.mutate_list[i];
if (mu.operation != ::dsn::apps::mutate_operation::MO_PUT &&
mu.operation != ::dsn::apps::mutate_operation::MO_DELETE) {
derror_replica("invalid argument for check_and_mutate: decree = {}, error = mutation[{}] uses invalid operation {}",
decree, i, mu.operation);
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
}
src/include/pegasus/client.h
Outdated
struct mutations | ||
{ | ||
private: | ||
std::vector<::dsn::apps::mutate> mu_list; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private 为什么不用前缀下划线?_mu_list
我用shell测试check_and_mutate,感觉结果乱掉了:
|
fprintf(stderr, | ||
"check_operand: \"%s\"\n", | ||
pegasus::utils::c_escape_string(check_operand).c_str()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在这里把mutations打印出来,譬如:
mutation[0].type: SET
mutation[0].sort_key: "x"
mutation[0].value: "y"
mutation[0].ttl_seconds: 0
mutation[1].type: DEL
mutation[1].sort_key: "z"
if (mutations.is_empty()) { | ||
derror("invalid mutations: mutations should not be empty."); | ||
if (callback != nullptr) | ||
callback(PERR_INVALID_HASH_KEY, check_and_mutate_results(), internal_info()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不是PERR_INVALID_HASH_KEY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应当是PERR_INVALID_ARGUMENT
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size()); | ||
req.check_type = (dsn::apps::cas_check_type::type)check_type; | ||
req.check_operand.assign(check_operand.c_str(), 0, check_operand.size()); | ||
mutations.get_mutations(req.mutate_list); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_mutations名字是不是不大合适?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我觉得还可以,java中也是这么写的。毕竟也是const函数,不修改原值。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_mutations这里考虑到的两点是
1.get时才计算set_expire_ts_seconds
2.用户可能会重复使用同一个mutations对象(例如重试check_and_mutate等场景)
所以选择不修改原值,每次get都会拷贝。
如果不必要这么做,加一个move方法,直接移动?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我觉得现在这样就可以
auto &mu = update.mutate_list[i]; | ||
if (mu.operation != ::dsn::apps::mutate_operation::MO_PUT && | ||
mu.operation != ::dsn::apps::mutate_operation::MO_DELETE) { | ||
derror_replica("invalid argument for check_and_mutate: decree = {}, error = " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的format确认是可以这么写的吧?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应当没问题
"check_and_mutate", | ||
"atomically check and mutate", | ||
"<hash_key> " | ||
"[-c|--check_sort_key str] " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
必填参数用<xxx>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里主要是比较特殊,因为check_type的候选项太多了,一行放不下。所以我上面在check_and_set就按照这种写法来写了。所以这里这么写我觉得也可以的。
resp.error = check_status.code(); | ||
return resp.error; | ||
} | ||
dassert(check_status.ok() || check_status.IsNotFound(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
新加的代码统一都用dassert_xxx的函数吧
src/shell/commands.h
Outdated
stderr, | ||
" mutation[%d].value: \"%s\"\n", | ||
i, | ||
pegasus::utils::c_escape_string(copy_of_mutations[i].value.to_string()).c_str()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没有打印ttl_seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ttl这个时候算的值和传入pegasus_client之后再get的值不一样,感觉会造成误解,所以我没有打印
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没关系,因为每次get出来的值都不一样。我们这里打印出来,只是告诉用户传的参数是什么。
src/shell/commands.h
Outdated
case 0: | ||
ttl = 0; | ||
if (arg_count == 4) { | ||
ttl = std::stoi(args[3]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里不能用std::stoi,因为如果输入非法参数,会抛异常,然后shell就core掉。你可以输入set a b c
试试。
建议改为:
if (arg_count == 4) {
- ttl = std::stoi(args[3]);
+ if (!dsn::buf2int32(args[3], ttl)) {
+ fprintf(stderr,
+ "ERROR: parse \"%s\" as ttl failed, "
+ "print \"ok\" to finish loading\n",
+ args[3]);
+ break;
+ }
+ if (ttl <= 0) {
+ fprintf(stderr,
+ "ERROR: invalid ttl %s, "
+ "print \"ok\" to finish loading\n",
+ args[3]);
+ break;
+ }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ttl设为0,不应该算作invalid。我打算改为
ttl = 0;
if (arg_count == 4) {
char *end_ptr = nullptr;
long l_ttl = strtol(args[3], &end_ptr, 10);
if (end_ptr == args[3]) {
fprintf(stderr,
"ERROR: parse \"%s\" as ttl failed, "
"print \"ok\" to finish loading\n",
args[3]);
break;
}
if (l_ttl > INT_MAX || l_ttl < 0) {
fprintf(stderr,
"ERROR: invalid ttl %s, "
"print \"ok\" to finish loading\n",
args[3]);
break;
}
ttl = l_ttl;
}
这样设计有一个情况是
"111c"这样的,会截掉数字后不合法的字符,ttl会等于111, 是否可行?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不可行。"111c"就不应当处理为111,而是视为非法。
这里就不要自己写了,直接用封装好的函数buf2int32吧,自己写容易考虑不全。你看上面的set_value()函数,里面也有处理ttl的地方。
为了保持和set_value()的一致,用户也不允许传0;如果不设置ttl,那就不传。
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d", | ||
(int)hash_key.size()); | ||
if (callback != nullptr) | ||
callback(PERR_INVALID_HASH_KEY, check_and_mutate_results(), internal_info()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
那这里也要改了,建议给这个改动加个单测
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里为什么要改?
src/shell/commands.h
Outdated
if (end_ptr == args[3]) { | ||
fprintf(stderr, | ||
"ERROR: parse \"%s\" as ttl failed, " | ||
"print \"ok\" to finish loading\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
print "ok" to finish loading, print "abort" to abort this command\n
src/shell/commands.h
Outdated
if (l_ttl > INT_MAX || l_ttl < 0) { | ||
fprintf(stderr, | ||
"ERROR: invalid ttl %s, " | ||
"print \"ok\" to finish loading\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
print "ok" to finish loading, print "abort" to abort this command\n
src/shell/commands.h
Outdated
case 0: | ||
ttl = 0; | ||
if (arg_count == 4) { | ||
ttl = std::stoi(args[3]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不可行。"111c"就不应当处理为111,而是视为非法。
这里就不要自己写了,直接用封装好的函数buf2int32吧,自己写容易考虑不全。你看上面的set_value()函数,里面也有处理ttl的地方。
为了保持和set_value()的一致,用户也不允许传0;如果不设置ttl,那就不传。
server: add check_and_mutate() interface and implementation (apache#161)
Former-commit-id: bd5573b269ebe3f88e1409b5871b83dcc1e1e6de [formerly 6476639] Former-commit-id: 05a2d0d6d9576352686585153a35a8ca550dea17
refer to #159