From 6e23e697813541d19e5b2dbcd7b6ec0895c53dbb Mon Sep 17 00:00:00 2001 From: Han Zhu Date: Thu, 12 May 2016 05:58:48 +0000 Subject: [PATCH] add host based connection pool map --- Release/include/pplx/threadpool.h | 20 +++++++++ Release/src/http/client/http_client_asio.cpp | 45 ++++++++++++++------ 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/Release/include/pplx/threadpool.h b/Release/include/pplx/threadpool.h index c778040737..f28c51ea31 100644 --- a/Release/include/pplx/threadpool.h +++ b/Release/include/pplx/threadpool.h @@ -42,6 +42,10 @@ #include "pplx/pplx.h" #endif +namespace web { namespace http { namespace client { namespace details { + class asio_connection_pool; +}}}} + namespace crossplat { #if (defined(ANDROID) || defined(__ANDROID__)) @@ -97,6 +101,19 @@ class threadpool return m_service; } + std::shared_ptr obtain_connection_pool(const std::string &base_uri, std::function()> connection_pool_generator) + { + std::lock_guard lg(m_connection_pool_map_mutex); + + auto &pool = m_connection_pool_map[base_uri]; + if (!pool) + { + pool = connection_pool_generator(); + } + + return pool; + } + private: struct _cancel_thread { }; @@ -156,6 +173,9 @@ class threadpool std::vector m_threads; boost::asio::io_service m_service; boost::asio::io_service::work m_work; + + std::mutex m_connection_pool_map_mutex; + std::map> m_connection_pool_map; }; } diff --git a/Release/src/http/client/http_client_asio.cpp b/Release/src/http/client/http_client_asio.cpp index 72eeb0d8df..f9b1747c52 100644 --- a/Release/src/http/client/http_client_asio.cpp +++ b/Release/src/http/client/http_client_asio.cpp @@ -325,24 +325,45 @@ class asio_connection_pool boost::asio::io_service& m_io_service; const int m_timeout_secs; const bool m_start_with_ssl; - const std::function& m_ssl_context_callback; + std::function m_ssl_context_callback; std::vector > m_connections; std::mutex m_connections_mutex; }; - class asio_client : public _http_client_communicator { public: asio_client(http::uri address, http_client_config client_config) : _http_client_communicator(std::move(address), std::move(client_config)) - , m_pool(crossplat::threadpool::shared_instance().service(), - base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(), - std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds. - this->client_config().get_ssl_context_callback()) , m_resolver(crossplat::threadpool::shared_instance().service()) - {} + { + std::string host = base_uri().to_string(); + + auto &credentials = _http_client_communicator::client_config().credentials(); + if (credentials.is_set()) + { + host.append(credentials.username()); + } + + auto &proxy = _http_client_communicator::client_config().proxy(); + if (proxy.is_specified()) + { + host.append(proxy.address().to_string()); + if (proxy.credentials().is_set()) + { + host.append(proxy.credentials().username()); + } + } + + m_pool = crossplat::threadpool::shared_instance().obtain_connection_pool(host, [this]() + { + return std::make_shared(crossplat::threadpool::shared_instance().service(), + base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(), + std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds. + this->client_config().get_ssl_context_callback()); + }); + } void send_request(const std::shared_ptr &request_ctx) override; @@ -350,7 +371,7 @@ class asio_client : public _http_client_communicator virtual pplx::task propagate(http_request request) override; - asio_connection_pool m_pool; + std::shared_ptr m_pool; tcp::resolver m_resolver; }; @@ -375,13 +396,13 @@ class asio_context : public request_context, public std::enable_shared_from_this { m_timer.stop(); // Release connection back to the pool. If connection was not closed, it will be put to the pool for reuse. - std::static_pointer_cast(m_http_client)->m_pool.release(m_connection); + std::static_pointer_cast(m_http_client)->m_pool->release(m_connection); } static std::shared_ptr create_request_context(std::shared_ptr<_http_client_communicator> &client, http_request &request) { auto client_cast(std::static_pointer_cast(client)); - auto connection(client_cast->m_pool.obtain()); + auto connection(client_cast->m_pool->obtain()); auto ctx = std::make_shared(client, request, connection); ctx->m_timer.set_ctx(std::weak_ptr(ctx)); return ctx; @@ -458,7 +479,7 @@ class asio_context : public request_context, public std::enable_shared_from_this m_context->m_timer.reset(); //// Replace the connection. This causes old connection object to go out of scope. auto client = std::static_pointer_cast(m_context->m_http_client); - m_context->m_connection = client->m_pool.obtain(); + m_context->m_connection = client->m_pool->obtain(); auto endpoint = *endpoints; m_context->m_connection->async_connect(endpoint, boost::bind(&ssl_proxy_tunnel::handle_tcp_connect, shared_from_this(), boost::asio::placeholders::error, ++endpoints)); @@ -811,7 +832,7 @@ class asio_context : public request_context, public std::enable_shared_from_this { // Replace the connection. This causes old connection object to go out of scope. auto client = std::static_pointer_cast(m_http_client); - m_connection = client->m_pool.obtain(); + m_connection = client->m_pool->obtain(); auto endpoint = *endpoints; m_connection->async_connect(endpoint, boost::bind(&asio_context::handle_connect, shared_from_this(), boost::asio::placeholders::error, ++endpoints));