Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
rpc: minor refactoring on thrift_messsage_parser (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and neverchanje committed Jul 19, 2019
1 parent be96e07 commit aa1c353
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 118 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.kdev4/
.zk_install/
.idea
.vscode/

gcov_report/
bin/Linux/thrift
Expand Down
195 changes: 98 additions & 97 deletions src/core/tools/common/thrift_message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,104 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* Jun. 2016, Zuoyan Qin, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "thrift_message_parser.h"
#include <dsn/service_api_c.h>
#include <dsn/cpp/serialization_helper/thrift_helper.h>
#include <dsn/utility/ports.h>
#include <dsn/utility/crc.h>

namespace dsn {
void thrift_message_parser::reset() { _header_parsed = false; }

// //
// Request Parsing //
// //

void thrift_message_parser::read_thrift_header(const char *buffer,
/*out*/ thrift_message_header &header)
{
header.hdr_type = *(uint32_t *)(buffer);
buffer += sizeof(int32_t);
header.hdr_version = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.hdr_length = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.hdr_crc32 = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.body_length = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.body_crc32 = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.app_id = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.partition_index = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.client_timeout = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.client_thread_hash = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.client_partition_hash = be64toh(*(int64_t *)(buffer));
}

bool thrift_message_parser::check_thrift_header(const thrift_message_header &header)
{
if (header.hdr_type != THRIFT_HDR_SIG) {
derror("hdr_type should be %s, but %s",
message_parser::get_debug_string("THFT").c_str(),
message_parser::get_debug_string((const char *)&header.hdr_type).c_str());
return false;
}
if (header.hdr_version != 0) {
derror("hdr_version should be 0, but %u", header.hdr_version);
return false;
}
if (header.hdr_length != sizeof(thrift_message_header)) {
derror("hdr_length should be %u, but %u", sizeof(thrift_message_header), header.hdr_length);
return false;
}
return true;
}

dsn::message_ex *thrift_message_parser::parse_message(const thrift_message_header &thrift_header,
dsn::blob &message_data)
{
dsn::blob body_data = message_data.range(thrift_header.hdr_length);
dsn::message_ex *msg = message_ex::create_receive_message_with_standalone_header(body_data);
dsn::message_header *dsn_hdr = msg->header;

dsn::rpc_read_stream stream(msg);
::dsn::binary_reader_transport binary_transport(stream);
boost::shared_ptr<::dsn::binary_reader_transport> trans_ptr(
&binary_transport, [](::dsn::binary_reader_transport *) {});
::apache::thrift::protocol::TBinaryProtocol iprot(trans_ptr);

std::string fname;
::apache::thrift::protocol::TMessageType mtype;
int32_t seqid;
iprot.readMessageBegin(fname, mtype, seqid);
dinfo("rpc name: %s, type: %d, seqid: %d", fname.c_str(), mtype, seqid);

dsn_hdr->hdr_type = THRIFT_HDR_SIG;
dsn_hdr->hdr_length = sizeof(message_header);
dsn_hdr->body_length = thrift_header.body_length;
dsn_hdr->hdr_crc32 = dsn_hdr->body_crc32 = CRC_INVALID;

dsn_hdr->id = seqid;
strncpy(dsn_hdr->rpc_name, fname.c_str(), sizeof(dsn_hdr->rpc_name) - 1);
dsn_hdr->rpc_name[sizeof(dsn_hdr->rpc_name) - 1] = '\0';
dsn_hdr->gpid.set_app_id(thrift_header.app_id);
dsn_hdr->gpid.set_partition_index(thrift_header.partition_index);
dsn_hdr->client.timeout_ms = thrift_header.client_timeout;
dsn_hdr->client.thread_hash = thrift_header.client_thread_hash;
dsn_hdr->client.partition_hash = thrift_header.client_partition_hash;

if (mtype == ::apache::thrift::protocol::T_CALL ||
mtype == ::apache::thrift::protocol::T_ONEWAY)
dsn_hdr->context.u.is_request = 1;
dassert(dsn_hdr->context.u.is_request == 1, "only support receive request");
dsn_hdr->context.u.serialize_format = DSF_THRIFT_BINARY; // always serialize in thrift binary

return msg;
}

message_ex *thrift_message_parser::get_message_on_receive(message_reader *reader,
/*out*/ int &read_next)
Expand Down Expand Up @@ -93,6 +174,12 @@ message_ex *thrift_message_parser::get_message_on_receive(message_reader *reader
}
}

void thrift_message_parser::reset() { _header_parsed = false; }

// //
// Response Encoding //
// //

void thrift_message_parser::prepare_on_send(message_ex *msg)
{
auto &header = msg->header;
Expand Down Expand Up @@ -191,90 +278,4 @@ int thrift_message_parser::get_buffers_on_send(message_ex *msg, /*out*/ send_buf
return i;
}

void thrift_message_parser::read_thrift_header(const char *buffer,
/*out*/ thrift_message_header &header)
{
header.hdr_type = *(uint32_t *)(buffer);
buffer += sizeof(int32_t);
header.hdr_version = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.hdr_length = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.hdr_crc32 = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.body_length = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.body_crc32 = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.app_id = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.partition_index = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.client_timeout = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.client_thread_hash = be32toh(*(int32_t *)(buffer));
buffer += sizeof(int32_t);
header.client_partition_hash = be64toh(*(int64_t *)(buffer));
}

bool thrift_message_parser::check_thrift_header(const thrift_message_header &header)
{
if (header.hdr_type != THRIFT_HDR_SIG) {
derror("hdr_type should be %s, but %s",
message_parser::get_debug_string("THFT").c_str(),
message_parser::get_debug_string((const char *)&header.hdr_type).c_str());
return false;
}
if (header.hdr_version != 0) {
derror("hdr_version should be 0, but %u", header.hdr_version);
return false;
}
if (header.hdr_length != sizeof(thrift_message_header)) {
derror("hdr_length should be %u, but %u", sizeof(thrift_message_header), header.hdr_length);
return false;
}
return true;
}

dsn::message_ex *thrift_message_parser::parse_message(const thrift_message_header &thrift_header,
dsn::blob &message_data)
{
dsn::blob body_data = message_data.range(thrift_header.hdr_length);
dsn::message_ex *msg = message_ex::create_receive_message_with_standalone_header(body_data);
dsn::message_header *dsn_hdr = msg->header;

dsn::rpc_read_stream stream(msg);
::dsn::binary_reader_transport binary_transport(stream);
boost::shared_ptr<::dsn::binary_reader_transport> trans_ptr(
&binary_transport, [](::dsn::binary_reader_transport *) {});
::apache::thrift::protocol::TBinaryProtocol iprot(trans_ptr);

std::string fname;
::apache::thrift::protocol::TMessageType mtype;
int32_t seqid;
iprot.readMessageBegin(fname, mtype, seqid);
dinfo("rpc name: %s, type: %d, seqid: %d", fname.c_str(), mtype, seqid);

dsn_hdr->hdr_type = THRIFT_HDR_SIG;
dsn_hdr->hdr_length = sizeof(message_header);
dsn_hdr->body_length = thrift_header.body_length;
dsn_hdr->hdr_crc32 = dsn_hdr->body_crc32 = CRC_INVALID;

dsn_hdr->id = seqid;
strncpy(dsn_hdr->rpc_name, fname.c_str(), sizeof(dsn_hdr->rpc_name) - 1);
dsn_hdr->rpc_name[sizeof(dsn_hdr->rpc_name) - 1] = '\0';
dsn_hdr->gpid.set_app_id(thrift_header.app_id);
dsn_hdr->gpid.set_partition_index(thrift_header.partition_index);
dsn_hdr->client.timeout_ms = thrift_header.client_timeout;
dsn_hdr->client.thread_hash = thrift_header.client_thread_hash;
dsn_hdr->client.partition_hash = thrift_header.client_partition_hash;

if (mtype == ::apache::thrift::protocol::T_CALL ||
mtype == ::apache::thrift::protocol::T_ONEWAY)
dsn_hdr->context.u.is_request = 1;
dassert(dsn_hdr->context.u.is_request == 1, "only support receive request");
dsn_hdr->context.u.serialize_format = DSF_THRIFT_BINARY; // always serialize in thrift binary

return msg;
}
}
} // namespace dsn
37 changes: 16 additions & 21 deletions src/core/tools/common/thrift_message_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* message parser for thrift request
*
* Revision history:
* Jun. 2016, Zuoyan Qin, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/tool-api/message_parser.h>
Expand All @@ -57,28 +48,31 @@ struct thrift_message_header
//------------- sizeof(thrift_message_header) = 48 ----------//
};

// response format:
// <total_len(int32)> <thrift_string> <thrift_message_begin> <body_data(bytes)>
// <thrift_message_end>

#define THRIFT_HDR_SIG (*(uint32_t *)"THFT")

DEFINE_CUSTOMIZED_ID(network_header_format, NET_HDR_THRIFT)

class thrift_message_parser : public message_parser
// Parses request sent in rDSN thrift protocol, which is
// mainly used by our Java/GoLang/NodeJs/Python clients,
// and encodes response to them.
class thrift_message_parser final : public message_parser
{
public:
thrift_message_parser() : _header_parsed(false) {}
virtual ~thrift_message_parser() {}

virtual void reset() override;
~thrift_message_parser() {}

void reset() override;

virtual message_ex *get_message_on_receive(message_reader *reader,
/*out*/ int &read_next) override;
message_ex *get_message_on_receive(message_reader *reader,
/*out*/ int &read_next) override;

virtual void prepare_on_send(message_ex *msg) override;
// response format:
// <total_len(int32)> <thrift_string> <thrift_message_begin> <body_data(bytes)>
// <thrift_message_end>
void prepare_on_send(message_ex *msg) override;

virtual int get_buffers_on_send(message_ex *msg, /*out*/ send_buf *buffers) override;
int get_buffers_on_send(message_ex *msg, /*out*/ send_buf *buffers) override;

public:
static void read_thrift_header(const char *buffer, /*out*/ thrift_message_header &header);
Expand All @@ -91,4 +85,5 @@ class thrift_message_parser : public message_parser
thrift_message_header _thrift_header;
bool _header_parsed;
};
}

} // namespace dsn

0 comments on commit aa1c353

Please sign in to comment.