From f386dc12c7ed5416356b17b6ed4f20042b8e5b83 Mon Sep 17 00:00:00 2001 From: liucf3995 Date: Mon, 29 Jan 2024 16:30:35 +0800 Subject: [PATCH] BugFix: fix the issue of the client-side socket fd cannot be recycled in a timely manner when using fiber connection pool, causing the number of close-wait to increase. --- .../iomodel/reactor/fiber/fiber_connection.cc | 6 ++++++ .../iomodel/reactor/fiber/fiber_connection.h | 6 ++++++ .../reactor/fiber/fiber_tcp_connection.cc | 20 ++++++++++++++----- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc b/trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc index d070c631..d076139c 100644 --- a/trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc +++ b/trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc @@ -352,6 +352,12 @@ void FiberConnection::QueueCleanupCallbackCheck() { read_mostly_.seldomly_used->error_events.load(std::memory_order_relaxed) == 0) { // Consider queue a call to `OnCleanup()` then. if (!read_mostly_.seldomly_used->cleanup_queued.exchange(true, std::memory_order_release)) { + { + std::scoped_lock _(mutex_); + // Set the connection unavailability status flag in advance to make read/write tasks retreat in advance, + // reducing the conflict of Cleanup callback locks. + conn_unavailable_ = true; + } // No need to take a reference to us, `OnCleanup()` has not been called. GetReactor()->SubmitTask([this] { // The load below acts as a fence (paired with `exchange` above). (But diff --git a/trpc/runtime/iomodel/reactor/fiber/fiber_connection.h b/trpc/runtime/iomodel/reactor/fiber/fiber_connection.h index 15c477da..76e60f1f 100644 --- a/trpc/runtime/iomodel/reactor/fiber/fiber_connection.h +++ b/trpc/runtime/iomodel/reactor/fiber/fiber_connection.h @@ -118,6 +118,12 @@ class alignas(hardware_destructive_interference_size) FiberConnection : public C std::atomic restart_read_count_{0}, restart_write_count_{0}; + // Connection closing cleanup and connection sending/receiving mutex lock, to protect connection cleanup and network + // data transmission/reception thread safety. + std::mutex mutex_; + // Connection unavailability status flag. + bool conn_unavailable_{false}; + private: void SuppressReadAndClearReadEvent(); void SuppressAndClearWriteEvent(); diff --git a/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc b/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc index 9d79c237..7f3a467f 100644 --- a/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc +++ b/trpc/runtime/iomodel/reactor/fiber/fiber_tcp_connection.cc @@ -31,10 +31,6 @@ FiberTcpConnection::FiberTcpConnection(Reactor* reactor, const Socket& socket) } FiberTcpConnection::~FiberTcpConnection() { - // Requirements: destroy IO-handler before close socket. - GetIoHandler()->Destroy(); - socket_.Close(); - TRPC_LOG_DEBUG("~FiberTcpConnection fd:" << socket_.GetFd() << ", conn_id:" << this->GetConnId()); TRPC_ASSERT(!socket_.IsValid()); } @@ -116,7 +112,15 @@ int FiberTcpConnection::Send(IoMessage&& msg) { } constexpr auto kMaximumBytesPerCall = 1048576; + // External calls to the Send method may conflict with Socket closing concurrently, so a lock is added here for + // protection. + std::unique_lock lock(mutex_); + // If the connection is unavailable, return an error directly. + if (conn_unavailable_) { + return -1; + } auto flush_status = FlushWritingBuffer(kMaximumBytesPerCall); + lock.unlock(); if (TRPC_LIKELY(flush_status == FlushStatus::kFlushed)) { return 0; } else if (flush_status == FlushStatus::kSystemBufferSaturated || flush_status == FlushStatus::kQuotaExceeded) { @@ -370,7 +374,13 @@ void FiberTcpConnection::OnCleanup(CleanupReason reason) { writing_buffers_.Stop(); - // For multi-threads-safety, move "socket_.Close()" to ~FiberTcpConnection(); + { + std::scoped_lock _(mutex_); + conn_unavailable_ = true; + // Requirements: destroy IO-handler before close socket. + GetIoHandler()->Destroy(); + socket_.Close(); + } } IoHandler::HandshakeStatus FiberTcpConnection::DoHandshake(bool from_on_readable) {