From 458d4272136f9ce12a154b723e85e3e4127a37b5 Mon Sep 17 00:00:00 2001 From: Wu Tao Date: Thu, 16 May 2019 17:51:32 +0800 Subject: [PATCH] rpc: minor refactoring on message_reader (#252) --- include/dsn/cpp/rpc_stream.h | 12 ---- include/dsn/tool-api/message_parser.h | 31 ++++---- src/core/core/message_parser.cpp | 3 + src/core/tests/message_reader_test.cpp | 99 ++++++++++++++++++++++++++ 4 files changed, 119 insertions(+), 26 deletions(-) create mode 100644 src/core/tests/message_reader_test.cpp diff --git a/include/dsn/cpp/rpc_stream.h b/include/dsn/cpp/rpc_stream.h index 181e5e1094..e7480acf25 100644 --- a/include/dsn/cpp/rpc_stream.h +++ b/include/dsn/cpp/rpc_stream.h @@ -73,23 +73,11 @@ typedef ::dsn::ref_ptr rpc_read_stream_ptr; class rpc_write_stream : public binary_writer { public: - // for response rpc_write_stream(message_ex *msg) : _msg(msg), _last_write_next_committed(true), _last_write_next_total_size(0) { } - // for request - rpc_write_stream(task_code code, - int timeout_ms = 0, - int thread_hash = 0, - uint64_t partition_hash = 0) - : _msg(message_ex::create_request(code, timeout_ms, thread_hash, partition_hash)), - _last_write_next_committed(true), - _last_write_next_total_size(0) - { - } - // write buffer for rpc_write_stream is allocated from // a per-thread pool, and it is expected that // the per-thread pool cannot allocated two outstanding diff --git a/include/dsn/tool-api/message_parser.h b/include/dsn/tool-api/message_parser.h index 137b7b53e4..1c5bf97f5a 100644 --- a/include/dsn/tool-api/message_parser.h +++ b/include/dsn/tool-api/message_parser.h @@ -24,16 +24,6 @@ * THE SOFTWARE. */ -/* - * Description: - * message parser base prototype, to support different kinds - * of message headers (so as to interact among them) - * - * Revision history: - * Mar., 2015, @imzhenyu (Zhenyu Guo), first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #pragma once #include @@ -47,6 +37,9 @@ #include namespace dsn { + +// TODO(wutao1): call it read_buffer, and make it an utility +// Not-Thread-Safe. class message_reader { public: @@ -54,7 +47,6 @@ class message_reader : _buffer_occupied(0), _buffer_block_size(buffer_block_size) { } - ~message_reader() {} // called before read to extend read buffer DSN_API char *read_buffer_ptr(unsigned int read_next); @@ -68,10 +60,20 @@ class message_reader // discard read data void truncate_read() { _buffer_occupied = 0; } + // mark the tailing `sz` of bytes are consumed and discardable. + void consume_buffer(size_t sz) + { + _buffer = _buffer.range(sz); + _buffer_occupied -= sz; + } + + blob buffer() const { return _buffer.range(0, _buffer_occupied); } + public: - dsn::blob _buffer; + // TODO(wutao1): make them private members + blob _buffer; unsigned int _buffer_occupied; - unsigned int _buffer_block_size; + const unsigned int _buffer_block_size; }; class message_parser; @@ -130,4 +132,5 @@ class message_parser : public ref_counter get_header_type(const char *bytes); // buffer size >= sizeof(uint32_t) DSN_API static std::string get_debug_string(const char *bytes); }; -} + +} // namespace dsn diff --git a/src/core/core/message_parser.cpp b/src/core/core/message_parser.cpp index ded903c79a..7ab91417f2 100644 --- a/src/core/core/message_parser.cpp +++ b/src/core/core/message_parser.cpp @@ -144,6 +144,9 @@ char *message_reader::read_buffer_ptr(unsigned int read_next) unsigned int sz = (read_next + _buffer_occupied > _buffer_block_size ? read_next + _buffer_occupied : _buffer_block_size); + // TODO(wutao1): make it a buffer queue like what sofa-pbrpc does + // (https://github.com/baidu/sofa-pbrpc/blob/master/src/sofa/pbrpc/buffer.h) + // to reduce memory copy. _buffer.assign(dsn::utils::make_shared_array(sz), 0, sz); _buffer_occupied = 0; diff --git a/src/core/tests/message_reader_test.cpp b/src/core/tests/message_reader_test.cpp new file mode 100644 index 0000000000..53fcb29c31 --- /dev/null +++ b/src/core/tests/message_reader_test.cpp @@ -0,0 +1,99 @@ +// Copyright (c) 2019, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include + +#include + +namespace dsn { + +class message_reader_test : public testing::Test +{ +public: + void test_init() + { + message_reader reader(4096); + ASSERT_EQ(reader._buffer_block_size, 4096); + ASSERT_EQ(reader._buffer_occupied, 0); + ASSERT_EQ(reader._buffer.length(), 0); + } + + void test_read_buffer() + { + message_reader reader(4096); + + const char *p1 = reader.read_buffer_ptr(10); + ASSERT_EQ(reader._buffer_occupied, 0); + ASSERT_EQ(reader._buffer.length(), 4096); + reader.mark_read(10); + ASSERT_EQ(reader._buffer_occupied, 10); + + const char *p2 = reader.read_buffer_ptr(10); + ASSERT_EQ(reader._buffer_occupied, 10); + ASSERT_EQ(reader._buffer.length(), 4096); + reader.mark_read(10); + ASSERT_EQ(reader._buffer_occupied, 20); + ASSERT_EQ(p2 - p1, 10); // p1, p2 reside on the same allocated memory buffer. + + reader.read_buffer_ptr(4076); + ASSERT_EQ(reader._buffer_occupied, 20); + ASSERT_EQ(reader._buffer.length(), 4096); + reader.mark_read(4076); + ASSERT_EQ(reader._buffer_occupied, 4096); + + // buffer capacity extends + p1 = reader.read_buffer_ptr(1); + ASSERT_EQ(reader._buffer_occupied, 4096); + ASSERT_EQ(reader._buffer.length(), 4097); + reader.mark_read(1); + ASSERT_EQ(reader._buffer_occupied, 4097); + + // if buffer is not consumed in time, + // each read will cause one data copy + p2 = reader.read_buffer_ptr(3); + reader.mark_read(3); + ASSERT_EQ(reader._buffer.length(), 4100); + ASSERT_EQ(reader._buffer_occupied, 4100); + ASSERT_NE(p2 - p1, 3); + } + + void test_read_data() + { + message_reader reader(4096); + + std::string data = std::string("THFT") + std::string(44, '\0'); // 48 bytes + data[7] = data[9] = '\1'; + + char *buf = reader.read_buffer_ptr(data.length()); + memcpy(buf, data.data(), data.size()); + reader.mark_read(data.length()); + ASSERT_EQ(reader.buffer().size(), data.length()); + ASSERT_EQ(reader.buffer().to_string(), data); + } + + void test_consume_buffer() + { + message_reader reader(5000); + + reader.read_buffer_ptr(1000); + reader.mark_read(1000); + ASSERT_EQ(reader._buffer_occupied, 1000); + ASSERT_EQ(reader._buffer.length(), 5000); + ASSERT_EQ(reader.buffer().size(), 1000); + + reader.consume_buffer(500); + ASSERT_EQ(reader._buffer.length(), 4500); + ASSERT_EQ(reader._buffer_occupied, 500); + } +}; + +TEST_F(message_reader_test, init) { test_init(); } + +TEST_F(message_reader_test, read_buffer) { test_read_buffer(); } + +TEST_F(message_reader_test, read_data) { test_read_data(); } + +TEST_F(message_reader_test, consume_buffer) { test_consume_buffer(); } + +} // namespace dsn