diff --git a/rdsn b/rdsn index 044ce72409..44d3295974 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 044ce7240901832b58b96ed624b7d5ca4a0af451 +Subproject commit 44d32959746301d1289bf2110f1c68cc81c266a1 diff --git a/src/redis_protocol/proxy_lib/proxy_layer.cpp b/src/redis_protocol/proxy_lib/proxy_layer.cpp index c585b44a33..9edd954ad2 100644 --- a/src/redis_protocol/proxy_lib/proxy_layer.cpp +++ b/src/redis_protocol/proxy_lib/proxy_layer.cpp @@ -41,11 +41,6 @@ void proxy_stub::on_rpc_request(dsn_message_t request) // release in proxy_session dsn_msg_add_ref(request); - /* - tasking::enqueue(LPC_RPC_CALL_RAW_SCATTER, nullptr, - std::bind(&proxy_session::on_recv_request, ps.get(), ps, request), - ps->hash()); - */ ps->on_recv_request(ps, request); } diff --git a/src/redis_protocol/proxy_lib/proxy_layer.h b/src/redis_protocol/proxy_lib/proxy_layer.h index 62fcdf477d..e45b34eca1 100644 --- a/src/redis_protocol/proxy_lib/proxy_layer.h +++ b/src/redis_protocol/proxy_lib/proxy_layer.h @@ -41,7 +41,7 @@ class proxy_session : public std::enable_shared_from_this, public proxy_stub *stub; private: - // when get message from raw parser, request & response of "dsn_message_t" are not in couple + // when get message from raw parser, request & response of "dsn_message_t" are not in couple. // we need to backup one request to create a response struct. dsn_message_t backup_one_request; // the client address for which this session served diff --git a/src/redis_protocol/proxy_lib/redis_parser.cpp b/src/redis_protocol/proxy_lib/redis_parser.cpp index 1dba82238a..48f2039ff7 100644 --- a/src/redis_protocol/proxy_lib/redis_parser.cpp +++ b/src/redis_protocol/proxy_lib/redis_parser.cpp @@ -2,7 +2,8 @@ // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. -#include +#include + #include #include #include @@ -127,15 +128,16 @@ char redis_parser::peek() return current_buffer[current_cursor]; } -void redis_parser::eat(char c) +bool redis_parser::eat(char c) { prepare_current_buffer(); if (current_buffer[current_cursor] == c) { ++current_cursor; --total_length; + return true; } else { derror("expect token: %c, got %c", c, current_buffer[current_cursor]); - throw std::invalid_argument(""); + return false; } } @@ -155,22 +157,22 @@ void redis_parser::eat_all(char *dest, size_t length) } } -void redis_parser::end_array_size() +bool redis_parser::end_array_size() { redis_request ¤t_array = current_msg->request; - try { - current_array.length = boost::lexical_cast(current_size); - current_size.clear(); - if (current_array.length <= 0) { - derror("array size should be positive in redis request, but %d", current_array.length); - throw std::invalid_argument(""); - } - current_array.buffers.reserve(current_array.length); - status = start_bulk_string; - } catch (const boost::bad_lexical_cast &c) { - derror("invalid size string \"%s\"", current_size.c_str()); - throw std::invalid_argument(""); - } + + int32_t l; + bool result = dsn::buf2int32(dsn::string_view(current_size.c_str(), current_size.length()), l); + dverify_logged(result, LOG_LEVEL_ERROR, "invalid size string \"%s\"", current_size.c_str()); + + current_array.length = l; + current_size.clear(); + dverify_logged( + l > 0, LOG_LEVEL_ERROR, "array size should be positive in redis request, but got %d", l); + + current_array.buffers.reserve(current_array.length); + status = start_bulk_string; + return true; } void redis_parser::append_current_bulk_string() @@ -187,23 +189,23 @@ void redis_parser::append_current_bulk_string() } } -void redis_parser::end_bulk_string_size() +bool redis_parser::end_bulk_string_size() { - try { - current_str.length = boost::lexical_cast(current_size); - current_size.clear(); - if (-1 == current_str.length) { - append_current_bulk_string(); - } else if (current_str.length >= 0) { - status = start_bulk_string_data; - } else { - derror("invalid bulk string length: %d", current_str.length); - throw std::invalid_argument(""); - } - } catch (const boost::bad_lexical_cast &c) { - derror("invalid size string \"%s\"", current_size.c_str()); - throw std::invalid_argument(""); + int32_t l; + bool result = dsn::buf2int32(dsn::string_view(current_size.c_str(), current_size.length()), l); + dverify_logged(result, LOG_LEVEL_ERROR, "invalid size string \"%s\"", current_size.c_str()); + + current_str.length = l; + current_size.clear(); + if (-1 == current_str.length) { + append_current_bulk_string(); + } else if (current_str.length >= 0) { + status = start_bulk_string_data; + } else { + derror("invalid bulk string length: %d", current_str.length); + return false; } + return true; } void redis_parser::append_message(dsn_message_t msg) @@ -214,13 +216,13 @@ void redis_parser::append_message(dsn_message_t msg) } // refererence: http://redis.io/topics/protocol -void redis_parser::parse_stream() +bool redis_parser::parse_stream() { char t; while (total_length > 0) { switch (status) { case start_array: - eat('*'); + dverify(eat('*')); status = in_array_size; break; case in_array_size: @@ -228,21 +230,23 @@ void redis_parser::parse_stream() t = peek(); if (t == CR) { if (total_length > 1) { - eat(CR); - eat(LF); - if (in_array_size == status) - end_array_size(); - else - end_bulk_string_size(); - } else - return; + dverify(eat(CR)); + dverify(eat(LF)); + if (in_array_size == status) { + dverify(end_array_size()); + } else { + dverify(end_bulk_string_size()); + } + } else { + return true; + } } else { current_size.push_back(t); - eat(t); + dverify(eat(t)); } break; case start_bulk_string: - eat('$'); + dverify(eat('$')); status = in_bulk_string_size; break; case start_bulk_string_data: @@ -255,28 +259,27 @@ void redis_parser::parse_stream() eat_all(str_data.get(), current_str.length); current_str.data.assign(std::move(str_data), 0, current_str.length); } - eat(CR); - eat(LF); + dverify(eat(CR)); + dverify(eat(LF)); append_current_bulk_string(); } else - return; + return true; break; default: break; } } + return true; } bool redis_parser::parse(dsn_message_t msg) { append_message(msg); - try { - parse_stream(); - return true; - } catch (...) { + bool ans = parse_stream(); + if (!ans) { reset(); - return false; } + return ans; } void redis_parser::on_remove_session(std::shared_ptr _this) diff --git a/src/redis_protocol/proxy_lib/redis_parser.h b/src/redis_protocol/proxy_lib/redis_parser.h index b9ee712487..f3d05c1e9b 100644 --- a/src/redis_protocol/proxy_lib/redis_parser.h +++ b/src/redis_protocol/proxy_lib/redis_parser.h @@ -95,15 +95,15 @@ class redis_parser : public proxy_session void append_message(dsn_message_t msg); void prepare_current_buffer(); char peek(); - void eat(char c); + bool eat(char c); void eat_all(char *dest, size_t length); void reset(); // function for parser - void end_array_size(); - void end_bulk_string_size(); + bool end_array_size(); + bool end_bulk_string_size(); void append_current_bulk_string(); - void parse_stream(); + bool parse_stream(); // function for rrdb operation #define DECLARE_REDIS_HANDLER(function_name) \