Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis_proxy: when parse message error, don't throw exception #104

Merged
merged 1 commit into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/redis_protocol/proxy_lib/proxy_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ void proxy_stub::on_rpc_request(dsn_message_t request)

// release in proxy_session
dsn_msg_add_ref(request);
/*
tasking::enqueue(LPC_RPC_CALL_RAW_SCATTER, nullptr,
std::bind(&proxy_session::on_recv_request, ps.get(), ps, request),
ps->hash());
*/
ps->on_recv_request(ps, request);
}

Expand Down
2 changes: 1 addition & 1 deletion src/redis_protocol/proxy_lib/proxy_layer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class proxy_session : public std::enable_shared_from_this<proxy_session>, public
proxy_stub *stub;

private:
// when get message from raw parser, request & response of "dsn_message_t" are not in couple
// when get message from raw parser, request & response of "dsn_message_t" are not in couple.
// we need to backup one request to create a response struct.
dsn_message_t backup_one_request;
// the client address for which this session served
Expand Down
107 changes: 55 additions & 52 deletions src/redis_protocol/proxy_lib/redis_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <boost/lexical_cast.hpp>
#include <dsn/utility/string_conv.h>

#include <rrdb/rrdb.client.h>
#include <pegasus/error.h>
#include <rocksdb/status.h>
Expand Down Expand Up @@ -127,15 +128,16 @@ char redis_parser::peek()
return current_buffer[current_cursor];
}

void redis_parser::eat(char c)
bool redis_parser::eat(char c)
{
prepare_current_buffer();
if (current_buffer[current_cursor] == c) {
++current_cursor;
--total_length;
return true;
} else {
derror("expect token: %c, got %c", c, current_buffer[current_cursor]);
throw std::invalid_argument("");
return false;
}
}

Expand All @@ -155,22 +157,22 @@ void redis_parser::eat_all(char *dest, size_t length)
}
}

void redis_parser::end_array_size()
bool redis_parser::end_array_size()
{
redis_request &current_array = current_msg->request;
try {
current_array.length = boost::lexical_cast<int>(current_size);
current_size.clear();
if (current_array.length <= 0) {
derror("array size should be positive in redis request, but %d", current_array.length);
throw std::invalid_argument("");
}
current_array.buffers.reserve(current_array.length);
status = start_bulk_string;
} catch (const boost::bad_lexical_cast &c) {
derror("invalid size string \"%s\"", current_size.c_str());
throw std::invalid_argument("");
}

int32_t l;
bool result = dsn::buf2int32(dsn::string_view(current_size.c_str(), current_size.length()), l);
dverify_logged(result, LOG_LEVEL_ERROR, "invalid size string \"%s\"", current_size.c_str());

current_array.length = l;
current_size.clear();
dverify_logged(
l > 0, LOG_LEVEL_ERROR, "array size should be positive in redis request, but got %d", l);

current_array.buffers.reserve(current_array.length);
status = start_bulk_string;
return true;
}

void redis_parser::append_current_bulk_string()
Expand All @@ -187,23 +189,23 @@ void redis_parser::append_current_bulk_string()
}
}

void redis_parser::end_bulk_string_size()
bool redis_parser::end_bulk_string_size()
{
try {
current_str.length = boost::lexical_cast<int>(current_size);
current_size.clear();
if (-1 == current_str.length) {
append_current_bulk_string();
} else if (current_str.length >= 0) {
status = start_bulk_string_data;
} else {
derror("invalid bulk string length: %d", current_str.length);
throw std::invalid_argument("");
}
} catch (const boost::bad_lexical_cast &c) {
derror("invalid size string \"%s\"", current_size.c_str());
throw std::invalid_argument("");
int32_t l;
bool result = dsn::buf2int32(dsn::string_view(current_size.c_str(), current_size.length()), l);
dverify_logged(result, LOG_LEVEL_ERROR, "invalid size string \"%s\"", current_size.c_str());

current_str.length = l;
current_size.clear();
if (-1 == current_str.length) {
append_current_bulk_string();
} else if (current_str.length >= 0) {
status = start_bulk_string_data;
} else {
derror("invalid bulk string length: %d", current_str.length);
return false;
}
return true;
}

void redis_parser::append_message(dsn_message_t msg)
Expand All @@ -214,35 +216,37 @@ void redis_parser::append_message(dsn_message_t msg)
}

// refererence: http://redis.io/topics/protocol
void redis_parser::parse_stream()
bool redis_parser::parse_stream()
{
char t;
while (total_length > 0) {
switch (status) {
case start_array:
eat('*');
dverify(eat('*'));
status = in_array_size;
break;
case in_array_size:
case in_bulk_string_size:
t = peek();
if (t == CR) {
if (total_length > 1) {
eat(CR);
eat(LF);
if (in_array_size == status)
end_array_size();
else
end_bulk_string_size();
} else
return;
dverify(eat(CR));
dverify(eat(LF));
if (in_array_size == status) {
dverify(end_array_size());
} else {
dverify(end_bulk_string_size());
}
} else {
return true;
}
} else {
current_size.push_back(t);
eat(t);
dverify(eat(t));
}
break;
case start_bulk_string:
eat('$');
dverify(eat('$'));
status = in_bulk_string_size;
break;
case start_bulk_string_data:
Expand All @@ -255,28 +259,27 @@ void redis_parser::parse_stream()
eat_all(str_data.get(), current_str.length);
current_str.data.assign(std::move(str_data), 0, current_str.length);
}
eat(CR);
eat(LF);
dverify(eat(CR));
dverify(eat(LF));
append_current_bulk_string();
} else
return;
return true;
break;
default:
break;
}
}
return true;
}

bool redis_parser::parse(dsn_message_t msg)
{
append_message(msg);
try {
parse_stream();
return true;
} catch (...) {
bool ans = parse_stream();
if (!ans) {
reset();
return false;
}
return ans;
}

void redis_parser::on_remove_session(std::shared_ptr<proxy_session> _this)
Expand Down
8 changes: 4 additions & 4 deletions src/redis_protocol/proxy_lib/redis_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ class redis_parser : public proxy_session
void append_message(dsn_message_t msg);
void prepare_current_buffer();
char peek();
void eat(char c);
bool eat(char c);
void eat_all(char *dest, size_t length);
void reset();

// function for parser
void end_array_size();
void end_bulk_string_size();
bool end_array_size();
bool end_bulk_string_size();
void append_current_bulk_string();
void parse_stream();
bool parse_stream();

// function for rrdb operation
#define DECLARE_REDIS_HANDLER(function_name) \
Expand Down