Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add host based connection pool map #156

Merged
merged 8 commits into from
Jul 27, 2016
32 changes: 0 additions & 32 deletions Release/include/pplx/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@
#include "pplx/pplx.h"
#endif

namespace web { namespace http { namespace client { namespace details {
class asio_connection_pool;
}}}}

namespace crossplat {

#if (defined(ANDROID) || defined(__ANDROID__))
Expand Down Expand Up @@ -101,31 +97,6 @@ class threadpool
return m_service;
}

template<typename PoolGenerator>
std::shared_ptr<web::http::client::details::asio_connection_pool> obtain_connection_pool(const std::string &key, PoolGenerator pool_generator)
{
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);

auto &pool = m_connection_pool_map[key];
if (!pool)
{
pool = pool_generator();
}

return pool;
}

template<typename PoolReleaseHandler>
void release_connection_pool(const std::string &key, PoolReleaseHandler handler)
{
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);

auto pool = m_connection_pool_map[key];
handler(pool);
}

void free_connection_pool(const boost::system::error_code &ec, const std::string &key);

private:
struct _cancel_thread { };

Expand Down Expand Up @@ -185,9 +156,6 @@ class threadpool
std::vector<pthread_t> m_threads;
boost::asio::io_service m_service;
boost::asio::io_service::work m_work;

std::mutex m_connection_pool_map_mutex;
std::map<std::string, std::shared_ptr<web::http::client::details::asio_connection_pool>> m_connection_pool_map;
};

}
109 changes: 72 additions & 37 deletions Release/src/http/client/http_client_asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,28 @@ enum class httpclient_errorcode_context
};

class asio_connection_pool;

class asio_connection_pool_map
{
public:
static asio_connection_pool_map &instance();

std::shared_ptr<asio_connection_pool> obtain_connection_pool(const std::string &pool_key, bool start_with_ssl, std::chrono::seconds idle_timeout);

void release_connection_pool(const std::string &pool_key);

void free_connection_pool(const boost::system::error_code &ec, const std::string &pool_key);

private:
asio_connection_pool_map()
: m_threadpool(crossplat::threadpool::shared_instance())
{}

crossplat::threadpool &m_threadpool;
std::mutex m_connection_pool_map_mutex;
std::map<std::string, std::shared_ptr<asio_connection_pool>> m_connection_pool_map;
};

class asio_connection
{
friend class asio_connection_pool;
Expand Down Expand Up @@ -257,8 +279,7 @@ class asio_connection_pool
m_start_with_ssl(start_with_ssl),
m_ssl_context_callback(ssl_context_callback),
m_pool_timeout_secs(60), // Clean this connection pool 60 secs after the last asio_client release it.
m_pool_timer(io_service),
m_use_count(0)
m_pool_timer(io_service)
{}

~asio_connection_pool()
Expand Down Expand Up @@ -320,11 +341,6 @@ class asio_connection_pool
}
}

int &use_count()
{
return m_use_count;
}

private:

// Using weak_ptr here ensures bind() to this handler will not prevent the connection object from going out of scope.
Expand Down Expand Up @@ -354,9 +370,49 @@ class asio_connection_pool

const int m_pool_timeout_secs;
boost::asio::deadline_timer m_pool_timer;
int m_use_count;
};

asio_connection_pool_map &asio_connection_pool_map::instance()
{
static asio_connection_pool_map s_instance;
return s_instance;
}

std::shared_ptr<asio_connection_pool> asio_connection_pool_map::obtain_connection_pool(const std::string &pool_key, bool start_with_ssl, std::chrono::seconds idle_timeout)
{
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);
auto &pool = m_connection_pool_map[pool_key];
if (!pool)
{
pool = std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(), start_with_ssl, idle_timeout, nullptr);
}

pool->cancel_pool_timer();
return pool;
}

void asio_connection_pool_map::release_connection_pool(const std::string &pool_key)
{
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);
auto &pool = m_connection_pool_map[pool_key];
if (pool)
{
pool->start_pool_timer(boost::bind(&asio_connection_pool_map::free_connection_pool, this, boost::asio::placeholders::error, pool_key));
}
}

void asio_connection_pool_map::free_connection_pool(const boost::system::error_code &ec, const std::string &pool_key)
{
if (!ec)
{
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);
auto &pool = m_connection_pool_map[pool_key];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will allocate a new pool shared_ptr if there is not a map entry. It would probably be better to use .find(pool_key) to avoid allocating a new map entry just to erase it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

if (pool && pool.use_count() == 1)
{
m_connection_pool_map.erase(pool_key);
}
}
}

class asio_client : public _http_client_communicator
{
Expand All @@ -365,13 +421,14 @@ class asio_client : public _http_client_communicator
: _http_client_communicator(std::move(address), std::move(client_config))
, m_resolver(crossplat::threadpool::shared_instance().service())
{
if (this->client_config().get_ssl_context_callback())
bool start_with_ssl = base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified();
const std::chrono::seconds idle_timeout(30); // Unused sockets are kept in pool for 30 seconds.
auto ssl_context_callback = this->client_config().get_ssl_context_callback();

if (ssl_context_callback)
{
// The pool is not added to the map because there is no better approaches to compare callback functors.
m_pool = std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(),
base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(),
std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds.
this->client_config().get_ssl_context_callback());
m_pool = std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(), start_with_ssl, idle_timeout, ssl_context_callback);
}
else
{
Expand All @@ -393,37 +450,15 @@ class asio_client : public _http_client_communicator
}
}

m_pool = crossplat::threadpool::shared_instance().obtain_connection_pool(m_pool_key, [this]()
{
return std::make_shared<asio_connection_pool>(crossplat::threadpool::shared_instance().service(),
base_uri().scheme() == "https" && !_http_client_communicator::client_config().proxy().is_specified(),
std::chrono::seconds(30), // Unused sockets are kept in pool for 30 seconds.
nullptr);
});

if (m_pool->use_count() == 0)
{
m_pool->cancel_pool_timer();
}
++m_pool->use_count();
m_pool = asio_connection_pool_map::instance().obtain_connection_pool(m_pool_key, start_with_ssl, idle_timeout);
}
}

~asio_client()
{
if (!m_pool_key.empty())
{
crossplat::threadpool::shared_instance().release_connection_pool(m_pool_key, [this](std::shared_ptr<web::http::client::details::asio_connection_pool> pool)
{
if (pool)
{
--pool->use_count();
if (pool->use_count() == 0)
{
pool->start_pool_timer(boost::bind(&crossplat::threadpool::free_connection_pool, &crossplat::threadpool::shared_instance(), boost::asio::placeholders::error, m_pool_key));
}
}
});
asio_connection_pool_map::instance().release_connection_pool(m_pool_key);
}
}

Expand Down
9 changes: 0 additions & 9 deletions Release/src/pplx/threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,6 @@ threadpool& threadpool::shared_instance()
return s_shared;
}

void threadpool::free_connection_pool(const boost::system::error_code &ec, const std::string &key)
{
if (!ec)
{
std::lock_guard<std::mutex> lg(m_connection_pool_map_mutex);
m_connection_pool_map.erase(key);
}
}

#endif

}
Expand Down