Skip to content

Commit

Permalink
Improved asio connection pooling implementation.
Browse files Browse the repository at this point in the history
Removed asio_connection inherits from enable_shared_from_this.
Replaced asio_connection_pool::m_parent_pool_cb with a self-referencing shared_ptr.
Added asio_shared_connection_pool::m_timer to periodically clear out expired connection pools.
  • Loading branch information
ras0219-msft committed Jul 5, 2016
1 parent 0b7fd42 commit eaacac4
Showing 1 changed file with 84 additions and 37 deletions.
121 changes: 84 additions & 37 deletions Release/src/http/client/http_client_asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ enum class httpclient_errorcode_context
close
};

class asio_connection : public std::enable_shared_from_this<asio_connection>
class asio_connection
{
friend class asio_client;
public:
Expand Down Expand Up @@ -222,7 +222,7 @@ class asio_connection : public std::enable_shared_from_this<asio_connection>
bool m_keep_alive;
};

/// <summary>Implements a connection pool with adaptive connection removal and callback notification on idle</summary>
/// <summary>Implements a connection pool with adaptive connection removal</summary>
/// <remarks>
/// The timeout mechanism is based on the `uint64_t m_epoch` member. Every 30 seconds,
/// the lambda in `start_epoch_interval` fires, triggering the cleanup of any
Expand All @@ -247,20 +247,18 @@ class asio_connection : public std::enable_shared_from_this<asio_connection>
/// }
/// </code>
///
/// Additionally, idle notification is only triggered when two cleanup phases have
/// occurred with no calls to `acquire()` between them. This prevents a race
/// condition where the cleanup might occur when the pool happens to be
/// instantaneously empty but still under heavy load.
/// Additionally, when two cleanup phases have occurred with no calls to `release()`
/// between them, the internal self-reference is cleared. If there are no active
/// `http_client`s keeping the pool alive, this will cause the pool to expire upon
/// cleanup handler termination. Whenever a new call to `release()` arrives, the self
/// reference is re-applied to keep the pool alive.
/// </remarks>
class asio_connection_pool : public std::enable_shared_from_this<asio_connection_pool>
{
public:
asio_connection_pool(const std::function<void(asio_connection_pool*)>& m_cb = {})
: m_pool_epoch_timer(crossplat::threadpool::shared_instance().service()),
m_parent_pool_cb(m_cb)
{
start_epoch_interval(shared_from_this());
}
asio_connection_pool()
: m_pool_epoch_timer(crossplat::threadpool::shared_instance().service())
{}

std::shared_ptr<asio_connection> acquire()
{
Expand All @@ -283,48 +281,66 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
return;

std::lock_guard<std::mutex> lock(m_lock);
if (m_self_reference == nullptr)
{
auto sptr = this->shared_from_this();
m_self_reference = sptr;
start_epoch_interval(sptr);
}

m_epoch++;
m_connections.emplace_back(m_epoch, connection);
}

private:
// Note: must be called under m_lock
static void start_epoch_interval(const std::shared_ptr<asio_connection_pool>& pool) {
_ASSERTE(pool.get() != nullptr);
_ASSERTE(pool->m_self_reference != nullptr);

auto& self = *pool;
std::weak_ptr<asio_connection_pool> weak_pool = pool;
auto prev_epoch = pool->m_epoch;

self.m_prev_epoch = self.m_epoch;
pool->m_pool_epoch_timer.expires_from_now(boost::posix_time::seconds(30));
pool->m_pool_epoch_timer.async_wait([prev_epoch, weak_pool](const boost::system::error_code& ec) {
pool->m_pool_epoch_timer.async_wait([weak_pool](const boost::system::error_code& ec) {
if (ec)
return;

auto pool = weak_pool.lock();
if (!pool)
return;
auto& self = *pool;

std::lock_guard<std::mutex> lock(pool->m_lock);
if (prev_epoch == pool->m_epoch && pool->m_parent_pool_cb)
std::lock_guard<std::mutex> lock(self.m_lock);
_ASSERTE(self.m_self_reference != nullptr);
if (self.m_prev_epoch == self.m_epoch)
{
assert(pool->m_connections.empty());
pool->m_parent_pool_cb(pool.get());
assert(self.m_connections.empty());
self.m_self_reference = nullptr;
}
else
{
auto erase_end = std::find_if(pool->m_connections.begin(), pool->m_connections.end(), [prev_epoch](std::pair<uint64_t, std::shared_ptr<asio_connection>>& p)
auto prev_epoch = self.m_prev_epoch;
auto erase_end = std::find_if(self.m_connections.begin(), self.m_connections.end(),
[prev_epoch](std::pair<uint64_t, std::shared_ptr<asio_connection>>& p)
{
return p.first > prev_epoch;
});

pool->m_connections.erase(pool->m_connections.begin(), erase_end);
self.m_connections.erase(self.m_connections.begin(), erase_end);
start_epoch_interval(pool);
}
start_epoch_interval(pool);
});
}

std::mutex m_lock;
boost::asio::deadline_timer m_pool_epoch_timer;
std::deque<std::pair<uint64_t, std::shared_ptr<asio_connection>>> m_connections;
uint64_t m_epoch = 0;
uint64_t m_prev_epoch = 0;

const std::function<void(asio_connection_pool*)> m_parent_pool_cb;
std::shared_ptr<asio_connection_pool> m_self_reference;
};

class asio_shared_connection_pool : public std::enable_shared_from_this<asio_shared_connection_pool>
Expand All @@ -336,26 +352,27 @@ class asio_shared_connection_pool : public std::enable_shared_from_this<asio_sha

std::lock_guard<std::mutex> lock(m_lock);
auto it = m_pools.find(pool_key);
if (it == m_pools.end())
if (it != m_pools.end())
{
std::weak_ptr<asio_shared_connection_pool> weak_this = shared_from_this();
ret = std::make_shared<asio_connection_pool>([pool_key, weak_this](asio_connection_pool* inner_pool_ptr)
ret = it->second.lock();
if (ret == nullptr)
{
auto self = weak_this.lock();
if (!self)
return;

std::lock_guard<std::mutex> lock(self->m_lock);
auto it = self->m_pools.find(pool_key);
if (it != self->m_pools.end() && it->second.get() == inner_pool_ptr)
self->m_pools.erase(it);
});
m_pools.emplace(pool_key, ret);
// Previous pool expired
ret = std::make_shared<asio_connection_pool>();
it->second = ret;
}
}
else
{
ret = it->second;
if (m_pools.empty())
{
// If transitioning from empty to having a single element, restart the timer.
start_timer(shared_from_this());
}
ret = std::make_shared<asio_connection_pool>();
m_pools.emplace(pool_key, ret);
}

assert(ret != nullptr);
return ret;
}
Expand All @@ -367,9 +384,39 @@ class asio_shared_connection_pool : public std::enable_shared_from_this<asio_sha
return s_instance;
}

asio_shared_connection_pool() : m_timer(crossplat::threadpool::shared_instance().service()) {}

private:
static void start_timer(const std::shared_ptr<asio_shared_connection_pool>& self)
{
self->m_timer.expires_from_now(boost::posix_time::seconds(60));
std::weak_ptr<asio_shared_connection_pool> weak_this = self;
self->m_timer.async_wait([weak_this](const boost::system::error_code& ec)
{
if (ec)
return;
auto strong_this = weak_this.lock();
if (!strong_this)
return;

std::lock_guard<std::mutex> lock(strong_this->m_lock);
auto b = strong_this->m_pools.begin();
auto e = strong_this->m_pools.end();
for (; b != e;)
{
if (b->second.expired())
b = strong_this->m_pools.erase(b);
else
++b;
}
if (!strong_this->m_pools.empty())
start_timer(strong_this);
});
}

boost::asio::deadline_timer m_timer;
std::mutex m_lock;
std::unordered_map<std::string, std::shared_ptr<asio_connection_pool>> m_pools;
std::unordered_map<std::string, std::weak_ptr<asio_connection_pool>> m_pools;
};

class asio_client final : public _http_client_communicator
Expand Down

0 comments on commit eaacac4

Please sign in to comment.