Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BugFix: fix the issue of the client-side socket fd cannot be recycled… #108

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ void FiberConnection::QueueCleanupCallbackCheck() {
read_mostly_.seldomly_used->error_events.load(std::memory_order_relaxed) == 0) {
// Consider queue a call to `OnCleanup()` then.
if (!read_mostly_.seldomly_used->cleanup_queued.exchange(true, std::memory_order_release)) {
{
std::scoped_lock<std::mutex> _(mutex_);
// Set the connection unavailability status flag in advance to make read/write tasks retreat in advance,
// reducing the conflict of Cleanup callback locks.
conn_unavailable_ = true;
}
// No need to take a reference to us, `OnCleanup()` has not been called.
GetReactor()->SubmitTask([this] {
// The load below acts as a fence (paired with `exchange` above). (But
Expand Down
6 changes: 6 additions & 0 deletions trpc/runtime/iomodel/reactor/fiber/fiber_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ class alignas(hardware_destructive_interference_size) FiberConnection : public C

std::atomic<std::size_t> restart_read_count_{0}, restart_write_count_{0};

// Connection closing cleanup and connection sending/receiving mutex lock, to protect connection cleanup and network
// data transmission/reception thread safety.
std::mutex mutex_;
// Connection unavailability status flag.
bool conn_unavailable_{false};

private:
void SuppressReadAndClearReadEvent();
void SuppressAndClearWriteEvent();
Expand Down
20 changes: 15 additions & 5 deletions trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ FiberTcpConnection::FiberTcpConnection(Reactor* reactor, const Socket& socket)
}

FiberTcpConnection::~FiberTcpConnection() {
// Requirements: destroy IO-handler before close socket.
GetIoHandler()->Destroy();
socket_.Close();

TRPC_LOG_DEBUG("~FiberTcpConnection fd:" << socket_.GetFd() << ", conn_id:" << this->GetConnId());
TRPC_ASSERT(!socket_.IsValid());
}
Expand Down Expand Up @@ -116,7 +112,15 @@ int FiberTcpConnection::Send(IoMessage&& msg) {
}

constexpr auto kMaximumBytesPerCall = 1048576;
// External calls to the Send method may conflict with Socket closing concurrently, so a lock is added here for
// protection.
std::unique_lock<std::mutex> lock(mutex_);
// If the connection is unavailable, return an error directly.
if (conn_unavailable_) {
return -1;
}
auto flush_status = FlushWritingBuffer(kMaximumBytesPerCall);
lock.unlock();
if (TRPC_LIKELY(flush_status == FlushStatus::kFlushed)) {
return 0;
} else if (flush_status == FlushStatus::kSystemBufferSaturated || flush_status == FlushStatus::kQuotaExceeded) {
Expand Down Expand Up @@ -370,7 +374,13 @@ void FiberTcpConnection::OnCleanup(CleanupReason reason) {

writing_buffers_.Stop();

// For multi-threads-safety, move "socket_.Close()" to ~FiberTcpConnection();
{
std::scoped_lock<std::mutex> _(mutex_);
conn_unavailable_ = true;
// Requirements: destroy IO-handler before close socket.
GetIoHandler()->Destroy();
socket_.Close();
}
}

IoHandler::HandshakeStatus FiberTcpConnection::DoHandshake(bool from_on_readable) {
Expand Down
Loading