Skip to content

Commit

Permalink
feat: Add back the tox_loop implementation for low latency.
Browse files Browse the repository at this point in the history
Copied code from #335.
  • Loading branch information
iphydf committed Dec 29, 2021
1 parent eb4dc43 commit 6bde99f
Show file tree
Hide file tree
Showing 12 changed files with 591 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ auto_test(send_message)
auto_test(set_name)
auto_test(set_status_message)
auto_test(skeleton)
auto_test(tox_loop)
auto_test(tox_many)
auto_test(tox_many_tcp)
auto_test(tox_one)
Expand Down
104 changes: 104 additions & 0 deletions auto_tests/tox_loop_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#include <pthread.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#include "../toxcore/tox.h"

#include "check_compat.h"

#define TCP_RELAY_PORT 33448
/* The Travis-CI container responds poorly to ::1 as a localhost address
* You're encouraged to -D FORCE_TESTS_IPV6 on a local test */
#ifdef FORCE_TESTS_IPV6
#define TOX_LOCALHOST "::1"
#else
#define TOX_LOCALHOST "127.0.0.1"
#endif

typedef struct {
int start_count, stop_count;
pthread_mutex_t mutex;
Tox *tox;
} loop_test;

static void tox_loop_cb_start(Tox *tox, void *user_data)
{
loop_test *userdata = (loop_test *) user_data;
pthread_mutex_lock(&userdata->mutex);
userdata->start_count++;
}

static void tox_loop_cb_stop(Tox *tox, void *user_data)
{
loop_test *userdata = (loop_test *) user_data;
userdata->stop_count++;
pthread_mutex_unlock(&userdata->mutex);
}

static void *tox_loop_worker(void *data)
{
loop_test *userdata = (loop_test *) data;
tox_loop(userdata->tox, data, NULL);
return NULL;
}

static void test_tox_loop(void)
{
pthread_t worker, worker_tcp;
struct Tox_Options *opts = tox_options_new(NULL);
loop_test userdata;
uint8_t dpk[TOX_PUBLIC_KEY_SIZE];
int retval;

userdata.start_count = 0;
userdata.stop_count = 0;
pthread_mutex_init(&userdata.mutex, NULL);

tox_options_set_tcp_port(opts, TCP_RELAY_PORT);
userdata.tox = tox_new(opts, NULL);
tox_callback_loop_begin(userdata.tox, tox_loop_cb_start);
tox_callback_loop_end(userdata.tox, tox_loop_cb_stop);
pthread_create(&worker, NULL, tox_loop_worker, &userdata);

tox_self_get_dht_id(userdata.tox, dpk);

tox_options_default(opts);
loop_test userdata_tcp;
userdata_tcp.start_count = 0;
userdata_tcp.stop_count = 0;
pthread_mutex_init(&userdata_tcp.mutex, NULL);
userdata_tcp.tox = tox_new(opts, NULL);
tox_callback_loop_begin(userdata_tcp.tox, tox_loop_cb_start);
tox_callback_loop_end(userdata_tcp.tox, tox_loop_cb_stop);
pthread_create(&worker_tcp, NULL, tox_loop_worker, &userdata_tcp);

pthread_mutex_lock(&userdata_tcp.mutex);
TOX_ERR_BOOTSTRAP error;
ck_assert_msg(tox_add_tcp_relay(userdata_tcp.tox, TOX_LOCALHOST, TCP_RELAY_PORT, dpk, &error), "Add relay error, %i",
error);
ck_assert_msg(tox_bootstrap(userdata_tcp.tox, TOX_LOCALHOST, 33445, dpk, &error), "Bootstrap error, %i", error);
pthread_mutex_unlock(&userdata_tcp.mutex);

sleep(10);

tox_loop_stop(userdata.tox);
pthread_join(worker, (void **)(void *)&retval);
ck_assert_msg(retval == 0, "tox_loop didn't return 0");

tox_kill(userdata.tox);
ck_assert_msg(userdata.start_count == userdata.stop_count, "start and stop must match");

tox_loop_stop(userdata_tcp.tox);
pthread_join(worker_tcp, (void **)(void *)&retval);
ck_assert_msg(retval == 0, "tox_loop didn't return 0");

tox_kill(userdata_tcp.tox);
ck_assert_msg(userdata_tcp.start_count == userdata_tcp.stop_count, "start and stop must match");
}

int main(int argc, char *argv[])
{
test_tox_loop();
return 0;
}
20 changes: 20 additions & 0 deletions toxcore/Messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,22 @@ Messenger *new_messenger(Mono_Time *mono_time, Messenger_Options *options, unsig
return nullptr;
}


#ifdef HAVE_LIBEV
m->dispatcher = ev_loop_new(0);
#else
m->loop_run = false;
#endif // HAVE_LIBEV

#if defined(HAVE_LIBEV)

if (!m->dispatcher) {
free(m);
return nullptr;
}

#endif // HAVE_LIBEV

m->mono_time = mono_time;

m->fr = friendreq_new();
Expand Down Expand Up @@ -2044,6 +2060,10 @@ void kill_messenger(Messenger *m)
clear_receipts(m, i);
}

#ifdef HAVE_LIBEV
ev_loop_destroy(m->dispatcher);
#endif

logger_kill(m->log);
free(m->friendlist);
friendreq_kill(m->fr);
Expand Down
11 changes: 11 additions & 0 deletions toxcore/Messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#include "net_crypto.h"
#include "state.h"

#ifdef HAVE_LIBEV
#include <ev.h>
#endif

#define MAX_NAME_LENGTH 128
/* TODO(irungentoo): this must depend on other variable. */
#define MAX_STATUSMESSAGE_LENGTH 1007
Expand Down Expand Up @@ -291,6 +295,13 @@ struct Messenger {
m_self_connection_status_cb *core_connection_change;
unsigned int last_connection_status;

#ifdef HAVE_LIBEV
struct ev_loop *dispatcher;
ev_async stop_loop;
#else
bool loop_run;
#endif

Messenger_Options options;
};

Expand Down
24 changes: 24 additions & 0 deletions toxcore/TCP_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#include <stdlib.h>
#include <string.h>

#ifdef HAVE_LIBEV
#include <ev.h>
#endif

#include "mono_time.h"
#include "util.h"

Expand All @@ -22,9 +26,19 @@ typedef struct TCP_Client_Conn {
uint32_t number;
} TCP_Client_Conn;

#ifdef HAVE_LIBEV
typedef struct TCP_Client_Socket_Listener {
ev_io listener;
struct ev_loop *dispatcher;
} TCP_Client_Socket_Listener;
#endif

struct TCP_Client_Connection {
TCP_Client_Status status;
Socket sock;
#ifdef HAVE_LIBEV
TCP_Client_Socket_Listener sock_listener;
#endif
uint8_t self_public_key[CRYPTO_PUBLIC_KEY_SIZE]; /* our public key */
uint8_t public_key[CRYPTO_PUBLIC_KEY_SIZE]; /* public key of the server */
IP_Port ip_port; /* The ip and port of the server */
Expand Down Expand Up @@ -79,6 +93,11 @@ IP_Port tcp_con_ip_port(const TCP_Client_Connection *con)
return con->ip_port;
}

Socket tcp_con_sock(const TCP_Client_Connection *con)
{
return con->sock;
}

TCP_Client_Status tcp_con_status(const TCP_Client_Connection *con)
{
return con->status;
Expand Down Expand Up @@ -1052,6 +1071,11 @@ void kill_TCP_connection(TCP_Client_Connection *tcp_connection)

wipe_priority_list(tcp_connection->priority_queue_start);
kill_sock(tcp_connection->sock);

#ifdef HAVE_LIBEV
ev_io_stop(tcp_connection->sock_listener.dispatcher, &tcp_connection->sock_listener.listener);
#endif

crypto_memzero(tcp_connection, sizeof(TCP_Client_Connection));
free(tcp_connection);
}
1 change: 1 addition & 0 deletions toxcore/TCP_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ typedef struct TCP_Client_Connection TCP_Client_Connection;

const uint8_t *tcp_con_public_key(const TCP_Client_Connection *con);
IP_Port tcp_con_ip_port(const TCP_Client_Connection *con);
Socket tcp_con_sock(const TCP_Client_Connection *con);
TCP_Client_Status tcp_con_status(const TCP_Client_Connection *con);

void *tcp_con_custom_object(const TCP_Client_Connection *con);
Expand Down
32 changes: 32 additions & 0 deletions toxcore/TCP_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,38 @@ uint32_t tcp_connections_count(const TCP_Connections *tcp_c)
return tcp_c->tcp_connections_length;
}


/**
* Return number of elements of TCP connection array.
*
* @param tcp_c struct containing TCP_con array
*
* @return number of elements of TCP connection array
*/
uint32_t tcp_connections_length(const TCP_Connections *tcp_c)
{
return tcp_c->tcp_connections_length;
}


/**
* Return TCP connection stored at "idx" position.
*
* @param tcp_c struct containing TCP_con array
* @param idx index of TCP connection to return (values from 0 to tcp_connections_length() - 1)
*
* @return TCP connection stored at "idx" position, or NULL if errors occurred
*/
const TCP_con *tcp_connections_connection_at(const TCP_Connections *tcp_c, uint32_t idx)
{
if (idx >= tcp_c->tcp_connections_length) {
return nullptr;
}

return &tcp_c->tcp_connections[idx];
}


/** Set the size of the array to num.
*
* return -1 if realloc fails.
Expand Down
4 changes: 4 additions & 0 deletions toxcore/TCP_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ typedef struct TCP_Connections TCP_Connections;

const uint8_t *tcp_connections_public_key(const TCP_Connections *tcp_c);

uint32_t tcp_connections_length(const TCP_Connections *tcp_c);

const TCP_con *tcp_connections_connection_at(const TCP_Connections *tcp_c, uint32_t idx);

uint32_t tcp_connections_count(const TCP_Connections *tcp_c);

/** Returns the number of connected TCP relays */
Expand Down
24 changes: 24 additions & 0 deletions toxcore/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@
#include <stdlib.h>
#include <string.h>

#ifdef HAVE_LIBEV
#include <ev.h>
#endif

#ifndef VANILLA_NACL
// Used for sodium_init()
#include <sodium.h>
Expand Down Expand Up @@ -482,6 +486,13 @@ typedef struct Packet_Handler {
void *object;
} Packet_Handler;

#ifdef HAVE_LIBEV
typedef struct Networking_Socket_Listener {
ev_io listener;
struct ev_loop *dispatcher;
} Networking_Socket_Listener;
#endif

struct Networking_Core {
const Logger *log;
Packet_Handler packethandlers[256];
Expand All @@ -490,6 +501,9 @@ struct Networking_Core {
uint16_t port;
/* Our UDP socket. */
Socket sock;
#ifdef HAVE_LIBEV
Networking_Socket_Listener sock_listener;
#endif
};

Family net_family(const Networking_Core *net)
Expand All @@ -502,6 +516,11 @@ uint16_t net_port(const Networking_Core *net)
return net->port;
}

Socket net_sock(const Networking_Core *net)
{
return net->sock;
}

/* Basic network functions:
*/

Expand Down Expand Up @@ -1003,6 +1022,11 @@ void kill_networking(Networking_Core *net)
kill_sock(net->sock);
}


#ifdef HAVE_LIBEV
ev_io_stop(net->sock_listener.dispatcher, &net->sock_listener.listener);
#endif

free(net);
}

Expand Down
1 change: 1 addition & 0 deletions toxcore/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ typedef struct Networking_Core Networking_Core;

Family net_family(const Networking_Core *net);
uint16_t net_port(const Networking_Core *net);
Socket net_sock(const Networking_Core *net);

/** Run this before creating sockets.
*
Expand Down
Loading

0 comments on commit 6bde99f

Please sign in to comment.