Skip to content

Commit

Permalink
fix: Add queue size limit for websocket (#1701)
Browse files Browse the repository at this point in the history
For slow clients, we will disconnect with it if the message queue is too
long.

---------

Co-authored-by: Sergey Kuznetsov <[email protected]>
  • Loading branch information
cindyyan317 and kuznetsss authored Oct 25, 2024
1 parent f083c82 commit 1c82d37
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 96 deletions.
6 changes: 3 additions & 3 deletions docs/examples/config/example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@
// For sequent policy request from one client connection will be processed one by one and the next one will not be read before
// the previous one is processed. For parallel policy Clio will take all requests and process them in parallel and
// send a reply for each request whenever it is ready.
"parallel_requests_limit": 10 // Optional parameter, used only if "processing_strategy" is "parallel".
It limits the number of requests for one client connection processed in parallel. Infinite if not specified.

"parallel_requests_limit": 10, // Optional parameter, used only if "processing_strategy" is "parallel". It limits the number of requests for one client connection processed in parallel. Infinite if not specified.
// Max number of responses to queue up before sent successfully. If a client's waiting queue is too long, the server will close the connection.
"ws_max_sending_queue_size": 1500
},
// Time in seconds for graceful shutdown. Defaults to 10 seconds. Not fully implemented yet.
"graceful_period": 10.0,
Expand Down
10 changes: 8 additions & 2 deletions src/web/HttpSession.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/core/tcp_stream.hpp>

#include <cstdint>
#include <functional>
#include <memory>
#include <string>
Expand All @@ -52,6 +53,7 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
public std::enable_shared_from_this<HttpSession<HandlerType>> {
boost::beast::tcp_stream stream_;
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
std::uint32_t maxWsSendingQueueSize_;

public:
/**
Expand All @@ -64,6 +66,7 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
* @param dosGuard The denial of service guard to use
* @param handler The server handler to use
* @param buffer Buffer with initial data received from the peer
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/
explicit HttpSession(
tcp::socket&& socket,
Expand All @@ -72,7 +75,8 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer buffer
boost::beast::flat_buffer buffer,
std::uint32_t maxWsSendingQueueSize
)
: impl::HttpBase<HttpSession, HandlerType>(
ip,
Expand All @@ -84,6 +88,7 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
)
, stream_(std::move(socket))
, tagFactory_(tagFactory)
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{
}

Expand Down Expand Up @@ -128,7 +133,8 @@ class HttpSession : public impl::HttpBase<HttpSession, HandlerType>,
this->handler_,
std::move(this->buffer_),
std::move(this->req_),
ConnectionBase::isAdmin()
ConnectionBase::isAdmin(),
maxWsSendingQueueSize_
)
->run();
}
Expand Down
30 changes: 25 additions & 5 deletions src/web/PlainWsSession.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class PlainWsSession : public impl::WsBase<PlainWsSession, HandlerType> {
* @param dosGuard The denial of service guard to use
* @param handler The server handler to use
* @param buffer Buffer with initial data received from the peer
* @param isAdmin Whether the connection has admin privileges
* @param isAdmin Whether the connection has admin privileges,
* @param maxSendingQueueSize The maximum size of the sending queue for websocket
*/
explicit PlainWsSession(
boost::asio::ip::tcp::socket&& socket,
Expand All @@ -71,9 +72,17 @@ class PlainWsSession : public impl::WsBase<PlainWsSession, HandlerType> {
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer&& buffer,
bool isAdmin
bool isAdmin,
std::uint32_t maxSendingQueueSize
)
: impl::WsBase<PlainWsSession, HandlerType>(ip, tagFactory, dosGuard, handler, std::move(buffer))
: impl::WsBase<PlainWsSession, HandlerType>(
ip,
tagFactory,
dosGuard,
handler,
std::move(buffer),
maxSendingQueueSize
)
, ws_(std::move(socket))
{
ConnectionBase::isAdmin_ = isAdmin; // NOLINT(cppcoreguidelines-prefer-member-initializer)
Expand Down Expand Up @@ -107,6 +116,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
std::string ip_;
std::shared_ptr<HandlerType> const handler_;
bool isAdmin_;
std::uint32_t maxWsSendingQueueSize_;

public:
/**
Expand All @@ -120,6 +130,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
* @param buffer Buffer with initial data received from the peer. Ownership is transferred
* @param request The request. Ownership is transferred
* @param isAdmin Whether the connection has admin privileges
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/
WsUpgrader(
boost::beast::tcp_stream&& stream,
Expand All @@ -129,7 +140,8 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer&& buffer,
http::request<http::string_body> request,
bool isAdmin
bool isAdmin,
std::uint32_t maxWsSendingQueueSize
)
: http_(std::move(stream))
, buffer_(std::move(buffer))
Expand All @@ -139,6 +151,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
, ip_(std::move(ip))
, handler_(handler)
, isAdmin_(isAdmin)
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{
}

Expand Down Expand Up @@ -175,7 +188,14 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<HandlerType>>
boost::beast::get_lowest_layer(http_).expires_never();

std::make_shared<PlainWsSession<HandlerType>>(
http_.release_socket(), ip_, tagFactory_, dosGuard_, handler_, std::move(buffer_), isAdmin_
http_.release_socket(),
ip_,
tagFactory_,
dosGuard_,
handler_,
std::move(buffer_),
isAdmin_,
maxWsSendingQueueSize_
)
->run(std::move(req_));
}
Expand Down
40 changes: 34 additions & 6 deletions src/web/Server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <fmt/core.h>

#include <chrono>
#include <cstdint>
#include <exception>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -84,6 +85,7 @@ class Detector : public std::enable_shared_from_this<Detector<PlainSessionType,
std::shared_ptr<HandlerType> const handler_;
boost::beast::flat_buffer buffer_;
std::shared_ptr<impl::AdminVerificationStrategy> const adminVerification_;
std::uint32_t maxWsSendingQueueSize_;

public:
/**
Expand All @@ -95,21 +97,24 @@ class Detector : public std::enable_shared_from_this<Detector<PlainSessionType,
* @param dosGuard The denial of service guard to use
* @param handler The server handler to use
* @param adminVerification The admin verification strategy to use
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/
Detector(
tcp::socket&& socket,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> ctx,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> handler,
std::shared_ptr<impl::AdminVerificationStrategy> adminVerification
std::shared_ptr<impl::AdminVerificationStrategy> adminVerification,
std::uint32_t maxWsSendingQueueSize
)
: stream_(std::move(socket))
, ctx_(ctx)
, tagFactory_(std::cref(tagFactory))
, dosGuard_(dosGuard)
, handler_(std::move(handler))
, adminVerification_(std::move(adminVerification))
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{
}

Expand Down Expand Up @@ -167,14 +172,22 @@ class Detector : public std::enable_shared_from_this<Detector<PlainSessionType,
tagFactory_,
dosGuard_,
handler_,
std::move(buffer_)
std::move(buffer_),
maxWsSendingQueueSize_
)
->run();
return;
}

std::make_shared<PlainSessionType<HandlerType>>(
stream_.release_socket(), ip, adminVerification_, tagFactory_, dosGuard_, handler_, std::move(buffer_)
stream_.release_socket(),
ip,
adminVerification_,
tagFactory_,
dosGuard_,
handler_,
std::move(buffer_),
maxWsSendingQueueSize_
)
->run();
}
Expand Down Expand Up @@ -204,6 +217,7 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
std::shared_ptr<HandlerType> handler_;
tcp::acceptor acceptor_;
std::shared_ptr<impl::AdminVerificationStrategy> adminVerification_;
std::uint32_t maxWsSendingQueueSize_;

public:
/**
Expand All @@ -216,6 +230,7 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
* @param dosGuard The denial of service guard to use
* @param handler The server handler to use
* @param adminPassword The optional password to verify admin role in requests
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/
Server(
boost::asio::io_context& ioc,
Expand All @@ -224,7 +239,8 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
util::TagDecoratorFactory tagFactory,
dosguard::DOSGuardInterface& dosGuard,
std::shared_ptr<HandlerType> handler,
std::optional<std::string> adminPassword
std::optional<std::string> adminPassword,
std::uint32_t maxWsSendingQueueSize
)
: ioc_(std::ref(ioc))
, ctx_(std::move(ctx))
Expand All @@ -233,6 +249,7 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
, handler_(std::move(handler))
, acceptor_(boost::asio::make_strand(ioc))
, adminVerification_(impl::make_AdminVerificationStrategy(std::move(adminPassword)))
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{
boost::beast::error_code ec;

Expand Down Expand Up @@ -286,7 +303,13 @@ class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslS
ctx_ ? std::optional<std::reference_wrapper<boost::asio::ssl::context>>{ctx_.value()} : std::nullopt;

std::make_shared<Detector<PlainSessionType, SslSessionType, HandlerType>>(
std::move(socket), ctxRef, std::cref(tagFactory_), dosGuard_, handler_, adminVerification_
std::move(socket),
ctxRef,
std::cref(tagFactory_),
dosGuard_,
handler_,
adminVerification_,
maxWsSendingQueueSize_
)
->run();
}
Expand Down Expand Up @@ -348,14 +371,19 @@ make_HttpServer(
throw std::logic_error("Admin config error, one method must be specified to authorize admin.");
}

// If the transactions number is 200 per ledger, A client which subscribes everything will send 400+ feeds for
// each ledger. we allow user delay 3 ledgers by default
auto const maxWsSendingQueueSize = serverConfig.valueOr("ws_max_sending_queue_size", 1500);

auto server = std::make_shared<HttpServer<HandlerType>>(
ioc,
std::move(expectedSslContext).value(),
boost::asio::ip::tcp::endpoint{address, port},
util::TagDecoratorFactory(config),
dosGuard,
handler,
std::move(adminPassword)
std::move(adminPassword),
maxWsSendingQueueSize
);

server->run();
Expand Down
10 changes: 8 additions & 2 deletions src/web/SslHttpSession.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
Expand All @@ -59,6 +60,7 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
public std::enable_shared_from_this<SslHttpSession<HandlerType>> {
boost::beast::ssl_stream<boost::beast::tcp_stream> stream_;
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
std::uint32_t maxWsSendingQueueSize_;

public:
/**
Expand All @@ -72,6 +74,7 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
* @param dosGuard The denial of service guard to use
* @param handler The server handler to use
* @param buffer Buffer with initial data received from the peer
* @param maxWsSendingQueueSize The maximum size of the sending queue for websocket
*/
explicit SslHttpSession(
tcp::socket&& socket,
Expand All @@ -81,7 +84,8 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard,
std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer buffer
boost::beast::flat_buffer buffer,
std::uint32_t maxWsSendingQueueSize
)
: impl::HttpBase<SslHttpSession, HandlerType>(
ip,
Expand All @@ -93,6 +97,7 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
)
, stream_(std::move(socket), ctx)
, tagFactory_(tagFactory)
, maxWsSendingQueueSize_(maxWsSendingQueueSize)
{
}

Expand Down Expand Up @@ -173,7 +178,8 @@ class SslHttpSession : public impl::HttpBase<SslHttpSession, HandlerType>,
this->handler_,
std::move(this->buffer_),
std::move(this->req_),
ConnectionBase::isAdmin()
ConnectionBase::isAdmin(),
maxWsSendingQueueSize_
)
->run();
}
Expand Down
Loading

0 comments on commit 1c82d37

Please sign in to comment.