diff --git a/.gitignore b/.gitignore index 0d90593cb9..36df177d51 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ .kdev4/ .zk_install/ .idea +.vscode/ gcov_report/ bin/Linux/thrift diff --git a/src/core/tools/common/thrift_message_parser.cpp b/src/core/tools/common/thrift_message_parser.cpp index f8bc4c408f..bb12131ade 100644 --- a/src/core/tools/common/thrift_message_parser.cpp +++ b/src/core/tools/common/thrift_message_parser.cpp @@ -24,15 +24,6 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * Jun. 2016, Zuoyan Qin, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #include "thrift_message_parser.h" #include #include @@ -40,7 +31,97 @@ #include namespace dsn { -void thrift_message_parser::reset() { _header_parsed = false; } + +// // +// Request Parsing // +// // + +void thrift_message_parser::read_thrift_header(const char *buffer, + /*out*/ thrift_message_header &header) +{ + header.hdr_type = *(uint32_t *)(buffer); + buffer += sizeof(int32_t); + header.hdr_version = be32toh(*(int32_t *)(buffer)); + buffer += sizeof(int32_t); + header.hdr_length = be32toh(*(int32_t *)(buffer)); + buffer += sizeof(int32_t); + header.hdr_crc32 = be32toh(*(int32_t *)(buffer)); + buffer += sizeof(int32_t); + header.body_length = be32toh(*(int32_t *)(buffer)); + buffer += sizeof(int32_t); + header.body_crc32 = be32toh(*(int32_t *)(buffer)); + buffer += sizeof(int32_t); + header.app_id = be32toh(*(int32_t *)(buffer)); + buffer += sizeof(int32_t); + header.partition_index = be32toh(*(int32_t *)(buffer)); + buffer += sizeof(int32_t); + header.client_timeout = be32toh(*(int32_t *)(buffer)); + buffer += sizeof(int32_t); + header.client_thread_hash = be32toh(*(int32_t *)(buffer)); + buffer += sizeof(int32_t); + header.client_partition_hash = be64toh(*(int64_t *)(buffer)); +} + +bool thrift_message_parser::check_thrift_header(const thrift_message_header &header) +{ + if (header.hdr_type != THRIFT_HDR_SIG) { + derror("hdr_type should be %s, but %s", + message_parser::get_debug_string("THFT").c_str(), + message_parser::get_debug_string((const char *)&header.hdr_type).c_str()); + return false; + } + if (header.hdr_version != 0) { + derror("hdr_version should be 0, but %u", header.hdr_version); + return false; + } + if (header.hdr_length != sizeof(thrift_message_header)) { + derror("hdr_length should be %u, but %u", sizeof(thrift_message_header), header.hdr_length); + return false; + } + return true; +} + +dsn::message_ex *thrift_message_parser::parse_message(const thrift_message_header &thrift_header, + dsn::blob &message_data) +{ + dsn::blob body_data = message_data.range(thrift_header.hdr_length); + dsn::message_ex *msg = message_ex::create_receive_message_with_standalone_header(body_data); + dsn::message_header *dsn_hdr = msg->header; + + dsn::rpc_read_stream stream(msg); + ::dsn::binary_reader_transport binary_transport(stream); + boost::shared_ptr<::dsn::binary_reader_transport> trans_ptr( + &binary_transport, [](::dsn::binary_reader_transport *) {}); + ::apache::thrift::protocol::TBinaryProtocol iprot(trans_ptr); + + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + int32_t seqid; + iprot.readMessageBegin(fname, mtype, seqid); + dinfo("rpc name: %s, type: %d, seqid: %d", fname.c_str(), mtype, seqid); + + dsn_hdr->hdr_type = THRIFT_HDR_SIG; + dsn_hdr->hdr_length = sizeof(message_header); + dsn_hdr->body_length = thrift_header.body_length; + dsn_hdr->hdr_crc32 = dsn_hdr->body_crc32 = CRC_INVALID; + + dsn_hdr->id = seqid; + strncpy(dsn_hdr->rpc_name, fname.c_str(), sizeof(dsn_hdr->rpc_name) - 1); + dsn_hdr->rpc_name[sizeof(dsn_hdr->rpc_name) - 1] = '\0'; + dsn_hdr->gpid.set_app_id(thrift_header.app_id); + dsn_hdr->gpid.set_partition_index(thrift_header.partition_index); + dsn_hdr->client.timeout_ms = thrift_header.client_timeout; + dsn_hdr->client.thread_hash = thrift_header.client_thread_hash; + dsn_hdr->client.partition_hash = thrift_header.client_partition_hash; + + if (mtype == ::apache::thrift::protocol::T_CALL || + mtype == ::apache::thrift::protocol::T_ONEWAY) + dsn_hdr->context.u.is_request = 1; + dassert(dsn_hdr->context.u.is_request == 1, "only support receive request"); + dsn_hdr->context.u.serialize_format = DSF_THRIFT_BINARY; // always serialize in thrift binary + + return msg; +} message_ex *thrift_message_parser::get_message_on_receive(message_reader *reader, /*out*/ int &read_next) @@ -93,6 +174,12 @@ message_ex *thrift_message_parser::get_message_on_receive(message_reader *reader } } +void thrift_message_parser::reset() { _header_parsed = false; } + +// // +// Response Encoding // +// // + void thrift_message_parser::prepare_on_send(message_ex *msg) { auto &header = msg->header; @@ -191,90 +278,4 @@ int thrift_message_parser::get_buffers_on_send(message_ex *msg, /*out*/ send_buf return i; } -void thrift_message_parser::read_thrift_header(const char *buffer, - /*out*/ thrift_message_header &header) -{ - header.hdr_type = *(uint32_t *)(buffer); - buffer += sizeof(int32_t); - header.hdr_version = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.hdr_length = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.hdr_crc32 = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.body_length = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.body_crc32 = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.app_id = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.partition_index = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.client_timeout = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.client_thread_hash = be32toh(*(int32_t *)(buffer)); - buffer += sizeof(int32_t); - header.client_partition_hash = be64toh(*(int64_t *)(buffer)); -} - -bool thrift_message_parser::check_thrift_header(const thrift_message_header &header) -{ - if (header.hdr_type != THRIFT_HDR_SIG) { - derror("hdr_type should be %s, but %s", - message_parser::get_debug_string("THFT").c_str(), - message_parser::get_debug_string((const char *)&header.hdr_type).c_str()); - return false; - } - if (header.hdr_version != 0) { - derror("hdr_version should be 0, but %u", header.hdr_version); - return false; - } - if (header.hdr_length != sizeof(thrift_message_header)) { - derror("hdr_length should be %u, but %u", sizeof(thrift_message_header), header.hdr_length); - return false; - } - return true; -} - -dsn::message_ex *thrift_message_parser::parse_message(const thrift_message_header &thrift_header, - dsn::blob &message_data) -{ - dsn::blob body_data = message_data.range(thrift_header.hdr_length); - dsn::message_ex *msg = message_ex::create_receive_message_with_standalone_header(body_data); - dsn::message_header *dsn_hdr = msg->header; - - dsn::rpc_read_stream stream(msg); - ::dsn::binary_reader_transport binary_transport(stream); - boost::shared_ptr<::dsn::binary_reader_transport> trans_ptr( - &binary_transport, [](::dsn::binary_reader_transport *) {}); - ::apache::thrift::protocol::TBinaryProtocol iprot(trans_ptr); - - std::string fname; - ::apache::thrift::protocol::TMessageType mtype; - int32_t seqid; - iprot.readMessageBegin(fname, mtype, seqid); - dinfo("rpc name: %s, type: %d, seqid: %d", fname.c_str(), mtype, seqid); - - dsn_hdr->hdr_type = THRIFT_HDR_SIG; - dsn_hdr->hdr_length = sizeof(message_header); - dsn_hdr->body_length = thrift_header.body_length; - dsn_hdr->hdr_crc32 = dsn_hdr->body_crc32 = CRC_INVALID; - - dsn_hdr->id = seqid; - strncpy(dsn_hdr->rpc_name, fname.c_str(), sizeof(dsn_hdr->rpc_name) - 1); - dsn_hdr->rpc_name[sizeof(dsn_hdr->rpc_name) - 1] = '\0'; - dsn_hdr->gpid.set_app_id(thrift_header.app_id); - dsn_hdr->gpid.set_partition_index(thrift_header.partition_index); - dsn_hdr->client.timeout_ms = thrift_header.client_timeout; - dsn_hdr->client.thread_hash = thrift_header.client_thread_hash; - dsn_hdr->client.partition_hash = thrift_header.client_partition_hash; - - if (mtype == ::apache::thrift::protocol::T_CALL || - mtype == ::apache::thrift::protocol::T_ONEWAY) - dsn_hdr->context.u.is_request = 1; - dassert(dsn_hdr->context.u.is_request == 1, "only support receive request"); - dsn_hdr->context.u.serialize_format = DSF_THRIFT_BINARY; // always serialize in thrift binary - - return msg; -} -} +} // namespace dsn diff --git a/src/core/tools/common/thrift_message_parser.h b/src/core/tools/common/thrift_message_parser.h index 7cf6e9582a..18d84ddc44 100644 --- a/src/core/tools/common/thrift_message_parser.h +++ b/src/core/tools/common/thrift_message_parser.h @@ -24,15 +24,6 @@ * THE SOFTWARE. */ -/* -* Description: -* message parser for thrift request -* -* Revision history: -* Jun. 2016, Zuoyan Qin, first version -* xxxx-xx-xx, author, fix bug about xxx -*/ - #pragma once #include @@ -57,28 +48,31 @@ struct thrift_message_header //------------- sizeof(thrift_message_header) = 48 ----------// }; -// response format: -// -// - #define THRIFT_HDR_SIG (*(uint32_t *)"THFT") DEFINE_CUSTOMIZED_ID(network_header_format, NET_HDR_THRIFT) -class thrift_message_parser : public message_parser +// Parses request sent in rDSN thrift protocol, which is +// mainly used by our Java/GoLang/NodeJs/Python clients, +// and encodes response to them. +class thrift_message_parser final : public message_parser { public: thrift_message_parser() : _header_parsed(false) {} - virtual ~thrift_message_parser() {} - virtual void reset() override; + ~thrift_message_parser() {} + + void reset() override; - virtual message_ex *get_message_on_receive(message_reader *reader, - /*out*/ int &read_next) override; + message_ex *get_message_on_receive(message_reader *reader, + /*out*/ int &read_next) override; - virtual void prepare_on_send(message_ex *msg) override; + // response format: + // + // + void prepare_on_send(message_ex *msg) override; - virtual int get_buffers_on_send(message_ex *msg, /*out*/ send_buf *buffers) override; + int get_buffers_on_send(message_ex *msg, /*out*/ send_buf *buffers) override; public: static void read_thrift_header(const char *buffer, /*out*/ thrift_message_header &header); @@ -91,4 +85,5 @@ class thrift_message_parser : public message_parser thrift_message_header _thrift_header; bool _header_parsed; }; -} + +} // namespace dsn