diff --git a/Release/src/http/client/http_client_asio.cpp b/Release/src/http/client/http_client_asio.cpp index 49376dc882..560eac68d0 100644 --- a/Release/src/http/client/http_client_asio.cpp +++ b/Release/src/http/client/http_client_asio.cpp @@ -71,7 +71,7 @@ enum class httpclient_errorcode_context close }; -class asio_connection : public std::enable_shared_from_this +class asio_connection { friend class asio_client; public: @@ -222,7 +222,7 @@ class asio_connection : public std::enable_shared_from_this bool m_keep_alive; }; -/// Implements a connection pool with adaptive connection removal and callback notification on idle +/// Implements a connection pool with adaptive connection removal /// /// The timeout mechanism is based on the `uint64_t m_epoch` member. Every 30 seconds, /// the lambda in `start_epoch_interval` fires, triggering the cleanup of any @@ -247,20 +247,18 @@ class asio_connection : public std::enable_shared_from_this /// } /// /// -/// Additionally, idle notification is only triggered when two cleanup phases have -/// occurred with no calls to `acquire()` between them. This prevents a race -/// condition where the cleanup might occur when the pool happens to be -/// instantaneously empty but still under heavy load. +/// Additionally, when two cleanup phases have occurred with no calls to `release()` +/// between them, the internal self-reference is cleared. If there are no active +/// `http_client`s keeping the pool alive, this will cause the pool to expire upon +/// cleanup handler termination. Whenever a new call to `release()` arrives, the self +/// reference is re-applied to keep the pool alive. /// class asio_connection_pool : public std::enable_shared_from_this { public: - asio_connection_pool(const std::function& m_cb = {}) - : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service()), - m_parent_pool_cb(m_cb) - { - start_epoch_interval(shared_from_this()); - } + asio_connection_pool() + : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service()) + {} std::shared_ptr acquire() { @@ -283,39 +281,56 @@ class asio_connection_pool : public std::enable_shared_from_this lock(m_lock); + if (m_self_reference == nullptr) + { + auto sptr = this->shared_from_this(); + m_self_reference = sptr; + start_epoch_interval(sptr); + } + m_epoch++; m_connections.emplace_back(m_epoch, connection); } private: + // Note: must be called under m_lock static void start_epoch_interval(const std::shared_ptr& pool) { + _ASSERTE(pool.get() != nullptr); + _ASSERTE(pool->m_self_reference != nullptr); + + auto& self = *pool; std::weak_ptr weak_pool = pool; - auto prev_epoch = pool->m_epoch; + + self.m_prev_epoch = self.m_epoch; pool->m_pool_epoch_timer.expires_from_now(boost::posix_time::seconds(30)); - pool->m_pool_epoch_timer.async_wait([prev_epoch, weak_pool](const boost::system::error_code& ec) { + pool->m_pool_epoch_timer.async_wait([weak_pool](const boost::system::error_code& ec) { if (ec) return; auto pool = weak_pool.lock(); if (!pool) return; + auto& self = *pool; - std::lock_guard lock(pool->m_lock); - if (prev_epoch == pool->m_epoch && pool->m_parent_pool_cb) + std::lock_guard lock(self.m_lock); + _ASSERTE(self.m_self_reference != nullptr); + if (self.m_prev_epoch == self.m_epoch) { - assert(pool->m_connections.empty()); - pool->m_parent_pool_cb(pool.get()); + assert(self.m_connections.empty()); + self.m_self_reference = nullptr; } else { - auto erase_end = std::find_if(pool->m_connections.begin(), pool->m_connections.end(), [prev_epoch](std::pair>& p) + auto prev_epoch = self.m_prev_epoch; + auto erase_end = std::find_if(self.m_connections.begin(), self.m_connections.end(), + [prev_epoch](std::pair>& p) { return p.first > prev_epoch; }); - pool->m_connections.erase(pool->m_connections.begin(), erase_end); + self.m_connections.erase(self.m_connections.begin(), erase_end); + start_epoch_interval(pool); } - start_epoch_interval(pool); }); } @@ -323,8 +338,9 @@ class asio_connection_pool : public std::enable_shared_from_this>> m_connections; uint64_t m_epoch = 0; + uint64_t m_prev_epoch = 0; - const std::function m_parent_pool_cb; + std::shared_ptr m_self_reference; }; class asio_shared_connection_pool : public std::enable_shared_from_this @@ -336,26 +352,27 @@ class asio_shared_connection_pool : public std::enable_shared_from_this lock(m_lock); auto it = m_pools.find(pool_key); - if (it == m_pools.end()) + if (it != m_pools.end()) { - std::weak_ptr weak_this = shared_from_this(); - ret = std::make_shared([pool_key, weak_this](asio_connection_pool* inner_pool_ptr) + ret = it->second.lock(); + if (ret == nullptr) { - auto self = weak_this.lock(); - if (!self) - return; - - std::lock_guard lock(self->m_lock); - auto it = self->m_pools.find(pool_key); - if (it != self->m_pools.end() && it->second.get() == inner_pool_ptr) - self->m_pools.erase(it); - }); - m_pools.emplace(pool_key, ret); + // Previous pool expired + ret = std::make_shared(); + it->second = ret; + } } else { - ret = it->second; + if (m_pools.empty()) + { + // If transitioning from empty to having a single element, restart the timer. + start_timer(shared_from_this()); + } + ret = std::make_shared(); + m_pools.emplace(pool_key, ret); } + assert(ret != nullptr); return ret; } @@ -367,9 +384,39 @@ class asio_shared_connection_pool : public std::enable_shared_from_this& self) + { + self->m_timer.expires_from_now(boost::posix_time::seconds(60)); + std::weak_ptr weak_this = self; + self->m_timer.async_wait([weak_this](const boost::system::error_code& ec) + { + if (ec) + return; + auto strong_this = weak_this.lock(); + if (!strong_this) + return; + + std::lock_guard lock(strong_this->m_lock); + auto b = strong_this->m_pools.begin(); + auto e = strong_this->m_pools.end(); + for (; b != e;) + { + if (b->second.expired()) + b = strong_this->m_pools.erase(b); + else + ++b; + } + if (!strong_this->m_pools.empty()) + start_timer(strong_this); + }); + } + + boost::asio::deadline_timer m_timer; std::mutex m_lock; - std::unordered_map> m_pools; + std::unordered_map> m_pools; }; class asio_client final : public _http_client_communicator