Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

rpc: minor refactoring on message_reader #252

Merged
merged 2 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions include/dsn/cpp/rpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,11 @@ typedef ::dsn::ref_ptr<rpc_read_stream> rpc_read_stream_ptr;
class rpc_write_stream : public binary_writer
{
public:
// for response
rpc_write_stream(message_ex *msg)
: _msg(msg), _last_write_next_committed(true), _last_write_next_total_size(0)
{
}

// for request
rpc_write_stream(task_code code,
int timeout_ms = 0,
int thread_hash = 0,
uint64_t partition_hash = 0)
: _msg(message_ex::create_request(code, timeout_ms, thread_hash, partition_hash)),
_last_write_next_committed(true),
_last_write_next_total_size(0)
{
}

// write buffer for rpc_write_stream is allocated from
// a per-thread pool, and it is expected that
// the per-thread pool cannot allocated two outstanding
Expand Down
31 changes: 17 additions & 14 deletions include/dsn/tool-api/message_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* message parser base prototype, to support different kinds
* of message headers (so as to interact among them)
*
* Revision history:
* Mar., 2015, @imzhenyu (Zhenyu Guo), first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/utility/ports.h>
Expand All @@ -47,14 +37,16 @@
#include <vector>

namespace dsn {

// TODO(wutao1): call it read_buffer, and make it an utility
// Not-Thread-Safe.
class message_reader
{
public:
explicit message_reader(int buffer_block_size)
: _buffer_occupied(0), _buffer_block_size(buffer_block_size)
{
}
~message_reader() {}

// called before read to extend read buffer
DSN_API char *read_buffer_ptr(unsigned int read_next);
Expand All @@ -68,10 +60,20 @@ class message_reader
// discard read data
void truncate_read() { _buffer_occupied = 0; }

// mark the tailing `sz` of bytes are consumed and discardable.
void consume_buffer(size_t sz)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数什么时候用啊?

Copy link
Contributor Author

@neverchanje neverchanje May 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/XiaoMi/rdsn/blob/master/src/core/tools/common/dsn_message_parser.cpp#L80

你看这行代码,reader 读完数据之后把读完的 buffer 释放掉,怎么释放的?

reader->_buffer = buf.range(msg_sz);
reader->_buffer_occupied -= msg_sz;

这段代码就是 repeat your code 的典型,consume_buffer 这个函数就是做的这个事情,后面重构可以把上面的代码用 consume_buffer 代替

reader->consume_buffer(msg_sz);

我这边后面的 PR 会用到这个函数。

Copy link
Contributor

@hycdong hycdong May 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,好的,blob的range函数支持传negative的参数(https://github.com/XiaoMi/rdsn/blob/3989922cd3a99c7ce357af8de2d504bd6a5dd1af/include/dsn/utility/blob.h#L115)
不知道有没有更通用的写法,另外加下注释吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

注释已加

{
_buffer = _buffer.range(sz);
_buffer_occupied -= sz;
}

blob buffer() const { return _buffer.range(0, _buffer_occupied); }

public:
dsn::blob _buffer;
// TODO(wutao1): make them private members
blob _buffer;
unsigned int _buffer_occupied;
unsigned int _buffer_block_size;
const unsigned int _buffer_block_size;
};

class message_parser;
Expand Down Expand Up @@ -130,4 +132,5 @@ class message_parser : public ref_counter
get_header_type(const char *bytes); // buffer size >= sizeof(uint32_t)
DSN_API static std::string get_debug_string(const char *bytes);
};
}

} // namespace dsn
3 changes: 3 additions & 0 deletions src/core/core/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ char *message_reader::read_buffer_ptr(unsigned int read_next)
unsigned int sz =
(read_next + _buffer_occupied > _buffer_block_size ? read_next + _buffer_occupied
: _buffer_block_size);
// TODO(wutao1): make it a buffer queue like what sofa-pbrpc does
// (https://github.com/baidu/sofa-pbrpc/blob/master/src/sofa/pbrpc/buffer.h)
// to reduce memory copy.
_buffer.assign(dsn::utils::make_shared_array<char>(sz), 0, sz);
_buffer_occupied = 0;

Expand Down
99 changes: 99 additions & 0 deletions src/core/tests/message_reader_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <gtest/gtest.h>

#include <dsn/tool-api/message_parser.h>

namespace dsn {

class message_reader_test : public testing::Test
{
public:
void test_init()
{
message_reader reader(4096);
ASSERT_EQ(reader._buffer_block_size, 4096);
ASSERT_EQ(reader._buffer_occupied, 0);
ASSERT_EQ(reader._buffer.length(), 0);
}

void test_read_buffer()
{
message_reader reader(4096);

const char *p1 = reader.read_buffer_ptr(10);
ASSERT_EQ(reader._buffer_occupied, 0);
ASSERT_EQ(reader._buffer.length(), 4096);
reader.mark_read(10);
ASSERT_EQ(reader._buffer_occupied, 10);

const char *p2 = reader.read_buffer_ptr(10);
ASSERT_EQ(reader._buffer_occupied, 10);
ASSERT_EQ(reader._buffer.length(), 4096);
reader.mark_read(10);
ASSERT_EQ(reader._buffer_occupied, 20);
ASSERT_EQ(p2 - p1, 10); // p1, p2 reside on the same allocated memory buffer.

reader.read_buffer_ptr(4076);
ASSERT_EQ(reader._buffer_occupied, 20);
ASSERT_EQ(reader._buffer.length(), 4096);
reader.mark_read(4076);
ASSERT_EQ(reader._buffer_occupied, 4096);

// buffer capacity extends
p1 = reader.read_buffer_ptr(1);
ASSERT_EQ(reader._buffer_occupied, 4096);
ASSERT_EQ(reader._buffer.length(), 4097);
reader.mark_read(1);
ASSERT_EQ(reader._buffer_occupied, 4097);

// if buffer is not consumed in time,
// each read will cause one data copy
p2 = reader.read_buffer_ptr(3);
reader.mark_read(3);
ASSERT_EQ(reader._buffer.length(), 4100);
ASSERT_EQ(reader._buffer_occupied, 4100);
ASSERT_NE(p2 - p1, 3);
}

void test_read_data()
{
message_reader reader(4096);

std::string data = std::string("THFT") + std::string(44, '\0'); // 48 bytes
data[7] = data[9] = '\1';

char *buf = reader.read_buffer_ptr(data.length());
memcpy(buf, data.data(), data.size());
reader.mark_read(data.length());
ASSERT_EQ(reader.buffer().size(), data.length());
ASSERT_EQ(reader.buffer().to_string(), data);
}

void test_consume_buffer()
{
message_reader reader(5000);

reader.read_buffer_ptr(1000);
reader.mark_read(1000);
ASSERT_EQ(reader._buffer_occupied, 1000);
ASSERT_EQ(reader._buffer.length(), 5000);
ASSERT_EQ(reader.buffer().size(), 1000);

reader.consume_buffer(500);
ASSERT_EQ(reader._buffer.length(), 4500);
ASSERT_EQ(reader._buffer_occupied, 500);
}
};

TEST_F(message_reader_test, init) { test_init(); }

TEST_F(message_reader_test, read_buffer) { test_read_buffer(); }

TEST_F(message_reader_test, read_data) { test_read_data(); }

TEST_F(message_reader_test, consume_buffer) { test_consume_buffer(); }

} // namespace dsn