Skip to content

Commit

Permalink
buffer: separate the BufferFragement release and drain tracker (envoy…
Browse files Browse the repository at this point in the history
…proxy#28770)

Fixes envoyproxy#28760

Signed-off-by: He Jie Xu <[email protected]>
  • Loading branch information
soulxu authored Nov 8, 2023
1 parent 03b57c9 commit c124a78
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 4 deletions.
21 changes: 19 additions & 2 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Slice {
: capacity_(fragment.size()), storage_(nullptr),
base_(static_cast<uint8_t*>(const_cast<void*>(fragment.data()))),
reservable_(fragment.size()) {
addDrainTracker([&fragment]() { fragment.done(); });
releasor_ = [&fragment]() { fragment.done(); };
}

Slice(Slice&& rhs) noexcept {
Expand All @@ -101,6 +101,7 @@ class Slice {
reservable_ = rhs.reservable_;
drain_trackers_ = std::move(rhs.drain_trackers_);
account_ = std::move(rhs.account_);
releasor_.swap(rhs.releasor_);

rhs.capacity_ = 0;
rhs.base_ = nullptr;
Expand All @@ -119,6 +120,11 @@ class Slice {
reservable_ = rhs.reservable_;
drain_trackers_ = std::move(rhs.drain_trackers_);
account_ = std::move(rhs.account_);
if (releasor_) {
releasor_();
}
releasor_ = rhs.releasor_;
rhs.releasor_ = nullptr;

rhs.capacity_ = 0;
rhs.base_ = nullptr;
Expand All @@ -129,7 +135,12 @@ class Slice {
return *this;
}

~Slice() { callAndClearDrainTrackersAndCharges(); }
~Slice() {
callAndClearDrainTrackersAndCharges();
if (releasor_) {
releasor_();
}
}

/**
* @return true if the data in the slice is mutable
Expand Down Expand Up @@ -307,6 +318,9 @@ class Slice {
void transferDrainTrackersTo(Slice& destination) {
destination.drain_trackers_.splice(destination.drain_trackers_.end(), drain_trackers_);
ASSERT(drain_trackers_.empty());
// The releasor needn't to be transferred, and actually if there is releasor, this
// slice can't coalesce. Then there won't be a chance to calling this method.
ASSERT(releasor_ == nullptr);
}

/**
Expand Down Expand Up @@ -397,6 +411,9 @@ class Slice {
/** Account associated with this slice. This may be null. When
* coalescing with another slice, we do not transfer over their account. */
BufferMemoryAccountSharedPtr account_;

/** The releasor for the BufferFragment */
std::function<void()> releasor_;
};

class OwnedImpl;
Expand Down
42 changes: 40 additions & 2 deletions test/common/buffer/owned_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,44 @@ TEST_F(OwnedImplTest, AddBufferFragmentWithCleanup) {
EXPECT_TRUE(release_callback_called_);
}

TEST_F(OwnedImplTest, MoveBufferFragment) {
Buffer::OwnedImpl buffer1;
testing::MockFunction<void(const void*, size_t, const BufferFragmentImpl*)>
release_callback_tracker;
std::string frag_input("a");
BufferFragmentImpl frag(frag_input.c_str(), frag_input.size(),
release_callback_tracker.AsStdFunction());
buffer1.addBufferFragment(frag);

Buffer::OwnedImpl buffer2;
buffer2.move(buffer1);

EXPECT_EQ(0, buffer1.length());
EXPECT_EQ(1, buffer2.length());

EXPECT_CALL(release_callback_tracker, Call(_, _, _));
buffer2.drain(buffer2.length());
}

TEST_F(OwnedImplTest, MoveBufferFragmentWithReleaseDrainTracker) {
Buffer::OwnedImpl buffer1;
testing::MockFunction<void(const void*, size_t, const BufferFragmentImpl*)>
release_callback_tracker;
std::string frag_input("a");
BufferFragmentImpl frag(frag_input.c_str(), frag_input.size(),
release_callback_tracker.AsStdFunction());
buffer1.addBufferFragment(frag);

Buffer::OwnedImpl buffer2;
buffer2.move(buffer1, true);

EXPECT_EQ(0, buffer1.length());
EXPECT_EQ(1, buffer2.length());

EXPECT_CALL(release_callback_tracker, Call(_, _, _));
buffer2.drain(buffer2.length());
}

TEST_F(OwnedImplTest, AddEmptyFragment) {
char input[] = "hello world";
BufferFragmentImpl frag1(input, 11, [](const void*, size_t, const BufferFragmentImpl*) {});
Expand Down Expand Up @@ -667,10 +705,10 @@ TEST_F(OwnedImplTest, LinearizeDrainTracking) {
testing::MockFunction<void()> done_tracker;
EXPECT_CALL(tracker1, Call());
EXPECT_CALL(drain_tracker, Call(3 * LargeChunk + 108 * SmallChunk, 16384));
EXPECT_CALL(release_callback_tracker, Call(_, _, _));
EXPECT_CALL(tracker2, Call());
EXPECT_CALL(release_callback_tracker2, Call(_, _, _));
EXPECT_CALL(release_callback_tracker, Call(_, _, _));
EXPECT_CALL(tracker3, Call());
EXPECT_CALL(release_callback_tracker2, Call(_, _, _));
EXPECT_CALL(drain_tracker, Call(2 * LargeChunk + 107 * SmallChunk, 16384));
EXPECT_CALL(drain_tracker, Call(LargeChunk + 106 * SmallChunk, 16384));
EXPECT_CALL(tracker4, Call());
Expand Down
20 changes: 20 additions & 0 deletions test/extensions/io_socket/user_space/io_handle_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,26 @@ TEST_F(IoHandleImplTest, ShutDownOptionsNotSupported) {
ASSERT_DEBUG_DEATH(io_handle_peer_->shutdown(ENVOY_SHUT_RDWR), "");
}

// This test is ensure the memory created by BufferFragment won't be released
// after the write.
TEST_F(IoHandleImplTest, WriteBufferFragement) {
Buffer::OwnedImpl buf("a");
bool released = false;
auto buf_frag = Buffer::OwnedBufferFragmentImpl::create(
std::string(255, 'b'), [&released](const Buffer::OwnedBufferFragmentImpl* fragment) {
released = true;
delete fragment;
});
buf.addBufferFragment(*buf_frag.release());

auto result = io_handle_->write(buf);
EXPECT_FALSE(released);
EXPECT_EQ(0, buf.length());
io_handle_peer_->read(buf, absl::nullopt);
buf.drain(buf.length());
EXPECT_TRUE(released);
}

TEST_F(IoHandleImplTest, WriteByMove) {
Buffer::OwnedImpl buf("0123456789");
auto result = io_handle_peer_->write(buf);
Expand Down

0 comments on commit c124a78

Please sign in to comment.