Skip to content

Commit

Permalink
Merge pull request #7076 from Icinga/bugfix/eventqueue-leak
Browse files Browse the repository at this point in the history
/v1/events: terminate on disconnect
  • Loading branch information
Michael Friedrich authored Apr 5, 2019
2 parents 3c63356 + 2e4e2e1 commit b1042c3
Show file tree
Hide file tree
Showing 36 changed files with 111 additions and 58 deletions.
2 changes: 1 addition & 1 deletion lib/remote/actionshandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ bool ActionsHandler::HandleRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
namespace http = boost::beast::http;
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/actionshandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ActionsHandler final : public HttpHandler
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) override;
};

Expand Down
2 changes: 1 addition & 1 deletion lib/remote/configfileshandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ bool ConfigFilesHandler::HandleRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
namespace http = boost::beast::http;
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/configfileshandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ConfigFilesHandler final : public HttpHandler
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) override;
};

Expand Down
2 changes: 1 addition & 1 deletion lib/remote/configpackageshandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bool ConfigPackagesHandler::HandleRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
namespace http = boost::beast::http;
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/configpackageshandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ConfigPackagesHandler final : public HttpHandler
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) override;

private:
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/configstageshandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ bool ConfigStagesHandler::HandleRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
namespace http = boost::beast::http;
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/configstageshandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ConfigStagesHandler final : public HttpHandler
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) override;

private:
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/consolehandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ bool ConsoleHandler::HandleRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
namespace http = boost::beast::http;
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/consolehandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ConsoleHandler final : public HttpHandler
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) override;

static std::vector<String> GetAutocompletionSuggestions(const String& word, ScriptFrame& frame);
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/createobjecthandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ bool CreateObjectHandler::HandleRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
namespace http = boost::beast::http;
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/createobjecthandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class CreateObjectHandler final : public HttpHandler
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) override;
};

Expand Down
2 changes: 1 addition & 1 deletion lib/remote/deleteobjecthandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ bool DeleteObjectHandler::HandleRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
namespace http = boost::beast::http;
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/deleteobjecthandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class DeleteObjectHandler final : public HttpHandler
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) override;
};

Expand Down
31 changes: 21 additions & 10 deletions lib/remote/eventqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "base/io-engine.hpp"
#include "base/singleton.hpp"
#include "base/logger.hpp"
#include "base/utility.hpp"
#include <boost/asio/spawn.hpp>

using namespace icinga;
Expand Down Expand Up @@ -102,19 +103,29 @@ Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
}
}

Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc)
Dictionary::Ptr EventQueue::WaitForEvent(void *client, boost::asio::yield_context yc, double timeout)
{
double deadline = -1.0;

for (;;) {
{
boost::mutex::scoped_lock lock(m_Mutex);

auto it = m_Events.find(client);
ASSERT(it != m_Events.end());

if (!it->second.empty()) {
Dictionary::Ptr result = *it->second.begin();
it->second.pop_front();
return result;
boost::mutex::scoped_try_lock lock(m_Mutex);

if (lock.owns_lock()) {
auto it = m_Events.find(client);
ASSERT(it != m_Events.end());

if (it->second.empty()) {
if (deadline == -1.0) {
deadline = Utility::GetTime() + timeout;
} else if (Utility::GetTime() >= deadline) {
return nullptr;
}
} else {
Dictionary::Ptr result = *it->second.begin();
it->second.pop_front();
return result;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/remote/eventqueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class EventQueue final : public Object
void SetFilter(std::unique_ptr<Expression> filter);

Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc);
Dictionary::Ptr WaitForEvent(void *client, boost::asio::yield_context yc, double timeout = 5);

static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);
Expand Down
27 changes: 18 additions & 9 deletions lib/remote/eventshandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ bool EventsHandler::HandleRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
namespace asio = boost::asio;
Expand Down Expand Up @@ -88,7 +88,7 @@ bool EventsHandler::HandleRequest(
EventQueue::UnregisterIfUnused(queueName, queue);
});

hasStartedStreaming = true;
server.StartStreaming();

response.result(http::status::ok);
response.set(http::field::content_type, "application/json");
Expand All @@ -101,19 +101,28 @@ bool EventsHandler::HandleRequest(
}

asio::const_buffer newLine ("\n", 1);
AsioConditionVariable dontLockOwnStrand (stream.get_io_service(), true);

for (;;) {
String body = JsonEncode(queue->WaitForEvent(&request, yc));
auto event (queue->WaitForEvent(&request, yc));

boost::algorithm::replace_all(body, "\n", "");
if (event) {
String body = JsonEncode(event);

asio::const_buffer payload (body.CStr(), body.GetLength());
boost::algorithm::replace_all(body, "\n", "");

IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);
asio::const_buffer payload (body.CStr(), body.GetLength());

asio::async_write(stream, payload, yc);
asio::async_write(stream, newLine, yc);
stream.async_flush(yc);
IoBoundWorkSlot dontLockTheIoThreadWhileWriting (yc);

asio::async_write(stream, payload, yc);
asio::async_write(stream, newLine, yc);
stream.async_flush(yc);
} else if (server.Disconnected()) {
return true;
} else {
dontLockOwnStrand.Wait(yc);
}
}
}

2 changes: 1 addition & 1 deletion lib/remote/eventshandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class EventsHandler final : public HttpHandler
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) override;
};

Expand Down
4 changes: 2 additions & 2 deletions lib/remote/httphandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void HttpHandler::ProcessRequest(
boost::beast::http::request<boost::beast::http::string_body>& request,
boost::beast::http::response<boost::beast::http::string_body>& response,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
Dictionary::Ptr node = m_UrlTree;
Expand Down Expand Up @@ -99,7 +99,7 @@ void HttpHandler::ProcessRequest(

bool processed = false;
for (const HttpHandler::Ptr& handler : handlers) {
if (handler->HandleRequest(stream, user, request, url, response, params, yc, hasStartedStreaming)) {
if (handler->HandleRequest(stream, user, request, url, response, params, yc, server)) {
processed = true;
break;
}
Expand Down
5 changes: 3 additions & 2 deletions lib/remote/httphandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "remote/i2-remote.hpp"
#include "remote/url.hpp"
#include "remote/httpresponse.hpp"
#include "remote/httpserverconnection.hpp"
#include "remote/apiuser.hpp"
#include "base/registry.hpp"
#include "base/tlsstream.hpp"
Expand Down Expand Up @@ -34,7 +35,7 @@ class HttpHandler : public Object
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) = 0;

static void Register(const Url::Ptr& url, const HttpHandler::Ptr& handler);
Expand All @@ -44,7 +45,7 @@ class HttpHandler : public Object
boost::beast::http::request<boost::beast::http::string_body>& request,
boost::beast::http::response<boost::beast::http::string_body>& response,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
);

private:
Expand Down
38 changes: 33 additions & 5 deletions lib/remote/httpserverconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using namespace icinga;
auto const l_ServerHeader ("Icinga/" + Application::GetAppVersion());

HttpServerConnection::HttpServerConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream)
: m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(stream->get_io_service()), m_ShuttingDown(false)
: m_Stream(stream), m_Seen(Utility::GetTime()), m_IoStrand(stream->get_io_service()), m_ShuttingDown(false), m_HasStartedStreaming(false)
{
if (authenticated) {
m_ApiUser = ApiUser::GetByClientCN(identity);
Expand Down Expand Up @@ -91,6 +91,34 @@ void HttpServerConnection::Disconnect()
});
}

void HttpServerConnection::StartStreaming()
{
namespace asio = boost::asio;

m_HasStartedStreaming = true;

HttpServerConnection::Ptr keepAlive (this);

asio::spawn(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
char buf[128];
asio::mutable_buffer readBuf (buf, 128);
boost::system::error_code ec;

do {
m_Stream->async_read_some(readBuf, yc[ec]);
} while (!ec);

Disconnect();
}
});
}

bool HttpServerConnection::Disconnected()
{
return m_ShuttingDown;
}

static inline
bool EnsureValidHeaders(
AsioTlsStream& stream,
Expand Down Expand Up @@ -375,17 +403,17 @@ bool ProcessRequest(
boost::beast::http::request<boost::beast::http::string_body>& request,
ApiUser::Ptr& authenticatedUser,
boost::beast::http::response<boost::beast::http::string_body>& response,
HttpServerConnection& server,
bool& hasStartedStreaming,
boost::asio::yield_context& yc
)
{
namespace http = boost::beast::http;

bool hasStartedStreaming = false;

try {
CpuBoundWork handlingRequest (yc);

HttpHandler::ProcessRequest(stream, authenticatedUser, request, response, yc, hasStartedStreaming);
HttpHandler::ProcessRequest(stream, authenticatedUser, request, response, yc, server);
} catch (const std::exception& ex) {
if (hasStartedStreaming) {
return false;
Expand Down Expand Up @@ -481,7 +509,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)

m_Seen = std::numeric_limits<decltype(m_Seen)>::max();

if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, yc)) {
if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HasStartedStreaming, yc)) {
break;
}

Expand Down
4 changes: 4 additions & 0 deletions lib/remote/httpserverconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class HttpServerConnection final : public Object

void Start();
void Disconnect();
void StartStreaming();

bool Disconnected();

private:
ApiUser::Ptr m_ApiUser;
Expand All @@ -35,6 +38,7 @@ class HttpServerConnection final : public Object
String m_PeerAddress;
boost::asio::io_service::strand m_IoStrand;
bool m_ShuttingDown;
bool m_HasStartedStreaming;

void ProcessMessages(boost::asio::yield_context yc);
void CheckLiveness(boost::asio::yield_context yc);
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/infohandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bool InfoHandler::HandleRequest(
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
)
{
namespace http = boost::beast::http;
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/infohandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class InfoHandler final : public HttpHandler
boost::beast::http::response<boost::beast::http::string_body>& response,
const Dictionary::Ptr& params,
boost::asio::yield_context& yc,
bool& hasStartedStreaming
HttpServerConnection& server
) override;
};

Expand Down
Loading

0 comments on commit b1042c3

Please sign in to comment.