Skip to content

Commit

Permalink
Introduce IoEngine::SpawnCoroutine wrapping asio::spawn and Boost exc…
Browse files Browse the repository at this point in the history
…eptions

This is required to

- catch all exceptions and wrap them into Boost exceptions. They
are the only ones allowed with Boost.Coroutine.
- set a dedicated coroutine stack size for Windows.

refs #7431
  • Loading branch information
Michael Friedrich committed Sep 9, 2019
1 parent 0dbbba4 commit 1f8bebf
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 13 deletions.
78 changes: 78 additions & 0 deletions lib/base/io-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <memory>
#include <thread>
#include <vector>
#include <stdexcept>
#include <boost/exception/all.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
Expand Down Expand Up @@ -77,6 +79,82 @@ class IoEngine

boost::asio::io_context& GetIoContext();

/*
* Custom exceptions thrown in a Boost.Coroutine may cause stack corruption.
* Ensure that these are wrapped correctly.
*
* Inspired by https://github.com/niekbouman/commelec-api/blob/master/commelec-api/coroutine-exception.hpp
* Source: http://boost.2283326.n4.nabble.com/coroutine-only-std-exceptions-are-caught-from-coroutines-td4683671.html
*/
static inline boost::exception_ptr convertExceptionPtr(std::exception_ptr ex) {
try {
throw boost::enable_current_exception(ex);
} catch (...) {
return boost::current_exception();
}
}

static inline void rethrowBoostExceptionPointer() {
std::exception_ptr sep;
sep = std::current_exception();
boost::exception_ptr bep = convertExceptionPtr(sep);
boost::rethrow_exception(bep);
}

static inline size_t GetCoroutineStackSize() {
#ifdef _WIN32
// Increase the stack size for Windows coroutines to prevent exception corruption.
// Rationale: Low cost Windows agent only & https://github.com/Icinga/icinga2/issues/7431
return 8 * 1024 * 1024;
#else /* _WIN32 */
return boost::coroutines::stack_allocator::traits_type::default_size(); // Default 64 KB
#endif /* _WIN32 */
}

/* With dedicated strand in *Connection classes. */
template <typename Handler, typename Function>
static void SpawnCoroutine(Handler h, Function f) {

boost::asio::spawn(std::forward<Handler>(h),
[f](boost::asio::yield_context yc) {

try {
f(yc);
} catch (const boost::coroutines::detail::forced_unwind &) {
// Required for proper stack unwinding when coroutines are destroyed.
// https://github.com/boostorg/coroutine/issues/39
throw;
} catch (...) {
// Handle uncaught exceptions outside of the coroutine.
rethrowBoostExceptionPointer();
}
},
boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
);
}

/* Without strand in the IO executor's context. */
template <typename Function>
static void SpawnCoroutine(boost::asio::io_context& io, Function f) {

boost::asio::spawn(io,
[f](boost::asio::yield_context yc) {

try {
f(yc);
} catch (const boost::coroutines::detail::forced_unwind &) {
// Required for proper stack unwinding when coroutines are destroyed.
// https://github.com/boostorg/coroutine/issues/39
throw;
} catch (...) {
// Handle uncaught exceptions outside of the coroutine.
rethrowBoostExceptionPointer();
}
},
boost::coroutines::attributes(GetCoroutineStackSize()) // Set a pre-defined stack size.
);
}

private:
IoEngine();

Expand Down
8 changes: 4 additions & 4 deletions lib/remote/apilistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ bool ApiListener::AddListener(const String& node, const String& service)
Log(LogInformation, "ApiListener")
<< "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";

asio::spawn(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });
IoEngine::SpawnCoroutine(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });

UpdateStatusFile(localEndpoint);

Expand All @@ -435,7 +435,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const std

server->async_accept(sslConn->lowest_layer(), yc);

asio::spawn(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
IoEngine::SpawnCoroutine(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
} catch (const std::exception& ex) {
Log(LogCritical, "ApiListener")
<< "Cannot accept new connection: " << ex.what();
Expand All @@ -462,7 +462,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)

auto& io (IoEngine::Get().GetIoContext());

asio::spawn(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
IoEngine::SpawnCoroutine(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
String host = endpoint->GetHost();
String port = endpoint->GetPort();

Expand Down Expand Up @@ -664,7 +664,7 @@ void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const

endpoint->AddClient(aclient);

asio::spawn(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
IoEngine::SpawnCoroutine(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
CpuBoundWork syncClient (yc);

SyncClient(aclient, endpoint, needSync);
Expand Down
8 changes: 4 additions & 4 deletions lib/remote/httpserverconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ void HttpServerConnection::Start()

HttpServerConnection::Ptr keepAlive (this);

asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); });
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { ProcessMessages(yc); });
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
}

void HttpServerConnection::Disconnect()
Expand All @@ -73,7 +73,7 @@ void HttpServerConnection::Disconnect()

HttpServerConnection::Ptr keepAlive (this);

asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
m_ShuttingDown = true;

Expand Down Expand Up @@ -117,7 +117,7 @@ void HttpServerConnection::StartStreaming()

HttpServerConnection::Ptr keepAlive (this);

asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
char buf[128];
asio::mutable_buffer readBuf (buf, 128);
Expand Down
10 changes: 5 additions & 5 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ void JsonRpcConnection::Start()

JsonRpcConnection::Ptr keepAlive (this);

asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); });
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); });
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); });
}

void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
Expand Down Expand Up @@ -193,7 +193,7 @@ void JsonRpcConnection::Disconnect()

JsonRpcConnection::Ptr keepAlive (this);

asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
m_ShuttingDown = true;

Expand Down

0 comments on commit 1f8bebf

Please sign in to comment.