Skip to content

Commit

Permalink
Revert global connection pool, but leave improved individual pools. P…
Browse files Browse the repository at this point in the history
…ools no longer need to self-reference and are destroyed with the owning http_client.
  • Loading branch information
ras0219-msft committed Aug 2, 2016
1 parent 21eda51 commit 6cd6392
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 155 deletions.
6 changes: 3 additions & 3 deletions Release/src/http/client/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ const uri & _http_client_communicator::base_uri() const
return m_uri;
}

_http_client_communicator::_http_client_communicator(http::uri address, http_client_config client_config)
_http_client_communicator::_http_client_communicator(http::uri&& address, http_client_config&& client_config)
: m_uri(std::move(address)), m_client_config(std::move(client_config)), m_opened(false), m_scheduled(0)
{
}
Expand Down Expand Up @@ -370,12 +370,12 @@ http_client::http_client(const uri &base_uri, const http_client_config &client_c
uribuilder.set_scheme(_XPLATSTR("http"));
uri uriWithScheme = uribuilder.to_uri();
verify_uri(uriWithScheme);
final_pipeline_stage = details::create_platform_final_pipeline_stage(uriWithScheme, client_config);
final_pipeline_stage = details::create_platform_final_pipeline_stage(std::move(uriWithScheme), http_client_config(client_config));
}
else
{
verify_uri(base_uri);
final_pipeline_stage = details::create_platform_final_pipeline_stage(base_uri, client_config);
final_pipeline_stage = details::create_platform_final_pipeline_stage(uri(base_uri), http_client_config(client_config));
}

m_pipeline = std::make_shared<http_pipeline>(std::move(final_pipeline_stage));
Expand Down
172 changes: 28 additions & 144 deletions Release/src/http/client/http_client_asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,10 @@ class asio_connection
///
/// During the cleanup phase, connections are removed starting with the oldest. This
/// ensures that if a high intensity workload is followed by a low intensity workload,
/// the connection pool will correctly adapt to the current workload. Specifically,
/// the following code will eventually result in a maximum of one pooled connection
/// regardless of the initial number of pooled connections:
/// the connection pool will correctly adapt to the low intensity workload.
///
/// Specifically, the following code will eventually result in a maximum of one pooled
/// connection regardless of the initial number of pooled connections:
/// <code>
/// while(1)
/// {
Expand All @@ -246,18 +247,11 @@ class asio_connection
/// pool.release(conn);
/// }
/// </code>
///
/// Additionally, when two cleanup phases have occurred with no calls to `release()`
/// between them, the internal self-reference is cleared. If there are no active
/// `http_client`s keeping the pool alive, this will cause the pool to expire upon
/// cleanup handler termination. Whenever a new call to `release()` arrives, the self
/// reference is re-applied to keep the pool alive.
/// </remarks>
class asio_connection_pool : public std::enable_shared_from_this<asio_connection_pool>
{
public:
asio_connection_pool()
: m_pool_epoch_timer(crossplat::threadpool::shared_instance().service())
asio_connection_pool() : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service())
{}

std::shared_ptr<asio_connection> acquire()
Expand All @@ -281,29 +275,29 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
return;

std::lock_guard<std::mutex> lock(m_lock);
if (m_self_reference == nullptr)
if (!is_timer_running)
{
auto sptr = this->shared_from_this();
m_self_reference = sptr;
start_epoch_interval(sptr);
start_epoch_interval(shared_from_this());
is_timer_running = true;
}

m_epoch++;
m_connections.emplace_back(m_epoch, connection);
m_connections.emplace_back(m_epoch, std::move(connection));
}

private:
// Note: must be called under m_lock
static void start_epoch_interval(const std::shared_ptr<asio_connection_pool>& pool) {
static void start_epoch_interval(const std::shared_ptr<asio_connection_pool>& pool)
{
_ASSERTE(pool.get() != nullptr);
_ASSERTE(pool->m_self_reference != nullptr);

auto& self = *pool;
std::weak_ptr<asio_connection_pool> weak_pool = pool;

self.m_prev_epoch = self.m_epoch;
pool->m_pool_epoch_timer.expires_from_now(boost::posix_time::seconds(30));
pool->m_pool_epoch_timer.async_wait([weak_pool](const boost::system::error_code& ec) {
pool->m_pool_epoch_timer.async_wait([weak_pool](const boost::system::error_code& ec)
{
if (ec)
return;

Expand All @@ -313,11 +307,11 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
auto& self = *pool;

std::lock_guard<std::mutex> lock(self.m_lock);
_ASSERTE(self.m_self_reference != nullptr);
if (self.m_prev_epoch == self.m_epoch)
{
self.m_connections.clear();
self.m_self_reference = nullptr;
self.is_timer_running = false;
return;
}
else
{
Expand All @@ -335,109 +329,23 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
}

std::mutex m_lock;
boost::asio::deadline_timer m_pool_epoch_timer;
std::deque<std::pair<uint64_t, std::shared_ptr<asio_connection>>> m_connections;

uint64_t m_epoch = 0;
uint64_t m_prev_epoch = 0;

std::shared_ptr<asio_connection_pool> m_self_reference;
};

class asio_shared_connection_pool : public std::enable_shared_from_this<asio_shared_connection_pool>
{
public:
std::shared_ptr<asio_connection_pool> obtain(const std::string &pool_key)
{
std::shared_ptr<asio_connection_pool> ret;

std::lock_guard<std::mutex> lock(m_lock);
auto it = m_pools.find(pool_key);
if (it != m_pools.end())
{
ret = it->second.lock();
if (ret == nullptr)
{
// Previous pool expired
ret = std::make_shared<asio_connection_pool>();
it->second = ret;
}
}
else
{
if (m_pools.empty())
{
// If transitioning from empty to having a single element, restart the timer.
start_timer(shared_from_this());
}
ret = std::make_shared<asio_connection_pool>();
m_pools.emplace(pool_key, ret);
}

assert(ret != nullptr);
return ret;
}

static std::shared_ptr<asio_shared_connection_pool>& shared_instance()
{
static std::shared_ptr<asio_shared_connection_pool> s_instance = std::make_shared<asio_shared_connection_pool>();

return s_instance;
}

asio_shared_connection_pool() : m_timer(crossplat::threadpool::shared_instance().service()) {}

private:
static void start_timer(const std::shared_ptr<asio_shared_connection_pool>& self)
{
self->m_timer.expires_from_now(boost::posix_time::seconds(60));
std::weak_ptr<asio_shared_connection_pool> weak_this = self;
self->m_timer.async_wait([weak_this](const boost::system::error_code& ec)
{
if (ec)
return;
auto strong_this = weak_this.lock();
if (!strong_this)
return;

std::lock_guard<std::mutex> lock(strong_this->m_lock);
auto b = strong_this->m_pools.begin();
auto e = strong_this->m_pools.end();
for (; b != e;)
{
if (b->second.expired())
b = strong_this->m_pools.erase(b);
else
++b;
}
if (!strong_this->m_pools.empty())
start_timer(strong_this);
});
}

boost::asio::deadline_timer m_timer;
std::mutex m_lock;
std::unordered_map<std::string, std::weak_ptr<asio_connection_pool>> m_pools;
bool is_timer_running = false;
boost::asio::deadline_timer m_pool_epoch_timer;
};

class asio_client final : public _http_client_communicator
{
public:
asio_client(http::uri address, http_client_config client_config)
: _http_client_communicator(std::move(address), std::move(client_config))
, m_resolver(crossplat::threadpool::shared_instance().service())
{
m_start_with_ssl = base_uri().scheme() == "https" && !this->client_config().proxy().is_specified();

if (this->client_config().get_ssl_context_callback())
{
// We will use a private connection pool because there is no better approaches to compare callback functors.
m_pool = std::make_shared<asio_connection_pool>();
}
else
{
m_pool = asio_shared_connection_pool::shared_instance()->obtain(get_pool_key());
}
}
asio_client(http::uri&& address, http_client_config&& client_config)
: _http_client_communicator(std::move(address), std::move(client_config))
, m_resolver(crossplat::threadpool::shared_instance().service())
, m_pool(std::make_shared<asio_connection_pool>())
, m_start_with_ssl(base_uri().scheme() == "https" && !this->client_config().proxy().is_specified())
{}

void send_request(const std::shared_ptr<request_context> &request_ctx) override;

Expand All @@ -464,35 +372,11 @@ class asio_client final : public _http_client_communicator

virtual pplx::task<http_response> propagate(http_request request) override;

private:
std::string get_pool_key() const
{
auto pool_key = base_uri().to_string();

auto &credentials = _http_client_communicator::client_config().credentials();
if (credentials.is_set())
{
pool_key.append(credentials.username());
}

auto &proxy = _http_client_communicator::client_config().proxy();
if (proxy.is_specified())
{
pool_key.append(proxy.address().to_string());
if (proxy.credentials().is_set())
{
pool_key.append(proxy.credentials().username());
}
}

return pool_key;
}

std::shared_ptr<asio_connection_pool> m_pool;
public:
tcp::resolver m_resolver;
private:
bool m_start_with_ssl;
const std::shared_ptr<asio_connection_pool> m_pool;
const bool m_start_with_ssl;
};

class asio_context : public request_context, public std::enable_shared_from_this<asio_context>
Expand Down Expand Up @@ -1612,9 +1496,9 @@ class asio_context : public request_context, public std::enable_shared_from_this
};


std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config)
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config)
{
return std::make_shared<asio_client>(base_uri, client_config);
return std::make_shared<asio_client>(std::move(base_uri), std::move(client_config));
}

void asio_client::send_request(const std::shared_ptr<request_context> &request_ctx)
Expand Down
6 changes: 3 additions & 3 deletions Release/src/http/client/http_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class _http_client_communicator : public http_pipeline_stage
{
public:

virtual ~_http_client_communicator() {}
virtual ~_http_client_communicator() override = default;

// Asynchronously send a HTTP request and process the response.
void async_send_request(const std::shared_ptr<request_context> &request);
Expand All @@ -112,7 +112,7 @@ class _http_client_communicator : public http_pipeline_stage
const uri & base_uri() const;

protected:
_http_client_communicator(http::uri address, http_client_config client_config);
_http_client_communicator(http::uri&& address, http_client_config&& client_config);

// Method to open client.
virtual unsigned long open() = 0;
Expand Down Expand Up @@ -146,6 +146,6 @@ class _http_client_communicator : public http_pipeline_stage
/// <summary>
/// Factory function implemented by the separate platforms to construct their subclasses of _http_client_communicator
/// </summary>
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config);
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config);

}}}}
4 changes: 2 additions & 2 deletions Release/src/http/client/http_client_winhttp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1294,9 +1294,9 @@ class winhttp_client : public _http_client_communicator
bool m_secure;
};

std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config)
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config)
{
return std::make_shared<details::winhttp_client>(std::move(base_uri), client_config);
return std::make_shared<details::winhttp_client>(std::move(base_uri), std::move(client_config));
}

}}}}
6 changes: 3 additions & 3 deletions Release/src/http/client/http_client_winrt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class IResponseStream
class winrt_client : public _http_client_communicator
{
public:
winrt_client(http::uri address, http_client_config client_config)
winrt_client(http::uri&& address, http_client_config&& client_config)
: _http_client_communicator(std::move(address), std::move(client_config)) { }

winrt_client(const winrt_client&) = delete;
Expand Down Expand Up @@ -560,9 +560,9 @@ class winrt_client : public _http_client_communicator
}
};

std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config)
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config)
{
return std::make_shared<details::winrt_client>(std::move(base_uri), client_config);
return std::make_shared<details::winrt_client>(std::move(base_uri), std::move(client_config));
}

}}}}

0 comments on commit 6cd6392

Please sign in to comment.