Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
rpc: minor refactoring on message_reader (#252)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored May 16, 2019
1 parent 3989922 commit 270a42c
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 26 deletions.
12 changes: 0 additions & 12 deletions include/dsn/cpp/rpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,11 @@ typedef ::dsn::ref_ptr<rpc_read_stream> rpc_read_stream_ptr;
class rpc_write_stream : public binary_writer
{
public:
// for response
rpc_write_stream(message_ex *msg)
: _msg(msg), _last_write_next_committed(true), _last_write_next_total_size(0)
{
}

// for request
rpc_write_stream(task_code code,
int timeout_ms = 0,
int thread_hash = 0,
uint64_t partition_hash = 0)
: _msg(message_ex::create_request(code, timeout_ms, thread_hash, partition_hash)),
_last_write_next_committed(true),
_last_write_next_total_size(0)
{
}

// write buffer for rpc_write_stream is allocated from
// a per-thread pool, and it is expected that
// the per-thread pool cannot allocated two outstanding
Expand Down
31 changes: 17 additions & 14 deletions include/dsn/tool-api/message_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* message parser base prototype, to support different kinds
* of message headers (so as to interact among them)
*
* Revision history:
* Mar., 2015, @imzhenyu (Zhenyu Guo), first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/utility/ports.h>
Expand All @@ -47,14 +37,16 @@
#include <vector>

namespace dsn {

// TODO(wutao1): call it read_buffer, and make it an utility
// Not-Thread-Safe.
class message_reader
{
public:
explicit message_reader(int buffer_block_size)
: _buffer_occupied(0), _buffer_block_size(buffer_block_size)
{
}
~message_reader() {}

// called before read to extend read buffer
DSN_API char *read_buffer_ptr(unsigned int read_next);
Expand All @@ -68,10 +60,20 @@ class message_reader
// discard read data
void truncate_read() { _buffer_occupied = 0; }

// mark the tailing `sz` of bytes are consumed and discardable.
void consume_buffer(size_t sz)
{
_buffer = _buffer.range(sz);
_buffer_occupied -= sz;
}

blob buffer() const { return _buffer.range(0, _buffer_occupied); }

public:
dsn::blob _buffer;
// TODO(wutao1): make them private members
blob _buffer;
unsigned int _buffer_occupied;
unsigned int _buffer_block_size;
const unsigned int _buffer_block_size;
};

class message_parser;
Expand Down Expand Up @@ -130,4 +132,5 @@ class message_parser : public ref_counter
get_header_type(const char *bytes); // buffer size >= sizeof(uint32_t)
DSN_API static std::string get_debug_string(const char *bytes);
};
}

} // namespace dsn
3 changes: 3 additions & 0 deletions src/core/core/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ char *message_reader::read_buffer_ptr(unsigned int read_next)
unsigned int sz =
(read_next + _buffer_occupied > _buffer_block_size ? read_next + _buffer_occupied
: _buffer_block_size);
// TODO(wutao1): make it a buffer queue like what sofa-pbrpc does
// (https://github.com/baidu/sofa-pbrpc/blob/master/src/sofa/pbrpc/buffer.h)
// to reduce memory copy.
_buffer.assign(dsn::utils::make_shared_array<char>(sz), 0, sz);
_buffer_occupied = 0;

Expand Down
99 changes: 99 additions & 0 deletions src/core/tests/message_reader_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <gtest/gtest.h>

#include <dsn/tool-api/message_parser.h>

namespace dsn {

class message_reader_test : public testing::Test
{
public:
void test_init()
{
message_reader reader(4096);
ASSERT_EQ(reader._buffer_block_size, 4096);
ASSERT_EQ(reader._buffer_occupied, 0);
ASSERT_EQ(reader._buffer.length(), 0);
}

void test_read_buffer()
{
message_reader reader(4096);

const char *p1 = reader.read_buffer_ptr(10);
ASSERT_EQ(reader._buffer_occupied, 0);
ASSERT_EQ(reader._buffer.length(), 4096);
reader.mark_read(10);
ASSERT_EQ(reader._buffer_occupied, 10);

const char *p2 = reader.read_buffer_ptr(10);
ASSERT_EQ(reader._buffer_occupied, 10);
ASSERT_EQ(reader._buffer.length(), 4096);
reader.mark_read(10);
ASSERT_EQ(reader._buffer_occupied, 20);
ASSERT_EQ(p2 - p1, 10); // p1, p2 reside on the same allocated memory buffer.

reader.read_buffer_ptr(4076);
ASSERT_EQ(reader._buffer_occupied, 20);
ASSERT_EQ(reader._buffer.length(), 4096);
reader.mark_read(4076);
ASSERT_EQ(reader._buffer_occupied, 4096);

// buffer capacity extends
p1 = reader.read_buffer_ptr(1);
ASSERT_EQ(reader._buffer_occupied, 4096);
ASSERT_EQ(reader._buffer.length(), 4097);
reader.mark_read(1);
ASSERT_EQ(reader._buffer_occupied, 4097);

// if buffer is not consumed in time,
// each read will cause one data copy
p2 = reader.read_buffer_ptr(3);
reader.mark_read(3);
ASSERT_EQ(reader._buffer.length(), 4100);
ASSERT_EQ(reader._buffer_occupied, 4100);
ASSERT_NE(p2 - p1, 3);
}

void test_read_data()
{
message_reader reader(4096);

std::string data = std::string("THFT") + std::string(44, '\0'); // 48 bytes
data[7] = data[9] = '\1';

char *buf = reader.read_buffer_ptr(data.length());
memcpy(buf, data.data(), data.size());
reader.mark_read(data.length());
ASSERT_EQ(reader.buffer().size(), data.length());
ASSERT_EQ(reader.buffer().to_string(), data);
}

void test_consume_buffer()
{
message_reader reader(5000);

reader.read_buffer_ptr(1000);
reader.mark_read(1000);
ASSERT_EQ(reader._buffer_occupied, 1000);
ASSERT_EQ(reader._buffer.length(), 5000);
ASSERT_EQ(reader.buffer().size(), 1000);

reader.consume_buffer(500);
ASSERT_EQ(reader._buffer.length(), 4500);
ASSERT_EQ(reader._buffer_occupied, 500);
}
};

TEST_F(message_reader_test, init) { test_init(); }

TEST_F(message_reader_test, read_buffer) { test_read_buffer(); }

TEST_F(message_reader_test, read_data) { test_read_data(); }

TEST_F(message_reader_test, consume_buffer) { test_consume_buffer(); }

} // namespace dsn

0 comments on commit 270a42c

Please sign in to comment.