Skip to content

Commit

Permalink
feat: Client connection registers a queue to it and the client uses a…
Browse files Browse the repository at this point in the history
… network connection allowing to receive the ConnectionMessage
  • Loading branch information
Knoblauchpilze committed Jan 24, 2024
1 parent 479523a commit 2481d67
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 63 deletions.
1 change: 1 addition & 0 deletions src/bsgo/queues/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ target_sources (bsgalone_core_lib PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/AbstractMessageListener.cc
${CMAKE_CURRENT_SOURCE_DIR}/AbstractMessageQueue.cc
${CMAKE_CURRENT_SOURCE_DIR}/MessageQueue.cc
${CMAKE_CURRENT_SOURCE_DIR}/NetworkMessageQueue.cc
${CMAKE_CURRENT_SOURCE_DIR}/SynchronizedMessageQueue.cc
)
77 changes: 77 additions & 0 deletions src/bsgo/queues/NetworkMessageQueue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@

#include "NetworkMessageQueue.hh"
#include "MessageParser.hh"
#include "SynchronizedMessageQueue.hh"

namespace bsgo {

NetworkMessageQueue::NetworkMessageQueue(IMessageQueuePtr localQueue)
: IMessageQueue()
, utils::CoreObject("message")
, m_localQueue(std::make_unique<SynchronizedMessageQueue>(std::move(localQueue)))
{
addModule("queue");
setService("network");
}

void NetworkMessageQueue::registerToConnection(net::Connection &connection)
{
connection.setDataHandler(
[this](const net::ConnectionId connectionId, const std::deque<char> &data) {
return onDataReceived(connectionId, data);
});
}

void NetworkMessageQueue::pushMessage(IMessagePtr message)
{
m_localQueue->pushMessage(std::move(message));
}

void NetworkMessageQueue::addListener(IMessageListener *listener)
{
m_localQueue->addListener(listener);
}

bool NetworkMessageQueue::empty()
{
return m_localQueue->empty();
}

void NetworkMessageQueue::processMessages(const std::optional<int> &amount)
{
m_localQueue->processMessages(amount);
}

auto NetworkMessageQueue::onDataReceived(const net::ConnectionId /*connectionId*/,
const std::deque<char> &data) -> int
{
bool processedSomeBytes{true};
auto processedBytes{0};
std::vector<IMessagePtr> messages{};

std::deque<char> workingData(data);
MessageParser parser{};

while (processedSomeBytes)
{
auto result = parser.tryParseMessage(workingData);
if (result.message)
{
messages.emplace_back(std::move(*result.message));
}

processedSomeBytes = (result.bytesProcessed > 0);
processedBytes += result.bytesProcessed;

workingData.erase(workingData.begin(), workingData.begin() + result.bytesProcessed);
}

for (auto &message : messages)
{
m_localQueue->pushMessage(std::move(message));
}

return processedBytes;
}

} // namespace bsgo
34 changes: 34 additions & 0 deletions src/bsgo/queues/NetworkMessageQueue.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

#pragma once

#include "Connection.hh"
#include "IMessageQueue.hh"
#include <core_utils/CoreObject.hh>
#include <deque>
#include <memory>

namespace bsgo {

class NetworkMessageQueue : public IMessageQueue, public utils::CoreObject
{
public:
NetworkMessageQueue(IMessageQueuePtr localQueue);
~NetworkMessageQueue() override = default;

void registerToConnection(net::Connection &connection);

void pushMessage(IMessagePtr message) override;
void addListener(IMessageListener *listener) override;
bool empty() override;

void processMessages(const std::optional<int> &amount = {}) override;

private:
IMessageQueuePtr m_localQueue{};

auto onDataReceived(const net::ConnectionId connectionId, const std::deque<char> &data) -> int;
};

using NetworkMessageQueuePtr = std::unique_ptr<NetworkMessageQueue>;

} // namespace bsgo
7 changes: 3 additions & 4 deletions src/client/lib/game/Game.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "IRenderer.hh"
#include "IUiHandler.hh"
#include "NetworkSystem.hh"
#include "SynchronizedMessageQueue.hh"

#include "GameScreenInputHandler.hh"
#include "GameScreenRenderer.hh"
Expand Down Expand Up @@ -246,10 +245,10 @@ void Game::initialize()
m_networkSystem = networkSystem.get();

auto queue = std::make_unique<bsgo::MessageQueue>();
auto localQueue = std::make_unique<bsgo::SynchronizedMessageQueue>(std::move(queue));

auto connection = std::make_unique<ClientConnection>(*m_networkContext);
m_messageQueue = std::make_unique<ClientMessageQueue>(std::move(localQueue),
auto localQueue = connection->connectMessageQueue(std::move(queue));

m_messageQueue = std::make_unique<ClientMessageQueue>(std::move(localQueue),
std::move(connection));

m_coordinator = std::make_shared<bsgo::Coordinator>(std::move(networkSystem),
Expand Down
9 changes: 9 additions & 0 deletions src/client/lib/network/ClientConnection.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

#include "ClientConnection.hh"
#include "NetworkMessageQueue.hh"

namespace pge {
constexpr auto DEFAULT_SERVER_URL = "127.0.0.1";
Expand All @@ -23,4 +24,12 @@ void ClientConnection::sendMessage(const bsgo::IMessage &message)
m_connection->send(message);
}

auto ClientConnection::connectMessageQueue(bsgo::IMessageQueuePtr messageQueue)
-> bsgo::IMessageQueuePtr
{
auto queue = std::make_unique<bsgo::NetworkMessageQueue>(std::move(messageQueue));
queue->registerToConnection(*m_connection);
return queue;
}

} // namespace pge
2 changes: 2 additions & 0 deletions src/client/lib/network/ClientConnection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "Context.hh"
#include "IMessage.hh"
#include "IMessageQueue.hh"
#include <core_utils/CoreObject.hh>
#include <memory>

Expand All @@ -17,6 +18,7 @@ class ClientConnection : public utils::CoreObject
void setDataHandler(const net::DataReceivedHandler &handler);

void sendMessage(const bsgo::IMessage &message);
auto connectMessageQueue(bsgo::IMessageQueuePtr messageQueue) -> bsgo::IMessageQueuePtr;

private:
net::ConnectionShPtr m_connection{};
Expand Down
80 changes: 27 additions & 53 deletions src/server/lib/Server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "ConnectionMessage.hh"
#include "ConsumerUtils.hh"
#include "MessageQueue.hh"
#include "NetworkMessageQueue.hh"
#include "SynchronizedMessageQueue.hh"
#include <core_utils/TimeUtils.hh>

Expand Down Expand Up @@ -34,7 +35,15 @@ void Server::requestStop()
}

namespace {
auto createSynchronizedMessageQueue() -> IMessageQueuePtr
auto createInputMessageQueue() -> NetworkMessageQueuePtr
{
auto messageQueue = std::make_unique<MessageQueue>();
auto asyncQueue = std::make_unique<AsyncMessageQueue>(std::move(messageQueue));

return std::make_unique<NetworkMessageQueue>(std::move(asyncQueue));
}

auto createOutputMessageQueue() -> IMessageQueuePtr
{
auto messageQueue = std::make_unique<MessageQueue>();
return std::make_unique<SynchronizedMessageQueue>(std::move(messageQueue));
Expand All @@ -45,8 +54,8 @@ void Server::initialize()
{
const auto repositories = m_dataSource.repositories();

m_inputMessagesQueue = std::make_unique<AsyncMessageQueue>(createSynchronizedMessageQueue());
m_outputMessagesQueue = createSynchronizedMessageQueue();
m_inputMessagesQueue = createInputMessageQueue();
m_outputMessagesQueue = createOutputMessageQueue();

m_coordinator = std::make_shared<bsgo::Coordinator>(m_inputMessagesQueue.get());
m_services = createServices(repositories, m_coordinator);
Expand All @@ -61,17 +70,18 @@ void Server::initialize()

void Server::setup(const int port)
{
const net::ServerConfig
config{.acceptor =
[this](const net::Connection &connection) { return onConnectionReceived(connection); },
.disconnectHandler =
[this](const net::ConnectionId connectionId) { return onConnectionLost(connectionId); },
.connectionReadyHandler =
[this](net::Connection &connection) { onConnectionReady(connection); },
.connectionDataHandler =
[this](const net::ConnectionId connectionId, const std::deque<char> &data) {
return onDataReceived(connectionId, data);
}};
const net::ServerConfig config{.acceptor =
[this](const net::Connection &connection) {
return onConnectionReceived(connection);
},
.disconnectHandler =
[this](const net::ConnectionId connectionId) {
return onConnectionLost(connectionId);
},
.connectionReadyHandler =
[this](net::Connection &connection) {
onConnectionReady(connection);
}};

m_tcpServer = std::make_unique<net::TcpServer>(m_context, port, config);

Expand Down Expand Up @@ -110,47 +120,11 @@ void Server::onConnectionLost(const net::ConnectionId connectionId)
void Server::onConnectionReady(net::Connection &connection)
{
const auto clientId = m_clientManager.registerConnection(connection.id());
const ConnectionMessage message(clientId);
connection.send(message);
}

auto Server::onDataReceived(const net::ConnectionId /*connectionId*/, const std::deque<char> &data)
-> int
{
bool processedSomeBytes{true};
auto processedBytes{0};
std::vector<IMessagePtr> messages{};

std::deque<char> workingData(data);

while (processedSomeBytes)
{
auto result = m_messageParser.tryParseMessage(workingData);
if (result.message)
{
messages.emplace_back(std::move(*result.message));
}

processedSomeBytes = (result.bytesProcessed > 0);
processedBytes += result.bytesProcessed;
m_inputMessagesQueue->registerToConnection(connection);

workingData.erase(workingData.begin(), workingData.begin() + result.bytesProcessed);
}

if (!messages.empty())
{
handleReceivedMessages(std::move(messages));
}

return processedBytes;
}

void Server::handleReceivedMessages(std::vector<IMessagePtr> &&messages)
{
for (auto &message : messages)
{
m_inputMessagesQueue->pushMessage(std::move(message));
}
const ConnectionMessage message(clientId);
connection.send(message);
}

} // namespace bsgo
8 changes: 2 additions & 6 deletions src/server/lib/Server.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "Context.hh"
#include "DataSource.hh"
#include "IMessageQueue.hh"
#include "MessageParser.hh"
#include "NetworkMessageQueue.hh"
#include "NetworkSystem.hh"
#include "Services.hh"
#include "TcpServer.hh"
Expand All @@ -30,8 +30,7 @@ class Server : public utils::CoreObject

ClientManager m_clientManager{};

MessageParser m_messageParser{};
IMessageQueuePtr m_inputMessagesQueue{};
NetworkMessageQueuePtr m_inputMessagesQueue{};
IMessageQueuePtr m_outputMessagesQueue{};

DataSource m_dataSource{};
Expand All @@ -48,8 +47,5 @@ class Server : public utils::CoreObject
bool onConnectionReceived(const net::Connection &connection) const;
void onConnectionLost(const net::ConnectionId connectionId);
void onConnectionReady(net::Connection &connection);
auto onDataReceived(const net::ConnectionId connectionId, const std::deque<char> &data) -> int;

void handleReceivedMessages(std::vector<IMessagePtr> &&messages);
};
} // namespace bsgo

0 comments on commit 2481d67

Please sign in to comment.