Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add host based connection pool map #156

Merged
merged 8 commits into from
Jul 27, 2016
20 changes: 20 additions & 0 deletions Release/include/pplx/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
#include "pplx/pplx.h"
#endif

namespace web { namespace http { namespace client { namespace details {
class asio_connection_pool;
}}}}

namespace crossplat {

#if (defined(ANDROID) || defined(__ANDROID__))
Expand Down Expand Up @@ -97,6 +101,19 @@ class threadpool
return m_service;
}

std::shared_ptr<web::http::client::details::asio_connection_pool> obtain_connection_pool(const std::string &base_uri, std::function<std::shared_ptr<web::http::client::details::asio_connection_pool>()> connection_pool_generator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation for this function needs to go into Release\src\pplx\threadpool.cpp file instead of the header.

Since we're only using this function with a single callback, it seems simpler to me if the single callback was inlined into this function. Instead of taking the base_uri, this function could just take the http_client_config by reference and determine whether to make a new pool or to reuse a cached pool based on the base_uri() and ssl_context_callback.

{
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);

auto &pool = m_connection_pool_map[base_uri];
if (!pool)
{
pool = connection_pool_generator();
}

return pool;
}

private:
struct _cancel_thread { };

Expand Down Expand Up @@ -156,6 +173,9 @@ class threadpool
std::vector<pthread_t> m_threads;
boost::asio::io_service m_service;
boost::asio::io_service::work m_work;

std::mutex m_connection_pool_map_mutex;
std::map<std::string, std::shared_ptr<web::http::client::details::asio_connection_pool>> m_connection_pool_map;
Copy link
Contributor

@ras0219-msft ras0219-msft May 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this map ever get cleared?

One option would be to use a std::weak_ptr here instead of a std::shared_ptr so the pool will automatically clear once the last http_client using it closes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map is get cleared when the program exits. Although the connection pool will remain until the process exists, all actual connections in it have a default 30 seconds timer. The timer is started after the connection being released, and the underlying socket is closed once the timer expires.

Using std::weak_ptr here will make the pool cleared right after the last http_client using it. If another http_client which is going to use the same pool is created after that, all connections have to be re-started.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see one more public function added which clears the entire cache. I'm concerned about applications that act more like scanners or crawlers; these applications would basically have a memory leak for every host they connect to.

While finer-grained control of the cache will eventually be needed, I want to ensure we have a workaround available until we can capture some solid cases where the optimal control design becomes clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A pool timer is added to the asio_connection_pool. The pool is going to be cleared 60 seconds after the last http_client using it releases it.

};

}
56 changes: 44 additions & 12 deletions Release/src/http/client/http_client_asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,32 +325,64 @@ class asio_connection_pool
boost::asio::io_service& m_io_service;
const int m_timeout_secs;
const bool m_start_with_ssl;
const std::function<void(boost::asio::ssl::context&)>& m_ssl_context_callback;
std::function<void(boost::asio::ssl::context&)> m_ssl_context_callback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to prevent a dangling reference, because the lifetime of asio_connection_pool can be longer than the function object m_ssl_context_callback references. That function object comes from http_client_config when creating a http_client object.

std::vector<std::shared_ptr<asio_connection> > m_connections;
std::mutex m_connections_mutex;
};



class asio_client : public _http_client_communicator
{
public:
asio_client(http::uri address, http_client_config client_config)
: _http_client_communicator(std::move(address), std::move(client_config))
, m_pool(crossplat::threadpool::shared_instance().service(),
base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(),
std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds.
this->client_config().get_ssl_context_callback())
, m_resolver(crossplat::threadpool::shared_instance().service())
{}
{
if (this->client_config().get_ssl_context_callback())
{
// The pool is not added to the map because there is no better approaches to compare callback functors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that if I make multiple clients with the same host but different SSL context callbacks, the second client's callback will get dropped.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I would make the pool not added to the map if ssl_context_callback is used.

m_pool = std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(),
base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(),
std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds.
this->client_config().get_ssl_context_callback());
}
else
{
std::string host = base_uri().to_string();

auto &credentials = _http_client_communicator::client_config().credentials();
if (credentials.is_set())
{
host.append(credentials.username());
}

auto &proxy = _http_client_communicator::client_config().proxy();
if (proxy.is_specified())
{
host.append(proxy.address().to_string());
if (proxy.credentials().is_set())
{
host.append(proxy.credentials().username());
}
}

m_pool = crossplat::threadpool::shared_instance().obtain_connection_pool(host, [this]()
{
return std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(),
base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(),
std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds.
this->client_config().get_ssl_context_callback());
Copy link
Contributor

@ras0219-msft ras0219-msft Jun 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] This branch of the if already has guaranteed there is no ssl_context_callback, so passing in nullptr here is probably more clear.

});
}
}

void send_request(const std::shared_ptr<request_context> &request_ctx) override;

unsigned long open() override { return 0; }

virtual pplx::task<http_response> propagate(http_request request) override;

asio_connection_pool m_pool;
std::shared_ptr<asio_connection_pool> m_pool;
tcp::resolver m_resolver;
};

Expand All @@ -375,13 +407,13 @@ class asio_context : public request_context, public std::enable_shared_from_this
{
m_timer.stop();
// Release connection back to the pool. If connection was not closed, it will be put to the pool for reuse.
std::static_pointer_cast<asio_client>(m_http_client)->m_pool.release(m_connection);
std::static_pointer_cast<asio_client>(m_http_client)->m_pool->release(m_connection);
}

static std::shared_ptr<request_context> create_request_context(std::shared_ptr<_http_client_communicator> &client, http_request &request)
{
auto client_cast(std::static_pointer_cast<asio_client>(client));
auto connection(client_cast->m_pool.obtain());
auto connection(client_cast->m_pool->obtain());
auto ctx = std::make_shared<asio_context>(client, request, connection);
ctx->m_timer.set_ctx(std::weak_ptr<asio_context>(ctx));
return ctx;
Expand Down Expand Up @@ -458,7 +490,7 @@ class asio_context : public request_context, public std::enable_shared_from_this
m_context->m_timer.reset();
//// Replace the connection. This causes old connection object to go out of scope.
auto client = std::static_pointer_cast<asio_client>(m_context->m_http_client);
m_context->m_connection = client->m_pool.obtain();
m_context->m_connection = client->m_pool->obtain();

auto endpoint = *endpoints;
m_context->m_connection->async_connect(endpoint, boost::bind(&ssl_proxy_tunnel::handle_tcp_connect, shared_from_this(), boost::asio::placeholders::error, ++endpoints));
Expand Down Expand Up @@ -811,7 +843,7 @@ class asio_context : public request_context, public std::enable_shared_from_this
{
// Replace the connection. This causes old connection object to go out of scope.
auto client = std::static_pointer_cast<asio_client>(m_http_client);
m_connection = client->m_pool.obtain();
m_connection = client->m_pool->obtain();

auto endpoint = *endpoints;
m_connection->async_connect(endpoint, boost::bind(&asio_context::handle_connect, shared_from_this(), boost::asio::placeholders::error, ++endpoints));
Expand Down