diff --git a/Release/include/cpprest/details/http_server_asio.h b/Release/include/cpprest/details/http_server_asio.h index 95c1745083..e34435d09c 100644 --- a/Release/include/cpprest/details/http_server_asio.h +++ b/Release/include/cpprest/details/http_server_asio.h @@ -87,7 +87,10 @@ class connection if (is_https) { m_ssl_context = utility::details::make_unique(boost::asio::ssl::context::sslv23); - ssl_context_callback(*m_ssl_context); + if (ssl_context_callback) + { + ssl_context_callback(*m_ssl_context); + } m_ssl_stream = utility::details::make_unique>(*m_socket, *m_ssl_context); m_ssl_stream->async_handshake(boost::asio::ssl::stream_base::server, [this](const boost::system::error_code&) { this->start_request_response(); }); diff --git a/Release/include/cpprest/http_client.h b/Release/include/cpprest/http_client.h index 8f9e0f9f8a..e5d2f3677a 100644 --- a/Release/include/cpprest/http_client.h +++ b/Release/include/cpprest/http_client.h @@ -100,7 +100,6 @@ class http_client_config #endif , m_set_user_nativehandle_options([](native_handle)->void{}) #if !defined(_WIN32) && !defined(__cplusplus_winrt) - , m_ssl_context_callback([](boost::asio::ssl::context&)->void{}) , m_tlsext_sni_enabled(true) #endif #if defined(_WIN32) && !defined(__cplusplus_winrt) diff --git a/Release/include/pplx/threadpool.h b/Release/include/pplx/threadpool.h index f28c51ea31..5ec939ddf5 100644 --- a/Release/include/pplx/threadpool.h +++ b/Release/include/pplx/threadpool.h @@ -101,19 +101,31 @@ class threadpool return m_service; } - std::shared_ptr obtain_connection_pool(const std::string &base_uri, std::function()> connection_pool_generator) + template + std::shared_ptr obtain_connection_pool(const std::string &key, PoolGenerator pool_generator) { std::lock_guard lg(m_connection_pool_map_mutex); - auto &pool = m_connection_pool_map[base_uri]; + auto &pool = m_connection_pool_map[key]; if (!pool) { - pool = connection_pool_generator(); + pool = pool_generator(); } return pool; } + template + void release_connection_pool(const std::string &key, PoolReleaseHandler handler) + { + std::lock_guard lg(m_connection_pool_map_mutex); + + auto pool = m_connection_pool_map[key]; + handler(pool); + } + + void free_connection_pool(const boost::system::error_code &ec, const std::string &key); + private: struct _cancel_thread { }; diff --git a/Release/src/http/client/http_client_asio.cpp b/Release/src/http/client/http_client_asio.cpp index 0b1d30dce6..613b18de46 100644 --- a/Release/src/http/client/http_client_asio.cpp +++ b/Release/src/http/client/http_client_asio.cpp @@ -80,7 +80,7 @@ class asio_connection asio_connection(boost::asio::io_service& io_service, bool start_with_ssl, const std::function& ssl_context_callback) : m_socket(io_service), m_ssl_context_callback(ssl_context_callback), - m_pool_timer(io_service), + m_connection_timer(io_service), m_is_reused(false), m_keep_alive(true) { @@ -103,7 +103,10 @@ class asio_connection boost::asio::ssl::context ssl_context(boost::asio::ssl::context::sslv23); ssl_context.set_default_verify_paths(); ssl_context.set_options(boost::asio::ssl::context::default_workarounds); - m_ssl_context_callback(ssl_context); + if (m_ssl_context_callback) + { + m_ssl_context_callback(ssl_context); + } m_ssl_stream = utility::details::make_unique>(m_socket, ssl_context); } @@ -127,9 +130,9 @@ class asio_connection return error; } - void cancel_pool_timer() + void cancel_connection_timer() { - m_pool_timer.cancel(); + m_connection_timer.cancel(); } bool is_reused() const { return m_is_reused; } @@ -218,15 +221,15 @@ class asio_connection private: template - void start_pool_timer(int timeout_secs, const TimeoutHandler &handler) + void start_connection_timer(int timeout_secs, const TimeoutHandler &handler) { - m_pool_timer.expires_from_now(boost::posix_time::milliseconds(timeout_secs * 1000)); - m_pool_timer.async_wait(handler); + m_connection_timer.expires_from_now(boost::posix_time::milliseconds(timeout_secs * 1000)); + m_connection_timer.async_wait(handler); } void start_reuse() { - cancel_pool_timer(); + cancel_connection_timer(); m_is_reused = true; } @@ -239,7 +242,7 @@ class asio_connection std::function m_ssl_context_callback; - boost::asio::deadline_timer m_pool_timer; + boost::asio::deadline_timer m_connection_timer; bool m_is_reused; bool m_keep_alive; }; @@ -252,7 +255,10 @@ class asio_connection_pool m_io_service(io_service), m_timeout_secs(static_cast(idle_timeout.count())), m_start_with_ssl(start_with_ssl), - m_ssl_context_callback(ssl_context_callback) + m_ssl_context_callback(ssl_context_callback), + m_pool_timeout_secs(60), // Clean this connection pool 60 secs after the last asio_client release it. + m_pool_timer(io_service), + m_use_count(0) {} ~asio_connection_pool() @@ -261,10 +267,24 @@ class asio_connection_pool // Cancel the pool timer for all connections. for (auto& connection : m_connections) { - connection->cancel_pool_timer(); + connection->cancel_connection_timer(); } } + template + void start_pool_timer(const TimeoutHandler &handler) + { + //std::lock_guard lg(m_pool_timer_mutex); + m_pool_timer.expires_from_now(boost::posix_time::milliseconds(m_pool_timeout_secs * 1000)); + m_pool_timer.async_wait(handler); + } + + void cancel_pool_timer() + { + //std::lock_guard lg(m_pool_timer_mutex); + m_pool_timer.cancel(); + } + void release(const std::shared_ptr &connection) { if (connection->keep_alive() && (m_timeout_secs > 0)) @@ -274,7 +294,7 @@ class asio_connection_pool std::lock_guard lock(m_connections_mutex); // This will destroy and remove the connection from pool after the set timeout. // We use 'this' because async calls to timer handler only occur while the pool exists. - connection->start_pool_timer(m_timeout_secs, boost::bind(&asio_connection_pool::handle_pool_timer, this, boost::asio::placeholders::error, connection)); + connection->start_connection_timer(m_timeout_secs, boost::bind(&asio_connection_pool::handle_connection_timer, this, boost::asio::placeholders::error, connection)); m_connections.push_back(connection); } // Otherwise connection is not put to the pool and it will go out of scope. @@ -302,10 +322,15 @@ class asio_connection_pool } } + int &use_count() + { + return m_use_count; + } + private: // Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope. - void handle_pool_timer(const boost::system::error_code& ec, const std::weak_ptr &connection) + void handle_connection_timer(const boost::system::error_code& ec, const std::weak_ptr &connection) { if (!ec) { @@ -328,6 +353,12 @@ class asio_connection_pool std::function m_ssl_context_callback; std::vector > m_connections; std::mutex m_connections_mutex; + + const int m_pool_timeout_secs; + std::mutex m_pool_timer_mutex; + boost::asio::deadline_timer m_pool_timer; + //std::atomic m_use_count; + int m_use_count; }; @@ -348,32 +379,56 @@ class asio_client : public _http_client_communicator } else { - std::string host = base_uri().to_string(); + m_pool_key = base_uri().to_string(); auto &credentials = _http_client_communicator::client_config().credentials(); if (credentials.is_set()) { - host.append(credentials.username()); + m_pool_key.append(credentials.username()); } auto &proxy = _http_client_communicator::client_config().proxy(); if (proxy.is_specified()) { - host.append(proxy.address().to_string()); + m_pool_key.append(proxy.address().to_string()); if (proxy.credentials().is_set()) { - host.append(proxy.credentials().username()); + m_pool_key.append(proxy.credentials().username()); } } - m_pool = crossplat::threadpool::shared_instance().obtain_connection_pool(host, [this]() + m_pool = crossplat::threadpool::shared_instance().obtain_connection_pool(m_pool_key, [this]() { return std::make_shared(crossplat::threadpool::shared_instance().service(), base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(), std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds. - this->client_config().get_ssl_context_callback()); + nullptr); + }); + + if (m_pool->use_count() == 0) + { + m_pool->cancel_pool_timer(); + } + ++m_pool->use_count(); + } + } + + ~asio_client() + { + if (!m_pool_key.empty()) + { + crossplat::threadpool::shared_instance().release_connection_pool(m_pool_key, [this](std::shared_ptr pool) + { + if (pool) + { + --pool->use_count(); + if (pool->use_count() == 0) + { + pool->start_pool_timer(boost::bind(&crossplat::threadpool::free_connection_pool, &crossplat::threadpool::shared_instance(), boost::asio::placeholders::error, m_pool_key)); + } + } }); - } + } } void send_request(const std::shared_ptr &request_ctx) override; @@ -384,6 +439,7 @@ class asio_client : public _http_client_communicator std::shared_ptr m_pool; tcp::resolver m_resolver; + std::string m_pool_key; }; class asio_context : public request_context, public std::enable_shared_from_this @@ -509,7 +565,7 @@ class asio_context : public request_context, public std::enable_shared_from_this { m_context->report_error("Failed to send connect request to proxy.", err, httpclient_errorcode_context::writebody); } - } + } void handle_status_line(const boost::system::error_code& ec) { diff --git a/Release/src/pplx/threadpool.cpp b/Release/src/pplx/threadpool.cpp index 729dae85d9..16c1bfbce9 100644 --- a/Release/src/pplx/threadpool.cpp +++ b/Release/src/pplx/threadpool.cpp @@ -67,6 +67,15 @@ threadpool& threadpool::shared_instance() return s_shared; } +void threadpool::free_connection_pool(const boost::system::error_code &ec, const std::string &key) +{ + if (!ec) + { + std::lock_guard lg(m_connection_pool_map_mutex); + m_connection_pool_map.erase(key); + } +} + #endif }