diff --git a/Release/src/http/client/http_client_asio.cpp b/Release/src/http/client/http_client_asio.cpp index b7231831cc..49376dc882 100644 --- a/Release/src/http/client/http_client_asio.cpp +++ b/Release/src/http/client/http_client_asio.cpp @@ -71,30 +71,15 @@ enum class httpclient_errorcode_context close }; -class asio_connection_pool; -class asio_connection +class asio_connection : public std::enable_shared_from_this { - friend class asio_connection_pool; friend class asio_client; public: - asio_connection( - boost::asio::io_service& io_service, - const std::string &pool_key, - bool start_with_ssl, - const std::function& ssl_context_callback) - : m_socket(io_service), - m_ssl_context_callback(ssl_context_callback), - m_pool_timer(io_service), + asio_connection(boost::asio::io_service& io_service) + : m_socket(io_service), m_is_reused(false), - m_keep_alive(true), - m_pool_key(pool_key), - m_epoch(0) - { - if (start_with_ssl) - { - upgrade_to_ssl(); - } - } + m_keep_alive(true) + {} ~asio_connection() { @@ -102,16 +87,16 @@ class asio_connection } // This simply instantiates the internal state to support ssl. It does not perform the handshake. - void upgrade_to_ssl() + void upgrade_to_ssl(const std::function& ssl_context_callback) { std::lock_guard lock(m_socket_lock); assert(!is_ssl()); boost::asio::ssl::context ssl_context(boost::asio::ssl::context::sslv23); ssl_context.set_default_verify_paths(); ssl_context.set_options(boost::asio::ssl::context::default_workarounds); - if (m_ssl_context_callback) + if (ssl_context_callback) { - m_ssl_context_callback(ssl_context); + ssl_context_callback(ssl_context); } m_ssl_stream = utility::details::make_unique>(m_socket, ssl_context); } @@ -136,17 +121,10 @@ class asio_connection return error; } - void cancel_pool_timer() - { - m_pool_timer.cancel(); - } - bool is_reused() const { return m_is_reused; } void set_keep_alive(bool keep_alive) { m_keep_alive = keep_alive; } bool keep_alive() const { return m_keep_alive; } bool is_ssl() const { return m_ssl_stream ? true : false; } - const std::string &pool_key() const { return m_pool_key; } - uint32_t epoch() const { return m_epoch; } template void async_connect(const Iterator &begin, const Handler &handler) @@ -227,21 +205,12 @@ class asio_connection } } -private: - template - void start_pool_timer(int timeout_secs, const TimeoutHandler &handler) - { - m_pool_timer.expires_from_now(boost::posix_time::milliseconds(timeout_secs * 1000)); - m_pool_timer.async_wait(handler); - } - void start_reuse() { - cancel_pool_timer(); m_is_reused = true; - m_epoch++; } +private: // Guards concurrent access to socket/ssl::stream. This is necessary // because timeouts and cancellation can touch the socket at the same time // as normal message processing. @@ -249,229 +218,177 @@ class asio_connection tcp::socket m_socket; std::unique_ptr > m_ssl_stream; - std::function m_ssl_context_callback; - - boost::asio::deadline_timer m_pool_timer; bool m_is_reused; bool m_keep_alive; - std::string m_pool_key; - uint32_t m_epoch; }; -class asio_shared_connection_pool +/// Implements a connection pool with adaptive connection removal and callback notification on idle +/// +/// The timeout mechanism is based on the `uint64_t m_epoch` member. Every 30 seconds, +/// the lambda in `start_epoch_interval` fires, triggering the cleanup of any +/// connections that have resided in the pool since the last cleanup phase's epoch. +/// +/// This works because the `m_connections` member functions is used in LIFO order. +/// LIFO usage guarantees that the elements remain sorted based on epoch number, +/// since the highest epoch is always removed and on insertion the next monotonically +/// increasing epoch is used. +/// +/// During the cleanup phase, connections are removed starting with the oldest. This +/// ensures that if a high intensity workload is followed by a low intensity workload, +/// the connection pool will correctly adapt to the current workload. Specifically, +/// the following code will eventually result in a maximum of one pooled connection +/// regardless of the initial number of pooled connections: +/// +/// while(1) +/// { +/// auto conn = pool.acquire(); +/// if (!conn) conn = new_conn(); +/// pool.release(conn); +/// } +/// +/// +/// Additionally, idle notification is only triggered when two cleanup phases have +/// occurred with no calls to `acquire()` between them. This prevents a race +/// condition where the cleanup might occur when the pool happens to be +/// instantaneously empty but still under heavy load. +/// +class asio_connection_pool : public std::enable_shared_from_this { public: - asio_shared_connection_pool(boost::asio::io_service& io_service) : - m_io_service(io_service) - {} - - ~asio_shared_connection_pool() + asio_connection_pool(const std::function& m_cb = {}) + : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service()), + m_parent_pool_cb(m_cb) { - std::lock_guard lock(m_connections_mutex); - // Cancel the pool timer for all connections. - for (auto& connection : m_connections) - { - connection.second->cancel_pool_timer(); - } + start_epoch_interval(shared_from_this()); } - void release(const std::shared_ptr& connection) + std::shared_ptr acquire() { - if (connection->keep_alive() && (m_timeout_secs > 0)) - { - connection->cancel(); + std::lock_guard lock(m_lock); - std::lock_guard lock(m_connections_mutex); - auto it = m_connections.insert(std::make_pair(connection->pool_key(), connection)); + if (m_connections.empty()) + return nullptr; - // This will destroy and remove the connection from pool after the set timeout. - // We use 'this' because async calls to timer handler only occur while the pool exists. - auto connection_weak = std::weak_ptr(connection); - auto epoch = connection->epoch(); - connection->start_pool_timer(m_timeout_secs, [this, connection_weak, epoch](const boost::error_code& ec) { - this->free_shared_connection(ec, connection_weak, epoch); - }); - } + auto conn = std::move(m_connections.back().second); + m_connections.pop_back(); + conn->start_reuse(); + return conn; } -private: - void free_connection(const boost::system::error_code& ec, const std::weak_ptr &connection, unsigned int epoch) + void release(const std::shared_ptr& connection) { - auto connection_shared = connection.lock(); - if (!connection_shared) - return; + connection->cancel(); - std::lock_guard lock(m_connections_mutex); - auto it = m_connections.find(connection_shared); - if (it == m_connections.end()) - // The connection was acquired while this callback was firing + if (!connection->keep_alive()) return; - // The epoch is used to ensure the connection was not quickly acquired and released while this callback was firing. - // Every acquisition increments the epoch. - if (epoch != (*it)->epoch()) - m_connections.erase(it); - } -}; - -class asio_connection_pool -{ -public: - - asio_connection_pool(boost::asio::io_service& io_service) : - m_io_service(io_service) - {} - - ~asio_connection_pool() - { - std::lock_guard lock(m_connections_mutex); - // Cancel the pool timer for all connections. - for (auto& connection : m_connections) - { - connection->cancel_pool_timer(); - } + std::lock_guard lock(m_lock); + m_epoch++; + m_connections.emplace_back(m_epoch, connection); } - void release(const std::shared_ptr &connection) - { - connection->cancel(); +private: + static void start_epoch_interval(const std::shared_ptr& pool) { + std::weak_ptr weak_pool = pool; + auto prev_epoch = pool->m_epoch; + pool->m_pool_epoch_timer.expires_from_now(boost::posix_time::seconds(30)); + pool->m_pool_epoch_timer.async_wait([prev_epoch, weak_pool](const boost::system::error_code& ec) { + if (ec) + return; - if (connection->keep_alive()) - { - std::lock_guard lock(m_connections_mutex); - // This will destroy and remove the connection from pool after the set timeout. - // We use 'this' because async calls to timer handler only occur while the pool exists. - connection->start_pool_timer(s_timeout_secs.count(), boost::bind(&asio_connection_pool::free_connection, this, boost::asio::placeholders::error, pair.first, std::weak_ptr(connection), connection->nonce())); - m_connections.push_back(connection); - } - // Otherwise connection is not put to the pool and it will go out of scope. - } + auto pool = weak_pool.lock(); + if (!pool) + return; - std::shared_ptr obtain(const std::string &pool_key, bool start_with_ssl, const std::function& ssl_context_callback) - { - if (m_is_shared) - { - std::unique_lock lock(m_connections_mutex); - auto it = m_shared_connections.find(pool_key); - if (it == m_shared_connections.end()) + std::lock_guard lock(pool->m_lock); + if (prev_epoch == pool->m_epoch && pool->m_parent_pool_cb) { - lock.unlock(); - - // No connections in pool => create a new connection instance. - return std::make_shared(m_io_service, pool_key, start_with_ssl, ssl_context_callback); + assert(pool->m_connections.empty()); + pool->m_parent_pool_cb(pool.get()); } else { - // Reuse connection from pool. - auto connection = it->second; - m_shared_connections.erase(it); - connection->start_reuse(); - lock.unlock(); - - return connection; - } - } - else - { - std::unique_lock lock(m_connections_mutex); - if (m_connections.empty()) - { - lock.unlock(); + auto erase_end = std::find_if(pool->m_connections.begin(), pool->m_connections.end(), [prev_epoch](std::pair>& p) + { + return p.first > prev_epoch; + }); - // No connections in pool => create a new connection instance. - return std::make_shared(m_io_service, start_with_ssl, ssl_context_callback); + pool->m_connections.erase(pool->m_connections.begin(), erase_end); } - else - { - // Reuse connection from pool. - auto it = m_connections.begin(); - auto connection = *it; - m_connections.erase(it); - connection->start_reuse(); - lock.unlock(); - - return connection; - } - } + start_epoch_interval(pool); + }); } - static std::shared_ptr shared_instance(); + std::mutex m_lock; + boost::asio::deadline_timer m_pool_epoch_timer; + std::deque>> m_connections; + uint64_t m_epoch = 0; -private: + const std::function m_parent_pool_cb; +}; - // Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope. - void free_shared_connection(const boost::system::error_code& ec, std::multimap>::iterator it, const std::weak_ptr &connection, const std::string &nonce) +class asio_shared_connection_pool : public std::enable_shared_from_this +{ +public: + std::shared_ptr obtain(const std::string &pool_key) { - if (!ec) + std::shared_ptr ret; + + std::lock_guard lock(m_lock); + auto it = m_pools.find(pool_key); + if (it == m_pools.end()) { - auto connection_shared = connection.lock(); - // Compare nonce here to ensure the iterator is valid, the connection not been reused. - if (connection_shared && (connection_shared->nonce() == nonce)) + std::weak_ptr weak_this = shared_from_this(); + ret = std::make_shared([pool_key, weak_this](asio_connection_pool* inner_pool_ptr) { - std::lock_guard lock(m_connections_mutex); - m_shared_connections.erase(it); - } - } - } + auto self = weak_this.lock(); + if (!self) + return; - // Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope. - void free_connection(const boost::system::error_code& ec, const std::weak_ptr &connection, uint32_t epoch) - { - if (!ec) + std::lock_guard lock(self->m_lock); + auto it = self->m_pools.find(pool_key); + if (it != self->m_pools.end() && it->second.get() == inner_pool_ptr) + self->m_pools.erase(it); + }); + m_pools.emplace(pool_key, ret); + } + else { - auto connection_shared = connection.lock(); - if (!connection_shared) - return; - - std::lock_guard lock(m_connections_mutex); - auto it = m_connections.find(connection_shared); - if (it == m_connections.end()) - // The connection was acquired while this callback was firing - return; - - // The epoch is used to ensure the connection was not quickly acquired and released while this callback was firing. - // Every acquisition increments the epoch. - if (epoch != (*it)->epoch()) - m_connections.erase(it); + ret = it->second; } + assert(ret != nullptr); + return ret; } - boost::asio::io_service& m_io_service; + static std::shared_ptr& shared_instance() + { + static std::shared_ptr s_instance = std::make_shared(); - std::mutex m_connections_mutex; - std::set> m_connections; + return s_instance; + } - static const std::chrono::seconds s_timeout_secs = 30; +private: + std::mutex m_lock; + std::unordered_map> m_pools; }; -std::shared_ptr asio_connection_pool::shared_instance() -{ - const std::chrono::seconds idle_timeout(30); // Unused sockets are kept in pool for 30 seconds. - static std::shared_ptr s_instance = std::make_shared(crossplat::threadpool::shared_instance().service(), idle_timeout, true); - - return s_instance; -} - -class asio_client : public _http_client_communicator +class asio_client final : public _http_client_communicator { public: asio_client(http::uri address, http_client_config client_config) : _http_client_communicator(std::move(address), std::move(client_config)) , m_resolver(crossplat::threadpool::shared_instance().service()) { - m_start_with_ssl = base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(); - m_ssl_context_callback = this->client_config().get_ssl_context_callback(); - const std::chrono::seconds idle_timeout(30); // Unused sockets are kept in pool for 30 seconds. + m_start_with_ssl = base_uri().scheme() == "https" && !this->client_config().proxy().is_specified(); - if (m_ssl_context_callback) + if (this->client_config().get_ssl_context_callback()) { // We will use a private connection pool because there is no better approaches to compare callback functors. - m_pool = std::make_shared(crossplat::threadpool::shared_instance().service(), idle_timeout, false); + m_pool = std::make_shared(); } else { - init_pool_key(); - m_pool = asio_connection_pool::shared_instance(); + m_pool = asio_shared_connection_pool::shared_instance()->obtain(get_pool_key()); } } @@ -479,44 +396,57 @@ class asio_client : public _http_client_communicator unsigned long open() override { return 0; } - virtual pplx::task propagate(http_request request) override; - - void init_pool_key(); + void release_connection(std::shared_ptr& conn) + { + m_pool->release(conn); + } + std::shared_ptr obtain_connection() + { + std::shared_ptr conn = m_pool->acquire(); - std::shared_ptr obtain_connection(); + if (conn == nullptr) + { + // Pool was empty. Create a new connection + conn = std::make_shared(crossplat::threadpool::shared_instance().service()); + if (m_start_with_ssl) + conn->upgrade_to_ssl(this->client_config().get_ssl_context_callback()); + } - std::shared_ptr m_pool; - tcp::resolver m_resolver; - bool m_start_with_ssl; - std::function m_ssl_context_callback; - std::string m_pool_key; -}; + return conn; + } -void asio_client::init_pool_key() -{ - m_pool_key = base_uri().to_string(); + virtual pplx::task propagate(http_request request) override; - auto &credentials = _http_client_communicator::client_config().credentials(); - if (credentials.is_set()) +private: + std::string get_pool_key() const { - m_pool_key.append(credentials.username()); - } + auto pool_key = base_uri().to_string(); - auto &proxy = _http_client_communicator::client_config().proxy(); - if (proxy.is_specified()) - { - m_pool_key.append(proxy.address().to_string()); - if (proxy.credentials().is_set()) + auto &credentials = _http_client_communicator::client_config().credentials(); + if (credentials.is_set()) { - m_pool_key.append(proxy.credentials().username()); + pool_key.append(credentials.username()); } + + auto &proxy = _http_client_communicator::client_config().proxy(); + if (proxy.is_specified()) + { + pool_key.append(proxy.address().to_string()); + if (proxy.credentials().is_set()) + { + pool_key.append(proxy.credentials().username()); + } + } + + return pool_key; } -} -std::shared_ptr asio_client::obtain_connection() -{ - return m_pool->obtain(m_pool_key, m_start_with_ssl, m_ssl_context_callback); -} + std::shared_ptr m_pool; +public: + tcp::resolver m_resolver; +private: + bool m_start_with_ssl; +}; class asio_context : public request_context, public std::enable_shared_from_this { @@ -539,7 +469,7 @@ class asio_context : public request_context, public std::enable_shared_from_this { m_timer.stop(); // Release connection back to the pool. If connection was not closed, it will be put to the pool for reuse. - std::static_pointer_cast(m_http_client)->m_pool->release(m_connection); + std::static_pointer_cast(m_http_client)->release_connection(m_connection); } static std::shared_ptr create_request_context(std::shared_ptr<_http_client_communicator> &client, http_request &request) @@ -668,9 +598,9 @@ class asio_context : public request_context, public std::enable_shared_from_this m_context->report_error(err_ss.str(), ec, httpclient_errorcode_context::readheader); return; } - - m_context->m_connection->upgrade_to_ssl(); - + + m_context->m_connection->upgrade_to_ssl(m_context->m_http_client->client_config().get_ssl_context_callback()); + m_ssl_tunnel_established(m_context); } else