Skip to content

Commit

Permalink
Keeps track of the number of connected clients (#447)
Browse files Browse the repository at this point in the history
* Keeps track of the number of connected clients
in the GridConnect TCP hub.

* fix whitespace

* Fix typo
  • Loading branch information
balazsracz authored Oct 17, 2020
1 parent 7f32dd0 commit 172a4c3
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 8 deletions.
14 changes: 12 additions & 2 deletions src/openlcb/SimpleStack.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,15 @@ public:
{
/// @TODO (balazs.racz) make this more efficient by rendering to string
/// only once for all connections.
/// @TODO (balazs.racz) do not leak this.
new GcTcpHub(can_hub(), port);
gcHubServer_.reset(new GcTcpHub(can_hub(), port));
}

/// Retrieve the instance of the GridConnect Hub server, which was started
/// with start_tcp_hub_server().
/// @return the TCP hub server, or nullptr if no server was ever started.
GcTcpHub *get_tcp_hub_server()
{
return gcHubServer_.get();
}

/// Connects to a CAN hub using TCP with the gridconnect protocol.
Expand Down Expand Up @@ -473,6 +480,9 @@ private:
/// the CAN interface to function. Will be called exactly once by the
/// constructor of the base class.
std::unique_ptr<PhysicalIf> create_if(const openlcb::NodeID node_id);

/// Holds the ownership of the TCP hub server (if one was created).
std::unique_ptr<GcTcpHub> gcHubServer_;
};

class SimpleTcpStackBase : public SimpleStackBase
Expand Down
21 changes: 17 additions & 4 deletions src/utils/GcTcpHub.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,30 @@
#include "nmranet_config.h"
#include "utils/GridConnectHub.hxx"

void GcTcpHub::OnNewConnection(int fd)
void GcTcpHub::on_new_connection(int fd)
{
const bool use_select =
(config_gridconnect_tcp_use_select() == CONSTANT_TRUE);
create_gc_port_for_can_hub(canHub_, fd, nullptr, use_select);
{
AtomicHolder h(this);
numClients_++;
}
create_gc_port_for_can_hub(canHub_, fd, this, use_select);
}

void GcTcpHub::notify()
{
AtomicHolder h(this);
if (numClients_)
{
numClients_--;
}
}

GcTcpHub::GcTcpHub(CanHubFlow *can_hub, int port)
: canHub_(can_hub)
, tcpListener_(port, std::bind(&GcTcpHub::OnNewConnection, this,
std::placeholders::_1))
, tcpListener_(port,
std::bind(&GcTcpHub::on_new_connection, this, std::placeholders::_1))
{
}

Expand Down
7 changes: 7 additions & 0 deletions src/utils/GcTcpHub.cxxtest
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ protected:
fprintf(stderr, "waiting for exiting.\r");
usleep(100000);
}
EXPECT_EQ(0U, tcpHub_.get_num_clients());
}

struct Client
Expand Down Expand Up @@ -154,6 +155,7 @@ protected:

TEST_F(GcTcpHubTest, CreateDestroy)
{
EXPECT_EQ(0u, tcpHub_.get_num_clients());
}

TEST_F(GcTcpHubTest, TwoClientsPingPong)
Expand All @@ -165,6 +167,9 @@ TEST_F(GcTcpHubTest, TwoClientsPingPong)
writeline(b.fd_, ":S001N01;");
EXPECT_EQ(":S001N01;", readline(a.fd_, ';'));
EXPECT_EQ(3U, can_hub0.size());

EXPECT_EQ(2u, tcpHub_.get_num_clients());

// Test writing outwards.
send_packet(":S002N0102;");
EXPECT_EQ(":S002N0102;", readline(a.fd_, ';'));
Expand All @@ -177,13 +182,15 @@ TEST_F(GcTcpHubTest, ClientCloseExpect)
unsigned can_hub_size = can_hub0.size();
LOG(INFO, "can hub: %p ", &can_hub0);
EXPECT_EQ(1U, can_hub_size);
EXPECT_EQ(0U, tcpHub_.get_num_clients());
{
Client a;
Client b;
expect_packet(":S001N01;");
writeline(b.fd_, ":S001N01;");
EXPECT_EQ(":S001N01;", readline(a.fd_, ';'));
EXPECT_EQ(can_hub_size + 2, can_hub0.size());
EXPECT_EQ(2U, tcpHub_.get_num_clients());
wait();
}
// Test writing outwards.
Expand Down
16 changes: 14 additions & 2 deletions src/utils/GcTcpHub.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ExecutorBase;
* format. Any new incoming connection will be wired into the same virtual CAN
* hub. All packets will be forwarded to every participant, without
* loopback. */
class GcTcpHub
class GcTcpHub : private Notifiable, private Atomic
{
public:
/// Constructor.
Expand All @@ -60,16 +60,28 @@ public:
return tcpListener_.is_started();
}

/// @return currently connected client count.
unsigned get_num_clients()
{
return numClients_;
}

private:
/// Callback when a new connection arrives.
///
/// @param fd filedes of the freshly established incoming connection.
///
void OnNewConnection(int fd);
void on_new_connection(int fd);

/// Error callback from the gridconnect socket. This is invoked when a
/// client disconnects.
void notify() override;

/// @param can_hub Which CAN-hub should we attach the TCP gridconnect hub
/// onto.
CanHubFlow *canHub_;
/// How many clients are connected right now.
unsigned numClients_ {0};
/// Helper object representing the listening on the socket.
SocketListener tcpListener_;
};
Expand Down

0 comments on commit 172a4c3

Please sign in to comment.