Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: add check_and_mutate() interface and implementation #161

Merged
merged 30 commits into from
Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rdsn
Submodule rdsn updated 44 files
+1 −4 CMakeLists.txt
+7 −0 bin/dsn.cmake
+4 −0 include/dsn/dist/replication/meta_service_app.h
+4 −0 include/dsn/dist/replication/replication_service_app.h
+88 −0 include/dsn/tool-api/http_server.h
+3 −0 include/dsn/tool-api/rpc_address.h
+15 −1 include/dsn/tool-api/rpc_message.h
+3 −0 include/dsn/utility/strings.h
+6 −1 run.sh
+8 −0 scripts/linux/build.sh
+1 −1 src/apps/skv/CMakeLists.txt
+1 −0 src/core/CMakeLists.txt
+16 −0 src/core/core/rpc_address.cpp
+0 −16 src/core/core/rpc_message.cpp
+28 −0 src/core/core/strings.cpp
+1 −1 src/core/tests/CMakeLists.txt
+4 −4 src/core/tests/fail_point_test.cpp
+43 −0 src/core/tests/http_server_test.cpp
+1 −0 src/core/tools/CMakeLists.txt
+0 −444 src/core/tools/common/http_message_parser.cpp
+0 −3 src/core/tools/common/providers.common.cpp
+9 −0 src/core/tools/http/CMakeLists.txt
+181 −0 src/core/tools/http/http_message_parser.cpp
+22 −34 src/core/tools/http/http_message_parser.h
+0 −0 src/core/tools/http/http_parser.c
+0 −0 src/core/tools/http/http_parser.h
+131 −0 src/core/tools/http/http_server.cpp
+31 −0 src/core/tools/http/root_http_service.h
+256 −149 src/dist/block_service/local/local_service.cpp
+7 −0 src/dist/block_service/local/local_service.h
+1 −1 src/dist/nfs/test/CMakeLists.txt
+1 −1 src/dist/replication/common/replication_common.cpp
+1 −0 src/dist/replication/lib/replica.cpp
+4 −0 src/dist/replication/lib/replica_context.cpp
+7 −2 src/dist/replication/lib/replica_context.h
+10 −7 src/dist/replication/lib/replica_learn.cpp
+1 −0 src/dist/replication/lib/replica_stub.h
+4 −1 src/dist/replication/lib/replication_service_app.cpp
+3 −1 src/dist/replication/meta_server/meta_service_app.cpp
+1 −1 src/dist/replication/test/meta_test/balancer_simulator/CMakeLists.txt
+1 −1 src/dist/replication/test/meta_test/unit_test/CMakeLists.txt
+1 −1 src/dist/replication/test/replica_test/unit_test/CMakeLists.txt
+1 −5 src/dist/replication/test/simple_kv/CMakeLists.txt
+1 −1 src/tests/dsn/CMakeLists.txt
3 changes: 3 additions & 0 deletions src/base/pegasus_rpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ using incr_rpc = dsn::rpc_holder<dsn::apps::incr_request, dsn::apps::incr_respon
using check_and_set_rpc =
dsn::rpc_holder<dsn::apps::check_and_set_request, dsn::apps::check_and_set_response>;

using check_and_mutate_rpc =
dsn::rpc_holder<dsn::apps::check_and_mutate_request, dsn::apps::check_and_mutate_response>;

} // namespace pegasus
1,225 changes: 942 additions & 283 deletions src/base/rrdb_types.cpp

Large diffs are not rendered by default.

129 changes: 127 additions & 2 deletions src/client_lib/pegasus_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,14 @@ void pegasus_client_impl::async_check_and_set(const std::string &hash_key,
return;
}

if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) ==
dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) {
derror("invalid check type: %d", (int)check_type);
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, check_and_set_results(), internal_info());
return;
}

::dsn::apps::check_and_set_request req;
req.hash_key.assign(hash_key.c_str(), 0, hash_key.size());
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size());
Expand Down Expand Up @@ -938,6 +946,123 @@ void pegasus_client_impl::async_check_and_set(const std::string &hash_key,
partition_hash);
}

int pegasus_client_impl::check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
check_and_mutate_results &results,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int _err, check_and_mutate_results &&_results, internal_info &&_info) {
ret = _err;
results = std::move(_results);
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_check_and_mutate(hash_key,
check_sort_key,
check_type,
check_operand,
mutations,
options,
std::move(callback),
timeout_milliseconds);
op_completed.wait();
return ret;
}

void pegasus_client_impl::async_check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
async_check_and_mutate_callback_t &&callback,
int timeout_milliseconds)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, check_and_mutate_results(), internal_info());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那这里也要改了,建议给这个改动加个单测

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么要改?

return;
}

if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) ==
dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) {
derror("invalid check type: %d", (int)check_type);
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, check_and_mutate_results(), internal_info());
return;
}
if (mutations.is_empty()) {
derror("invalid mutations: mutations should not be empty.");
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, check_and_mutate_results(), internal_info());
return;
}

::dsn::apps::check_and_mutate_request req;
req.hash_key.assign(hash_key.c_str(), 0, hash_key.size());
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size());
req.check_type = (dsn::apps::cas_check_type::type)check_type;
req.check_operand.assign(check_operand.c_str(), 0, check_operand.size());
mutations.get_mutations(req.mutate_list);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_mutations名字是不是不大合适?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得还可以,java中也是这么写的。毕竟也是const函数,不修改原值。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_mutations这里考虑到的两点是
1.get时才计算set_expire_ts_seconds
2.用户可能会重复使用同一个mutations对象(例如重试check_and_mutate等场景)
所以选择不修改原值,每次get都会拷贝。
如果不必要这么做,加一个move方法,直接移动?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得现在这样就可以

req.return_check_value = options.return_check_value;

::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
{
if (user_callback == nullptr) {
return;
}
check_and_mutate_results results;
internal_info info;
::dsn::apps::check_and_mutate_response response;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
if (response.error == 0) {
results.mutate_succeed = true;
} else if (response.error == 13) { // kTryAgain
results.mutate_succeed = false;
response.error = 0;
} else {
results.mutate_succeed = false;
}
if (response.check_value_returned) {
results.check_value_returned = true;
if (response.check_value_exist) {
results.check_value_exist = true;
results.check_value.assign(response.check_value.data(),
response.check_value.length());
}
}
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(results), std::move(info));
};
_client->check_and_mutate(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
0,
partition_hash);
}

int pegasus_client_impl::ttl(const std::string &hash_key,
const std::string &sort_key,
int &ttl_seconds,
Expand Down Expand Up @@ -1171,5 +1296,5 @@ const char *pegasus_client_impl::get_error_string(int error_code) const
{
return (rocskdb_error == 0) ? 0 : ROCSKDB_ERROR_START - rocskdb_error;
}
}
} // namespace
} // namespace client
} // namespace pegasus
23 changes: 21 additions & 2 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,25 @@ class pegasus_client_impl : public pegasus_client
async_check_and_set_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) override;

virtual int check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
check_and_mutate_results &results,
int timeout_milliseconds = 5000,
internal_info *info = nullptr) override;

virtual void async_check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
async_check_and_mutate_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) override;

virtual int ttl(const std::string &hashkey,
const std::string &sortkey,
int &ttl_seconds,
Expand Down Expand Up @@ -300,5 +319,5 @@ class pegasus_client_impl : public pegasus_client
///
static std::unordered_map<int, int> _server_error_to_client;
};
}
} // namespace
} // namespace client
} // namespace pegasus
39 changes: 39 additions & 0 deletions src/idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ enum cas_check_type
CT_VALUE_INT_GREATER // int compare: value > operand
}

enum mutate_operation
{
MO_PUT,
MO_DELETE
}

struct update_request
{
1:dsn.blob key;
Expand Down Expand Up @@ -181,6 +187,38 @@ struct check_and_set_response
8:string server;
}

struct mutate
{
1:mutate_operation operation;
2:dsn.blob sort_key;
3:dsn.blob value; // set null if operation is MO_DELETE
4:i32 set_expire_ts_seconds; // set 0 if operation is MO_DELETE
}

struct check_and_mutate_request
{
1:dsn.blob hash_key;
2:dsn.blob check_sort_key;
3:cas_check_type check_type;
4:dsn.blob check_operand;
5:list<mutate> mutate_list;
6:bool return_check_value;
}

struct check_and_mutate_response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个可以直接使用 check_and_set_response,不用新增struct了

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉这个留着好。以后更有可能的是用check_and_mutate把check_and_set给实现了,到时候还是得改。
甚至我们可以更激进一点,趁着check_and_set还没有大规模使用,这个版本就赶紧把check_and_set给换成check_and_mutate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XiaoMi/rdsn#149 这个修复提交后,就可以移除 check_and_set,并发布新版本1.11.0 。这样java client 1.10.0在访问server 1.11.0,check_and_set操作不存在,也能返回handle not found的错误码,用户很容易就知道是什么问题,然后升级到java client 1.11.0就可以了。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

和伟杰讨论下,还是按照现在的方案,同时保留check_and_set和check_and_mutate两个接口,以保持各个版本间的兼容性。不过在pegasus_write_service的实现里面,这两个函数的实现上绝大部分逻辑都是一样的,尽量将重复的代码合并。

{
1:i32 error; // return kTryAgain if check not passed.
// return kInvalidArgument if check type is int compare and
// check_operand/check_value is not integer or out of range.
2:bool check_value_returned;
3:bool check_value_exist; // used only if check_value_returned is true
4:dsn.blob check_value; // used only if check_value_returned and check_value_exist is true
5:i32 app_id;
6:i32 partition_index;
7:i64 decree;
8:string server;
}

struct get_scanner_request
{
1:dsn.blob start_key;
Expand Down Expand Up @@ -218,6 +256,7 @@ service rrdb
multi_remove_response multi_remove(1:multi_remove_request request);
incr_response incr(1:incr_request request);
check_and_set_response check_and_set(1:check_and_set_request request);
check_and_mutate_response check_and_mutate(1:check_and_mutate_request request);
read_response get(1:dsn.blob key);
multi_get_response multi_get(1:multi_get_request request);
count_response sortkey_count(1:dsn.blob hash_key);
Expand Down
Loading