From 40eccaf80d492e8261b7bef907343712ae8d2880 Mon Sep 17 00:00:00 2001 From: iphydf Date: Wed, 29 Dec 2021 02:18:00 +0000 Subject: [PATCH] feat: Add back the `tox_loop` implementation for low latency. Copied code from https://github.com/TokTok/c-toxcore/pull/335. --- .circleci/cmake-tsan | 1 + .circleci/config.yml | 3 + CMakeLists.txt | 6 + auto_tests/CMakeLists.txt | 1 + auto_tests/tox_loop_test.c | 128 +++++++++++++++ other/analysis/gen-file.sh | 2 +- other/analysis/run-clang-tidy | 3 + other/analysis/variants.sh | 4 +- toxcore/BUILD.bazel | 2 + toxcore/LAN_discovery.c | 3 +- toxcore/TCP_client.c | 58 +++++++ toxcore/TCP_client.h | 18 ++ toxcore/TCP_connection.c | 31 ++++ toxcore/TCP_connection.h | 14 ++ toxcore/network.c | 53 ++++++ toxcore/network.h | 14 ++ toxcore/tox.c | 300 ++++++++++++++++++++++++++++++++++ toxcore/tox.h | 65 ++++++++ toxcore/tox_struct.h | 10 ++ 19 files changed, 713 insertions(+), 3 deletions(-) create mode 100644 auto_tests/tox_loop_test.c diff --git a/.circleci/cmake-tsan b/.circleci/cmake-tsan index ab98e0de536..9fabb5ab971 100755 --- a/.circleci/cmake-tsan +++ b/.circleci/cmake-tsan @@ -22,6 +22,7 @@ cmake -B_build -H. -GNinja \ -DSTRICT_ABI=ON \ -DTEST_TIMEOUT_SECONDS=120 \ -DUSE_IPV6=OFF \ + -DUSE_LIBEV=ON \ -DAUTOTEST=ON cd _build diff --git a/.circleci/config.yml b/.circleci/config.yml index 9fd1c82266e..7e10dd8dcc9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -68,6 +68,7 @@ jobs: cmake git libconfig-dev + libev-dev libgmock-dev libgtest-dev libopus-dev @@ -120,6 +121,8 @@ jobs: - run: other/analysis/check_logger_levels - run: other/analysis/run-clang - run: other/analysis/run-gcc + - run: other/analysis/run-cppcheck + - run: other/analysis/run-clang-analyze clang-analyze: working_directory: ~/work diff --git a/CMakeLists.txt b/CMakeLists.txt index 49bd2f95e08..3672ed6c51b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -192,6 +192,12 @@ endif() # We don't transfer floats over the network, so we disable this functionality. add_definitions(-DCMP_NO_FLOAT=1) +# TODO(iphydf): Check whether this is actually true. +option(USE_LIBEV "Whether to use libev for tox_loop" OFF) +if(USE_LIBEV) + add_definitions(-DHAVE_LIBEV=1) +endif() + ################################################################################ # # :: Tox Core Library diff --git a/auto_tests/CMakeLists.txt b/auto_tests/CMakeLists.txt index d891ba122b0..c4587fc7d71 100644 --- a/auto_tests/CMakeLists.txt +++ b/auto_tests/CMakeLists.txt @@ -79,6 +79,7 @@ auto_test(set_name) auto_test(set_status_message) auto_test(tox_dispatch) auto_test(tox_events) +auto_test(tox_loop) auto_test(tox_many) auto_test(tox_many_tcp) auto_test(tox_strncasecmp) diff --git a/auto_tests/tox_loop_test.c b/auto_tests/tox_loop_test.c new file mode 100644 index 00000000000..8e0b0fd2ea0 --- /dev/null +++ b/auto_tests/tox_loop_test.c @@ -0,0 +1,128 @@ +#include +#include +#include + +#include "../toxcore/tox.h" + +#include "check_compat.h" +#include "../testing/misc_tools.h" + +/* The CI containers respond poorly to ::1 as a localhost address + * You're encouraged to -D FORCE_TESTS_IPV6 on a local test */ +#ifdef TOX_LOCALHOST +#undef TOX_LOCALHOST +#endif +#ifdef FORCE_TESTS_IPV6 +#define TOX_LOCALHOST "::1" +#else +#define TOX_LOCALHOST "127.0.0.1" +#endif + +#ifdef TCP_RELAY_PORT +#undef TCP_RELAY_PORT +#endif +#define TCP_RELAY_PORT 33431 + +typedef struct Loop_Test { + int start_count; + int stop_count; + pthread_mutex_t mutex; + Tox *tox; +} Loop_Test; + +static void tox_loop_cb_start(Tox *tox, void *data) +{ + Loop_Test *userdata = (Loop_Test *)data; + pthread_mutex_lock(&userdata->mutex); + ++userdata->start_count; +} + +static void tox_loop_cb_stop(Tox *tox, void *data) +{ + Loop_Test *userdata = (Loop_Test *)data; + ++userdata->stop_count; + pthread_mutex_unlock(&userdata->mutex); +} + +static void *tox_loop_worker(void *data) +{ + Loop_Test *userdata = (Loop_Test *)data; + Tox_Err_Loop err; + tox_loop(userdata->tox, userdata, &err); + ck_assert_msg(err == TOX_ERR_LOOP_OK, "tox_loop error: %d", err); + return nullptr; +} + +static void test_tox_loop(void) +{ + pthread_t worker, worker_tcp; + Tox_Err_Options_New err_opts; + struct Tox_Options *opts = tox_options_new(&err_opts); + ck_assert_msg(err_opts == TOX_ERR_OPTIONS_NEW_OK, "tox_options_new: %d\n", err_opts); + tox_options_set_experimental_thread_safety(opts, true); + + Loop_Test *userdata = (Loop_Test *)calloc(1, sizeof(Loop_Test)); + ck_assert(userdata != nullptr); + uint8_t dpk[TOX_PUBLIC_KEY_SIZE]; + + userdata->start_count = 0; + userdata->stop_count = 0; + pthread_mutex_init(&userdata->mutex, nullptr); + + tox_options_set_tcp_port(opts, TCP_RELAY_PORT); + Tox_Err_New err_new; + userdata->tox = tox_new(opts, &err_new); + ck_assert_msg(err_new == TOX_ERR_NEW_OK, "tox_new: %d\n", err_new); + tox_callback_loop_begin(userdata->tox, tox_loop_cb_start); + tox_callback_loop_end(userdata->tox, tox_loop_cb_stop); + pthread_create(&worker, nullptr, tox_loop_worker, userdata); + + tox_self_get_dht_id(userdata->tox, dpk); + + tox_options_default(opts); + tox_options_set_experimental_thread_safety(opts, true); + Loop_Test userdata_tcp; + userdata_tcp.start_count = 0; + userdata_tcp.stop_count = 0; + pthread_mutex_init(&userdata_tcp.mutex, nullptr); + userdata_tcp.tox = tox_new(opts, &err_new); + ck_assert_msg(err_new == TOX_ERR_NEW_OK, "tox_new: %d\n", err_new); + tox_callback_loop_begin(userdata_tcp.tox, tox_loop_cb_start); + tox_callback_loop_end(userdata_tcp.tox, tox_loop_cb_stop); + pthread_create(&worker_tcp, nullptr, tox_loop_worker, &userdata_tcp); + + pthread_mutex_lock(&userdata_tcp.mutex); + Tox_Err_Bootstrap error; + ck_assert_msg(tox_add_tcp_relay(userdata_tcp.tox, TOX_LOCALHOST, TCP_RELAY_PORT, dpk, &error), "Add relay error, %i", + error); + ck_assert_msg(tox_bootstrap(userdata_tcp.tox, TOX_LOCALHOST, 33445, dpk, &error), "Bootstrap error, %i", error); + pthread_mutex_unlock(&userdata_tcp.mutex); + + c_sleep(1000); + + tox_loop_stop(userdata->tox); + void *retval = nullptr; + pthread_join(worker, &retval); + ck_assert_msg((uintptr_t)retval == 0, "tox_loop didn't return 0"); + + tox_kill(userdata->tox); + ck_assert_msg(userdata->start_count == userdata->stop_count, "start and stop must match (start = %d, stop = %d)", + userdata->start_count, userdata->stop_count); + + tox_loop_stop(userdata_tcp.tox); + pthread_join(worker_tcp, &retval); + ck_assert_msg((uintptr_t)retval == 0, "tox_loop didn't return 0"); + + tox_kill(userdata_tcp.tox); + ck_assert_msg(userdata_tcp.start_count == userdata_tcp.stop_count, "start and stop must match (start = %d, stop = %d)", + userdata_tcp.start_count, userdata_tcp.stop_count); + + tox_options_free(opts); + free(userdata); +} + +int main(int argc, char *argv[]) +{ + test_tox_loop(); + return 0; +} diff --git a/other/analysis/gen-file.sh b/other/analysis/gen-file.sh index 10edd0b5315..075066f6ea5 100644 --- a/other/analysis/gen-file.sh +++ b/other/analysis/gen-file.sh @@ -15,7 +15,7 @@ CPPFLAGS+=("-Itoxav") CPPFLAGS+=("-Itoxencryptsave") CPPFLAGS+=("-Ithird_party/cmp") -LDFLAGS=("-lopus" "-lsodium" "-lvpx" "-lpthread" "-lconfig" "-lgmock" "-lgtest") +LDFLAGS=("-lopus" "-lsodium" "-lvpx" "-lpthread" "-lconfig" "-lgmock" "-lgtest" "-lev") LDFLAGS+=("-fuse-ld=gold") LDFLAGS+=("-Wl,--detect-odr-violations") LDFLAGS+=("-Wl,--warn-common") diff --git a/other/analysis/run-clang-tidy b/other/analysis/run-clang-tidy index bcdb78f5b84..cbba2059943 100755 --- a/other/analysis/run-clang-tidy +++ b/other/analysis/run-clang-tidy @@ -79,6 +79,9 @@ CHECKS="$CHECKS,-clang-diagnostic-tautological-pointer-compare" # [unreadVariable] CHECKS="$CHECKS,-cppcoreguidelines-init-variables" +# Used by libev. +CHECKS="$CHECKS,-hicpp-no-assembler" + # Short variable names are used quite a lot, and we don't consider them a # readability issue. CHECKS="$CHECKS,-readability-identifier-length" diff --git a/other/analysis/variants.sh b/other/analysis/variants.sh index 6a8241bae95..7e04a450a57 100644 --- a/other/analysis/variants.sh +++ b/other/analysis/variants.sh @@ -1,3 +1,5 @@ #!/bin/bash -run +run "$@" +#run -DVANILLA_NACL -I/usr/include/sodium "$@" +run -DHAVE_LIBEV "$@" diff --git a/toxcore/BUILD.bazel b/toxcore/BUILD.bazel index 5ff521a4ed6..8d6692f6979 100644 --- a/toxcore/BUILD.bazel +++ b/toxcore/BUILD.bazel @@ -289,6 +289,7 @@ cc_library( name = "network", srcs = ["network.c"], hdrs = ["network.h"], + defines = ["HAVE_LIBEV"], visibility = [ "//c-toxcore/auto_tests:__pkg__", "//c-toxcore/other:__pkg__", @@ -305,6 +306,7 @@ cc_library( ":mem", ":mono_time", ":util", + "@ev", "@libsodium", "@psocket", "@pthread", diff --git a/toxcore/LAN_discovery.c b/toxcore/LAN_discovery.c index aead9759116..372b9d732e0 100644 --- a/toxcore/LAN_discovery.c +++ b/toxcore/LAN_discovery.c @@ -204,7 +204,8 @@ static Broadcast_Info *fetch_broadcast_info(const Network *ns) #endif /* platforms */ -/** @brief Send packet to all IPv4 broadcast addresses +/** + * @brief Send packet to all IPv4 broadcast addresses * * @retval true if sent to at least one broadcast target. * @retval false on failure to find any valid broadcast target. diff --git a/toxcore/TCP_client.c b/toxcore/TCP_client.c index 2b8c6e448f5..40501c3b277 100644 --- a/toxcore/TCP_client.c +++ b/toxcore/TCP_client.c @@ -12,6 +12,11 @@ #include #include +#ifdef HAVE_LIBEV +#include +#endif + +#include "DHT.h" #include "TCP_common.h" #include "attributes.h" #include "ccompat.h" @@ -30,9 +35,19 @@ typedef struct TCP_Client_Conn { uint32_t number; } TCP_Client_Conn; +#ifdef HAVE_LIBEV +typedef struct TCP_Client_Socket_Listener { + ev_io listener; + struct ev_loop *dispatcher; +} TCP_Client_Socket_Listener; +#endif + struct TCP_Client_Connection { TCP_Connection con; TCP_Client_Status status; +#ifdef HAVE_LIBEV + TCP_Client_Socket_Listener sock_listener; +#endif uint8_t self_public_key[CRYPTO_PUBLIC_KEY_SIZE]; /* our public key */ uint8_t public_key[CRYPTO_PUBLIC_KEY_SIZE]; /* public key of the server */ IP_Port ip_port; /* The ip and port of the server */ @@ -85,6 +100,44 @@ TCP_Client_Status tcp_con_status(const TCP_Client_Connection *con) { return con->status; } + +#ifdef HAVE_LIBEV +non_null() +static bool tcp_con_ev_is_active(TCP_Client_Connection *con) +{ + return ev_is_active(&con->sock_listener.listener) + || ev_is_pending(&con->sock_listener.listener); +} + +void tcp_con_ev_listen(TCP_Client_Connection *con, struct ev_loop *dispatcher, tcp_con_ev_listen_cb *callback, + void *data) +{ + if (tcp_con_ev_is_active(con)) { + return; + } + + con->sock_listener.dispatcher = dispatcher; + con->sock_listener.listener.data = data; + + ev_io_init(&con->sock_listener.listener, callback, con->con.sock.sock, EV_READ); + ev_io_start(dispatcher, &con->sock_listener.listener); +} + +void tcp_con_ev_stop(TCP_Client_Connection *con) +{ + if (!tcp_con_ev_is_active(con)) { + return; + } + + ev_io_stop(con->sock_listener.dispatcher, &con->sock_listener.listener); +} +#else +Socket tcp_con_sock(const TCP_Client_Connection *con) +{ + return con->con.sock; +} +#endif + void *tcp_con_custom_object(const TCP_Client_Connection *con) { return con->custom_object; @@ -1024,6 +1077,11 @@ void kill_tcp_connection(TCP_Client_Connection *tcp_connection) wipe_priority_list(tcp_connection->con.mem, tcp_connection->con.priority_queue_start); kill_sock(tcp_connection->con.ns, tcp_connection->con.sock); + +#ifdef HAVE_LIBEV + ev_io_stop(tcp_connection->sock_listener.dispatcher, &tcp_connection->sock_listener.listener); +#endif + crypto_memzero(tcp_connection, sizeof(TCP_Client_Connection)); mem_delete(mem, tcp_connection); } diff --git a/toxcore/TCP_client.h b/toxcore/TCP_client.h index ea2654b953b..89f26bbcc81 100644 --- a/toxcore/TCP_client.h +++ b/toxcore/TCP_client.h @@ -9,6 +9,10 @@ #ifndef C_TOXCORE_TOXCORE_TCP_CLIENT_H #define C_TOXCORE_TOXCORE_TCP_CLIENT_H +#ifdef HAVE_LIBEV +#include +#endif + #include "attributes.h" #include "crypto_core.h" #include "forwarding.h" @@ -50,6 +54,20 @@ IP_Port tcp_con_ip_port(const TCP_Client_Connection *con); non_null() TCP_Client_Status tcp_con_status(const TCP_Client_Connection *con); +// TODO(iphydf): This is exactly the same as in network.h. It should be factored +// out and probably abstracted away from ev.h. +#ifdef HAVE_LIBEV +typedef void tcp_con_ev_listen_cb(struct ev_loop *dispatcher, ev_io *sock_listener, int events); +non_null() +void tcp_con_ev_listen(TCP_Client_Connection *con, struct ev_loop *dispatcher, tcp_con_ev_listen_cb *callback, + void *data); +non_null() +void tcp_con_ev_stop(TCP_Client_Connection *con); +#else +non_null() +Socket tcp_con_sock(const TCP_Client_Connection *con); +#endif + non_null() void *tcp_con_custom_object(const TCP_Client_Connection *con); non_null() diff --git a/toxcore/TCP_connection.c b/toxcore/TCP_connection.c index c7161a92894..4cc61e9d865 100644 --- a/toxcore/TCP_connection.c +++ b/toxcore/TCP_connection.c @@ -71,6 +71,37 @@ uint32_t tcp_connections_count(const TCP_Connections *tcp_c) return tcp_c->tcp_connections_length; } +/** + * Return number of elements of TCP connection array. + * + * @param tcp_c struct containing TCP_con array. + * + * @return number of elements of TCP connection array. + */ +uint32_t tcp_connections_length(const TCP_Connections *tcp_c) +{ + return tcp_c->tcp_connections_length; +} + + +/** + * Return TCP connection stored at "idx" position. + * + * @param tcp_c struct containing TCP_con array. + * @param idx index of TCP connection to return (values from 0 to `tcp_connections_length() - 1`). + * + * @return TCP connection stored at "idx" position, or NULL if errors occurred. + */ +const TCP_con *tcp_connections_connection_at(const TCP_Connections *tcp_c, uint32_t idx) +{ + if (idx >= tcp_c->tcp_connections_length) { + return nullptr; + } + + return &tcp_c->tcp_connections[idx]; +} + + /** @brief Set the size of the array to num. * * @retval -1 if mem_vrealloc fails. diff --git a/toxcore/TCP_connection.h b/toxcore/TCP_connection.h index 2d35919f10e..42cdde404a6 100644 --- a/toxcore/TCP_connection.h +++ b/toxcore/TCP_connection.h @@ -83,6 +83,20 @@ typedef struct TCP_Connections TCP_Connections; non_null() const uint8_t *tcp_connections_public_key(const TCP_Connections *tcp_c); +non_null() +uint32_t tcp_connections_length(const TCP_Connections *tcp_c); + +/** + * Return TCP connection stored at "idx" position. + * + * @param tcp_c struct containing TCP_con array. + * @param idx index of TCP connection to return (values from 0 to `tcp_connections_length() - 1`). + * + * @return TCP connection stored at "idx" position, or NULL if errors occurred. + */ +non_null() +const TCP_con *tcp_connections_connection_at(const TCP_Connections *tcp_c, uint32_t idx); + non_null() uint32_t tcp_connections_count(const TCP_Connections *tcp_c); diff --git a/toxcore/network.c b/toxcore/network.c index c33c1cdac75..475f3cb3c95 100644 --- a/toxcore/network.c +++ b/toxcore/network.c @@ -81,6 +81,10 @@ #include #include +#ifdef HAVE_LIBEV +#include +#endif + #include "attributes.h" #include "bin_pack.h" #include "ccompat.h" @@ -904,6 +908,13 @@ typedef struct Packet_Handler { void *object; } Packet_Handler; +#ifdef HAVE_LIBEV +typedef struct Networking_Socket_Listener { + ev_io listener; + struct ev_loop *dispatcher; +} Networking_Socket_Listener; +#endif + struct Networking_Core { const Logger *log; const Memory *mem; @@ -914,6 +925,9 @@ struct Networking_Core { uint16_t port; /* Our UDP socket. */ Socket sock; +#ifdef HAVE_LIBEV + Networking_Socket_Listener sock_listener; +#endif }; Family net_family(const Networking_Core *net) @@ -926,6 +940,41 @@ uint16_t net_port(const Networking_Core *net) return net->port; } +Socket net_sock(const Networking_Core *net) +{ + return net->sock; +} + +#ifdef HAVE_LIBEV +non_null() +static bool net_ev_is_active(Networking_Core *net) +{ + return ev_is_active(&net->sock_listener.listener) || ev_is_pending(&net->sock_listener.listener); +} + +void net_ev_listen(Networking_Core *net, struct ev_loop *dispatcher, net_ev_listen_cb *callback, void *data) +{ + if (net_ev_is_active(net)) { + return; + } + + net->sock_listener.dispatcher = dispatcher; + net->sock_listener.listener.data = data; + + ev_io_init(&net->sock_listener.listener, callback, net->sock.sock, EV_READ); + ev_io_start(dispatcher, &net->sock_listener.listener); +} + +void net_ev_stop(Networking_Core *net) +{ + if (!net_ev_is_active(net)) { + return; + } + + ev_io_stop(net->sock_listener.dispatcher, &net->sock_listener.listener); +} +#endif + /* Basic network functions: */ @@ -1386,6 +1435,10 @@ void kill_networking(Networking_Core *net) kill_sock(net->ns, net->sock); } +#ifdef HAVE_LIBEV + net_ev_stop(net); +#endif + mem_delete(net->mem, net); } diff --git a/toxcore/network.h b/toxcore/network.h index 06857b89179..9c3442712ae 100644 --- a/toxcore/network.h +++ b/toxcore/network.h @@ -13,6 +13,10 @@ #include // size_t #include // uint*_t +#ifdef HAVE_LIBEV +#include +#endif + #include "attributes.h" #include "bin_pack.h" #include "logger.h" @@ -424,6 +428,16 @@ non_null() Family net_family(const Networking_Core *net); non_null() uint16_t net_port(const Networking_Core *net); +non_null() +Socket net_sock(const Networking_Core *net); + +#ifdef HAVE_LIBEV +typedef void net_ev_listen_cb(struct ev_loop *dispatcher, ev_io *sock_listener, int events); +non_null() +void net_ev_listen(Networking_Core *net, struct ev_loop *dispatcher, net_ev_listen_cb *callback, void *data); +non_null() +void net_ev_stop(Networking_Core *net); +#endif /** Close the socket. */ non_null() diff --git a/toxcore/tox.c b/toxcore/tox.c index 7bc121ea185..64c9cd35c9a 100644 --- a/toxcore/tox.c +++ b/toxcore/tox.c @@ -15,6 +15,18 @@ #include #include +#include + +#ifdef HAVE_LIBEV +#include +#else +#if defined (WIN32) || defined(_WIN32) || defined(__WIN32__) +#include +#else +#include +#endif // WIN32 || _WIN32 || __WIN32__ +#endif // !HAVE_LIBEV + #include "DHT.h" #include "Messenger.h" #include "TCP_client.h" @@ -867,6 +879,18 @@ Tox *tox_new(const struct Tox_Options *options, Tox_Err_New *error) m_options.proxy_info.ip_port.port = net_htons(tox_options_get_proxy_port(opts)); } +#ifdef HAVE_LIBEV + tox->dispatcher = ev_loop_new(0); + + if (tox->dispatcher == nullptr) { + SET_ERROR_PARAMETER(error, TOX_ERR_NEW_MALLOC); + tox_options_free(default_options); + mem_delete(sys->mem, tox); + return nullptr; + } + +#endif // HAVE_LIBEV + tox->mono_time = mono_time_new(tox->sys.mem, sys->mono_time_callback, sys->mono_time_user_data); if (tox->mono_time == nullptr) { @@ -1025,11 +1049,16 @@ void tox_kill(Tox *tox) return; } + tox_loop_stop(tox); + tox_lock(tox); LOGGER_ASSERT(tox->m->log, tox->m->msi_packet == nullptr, "Attempted to kill tox while toxav is still alive"); kill_groupchats(tox->m->conferences_object); kill_messenger(tox->m); mono_time_free(tox->sys.mem, tox->mono_time); +#ifdef HAVE_LIBEV + ev_loop_destroy(tox->dispatcher); +#endif tox_unlock(tox); if (tox->mutex != nullptr) { @@ -1264,6 +1293,277 @@ void tox_iterate(Tox *tox, void *user_data) tox_unlock(tox); } +void tox_callback_loop_begin(Tox *tox, tox_loop_begin_cb *callback) +{ + assert(tox != nullptr); + tox->loop_begin_callback = callback; +} + +void tox_callback_loop_end(Tox *tox, tox_loop_end_cb *callback) +{ + assert(tox != nullptr); + tox->loop_end_callback = callback; +} + +#ifdef HAVE_LIBEV + +non_null() +static void tox_stop_loop_async(struct ev_loop *dispatcher, ev_async *listener, int events) +{ + if (dispatcher == nullptr || listener == nullptr) { + return; + } + + const struct Tox_Userdata *tox_data = (const struct Tox_Userdata *)listener->data; + + const Messenger *m = tox_data->tox->m; + + // Stop TCP listeners. + const uint32_t len = tcp_connections_length(nc_get_tcp_c(m->net_crypto)); + + for (uint32_t i = 0; i < len; ++i) { + const TCP_con *conn = tcp_connections_connection_at(nc_get_tcp_c(m->net_crypto), i); + + assert(conn != nullptr); + tcp_con_ev_stop(conn->connection); + } + + // Stop UDP listener. + net_ev_stop(m->net); + + ev_async_stop(dispatcher, listener); + + ev_break(dispatcher, EVBREAK_ALL); +} + +non_null() +static void tox_do_iterate(struct ev_loop *dispatcher, ev_io *sock_listener, int events) +{ + if (dispatcher == nullptr || sock_listener == nullptr) { + return; + } + + struct Tox_Userdata *tox_data = (struct Tox_Userdata *)sock_listener->data; + + const Messenger *m = tox_data->tox->m; + + if (tox_data->tox->loop_begin_callback) { + tox_data->tox->loop_begin_callback(tox_data->tox, tox_data->user_data); + } + + tox_iterate(tox_data->tox, tox_data->user_data); + + // Start TCP listeners. + const uint32_t len = tcp_connections_length(nc_get_tcp_c(m->net_crypto)); + + for (uint32_t i = 0; i < len; ++i) { + const TCP_con *conn = tcp_connections_connection_at(nc_get_tcp_c(m->net_crypto), i); + + assert(conn != nullptr); + tcp_con_ev_listen(conn->connection, dispatcher, tox_do_iterate, tox_data); + } + + // Start UDP listener. + net_ev_listen(m->net, dispatcher, tox_do_iterate, tox_data); + + if (tox_data->tox->loop_end_callback) { + tox_data->tox->loop_end_callback(tox_data->tox, tox_data->user_data); + } +} + +bool tox_loop(Tox *tox, void *user_data, Tox_Err_Loop *error) +{ + assert(tox != nullptr); + + struct Tox_Userdata tox_data = { tox, user_data }; + + ev_async_init(&tox->stop_loop, tox_stop_loop_async); + tox->stop_loop.data = &tox_data; + ev_async_start(tox->dispatcher, &tox->stop_loop); + + ev_io stub_listener; + ev_init(&stub_listener, tox_do_iterate); + stub_listener.data = &tox_data; + tox_do_iterate(tox->dispatcher, &stub_listener, 0); + + if (ev_run(tox->dispatcher, 0) != 0) { + SET_ERROR_PARAMETER(error, TOX_ERR_LOOP_BREAK); + return false; + } + + SET_ERROR_PARAMETER(error, TOX_ERR_LOOP_OK); + return true; +} + +void tox_loop_stop(Tox *tox) +{ + assert(tox != nullptr); + tox_lock(tox); + ev_async_send(tox->dispatcher, &tox->stop_loop); + tox_unlock(tox); +} + +#else // !HAVE_LIBEV + +non_null() +static bool realloc_sockets(const Memory *mem, Socket **sockets_ptr, uint32_t *sockets_num, uint32_t fd_count) +{ + if (*sockets_num == fd_count) { + // No need to resize. + return true; + } + + Socket *new_sockets = (Socket *)mem_vrealloc(mem, *sockets_ptr, fd_count, sizeof(Socket)); + + if (new_sockets == nullptr) { + return false; + } + + *sockets_ptr = new_sockets; + *sockets_num = fd_count; + + return true; +} + +/** + * Gathers a list of every network file descriptor on which we expect + * I/O activity (the UDP socket and all TCP sockets). + * + * @param sockets_ptr a pointer to an array (the pointed array can be NULL). + * @param sockets_num the number of current known sockets (will be updated by the funciton). + * + * @return false on allocation error, true on success + */ +non_null() +static bool tox_loop_get_fds(const Messenger *m, Socket **sockets_ptr, uint32_t *sockets_num) +{ + assert(m != nullptr); + assert(sockets_ptr != nullptr); + assert(sockets_num != nullptr); + + const TCP_Connections *tcp_c = nc_get_tcp_c(m->net_crypto); + + const uint32_t tcp_count = tcp_connections_length(tcp_c); + const uint32_t fd_count = tcp_count + 1; // tcp_count TCP sockets + 1 UDP socket + + if (!realloc_sockets(m->mem, sockets_ptr, sockets_num, fd_count)) { + return false; + } + + Socket *sockets = *sockets_ptr; + + // Add the TCP sockets. + for (uint32_t i = 0; i < tcp_count; ++i) { + const TCP_con *conn = tcp_connections_connection_at(tcp_c, i); + + assert(conn != nullptr); + sockets[i] = tcp_con_sock(conn->connection); + } + + // Add the one UDP socket. + sockets[fd_count - 1] = net_sock(m->net); + + return true; +} + +non_null() +static bool locked_get(const Tox *tox, const bool *value) +{ + tox_lock(tox); + const bool res = *value; + tox_unlock(tox); + return res; +} + +non_null() +static void locked_set(const Tox *tox, bool *value, bool new_value) +{ + tox_lock(tox); + *value = new_value; + tox_unlock(tox); +} + +non_null() +static bool tox_loop_select(Tox *tox, Socket *fdlist, uint32_t fdcount) +{ + fd_set readable; + FD_ZERO(&readable); + + Socket maxfd = {0}; + + for (uint32_t i = 0; i < fdcount; ++i) { + if (fdlist[i].sock == 0) { + continue; + } + + FD_SET(fdlist[i].sock, &readable); + + if (fdlist[i].sock > maxfd.sock) { + maxfd = fdlist[i]; + } + } + + struct timeval timeout; + + // TODO(cleverca22): use a longer timeout. + timeout.tv_sec = 0; + + timeout.tv_usec = (suseconds_t)(tox_iteration_interval(tox) * 1000 * 2); + + return select(maxfd.sock, &readable, nullptr, nullptr, &timeout) >= 0 || errno == EBADF; +} + +bool tox_loop(Tox *tox, void *user_data, Tox_Err_Loop *error) +{ + assert(tox != nullptr); + + Messenger *m = tox->m; + + uint32_t fdcount = 0; + Socket *fdlist = nullptr; + + locked_set(tox, &tox->loop_run, true); + + while (locked_get(tox, &tox->loop_run)) { + if (tox->loop_begin_callback != nullptr) { + tox->loop_begin_callback(tox, user_data); + } + + tox_iterate(tox, user_data); + + // TODO(cleverca22): should we call loop_end_callback() on error? + if (tox->loop_end_callback != nullptr) { + tox->loop_end_callback(tox, user_data); + } + + if (!tox_loop_get_fds(m, &fdlist, &fdcount)) { + SET_ERROR_PARAMETER(error, TOX_ERR_LOOP_GET_FDS); + mem_delete(m->mem, fdlist); + return false; + } + + if (!tox_loop_select(tox, fdlist, fdcount)) { + SET_ERROR_PARAMETER(error, TOX_ERR_LOOP_SELECT); + mem_delete(m->mem, fdlist); + return false; + } + } + + SET_ERROR_PARAMETER(error, TOX_ERR_LOOP_OK); + + mem_delete(m->mem, fdlist); + + return true; +} + +void tox_loop_stop(Tox *tox) +{ + assert(tox != nullptr); + locked_set(tox, &tox->loop_run, false); +} + +#endif // !HAVE_LIBEV + void tox_self_get_address(const Tox *tox, uint8_t address[TOX_ADDRESS_SIZE]) { assert(tox != nullptr); diff --git a/toxcore/tox.h b/toxcore/tox.h index 371368d011d..1cd57abc24b 100644 --- a/toxcore/tox.h +++ b/toxcore/tox.h @@ -1061,6 +1061,71 @@ uint32_t tox_iteration_interval(const Tox *tox); */ void tox_iterate(Tox *tox, void *user_data); + +/** + * Error codes for `tox_loop()`. + */ +typedef enum Tox_Err_Loop { + + /** + * The function returned successfully. + */ + TOX_ERR_LOOP_OK, + + /** + * Failed running events dispatcher. + */ + TOX_ERR_LOOP_BREAK, + + /** + * Failed running `select()`. + */ + TOX_ERR_LOOP_SELECT, + + /** + * Failed getting sockets file descriptors. + */ + TOX_ERR_LOOP_GET_FDS, + +} Tox_Err_Loop; + + +/** + * Run `tox_iterate()` any time a packet arrives, returns after `tox_loop_stop()` or `tox_kill()`. + */ +bool tox_loop(Tox *tox, void *user_data, Tox_Err_Loop *error); + +/** + * Tell `tox_loop()` to return. + */ +void tox_loop_stop(Tox *tox); + +/** + * No extra parameters. + */ +typedef void tox_loop_begin_cb(Tox *tox, void *user_data); + + +/** + * Set the callback for the `loop_begin` event. Pass NULL to unset. + * + * This callback is invoked when `tox_loop()` calls into `tox_iterate()`, the client can lock a mutex here. + */ +void tox_callback_loop_begin(Tox *tox, tox_loop_begin_cb *callback); + +/** + * No extra parameters. + */ +typedef void tox_loop_end_cb(Tox *tox, void *user_data); + + +/** + * Set the callback for the `loop_end` event. Pass NULL to unset. + * + * This callback is invoked when `tox_loop()` is finished with `tox_iterate()`, the client can unlock the mutex here. + */ +void tox_callback_loop_end(Tox *tox, tox_loop_end_cb *callback); + /** @} */ /** @{ diff --git a/toxcore/tox_struct.h b/toxcore/tox_struct.h index bd42fcce8d6..6e6af78ee15 100644 --- a/toxcore/tox_struct.h +++ b/toxcore/tox_struct.h @@ -66,6 +66,16 @@ struct Tox { tox_group_join_fail_cb *group_join_fail_callback; tox_group_moderation_cb *group_moderation_callback; + tox_loop_begin_cb *loop_begin_callback; + tox_loop_end_cb *loop_end_callback; + +#ifdef HAVE_LIBEV + struct ev_loop *dispatcher; + ev_async stop_loop; +#else + bool loop_run; +#endif + void *toxav_object; // workaround to store a ToxAV object (setter and getter functions are available) };