From d1893a8c9eaa71b143bf4d140751fc2daa9a5dc5 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 16 Jan 2025 16:34:55 +0100 Subject: [PATCH 01/12] feat: support multiple juice_mux_listen callbacks Supports multiple callbacks for unhandled STUN requests. Stores multiple registries, one per local listening port in a `conn_registry_t **registries;` field on `conn_mode_entry_t`. The list of registries grows in the same way as the list of agents in `conn_mode_entry_t`. Adds a `port` property to `conn_registry_t` which is the port the registry is bound to, and an `index` property which is the index of the registry in the containing `registries` list in `conn_mode_entry_t`. --- CMakeLists.txt | 4 + src/agent.h | 2 + src/conn.c | 143 ++++++++++++++++++++++++++++----- src/conn.h | 2 + test/main.c | 29 +++++++ test/stun-unhandled-multiple.c | 83 +++++++++++++++++++ test/stun-unhandled-no-host.c | 87 ++++++++++++++++++++ test/stun-unhandled-unhandle.c | 104 ++++++++++++++++++++++++ test/stun-unhandled.c | 74 +++++++++++++++++ 9 files changed, 509 insertions(+), 19 deletions(-) create mode 100644 test/stun-unhandled-multiple.c create mode 100644 test/stun-unhandled-no-host.c create mode 100644 test/stun-unhandled-unhandle.c create mode 100644 test/stun-unhandled.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 25c71078..e2131c7e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -68,6 +68,10 @@ set(TESTS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/test/main.c ${CMAKE_CURRENT_SOURCE_DIR}/test/crc32.c ${CMAKE_CURRENT_SOURCE_DIR}/test/base64.c + ${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled.c + ${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-multiple.c + ${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-no-host.c + ${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-unhandle.c ${CMAKE_CURRENT_SOURCE_DIR}/test/stun.c ${CMAKE_CURRENT_SOURCE_DIR}/test/gathering.c ${CMAKE_CURRENT_SOURCE_DIR}/test/connectivity.c diff --git a/src/agent.h b/src/agent.h index 2bf029ed..c72e7542 100644 --- a/src/agent.h +++ b/src/agent.h @@ -149,6 +149,8 @@ struct juice_agent { thread_t resolver_thread; bool resolver_thread_started; + + conn_registry_t *registry; }; juice_agent_t *agent_create(const juice_config_t *config); diff --git a/src/conn.c b/src/conn.c index 4894c906..c81b4c9c 100644 --- a/src/conn.c +++ b/src/conn.c @@ -15,6 +15,7 @@ #include #include +#include #define INITIAL_REGISTRY_SIZE 16 @@ -33,7 +34,9 @@ typedef struct conn_mode_entry { int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size); mutex_t mutex; - conn_registry_t *registry; + conn_registry_t **registries; + int registries_size; + int registries_count; } conn_mode_entry_t; #define MODE_ENTRIES_SIZE 3 @@ -41,12 +44,12 @@ typedef struct conn_mode_entry { static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = { {conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup, conn_poll_lock, conn_poll_unlock, conn_poll_interrupt, conn_poll_send, conn_poll_get_addrs, - MUTEX_INITIALIZER, NULL}, + MUTEX_INITIALIZER, NULL, 0, 0}, {conn_mux_registry_init, conn_mux_registry_cleanup, conn_mux_init, conn_mux_cleanup, conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs, - MUTEX_INITIALIZER, NULL}, + MUTEX_INITIALIZER, NULL, 0, 0}, {NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock, - conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}}; + conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL, 0, 0}}; static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) { juice_concurrency_mode_t mode = agent->config.concurrency_mode; @@ -54,9 +57,91 @@ static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) { return mode_entries + (int)mode; } +// accepts a host name and a port and returns a token that can be used to +// distinguish one mux request handler from another. E.g. +// +// '127.0.0.1', 8080 -> '127.0.0.1:8080' +// '::', 8080 -> '[::]:8080' +// NULL, 8080 -> 'any:8080' +// '', 8080 -> 'any:8080' +static char *get_address (const char *bind_address, uint16_t port) { + if (!bind_address || strcmp(bind_address, "") == 0) { + bind_address = "any"; + } + + // search for '.' in bind_address, treat as IPv4 if found + char *result = strchr(bind_address, '.'); + int index = (int)(result - bind_address); + int maxAddrSize = 48; // ip6 is 39 chars + [ + ] + : + 5 for the port + \0 + char *address = (char*) calloc(1, maxAddrSize * sizeof(char)); + + // ip6 + char *format = "[%s]:%d"; + + if (index > -1) { + // ip4 + format = "%s:%d"; + } + + sprintf(address, format, bind_address, port); + + return address; +} + +static conn_registry_t *get_port_registry(conn_mode_entry_t *entry, const char *bind_address, uint16_t port) { + char *address = get_address(bind_address, port); + + for (int i = 0; i < entry->registries_size; i++) { + if (!entry->registries[i]) { + continue; + } + + if (strcmp(entry->registries[i]->address, address) == 0) { + return entry->registries[i]; + } + } + + return NULL; +} + +static int add_registry(conn_mode_entry_t *entry, conn_registry_t *registry) { + int i = 0; + while (i < entry->registries_size && entry->registries[i]) + ++i; + + if (i == entry->registries_size) { + int new_size = entry->registries_size * 2; + + if (new_size == 0) { + new_size = 1; + } + + JLOG_DEBUG("Reallocating registries array, new_size=%d", new_size); + assert(new_size > 0); + + conn_registry_t **new_registries = + realloc(entry->registries, new_size * sizeof(conn_registry_t *)); + if (!new_registries) { + JLOG_FATAL("Memory reallocation failed for registries array"); + return -1; + } + + entry->registries = new_registries; + entry->registries_size = new_size; + memset(entry->registries + i, 0, (new_size - i) * sizeof(conn_registry_t *)); + } + + entry->registries[i] = registry; + registry->registry_index = i; + ++entry->registries_count; + + return 0; +} + static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) { // entry must be locked - conn_registry_t *registry = entry->registry; + conn_registry_t *registry = get_port_registry(entry, config->bind_address, config->port_begin); + if (!registry) { if (!entry->registry_init_func) return 0; @@ -83,16 +168,25 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi mutex_init(®istry->mutex, MUTEX_RECURSIVE); mutex_lock(®istry->mutex); + registry->address = get_address(config->bind_address, config->port_begin); + if (entry->registry_init_func(registry, config)) { JLOG_FATAL("Registry initialization failed"); mutex_unlock(®istry->mutex); free(registry->agents); + free(registry->address); free(registry); return -1; } - entry->registry = registry; - + if (add_registry(entry, registry)) { + JLOG_FATAL("Adding registry to entry failed"); + mutex_unlock(®istry->mutex); + free(registry->agents); + free(registry->address); + free(registry); + return -1; + } } else { mutex_lock(®istry->mutex); } @@ -101,9 +195,8 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi return 0; } -static void release_registry(conn_mode_entry_t *entry) { +static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry) { // entry must be locked - conn_registry_t *registry = entry->registry; if (!registry) return; @@ -116,9 +209,19 @@ static void release_registry(conn_mode_entry_t *entry) { if (entry->registry_cleanup_func) entry->registry_cleanup_func(registry); + if (registry->registry_index > -1) { + int i = registry->registry_index; + assert(entry->registries[i] == registry); + entry->registries[i] = NULL; + registry->registry_index = -1; + } + + assert(entry->registries_count > 0); + --entry->registries_count; + free(registry->agents); + free(registry->address); free(registry); - entry->registry = NULL; return; } @@ -136,7 +239,8 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) { return -1; } - conn_registry_t *registry = entry->registry; + conn_registry_t *registry = get_port_registry(entry, config->bind_address, config->port_begin); + agent->registry = registry; JLOG_DEBUG("Creating connection"); if (registry) { @@ -164,7 +268,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) { } if (get_mode_entry(agent)->init_func(agent, registry, config)) { - release_registry(entry); // unlocks the registry + release_registry(entry, registry); // unlocks the registry mutex_unlock(&entry->mutex); return -1; } @@ -194,7 +298,7 @@ void conn_destroy(juice_agent_t *agent) { mutex_lock(&entry->mutex); JLOG_DEBUG("Destroying connection"); - conn_registry_t *registry = entry->registry; + conn_registry_t *registry = agent->registry; if (registry) { mutex_lock(®istry->mutex); @@ -205,12 +309,13 @@ void conn_destroy(juice_agent_t *agent) { assert(registry->agents[i] == agent); registry->agents[i] = NULL; agent->conn_index = -1; + agent->registry = NULL; } assert(registry->agents_count > 0); --registry->agents_count; - release_registry(entry); // unlocks the registry + release_registry(entry, registry); // unlocks the registry } else { entry->cleanup_func(agent); @@ -264,7 +369,7 @@ static int juice_mux_stop_listen(const char *bind_address, int local_port) { mutex_lock(&entry->mutex); - conn_registry_t *registry = entry->registry; + conn_registry_t *registry = get_port_registry(entry, bind_address, local_port); if (!registry) { mutex_unlock(&entry->mutex); return -1; @@ -276,7 +381,7 @@ static int juice_mux_stop_listen(const char *bind_address, int local_port) { registry->mux_incoming_user_ptr = NULL; conn_mux_interrupt_registry(registry); - release_registry(entry); + release_registry(entry, registry); mutex_unlock(&entry->mutex); @@ -296,9 +401,9 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco mutex_lock(&entry->mutex); - if (entry->registry) { + if (get_port_registry(entry, bind_address, local_port)) { mutex_unlock(&entry->mutex); - JLOG_DEBUG("juice_mux_listen needs to be called before establishing any mux connection."); + JLOG_DEBUG("juice_mux_listen there is already a listener for this host/port combination."); return -1; } @@ -307,7 +412,7 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco return -1; } - conn_registry_t *registry = entry->registry; + conn_registry_t *registry = get_port_registry(entry, bind_address, local_port); if(!registry) { mutex_unlock(&entry->mutex); return -1; diff --git a/src/conn.h b/src/conn.h index 5986c1c3..c179ff2b 100644 --- a/src/conn.h +++ b/src/conn.h @@ -25,6 +25,8 @@ typedef struct juice_agent juice_agent_t; // See include/juice/juice.h for implemented concurrency modes typedef struct conn_registry { + int registry_index; + char *address; void *impl; mutex_t mutex; juice_agent_t **agents; diff --git a/test/main.c b/test/main.c index 35e97f02..923082fc 100644 --- a/test/main.c +++ b/test/main.c @@ -22,6 +22,10 @@ int test_turn(void); int test_conflict(void); int test_bind(void); int test_ufrag(void); +int test_stun_unhandled(void); +int test_stun_unhandled_multiple(void); +int test_stun_unhandled_no_host(void); +int test_stun_unhandled_unhandle(void); #ifndef NO_SERVER int test_server(void); @@ -104,6 +108,31 @@ int main(int argc, char **argv) { return -1; } +#ifndef _WIN32 + // windows fails to read STUN message from listen socket: + // udp.c:196: Ignoring ECONNRESET returned by recvfrom + printf("\nRunning unhandled STUN message test...\n"); + if (test_stun_unhandled()) { + fprintf(stderr, "Unhandled STUN message test failed\n"); + return -1; + } + printf("\nRunning mutiple handler unhandled STUN message test...\n"); + if (test_stun_unhandled_multiple()) { + fprintf(stderr, "Mutiple handler unhandled STUN message test failed\n"); + return -1; + } + printf("\nRunning unhandled, unhandled STUN message test...\n"); + if (test_stun_unhandled_unhandle()) { + fprintf(stderr, "Unhandled, unhandled STUN message test failed\n"); + return -1; + } + printf("\nRunning no host unhandled STUN message test...\n"); + if (test_stun_unhandled_no_host()) { + fprintf(stderr, "Mutiple no host unhandled STUN message test failed\n"); + return -1; + } +#endif + #ifndef NO_SERVER printf("\nRunning server test...\n"); if (test_server()) { diff --git a/test/stun-unhandled-multiple.c b/test/stun-unhandled-multiple.c new file mode 100644 index 00000000..91c0bcf1 --- /dev/null +++ b/test/stun-unhandled-multiple.c @@ -0,0 +1,83 @@ +/** + * Copyright (c) 2022 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include "juice/juice.h" + +#include +#include +#include +#include + +#ifdef _WIN32 +#include +static void sleep(unsigned int secs) { Sleep(secs * 1000); } +#else +#include // for sleep +#endif + +static juice_agent_t *localAgent; +static juice_agent_t *remoteAgent; +static bool success1; +static bool success2; + +void stun_unhandled_multiple_callback1 (const juice_mux_binding_request_t *info, void *user_ptr) { + success1 = true; +} + +void stun_unhandled_multiple_callback2 (const juice_mux_binding_request_t *info, void *user_ptr) { + success2 = true; +} + +int test_stun_unhandled_multiple() { + juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); + + uint16_t port = 60002; + + // Generate local description + char * localSdp = "a=ice-ufrag:G4DJ\n\ +a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60002 typ host\n\ +a=candidate:2 1 UDP 2130706431 127.0.0.1 60003 typ host\n\ +a=end-of-candidates\n\ +a=ice-options:ice2\n\ +"; + + // Set up callbacks + juice_mux_listen("127.0.0.1", port, &stun_unhandled_multiple_callback1, NULL); + juice_mux_listen("127.0.0.1", port + 1, &stun_unhandled_multiple_callback2, NULL); + + // Create remote agent + juice_config_t remoteConfig; + memset(&remoteConfig, 0, sizeof(remoteConfig)); + remoteConfig.concurrency_mode = JUICE_CONCURRENCY_MODE_MUX; + remoteAgent = juice_create(&remoteConfig); + + // Remote agent: Receive description from local agent + juice_set_remote_description(remoteAgent, localSdp); + + // Remote agent: Gather candidates (and send them to local agent) + juice_gather_candidates(remoteAgent); + sleep(2); + + // -- Should have invoked both callbacks with STUN bind info -- + + // Destroy remote agent + juice_destroy(remoteAgent); + + // Unhandle mux listeners + juice_mux_listen("127.0.0.1", port, NULL, NULL); + juice_mux_listen("127.0.0.1", port + 1, NULL, NULL); + + if (success1 && success2) { + printf("Success\n"); + return 0; + } else { + printf("Failure\n"); + return -1; + } +} diff --git a/test/stun-unhandled-no-host.c b/test/stun-unhandled-no-host.c new file mode 100644 index 00000000..de723d6a --- /dev/null +++ b/test/stun-unhandled-no-host.c @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2022 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include "juice/juice.h" + +#include +#include +#include +#include + +#ifdef _WIN32 +#include +static void sleep(unsigned int secs) { Sleep(secs * 1000); } +#else +#include // for sleep +#endif + +static juice_agent_t *localAgent; +static juice_agent_t *remoteAgent; +static bool callback1Invoked; +static bool callback2Invoked; + +void stun_unhandled_no_host_callback1 (const juice_mux_binding_request_t *info, void *user_ptr) { + callback1Invoked = true; +} + +void stun_unhandled_no_host_callback2 (const juice_mux_binding_request_t *info, void *user_ptr) { + callback2Invoked = true; +} + +int test_stun_unhandled_no_host() { + juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); + + uint16_t port = 60001; + + // Generate local description + char * localSdp = "a=ice-ufrag:G4DJ\n\ +a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60001 typ host\n\ +a=candidate:2 1 UDP 2130706431 192.168.1.45 60001 typ host\n\ +a=end-of-candidates\n\ +a=ice-options:ice2\n\ +"; + + // Set up callbacks + juice_mux_listen(NULL, port, &stun_unhandled_no_host_callback1, NULL); + + if (juice_mux_listen("", port, &stun_unhandled_no_host_callback2, NULL) == 0) { + printf("Accepted two listeners for the same host/port combination\n"); + printf("Failure\n"); + return -1; + } + + // Create remote agent + juice_config_t remoteConfig; + memset(&remoteConfig, 0, sizeof(remoteConfig)); + remoteConfig.concurrency_mode = JUICE_CONCURRENCY_MODE_MUX; + remoteAgent = juice_create(&remoteConfig); + + // Remote agent: Receive description from local agent + juice_set_remote_description(remoteAgent, localSdp); + + // Remote agent: Gather candidates (and send them to local agent) + juice_gather_candidates(remoteAgent); + sleep(2); + + // -- Should have invoked both callbacks with STUN bind info -- + + // Destroy remote agent + juice_destroy(remoteAgent); + + // Unhandle mux listener + juice_mux_listen(NULL, port, NULL, NULL); + + if (callback1Invoked && !callback2Invoked) { + printf("Success\n"); + return 0; + } else { + printf("Failure\n"); + return -1; + } +} diff --git a/test/stun-unhandled-unhandle.c b/test/stun-unhandled-unhandle.c new file mode 100644 index 00000000..d0be45c7 --- /dev/null +++ b/test/stun-unhandled-unhandle.c @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2022 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include "juice/juice.h" + +#include +#include +#include +#include + +#ifdef _WIN32 +#include +static void sleep(unsigned int secs) { Sleep(secs * 1000); } +#else +#include // for sleep +#endif + +static juice_agent_t *remoteAgent1; +static juice_agent_t *remoteAgent2; + +static bool success; +static bool unhandled; +static bool invokedAfterUnhandle; + +void stun_unhandled_unhandle_callback (const juice_mux_binding_request_t *info, void *user_ptr) { + if (unhandled) { + invokedAfterUnhandle = true; + } else { + success = true; + } +} + +int test_stun_unhandled_unhandle() { + juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); + + uint16_t port = 60004; + + // Generate local description + char * localSdp = "a=ice-ufrag:G4DJ\n\ +a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60004 typ host\n\ +a=end-of-candidates\n\ +a=ice-options:ice2\n\ +"; + + // Set up callback + juice_mux_listen("127.0.0.1", port, &stun_unhandled_unhandle_callback, NULL); + + // Create remote agent + juice_config_t remoteConfig; + memset(&remoteConfig, 0, sizeof(remoteConfig)); + remoteConfig.concurrency_mode = JUICE_CONCURRENCY_MODE_MUX; + remoteAgent1 = juice_create(&remoteConfig); + + // Remote agent: Receive description from local agent + juice_set_remote_description(remoteAgent1, localSdp); + + // Remote agent: Gather candidates (and send them to local agent) + juice_gather_candidates(remoteAgent1); + sleep(2); + + // -- Should have received unhandled STUN packet(s) -- + + // Destroy remote agent + juice_destroy(remoteAgent1); + + // Remove callback + juice_mux_listen("127.0.0.1", port, NULL, NULL); + unhandled = true; + + // Create another remote agent + juice_config_t remoteConfig2; + memset(&remoteConfig2, 0, sizeof(remoteConfig)); + remoteConfig2.concurrency_mode = JUICE_CONCURRENCY_MODE_MUX; + remoteAgent2 = juice_create(&remoteConfig2); + + // Remote agent: Receive description from local agent + juice_set_remote_description(remoteAgent2, localSdp); + + // Remote agent: Gather candidates (and send them to local agent) + juice_gather_candidates(remoteAgent2); + sleep(2); + + // -- Should only have invoked callback with STUN bind info before unhandle -- + + // Destroy remote agent + juice_destroy(remoteAgent2); + + // Unhandle mux listener + juice_mux_listen("127.0.0.1", port, NULL, NULL); + + if (success && !invokedAfterUnhandle) { + printf("Success\n"); + return 0; + } else { + printf("Failure\n"); + return -1; + } +} diff --git a/test/stun-unhandled.c b/test/stun-unhandled.c new file mode 100644 index 00000000..74d652ea --- /dev/null +++ b/test/stun-unhandled.c @@ -0,0 +1,74 @@ +/** + * Copyright (c) 2022 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +#include "juice/juice.h" + +#include +#include +#include +#include + +#ifdef _WIN32 +#include +static void sleep(unsigned int secs) { Sleep(secs * 1000); } +#else +#include // for sleep +#endif + +static juice_agent_t *remoteAgent; +static bool success; + +void stun_unhandled_callback (const juice_mux_binding_request_t *info, void *user_ptr) { + success = true; +} + +int test_stun_unhandled() { + juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); + + uint16_t port = 60001; + + // Generate local description + char * localSdp = "a=ice-ufrag:G4DJ\n\ +a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60001 typ host\n\ +a=end-of-candidates\n\ +a=ice-options:ice2\n\ +"; + + // Set up callback + juice_mux_listen("127.0.0.1", 60001, &stun_unhandled_callback, NULL); + + // Create remote agent + juice_config_t remoteConfig; + memset(&remoteConfig, 0, sizeof(remoteConfig)); + remoteConfig.concurrency_mode = JUICE_CONCURRENCY_MODE_MUX; + remoteAgent = juice_create(&remoteConfig); + + // Remote agent: Receive description from local agent + juice_set_remote_description(remoteAgent, localSdp); + + // Remote agent: Gather candidates (and send them to local agent) + juice_gather_candidates(remoteAgent); + sleep(2); + + // -- Should have received unhandled STUN packet(s) -- + + // Destroy remote agent + juice_destroy(remoteAgent); + + // Unhandle mux listener + juice_mux_listen("127.0.0.1", port, NULL, NULL); + + if (success) { + printf("Success\n"); + return 0; + } else { + printf("Failure\n"); + return -1; + } +} From a4eba085cc80e39cbcef2f228c4d5f178b43a925 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 21 Jan 2025 11:21:12 +0100 Subject: [PATCH 02/12] chore: compare ports only --- src/conn.c | 52 ++++++++-------------------------------------------- src/conn.h | 2 +- 2 files changed, 9 insertions(+), 45 deletions(-) diff --git a/src/conn.c b/src/conn.c index c81b4c9c..3dbdcedb 100644 --- a/src/conn.c +++ b/src/conn.c @@ -57,46 +57,13 @@ static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) { return mode_entries + (int)mode; } -// accepts a host name and a port and returns a token that can be used to -// distinguish one mux request handler from another. E.g. -// -// '127.0.0.1', 8080 -> '127.0.0.1:8080' -// '::', 8080 -> '[::]:8080' -// NULL, 8080 -> 'any:8080' -// '', 8080 -> 'any:8080' -static char *get_address (const char *bind_address, uint16_t port) { - if (!bind_address || strcmp(bind_address, "") == 0) { - bind_address = "any"; - } - - // search for '.' in bind_address, treat as IPv4 if found - char *result = strchr(bind_address, '.'); - int index = (int)(result - bind_address); - int maxAddrSize = 48; // ip6 is 39 chars + [ + ] + : + 5 for the port + \0 - char *address = (char*) calloc(1, maxAddrSize * sizeof(char)); - - // ip6 - char *format = "[%s]:%d"; - - if (index > -1) { - // ip4 - format = "%s:%d"; - } - - sprintf(address, format, bind_address, port); - - return address; -} - -static conn_registry_t *get_port_registry(conn_mode_entry_t *entry, const char *bind_address, uint16_t port) { - char *address = get_address(bind_address, port); - +static conn_registry_t *get_port_registry(conn_mode_entry_t *entry, uint16_t port) { for (int i = 0; i < entry->registries_size; i++) { if (!entry->registries[i]) { continue; } - if (strcmp(entry->registries[i]->address, address) == 0) { + if (entry->registries[i]->port == port) { return entry->registries[i]; } } @@ -140,7 +107,7 @@ static int add_registry(conn_mode_entry_t *entry, conn_registry_t *registry) { static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) { // entry must be locked - conn_registry_t *registry = get_port_registry(entry, config->bind_address, config->port_begin); + conn_registry_t *registry = get_port_registry(entry, config->port_begin); if (!registry) { if (!entry->registry_init_func) @@ -168,13 +135,12 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi mutex_init(®istry->mutex, MUTEX_RECURSIVE); mutex_lock(®istry->mutex); - registry->address = get_address(config->bind_address, config->port_begin); + registry->port = config->port_begin; if (entry->registry_init_func(registry, config)) { JLOG_FATAL("Registry initialization failed"); mutex_unlock(®istry->mutex); free(registry->agents); - free(registry->address); free(registry); return -1; } @@ -183,7 +149,6 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi JLOG_FATAL("Adding registry to entry failed"); mutex_unlock(®istry->mutex); free(registry->agents); - free(registry->address); free(registry); return -1; } @@ -220,7 +185,6 @@ static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry --entry->registries_count; free(registry->agents); - free(registry->address); free(registry); return; } @@ -239,7 +203,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) { return -1; } - conn_registry_t *registry = get_port_registry(entry, config->bind_address, config->port_begin); + conn_registry_t *registry = get_port_registry(entry, config->port_begin); agent->registry = registry; JLOG_DEBUG("Creating connection"); @@ -369,7 +333,7 @@ static int juice_mux_stop_listen(const char *bind_address, int local_port) { mutex_lock(&entry->mutex); - conn_registry_t *registry = get_port_registry(entry, bind_address, local_port); + conn_registry_t *registry = get_port_registry(entry, local_port); if (!registry) { mutex_unlock(&entry->mutex); return -1; @@ -401,7 +365,7 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco mutex_lock(&entry->mutex); - if (get_port_registry(entry, bind_address, local_port)) { + if (get_port_registry(entry, local_port)) { mutex_unlock(&entry->mutex); JLOG_DEBUG("juice_mux_listen there is already a listener for this host/port combination."); return -1; @@ -412,7 +376,7 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco return -1; } - conn_registry_t *registry = get_port_registry(entry, bind_address, local_port); + conn_registry_t *registry = get_port_registry(entry, local_port); if(!registry) { mutex_unlock(&entry->mutex); return -1; diff --git a/src/conn.h b/src/conn.h index c179ff2b..c85160b5 100644 --- a/src/conn.h +++ b/src/conn.h @@ -26,7 +26,7 @@ typedef struct juice_agent juice_agent_t; typedef struct conn_registry { int registry_index; - char *address; + uint16_t port; void *impl; mutex_t mutex; juice_agent_t **agents; From 628379b5b567794768126b060d4f75e1c073b0c1 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 21 Jan 2025 11:21:35 +0100 Subject: [PATCH 03/12] chore: fix whitespace --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e2131c7e..63ab15bc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -70,7 +70,7 @@ set(TESTS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/test/base64.c ${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled.c ${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-multiple.c - ${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-no-host.c + ${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-no-host.c ${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-unhandle.c ${CMAKE_CURRENT_SOURCE_DIR}/test/stun.c ${CMAKE_CURRENT_SOURCE_DIR}/test/gathering.c From df031e9133c0bff4608d25cd231f49ce2bb7ac0e Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 22 Jan 2025 17:04:05 +0100 Subject: [PATCH 04/12] chore: move mux operations to conn_mux --- src/conn.c | 191 +++++++++------------------------ src/conn.h | 26 ++++- src/conn_mux.c | 135 ++++++++++++++++++++++- src/conn_mux.h | 3 + src/conn_poll.c | 6 ++ src/conn_poll.h | 1 + src/conn_thread.c | 7 ++ src/conn_thread.h | 1 + test/stun-unhandled-multiple.c | 32 ++++-- test/stun-unhandled-no-host.c | 12 ++- test/stun-unhandled-unhandle.c | 44 ++++++-- test/stun-unhandled.c | 24 +++-- 12 files changed, 305 insertions(+), 177 deletions(-) diff --git a/src/conn.c b/src/conn.c index 3dbdcedb..80d42b53 100644 --- a/src/conn.c +++ b/src/conn.c @@ -19,95 +19,37 @@ #define INITIAL_REGISTRY_SIZE 16 -typedef struct conn_mode_entry { - int (*registry_init_func)(conn_registry_t *registry, udp_socket_config_t *config); - void (*registry_cleanup_func)(conn_registry_t *registry); - - int (*init_func)(juice_agent_t *agent, struct conn_registry *registry, - udp_socket_config_t *config); - void (*cleanup_func)(juice_agent_t *agent); - void (*lock_func)(juice_agent_t *agent); - void (*unlock_func)(juice_agent_t *agent); - int (*interrupt_func)(juice_agent_t *agent); - int (*send_func)(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, - int ds); - int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size); - - mutex_t mutex; - conn_registry_t **registries; - int registries_size; - int registries_count; -} conn_mode_entry_t; - #define MODE_ENTRIES_SIZE 3 static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = { {conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup, conn_poll_lock, conn_poll_unlock, conn_poll_interrupt, conn_poll_send, conn_poll_get_addrs, - MUTEX_INITIALIZER, NULL, 0, 0}, + NULL, conn_poll_get_registry, NULL, MUTEX_INITIALIZER, NULL}, {conn_mux_registry_init, conn_mux_registry_cleanup, conn_mux_init, conn_mux_cleanup, conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs, - MUTEX_INITIALIZER, NULL, 0, 0}, - {NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock, - conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL, 0, 0}}; + conn_mux_listen, conn_mux_get_registry, conn_mux_can_release_registry, MUTEX_INITIALIZER, NULL}, + {NULL, NULL, conn_thread_init, conn_thread_cleanup, + conn_thread_lock, conn_thread_unlock, conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, + NULL, conn_thread_get_registry, NULL, MUTEX_INITIALIZER, NULL} +}; -static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) { - juice_concurrency_mode_t mode = agent->config.concurrency_mode; - assert(mode >= 0 && mode < MODE_ENTRIES_SIZE); - return mode_entries + (int)mode; -} +#define MODE_ENTRIES_SIZE 3 -static conn_registry_t *get_port_registry(conn_mode_entry_t *entry, uint16_t port) { - for (int i = 0; i < entry->registries_size; i++) { - if (!entry->registries[i]) { - continue; - } +static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE]; - if (entry->registries[i]->port == port) { - return entry->registries[i]; - } - } - - return NULL; +conn_mode_entry_t *conn_get_mode_entry(juice_concurrency_mode_t mode) { + assert(mode >= 0 && mode < MODE_ENTRIES_SIZE); + return mode_entries + (int)mode; } -static int add_registry(conn_mode_entry_t *entry, conn_registry_t *registry) { - int i = 0; - while (i < entry->registries_size && entry->registries[i]) - ++i; - - if (i == entry->registries_size) { - int new_size = entry->registries_size * 2; - - if (new_size == 0) { - new_size = 1; - } - - JLOG_DEBUG("Reallocating registries array, new_size=%d", new_size); - assert(new_size > 0); - - conn_registry_t **new_registries = - realloc(entry->registries, new_size * sizeof(conn_registry_t *)); - if (!new_registries) { - JLOG_FATAL("Memory reallocation failed for registries array"); - return -1; - } - - entry->registries = new_registries; - entry->registries_size = new_size; - memset(entry->registries + i, 0, (new_size - i) * sizeof(conn_registry_t *)); - } - - entry->registries[i] = registry; - registry->registry_index = i; - ++entry->registries_count; - - return 0; +static conn_mode_entry_t *get_agent_mode_entry(juice_agent_t *agent) { + juice_concurrency_mode_t mode = agent->config.concurrency_mode; + return conn_get_mode_entry(mode); } static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) { // entry must be locked - conn_registry_t *registry = get_port_registry(entry, config->port_begin); + conn_registry_t *registry = entry->get_registry_func(config); if (!registry) { if (!entry->registry_init_func) @@ -135,8 +77,6 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi mutex_init(®istry->mutex, MUTEX_RECURSIVE); mutex_lock(®istry->mutex); - registry->port = config->port_begin; - if (entry->registry_init_func(registry, config)) { JLOG_FATAL("Registry initialization failed"); mutex_unlock(®istry->mutex); @@ -145,13 +85,7 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi return -1; } - if (add_registry(entry, registry)) { - JLOG_FATAL("Adding registry to entry failed"); - mutex_unlock(®istry->mutex); - free(registry->agents); - free(registry); - return -1; - } + entry->registry = registry; } else { mutex_lock(®istry->mutex); } @@ -166,24 +100,16 @@ static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry return; // registry must be locked + bool canRelease = entry->can_release_registry_func ? entry->can_release_registry_func(registry) : true; - if (registry->agents_count == 0 && registry->cb_mux_incoming == NULL) { + if (registry->agents_count == 0 && canRelease) { JLOG_DEBUG("No connection left, destroying connections registry"); mutex_unlock(®istry->mutex); if (entry->registry_cleanup_func) entry->registry_cleanup_func(registry); - if (registry->registry_index > -1) { - int i = registry->registry_index; - assert(entry->registries[i] == registry); - entry->registries[i] = NULL; - registry->registry_index = -1; - } - - assert(entry->registries_count > 0); - --entry->registries_count; - + entry->registry = NULL; free(registry->agents); free(registry); return; @@ -196,14 +122,14 @@ static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry } int conn_create(juice_agent_t *agent, udp_socket_config_t *config) { - conn_mode_entry_t *entry = get_mode_entry(agent); + conn_mode_entry_t *entry = get_agent_mode_entry(agent); mutex_lock(&entry->mutex); - if(acquire_registry(entry, config)) { // locks the registry if created + if (acquire_registry(entry, config)) { // locks the registry if created mutex_unlock(&entry->mutex); return -1; } - conn_registry_t *registry = get_port_registry(entry, config->port_begin); + conn_registry_t *registry = entry->get_registry_func(config); agent->registry = registry; JLOG_DEBUG("Creating connection"); @@ -231,7 +157,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) { memset(registry->agents + i, 0, (new_size - i) * sizeof(juice_agent_t *)); } - if (get_mode_entry(agent)->init_func(agent, registry, config)) { + if (get_agent_mode_entry(agent)->init_func(agent, registry, config)) { release_registry(entry, registry); // unlocks the registry mutex_unlock(&entry->mutex); return -1; @@ -244,7 +170,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) { mutex_unlock(®istry->mutex); } else { - if (get_mode_entry(agent)->init_func(agent, NULL, config)) { + if (get_agent_mode_entry(agent)->init_func(agent, NULL, config)) { mutex_unlock(&entry->mutex); return -1; } @@ -258,7 +184,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) { } void conn_destroy(juice_agent_t *agent) { - conn_mode_entry_t *entry = get_mode_entry(agent); + conn_mode_entry_t *entry = get_agent_mode_entry(agent); mutex_lock(&entry->mutex); JLOG_DEBUG("Destroying connection"); @@ -273,12 +199,12 @@ void conn_destroy(juice_agent_t *agent) { assert(registry->agents[i] == agent); registry->agents[i] = NULL; agent->conn_index = -1; - agent->registry = NULL; } assert(registry->agents_count > 0); --registry->agents_count; + agent->registry = NULL; release_registry(entry, registry); // unlocks the registry } else { @@ -293,21 +219,21 @@ void conn_lock(juice_agent_t *agent) { if (!agent->conn_impl) return; - get_mode_entry(agent)->lock_func(agent); + get_agent_mode_entry(agent)->lock_func(agent); } void conn_unlock(juice_agent_t *agent) { if (!agent->conn_impl) return; - get_mode_entry(agent)->unlock_func(agent); + get_agent_mode_entry(agent)->unlock_func(agent); } int conn_interrupt(juice_agent_t *agent) { if (!agent->conn_impl) return -1; - return get_mode_entry(agent)->interrupt_func(agent); + return get_agent_mode_entry(agent)->interrupt_func(agent); } int conn_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, @@ -315,75 +241,58 @@ int conn_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, if (!agent->conn_impl) return -1; - return get_mode_entry(agent)->send_func(agent, dst, data, size, ds); + return get_agent_mode_entry(agent)->send_func(agent, dst, data, size, ds); } int conn_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size) { if (!agent->conn_impl) return -1; - return get_mode_entry(agent)->get_addrs_func(agent, records, size); + return get_agent_mode_entry(agent)->get_addrs_func(agent, records, size); } -static int juice_mux_stop_listen(const char *bind_address, int local_port) { - (void)bind_address; - (void)local_port; - +int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_incoming_t cb, void *user_ptr) +{ conn_mode_entry_t *entry = &mode_entries[JUICE_CONCURRENCY_MODE_MUX]; - mutex_lock(&entry->mutex); - - conn_registry_t *registry = get_port_registry(entry, local_port); - if (!registry) { - mutex_unlock(&entry->mutex); + if (!entry->mux_listen_func) { + JLOG_DEBUG("juice_mux_listen mux_listen_func is not implemented"); return -1; } - mutex_lock(®istry->mutex); - - registry->cb_mux_incoming = NULL; - registry->mux_incoming_user_ptr = NULL; - conn_mux_interrupt_registry(registry); - - release_registry(entry, registry); - - mutex_unlock(&entry->mutex); - - return 0; -} - -int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_incoming_t cb, void *user_ptr) -{ - if (!cb) - return juice_mux_stop_listen(bind_address, local_port); + if (!entry->get_registry_func) { + JLOG_DEBUG("juice_mux_listen get_registry_func is not implemented"); + return -1; + } - conn_mode_entry_t *entry = &mode_entries[JUICE_CONCURRENCY_MODE_MUX]; + mutex_lock(&entry->mutex); udp_socket_config_t config; config.bind_address = bind_address; config.port_begin = config.port_end = local_port; - mutex_lock(&entry->mutex); - - if (get_port_registry(entry, local_port)) { + // locks the registry, creating it first if required + if(acquire_registry(entry, &config)) { + JLOG_DEBUG("juice_mux_listen acquiring registry failed"); mutex_unlock(&entry->mutex); - JLOG_DEBUG("juice_mux_listen there is already a listener for this host/port combination."); return -1; } - if(acquire_registry(entry, &config)) { // locks the registry if created + conn_registry_t *registry = entry->get_registry_func(&config); + if (!registry) { + JLOG_DEBUG("juice_mux_listen registry not found after creating it"); mutex_unlock(&entry->mutex); return -1; } - conn_registry_t *registry = get_port_registry(entry, local_port); - if(!registry) { + if (entry->mux_listen_func(registry, cb, user_ptr)) { + mutex_unlock(®istry->mutex); mutex_unlock(&entry->mutex); + JLOG_DEBUG("juice_mux_listen failed to call mux_listen_func for %s:%d", bind_address, local_port); return -1; } - registry->cb_mux_incoming = cb; - registry->mux_incoming_user_ptr = user_ptr; + JLOG_DEBUG("try unlock registry\n"); mutex_unlock(®istry->mutex); mutex_unlock(&entry->mutex); diff --git a/src/conn.h b/src/conn.h index c85160b5..f7bac5a2 100644 --- a/src/conn.h +++ b/src/conn.h @@ -25,17 +25,35 @@ typedef struct juice_agent juice_agent_t; // See include/juice/juice.h for implemented concurrency modes typedef struct conn_registry { - int registry_index; - uint16_t port; void *impl; mutex_t mutex; juice_agent_t **agents; int agents_size; int agents_count; - juice_cb_mux_incoming_t cb_mux_incoming; - void *mux_incoming_user_ptr; } conn_registry_t; +typedef struct conn_mode_entry { + int (*registry_init_func)(conn_registry_t *registry, udp_socket_config_t *config); + void (*registry_cleanup_func)(conn_registry_t *registry); + + int (*init_func)(juice_agent_t *agent, struct conn_registry *registry, + udp_socket_config_t *config); + void (*cleanup_func)(juice_agent_t *agent); + void (*lock_func)(juice_agent_t *agent); + void (*unlock_func)(juice_agent_t *agent); + int (*interrupt_func)(juice_agent_t *agent); + int (*send_func)(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, + int ds); + int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size); + int (*mux_listen_func)(conn_registry_t *registry, juice_cb_mux_incoming_t cb, void *user_ptr); + conn_registry_t *(*get_registry_func)(udp_socket_config_t *config); + bool (*can_release_registry_func)(conn_registry_t *registry); + + mutex_t mutex; + conn_registry_t *registry; +} conn_mode_entry_t; + +conn_mode_entry_t *conn_get_mode_entry(juice_concurrency_mode_t mode); int conn_create(juice_agent_t *agent, udp_socket_config_t *config); void conn_destroy(juice_agent_t *agent); void conn_lock(juice_agent_t *agent); diff --git a/src/conn_mux.c b/src/conn_mux.c index e6c6774b..3ad43697 100644 --- a/src/conn_mux.c +++ b/src/conn_mux.c @@ -33,6 +33,8 @@ typedef struct map_entry { } map_entry_t; typedef struct registry_impl { + int conn_mux_registries_index; + uint16_t port; thread_t thread; socket_t sock; mutex_t send_mutex; @@ -40,6 +42,8 @@ typedef struct registry_impl { map_entry_t *map; int map_size; int map_count; + juice_cb_mux_incoming_t cb_mux_incoming; + void *mux_incoming_user_ptr; } registry_impl_t; typedef struct conn_impl { @@ -48,6 +52,63 @@ typedef struct conn_impl { bool finished; } conn_impl_t; +static conn_registry_t **conn_mux_registries; +static int conn_mux_registries_size; +static int conn_mux_registries_count; + +conn_registry_t *conn_mux_get_registry(udp_socket_config_t *config) { + for (int i = 0; i < conn_mux_registries_size; i++) { + if (!conn_mux_registries[i]) { + continue; + } + + conn_registry_t *registry = conn_mux_registries[i]; + registry_impl_t *impl = registry->impl; + + if (impl->port == config->port_begin) { + return registry; + } + } + + return NULL; +} + +static int conn_mux_add_registry(conn_registry_t *registry) { + int i = 0; + while (i < conn_mux_registries_size && conn_mux_registries[i]) + ++i; + + if (i == conn_mux_registries_size) { + int new_size = conn_mux_registries_size * 2; + + if (new_size == 0) { + new_size = 1; + } + + JLOG_DEBUG("Reallocating registries array, new_size=%d", new_size); + assert(new_size > 0); + + conn_registry_t **new_registries = + realloc(conn_mux_registries, new_size * sizeof(conn_registry_t *)); + if (!new_registries) { + JLOG_FATAL("Memory reallocation failed for registries array"); + return -1; + } + + conn_mux_registries = new_registries; + conn_mux_registries_size = new_size; + memset(conn_mux_registries + i, 0, (new_size - i) * sizeof(conn_registry_t *)); + } + + conn_mux_registries[i] = registry; + ++conn_mux_registries_count; + + registry_impl_t *impl = registry->impl; + impl->conn_mux_registries_index = i; + + return 0; +} + static bool is_ready(const juice_agent_t *agent) { if (!agent) return false; @@ -181,6 +242,7 @@ int conn_mux_registry_init(conn_registry_t *registry, udp_socket_config_t *confi } registry_impl->map_size = INITIAL_MAP_SIZE; registry_impl->map_count = 0; + registry_impl->port = config->port_begin; registry_impl->sock = udp_create_socket(config); if (registry_impl->sock == INVALID_SOCKET) { @@ -200,6 +262,13 @@ int conn_mux_registry_init(conn_registry_t *registry, udp_socket_config_t *confi goto error; } + if (conn_mux_add_registry(registry)) { + JLOG_FATAL("Could not add registry"); + free(registry_impl->map); + free(registry_impl); + return -1; + } + return 0; error: @@ -217,6 +286,16 @@ void conn_mux_registry_cleanup(conn_registry_t *registry) { JLOG_VERBOSE("Waiting for connections thread"); thread_join(registry_impl->thread, NULL); + if (registry_impl->conn_mux_registries_index > -1) { + int i = registry_impl->conn_mux_registries_index; + assert(conn_mux_registries[i] == registry); + conn_mux_registries[i] = NULL; + registry_impl->conn_mux_registries_index = -1; + } + + assert(conn_mux_registries_count > 0); + --conn_mux_registries_count; + mutex_destroy(®istry_impl->send_mutex); closesocket(registry_impl->sock); free(registry_impl->map); @@ -243,7 +322,8 @@ int conn_mux_prepare(conn_registry_t *registry, struct pollfd *pfd, timestamp_t } int count = registry->agents_count; - if (registry->cb_mux_incoming) + registry_impl_t *impl = registry->impl; + if (impl->cb_mux_incoming) ++count; mutex_unlock(®istry->mutex); return count; @@ -297,7 +377,9 @@ static juice_agent_t *lookup_agent(conn_registry_t *registry, char *buf, size_t } } - if (registry->cb_mux_incoming) { + registry_impl_t *impl = registry->impl; + + if (impl->cb_mux_incoming) { JLOG_DEBUG("Found STUN request with unknown ICE ufrag"); char host[ADDR_MAX_NUMERICHOST_LEN]; if (getnameinfo((const struct sockaddr *)&src->addr, src->len, host, ADDR_MAX_NUMERICHOST_LEN, NULL, 0, NI_NUMERICHOST)) { @@ -312,7 +394,7 @@ static juice_agent_t *lookup_agent(conn_registry_t *registry, char *buf, size_t incoming_info.address = host; incoming_info.port = addr_get_port((struct sockaddr *)src); - registry->cb_mux_incoming(&incoming_info, registry->mux_incoming_user_ptr); + impl->cb_mux_incoming(&incoming_info, impl->mux_incoming_user_ptr); return NULL; } @@ -566,3 +648,50 @@ int conn_mux_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size return udp_get_addrs(registry_impl->sock, records, size); } + +int conn_mux_stop_listen(conn_registry_t *registry) { + registry_impl_t *registry_impl = registry->impl; + if (!registry_impl) { + JLOG_VERBOSE("conn_mux_stop_listen No registry impl found"); + return -1; + } + + JLOG_VERBOSE("conn_mux_stop_listen Removing mux handler callback"); + registry_impl->cb_mux_incoming = NULL; + registry_impl->mux_incoming_user_ptr = NULL; + + return conn_mux_interrupt_registry(registry); +} + +int conn_mux_listen(conn_registry_t *registry, juice_cb_mux_incoming_t cb, void *user_ptr) { + if (!cb) { + return conn_mux_stop_listen(registry); + } + + registry_impl_t *registry_impl = registry->impl; + if (!registry_impl) { + JLOG_VERBOSE("conn_mux_listen No registry impl found"); + return -1; + } + + if (registry_impl->cb_mux_incoming) { + JLOG_VERBOSE("conn_mux_listen Callback already registered\n"); + return -1; + } + + registry_impl->cb_mux_incoming = cb; + registry_impl->mux_incoming_user_ptr = user_ptr; + + return 0; +} + +bool conn_mux_can_release_registry(conn_registry_t *registry) { + registry_impl_t *registry_impl = registry->impl; + + if (!registry_impl) { + JLOG_VERBOSE("conn_mux_can_release_registry No registry impl found"); + return true; + } + + return registry_impl->cb_mux_incoming == NULL; +} diff --git a/src/conn_mux.h b/src/conn_mux.h index 68b51b67..bfd65010 100644 --- a/src/conn_mux.h +++ b/src/conn_mux.h @@ -29,5 +29,8 @@ int conn_mux_interrupt(juice_agent_t *agent); int conn_mux_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, int ds); int conn_mux_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size); +int conn_mux_listen(conn_registry_t *registry, juice_cb_mux_incoming_t cb, void *user_ptr); +conn_registry_t *conn_mux_get_registry(udp_socket_config_t *config); +bool conn_mux_can_release_registry(conn_registry_t *registry); #endif diff --git a/src/conn_poll.c b/src/conn_poll.c index a8462c00..a2d80081 100644 --- a/src/conn_poll.c +++ b/src/conn_poll.c @@ -56,6 +56,12 @@ static thread_return_t THREAD_CALL conn_thread_entry(void *arg) { return (thread_return_t)0; } +conn_registry_t *conn_poll_get_registry([[maybe_unused]] udp_socket_config_t *config) { + conn_mode_entry_t *entry = conn_get_mode_entry(JUICE_CONCURRENCY_MODE_POLL); + + return entry->registry; +} + int conn_poll_registry_init(conn_registry_t *registry, udp_socket_config_t *config) { (void)config; registry_impl_t *registry_impl = calloc(1, sizeof(registry_impl_t)); diff --git a/src/conn_poll.h b/src/conn_poll.h index b3a162a8..5f672a98 100644 --- a/src/conn_poll.h +++ b/src/conn_poll.h @@ -28,5 +28,6 @@ int conn_poll_interrupt(juice_agent_t *agent); int conn_poll_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, int ds); int conn_poll_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size); +conn_registry_t *conn_poll_get_registry(udp_socket_config_t *config); #endif diff --git a/src/conn_thread.c b/src/conn_thread.c index 1b054580..81552a10 100644 --- a/src/conn_thread.c +++ b/src/conn_thread.c @@ -12,6 +12,7 @@ #include "socket.h" #include "thread.h" #include "udp.h" +#include "conn.h" #include #include @@ -40,6 +41,12 @@ static thread_return_t THREAD_CALL conn_thread_entry(void *arg) { return (thread_return_t)0; } +conn_registry_t *conn_thread_get_registry([[maybe_unused]] udp_socket_config_t *config) { + conn_mode_entry_t *entry = conn_get_mode_entry(JUICE_CONCURRENCY_MODE_THREAD); + + return entry->registry; +} + int conn_thread_prepare(juice_agent_t *agent, struct pollfd *pfd, timestamp_t *next_timestamp) { conn_impl_t *conn_impl = agent->conn_impl; mutex_lock(&conn_impl->mutex); diff --git a/src/conn_thread.h b/src/conn_thread.h index ceb23a4c..35c7b04d 100644 --- a/src/conn_thread.h +++ b/src/conn_thread.h @@ -28,5 +28,6 @@ int conn_thread_interrupt(juice_agent_t *agent); int conn_thread_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, int ds); int conn_thread_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size); +conn_registry_t *conn_thread_get_registry(udp_socket_config_t *config); #endif diff --git a/test/stun-unhandled-multiple.c b/test/stun-unhandled-multiple.c index 91c0bcf1..e66a1660 100644 --- a/test/stun-unhandled-multiple.c +++ b/test/stun-unhandled-multiple.c @@ -36,20 +36,29 @@ void stun_unhandled_multiple_callback2 (const juice_mux_binding_request_t *info, int test_stun_unhandled_multiple() { juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); - uint16_t port = 60002; + uint16_t port = 60000; // Generate local description char * localSdp = "a=ice-ufrag:G4DJ\n\ a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ -a=candidate:1 1 UDP 2130706431 127.0.0.1 60002 typ host\n\ -a=candidate:2 1 UDP 2130706431 127.0.0.1 60003 typ host\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60000 typ host\n\ +a=candidate:2 1 UDP 2130706431 127.0.0.1 60001 typ host\n\ a=end-of-candidates\n\ a=ice-options:ice2\n\ "; // Set up callbacks - juice_mux_listen("127.0.0.1", port, &stun_unhandled_multiple_callback1, NULL); - juice_mux_listen("127.0.0.1", port + 1, &stun_unhandled_multiple_callback2, NULL); + if (juice_mux_listen("127.0.0.1", port, &stun_unhandled_multiple_callback1, NULL)) { + printf("Did not register first unhandled mux callback\n"); + printf("Failure\n"); + return -1; + } + + if (juice_mux_listen("127.0.0.1", port + 1, &stun_unhandled_multiple_callback2, NULL)) { + printf("Did not register second unhandled mux callback\n"); + printf("Failure\n"); + return -1; + } // Create remote agent juice_config_t remoteConfig; @@ -70,8 +79,17 @@ a=ice-options:ice2\n\ juice_destroy(remoteAgent); // Unhandle mux listeners - juice_mux_listen("127.0.0.1", port, NULL, NULL); - juice_mux_listen("127.0.0.1", port + 1, NULL, NULL); + if (juice_mux_listen("127.0.0.1", port, NULL, NULL)) { + printf("Did not unregister first unhandled mux callback\n"); + printf("Failure\n"); + return -1; + } + + if (juice_mux_listen("127.0.0.1", port + 1, NULL, NULL)) { + printf("Did not unregister second unhandled mux callback\n"); + printf("Failure\n"); + return -1; + } if (success1 && success2) { printf("Success\n"); diff --git a/test/stun-unhandled-no-host.c b/test/stun-unhandled-no-host.c index de723d6a..284af206 100644 --- a/test/stun-unhandled-no-host.c +++ b/test/stun-unhandled-no-host.c @@ -36,19 +36,23 @@ void stun_unhandled_no_host_callback2 (const juice_mux_binding_request_t *info, int test_stun_unhandled_no_host() { juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); - uint16_t port = 60001; + uint16_t port = 60010; // Generate local description char * localSdp = "a=ice-ufrag:G4DJ\n\ a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ -a=candidate:1 1 UDP 2130706431 127.0.0.1 60001 typ host\n\ -a=candidate:2 1 UDP 2130706431 192.168.1.45 60001 typ host\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60010 typ host\n\ +a=candidate:2 1 UDP 2130706431 192.168.1.45 60010 typ host\n\ a=end-of-candidates\n\ a=ice-options:ice2\n\ "; // Set up callbacks - juice_mux_listen(NULL, port, &stun_unhandled_no_host_callback1, NULL); + if (juice_mux_listen(NULL, port, &stun_unhandled_no_host_callback1, NULL)) { + printf("Did not register unhandled mux callback\n"); + printf("Failure\n"); + return -1; + } if (juice_mux_listen("", port, &stun_unhandled_no_host_callback2, NULL) == 0) { printf("Accepted two listeners for the same host/port combination\n"); diff --git a/test/stun-unhandled-unhandle.c b/test/stun-unhandled-unhandle.c index d0be45c7..83036470 100644 --- a/test/stun-unhandled-unhandle.c +++ b/test/stun-unhandled-unhandle.c @@ -23,7 +23,7 @@ static void sleep(unsigned int secs) { Sleep(secs * 1000); } static juice_agent_t *remoteAgent1; static juice_agent_t *remoteAgent2; -static bool success; +static bool callbackInvoked; static bool unhandled; static bool invokedAfterUnhandle; @@ -31,25 +31,28 @@ void stun_unhandled_unhandle_callback (const juice_mux_binding_request_t *info, if (unhandled) { invokedAfterUnhandle = true; } else { - success = true; + callbackInvoked = true; } } int test_stun_unhandled_unhandle() { juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); - uint16_t port = 60004; + uint16_t port = 60020; // Generate local description char * localSdp = "a=ice-ufrag:G4DJ\n\ a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ -a=candidate:1 1 UDP 2130706431 127.0.0.1 60004 typ host\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60020 typ host\n\ a=end-of-candidates\n\ a=ice-options:ice2\n\ "; // Set up callback - juice_mux_listen("127.0.0.1", port, &stun_unhandled_unhandle_callback, NULL); + if (juice_mux_listen("127.0.0.1", port, &stun_unhandled_unhandle_callback, NULL)) { + printf("Failed to register callback\n"); + return -1; + } // Create remote agent juice_config_t remoteConfig; @@ -60,17 +63,34 @@ a=ice-options:ice2\n\ // Remote agent: Receive description from local agent juice_set_remote_description(remoteAgent1, localSdp); + printf("----> test gather candidates\n"); + // Remote agent: Gather candidates (and send them to local agent) juice_gather_candidates(remoteAgent1); - sleep(2); // -- Should have received unhandled STUN packet(s) -- + int attempts = 0; + while (true) { + if (callbackInvoked) { + break; + } + + sleep(1); + + if (attempts++ == 5) { + printf("Callback was not invoked after 5s\n"); + return -1; + } + } // Destroy remote agent juice_destroy(remoteAgent1); // Remove callback - juice_mux_listen("127.0.0.1", port, NULL, NULL); + if (juice_mux_listen("127.0.0.1", port, NULL, NULL)) { + printf("Failed to unregister callback\n"); + return -1; + } unhandled = true; // Create another remote agent @@ -91,10 +111,14 @@ a=ice-options:ice2\n\ // Destroy remote agent juice_destroy(remoteAgent2); - // Unhandle mux listener - juice_mux_listen("127.0.0.1", port, NULL, NULL); + // Remove callback + if (juice_mux_listen("127.0.0.1", port, NULL, NULL)) { + printf("Did not unregister unhandled mux callback\n"); + printf("Failure\n"); + return -1; + } - if (success && !invokedAfterUnhandle) { + if (callbackInvoked && !invokedAfterUnhandle) { printf("Success\n"); return 0; } else { diff --git a/test/stun-unhandled.c b/test/stun-unhandled.c index 74d652ea..67751b5f 100644 --- a/test/stun-unhandled.c +++ b/test/stun-unhandled.c @@ -21,27 +21,31 @@ static void sleep(unsigned int secs) { Sleep(secs * 1000); } #endif static juice_agent_t *remoteAgent; -static bool success; +static bool callbackInvoked; void stun_unhandled_callback (const juice_mux_binding_request_t *info, void *user_ptr) { - success = true; + callbackInvoked = true; } int test_stun_unhandled() { juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); - uint16_t port = 60001; + uint16_t port = 60030; // Generate local description - char * localSdp = "a=ice-ufrag:G4DJ\n\ + const char * localSdp = "a=ice-ufrag:G4DJ\n\ a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ -a=candidate:1 1 UDP 2130706431 127.0.0.1 60001 typ host\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60030 typ host\n\ a=end-of-candidates\n\ a=ice-options:ice2\n\ "; // Set up callback - juice_mux_listen("127.0.0.1", 60001, &stun_unhandled_callback, NULL); + if (juice_mux_listen("127.0.0.1", port, &stun_unhandled_callback, NULL)) { + printf("Did not register unhandled mux callback\n"); + printf("Failure\n"); + return -1; + } // Create remote agent juice_config_t remoteConfig; @@ -62,9 +66,13 @@ a=ice-options:ice2\n\ juice_destroy(remoteAgent); // Unhandle mux listener - juice_mux_listen("127.0.0.1", port, NULL, NULL); + if (juice_mux_listen("127.0.0.1", port, NULL, NULL)) { + printf("Did not unregister unhandled mux callback\n"); + printf("Failure\n"); + return -1; + } - if (success) { + if (callbackInvoked) { printf("Success\n"); return 0; } else { From 3c485bbf8e93025c058536db1d086ba2638b2646 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 23 Jan 2025 17:14:42 +0100 Subject: [PATCH 05/12] chore: remove unused hint --- src/conn_poll.c | 3 ++- src/conn_thread.c | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/conn_poll.c b/src/conn_poll.c index a2d80081..94a06736 100644 --- a/src/conn_poll.c +++ b/src/conn_poll.c @@ -56,7 +56,8 @@ static thread_return_t THREAD_CALL conn_thread_entry(void *arg) { return (thread_return_t)0; } -conn_registry_t *conn_poll_get_registry([[maybe_unused]] udp_socket_config_t *config) { +conn_registry_t *conn_poll_get_registry(udp_socket_config_t *config) { + (void)config; conn_mode_entry_t *entry = conn_get_mode_entry(JUICE_CONCURRENCY_MODE_POLL); return entry->registry; diff --git a/src/conn_thread.c b/src/conn_thread.c index 81552a10..eaa553a1 100644 --- a/src/conn_thread.c +++ b/src/conn_thread.c @@ -41,7 +41,8 @@ static thread_return_t THREAD_CALL conn_thread_entry(void *arg) { return (thread_return_t)0; } -conn_registry_t *conn_thread_get_registry([[maybe_unused]] udp_socket_config_t *config) { +conn_registry_t *conn_thread_get_registry(udp_socket_config_t *config) { + (void)config; conn_mode_entry_t *entry = conn_get_mode_entry(JUICE_CONCURRENCY_MODE_THREAD); return entry->registry; From 4b96019f19d6575bca56f49a65531b90a8cf864a Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 23 Jan 2025 17:14:59 +0100 Subject: [PATCH 06/12] chore: release registry --- src/conn.c | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/conn.c b/src/conn.c index 80d42b53..96e33eff 100644 --- a/src/conn.c +++ b/src/conn.c @@ -251,8 +251,7 @@ int conn_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size) { return get_agent_mode_entry(agent)->get_addrs_func(agent, records, size); } -int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_incoming_t cb, void *user_ptr) -{ +int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_incoming_t cb, void *user_ptr) { conn_mode_entry_t *entry = &mode_entries[JUICE_CONCURRENCY_MODE_MUX]; if (!entry->mux_listen_func) { @@ -286,15 +285,13 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco } if (entry->mux_listen_func(registry, cb, user_ptr)) { - mutex_unlock(®istry->mutex); - mutex_unlock(&entry->mutex); JLOG_DEBUG("juice_mux_listen failed to call mux_listen_func for %s:%d", bind_address, local_port); + release_registry(entry, registry); + mutex_unlock(&entry->mutex); return -1; } - JLOG_DEBUG("try unlock registry\n"); - - mutex_unlock(®istry->mutex); + release_registry(entry, registry); mutex_unlock(&entry->mutex); return 0; } From 5d4965aa162e7ba1f006cd7a220e7698f9e81cc4 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 23 Jan 2025 17:15:19 +0100 Subject: [PATCH 07/12] fix: return first registry if port if listening on wildcard --- src/conn_mux.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/conn_mux.c b/src/conn_mux.c index 3ad43697..67cdc13c 100644 --- a/src/conn_mux.c +++ b/src/conn_mux.c @@ -65,7 +65,7 @@ conn_registry_t *conn_mux_get_registry(udp_socket_config_t *config) { conn_registry_t *registry = conn_mux_registries[i]; registry_impl_t *impl = registry->impl; - if (impl->port == config->port_begin) { + if (impl->port == config->port_begin || config->port_begin == 0) { return registry; } } From 90e8665be566351ae9b4fa07ccdac9c858af9c92 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 24 Jan 2025 16:10:59 +0100 Subject: [PATCH 08/12] chore: use same port for tests --- test/stun-unhandled-no-host.c | 6 +++--- test/stun-unhandled-unhandle.c | 4 ++-- test/stun-unhandled.c | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/test/stun-unhandled-no-host.c b/test/stun-unhandled-no-host.c index 284af206..e52decf2 100644 --- a/test/stun-unhandled-no-host.c +++ b/test/stun-unhandled-no-host.c @@ -36,13 +36,13 @@ void stun_unhandled_no_host_callback2 (const juice_mux_binding_request_t *info, int test_stun_unhandled_no_host() { juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); - uint16_t port = 60010; + uint16_t port = 60000; // Generate local description char * localSdp = "a=ice-ufrag:G4DJ\n\ a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ -a=candidate:1 1 UDP 2130706431 127.0.0.1 60010 typ host\n\ -a=candidate:2 1 UDP 2130706431 192.168.1.45 60010 typ host\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60000 typ host\n\ +a=candidate:2 1 UDP 2130706431 192.168.1.45 60000 typ host\n\ a=end-of-candidates\n\ a=ice-options:ice2\n\ "; diff --git a/test/stun-unhandled-unhandle.c b/test/stun-unhandled-unhandle.c index 83036470..5bd435fd 100644 --- a/test/stun-unhandled-unhandle.c +++ b/test/stun-unhandled-unhandle.c @@ -38,12 +38,12 @@ void stun_unhandled_unhandle_callback (const juice_mux_binding_request_t *info, int test_stun_unhandled_unhandle() { juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); - uint16_t port = 60020; + uint16_t port = 60000; // Generate local description char * localSdp = "a=ice-ufrag:G4DJ\n\ a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ -a=candidate:1 1 UDP 2130706431 127.0.0.1 60020 typ host\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60000 typ host\n\ a=end-of-candidates\n\ a=ice-options:ice2\n\ "; diff --git a/test/stun-unhandled.c b/test/stun-unhandled.c index 67751b5f..5053491e 100644 --- a/test/stun-unhandled.c +++ b/test/stun-unhandled.c @@ -30,12 +30,12 @@ void stun_unhandled_callback (const juice_mux_binding_request_t *info, void *use int test_stun_unhandled() { juice_set_log_level(JUICE_LOG_LEVEL_DEBUG); - uint16_t port = 60030; + uint16_t port = 60000; // Generate local description const char * localSdp = "a=ice-ufrag:G4DJ\n\ a=ice-pwd:ok3ytD4tG2MCJ+9MrELhjO\n\ -a=candidate:1 1 UDP 2130706431 127.0.0.1 60030 typ host\n\ +a=candidate:1 1 UDP 2130706431 127.0.0.1 60000 typ host\n\ a=end-of-candidates\n\ a=ice-options:ice2\n\ "; From c4e297b5a6e37203f6a05abffee8ec7937b64675 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 11 Feb 2025 18:26:05 +0100 Subject: [PATCH 09/12] chore: apply suggestions from code review Co-authored-by: Paul-Louis Ageneau --- src/conn_mux.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/conn_mux.c b/src/conn_mux.c index 67cdc13c..08d952ed 100644 --- a/src/conn_mux.c +++ b/src/conn_mux.c @@ -65,7 +65,7 @@ conn_registry_t *conn_mux_get_registry(udp_socket_config_t *config) { conn_registry_t *registry = conn_mux_registries[i]; registry_impl_t *impl = registry->impl; - if (impl->port == config->port_begin || config->port_begin == 0) { + if (impl->port >= config->port_begin && (config->port_end == 0 || impl->port <= config->port_end)) { return registry; } } From 20d846651599df5525f9c834ba31f2ee5430f059 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 11 Feb 2025 18:32:29 +0100 Subject: [PATCH 10/12] fix: remove get_registry_func from poll/thread conns --- src/conn.c | 21 +++++++++++++++++---- src/conn_poll.c | 7 ------- src/conn_poll.h | 1 - src/conn_thread.c | 7 ------- src/conn_thread.h | 1 - 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/conn.c b/src/conn.c index 96e33eff..0f7c1f19 100644 --- a/src/conn.c +++ b/src/conn.c @@ -24,13 +24,13 @@ static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = { {conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup, conn_poll_lock, conn_poll_unlock, conn_poll_interrupt, conn_poll_send, conn_poll_get_addrs, - NULL, conn_poll_get_registry, NULL, MUTEX_INITIALIZER, NULL}, + NULL, NULL, NULL, MUTEX_INITIALIZER, NULL}, {conn_mux_registry_init, conn_mux_registry_cleanup, conn_mux_init, conn_mux_cleanup, conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs, conn_mux_listen, conn_mux_get_registry, conn_mux_can_release_registry, MUTEX_INITIALIZER, NULL}, {NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock, conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, - NULL, conn_thread_get_registry, NULL, MUTEX_INITIALIZER, NULL} + NULL, NULL, NULL, MUTEX_INITIALIZER, NULL} }; #define MODE_ENTRIES_SIZE 3 @@ -49,7 +49,13 @@ static conn_mode_entry_t *get_agent_mode_entry(juice_agent_t *agent) { static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) { // entry must be locked - conn_registry_t *registry = entry->get_registry_func(config); + conn_registry_t *registry; + + if (entry->get_registry_func) { + registry = entry->get_registry_func(config); + } else { + registry = entry->registry; + } if (!registry) { if (!entry->registry_init_func) @@ -129,7 +135,14 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) { return -1; } - conn_registry_t *registry = entry->get_registry_func(config); + conn_registry_t *registry; + + if (entry->get_registry_func) { + registry = entry->get_registry_func(config); + } else { + registry = entry->registry; + } + agent->registry = registry; JLOG_DEBUG("Creating connection"); diff --git a/src/conn_poll.c b/src/conn_poll.c index 94a06736..a8462c00 100644 --- a/src/conn_poll.c +++ b/src/conn_poll.c @@ -56,13 +56,6 @@ static thread_return_t THREAD_CALL conn_thread_entry(void *arg) { return (thread_return_t)0; } -conn_registry_t *conn_poll_get_registry(udp_socket_config_t *config) { - (void)config; - conn_mode_entry_t *entry = conn_get_mode_entry(JUICE_CONCURRENCY_MODE_POLL); - - return entry->registry; -} - int conn_poll_registry_init(conn_registry_t *registry, udp_socket_config_t *config) { (void)config; registry_impl_t *registry_impl = calloc(1, sizeof(registry_impl_t)); diff --git a/src/conn_poll.h b/src/conn_poll.h index 5f672a98..b3a162a8 100644 --- a/src/conn_poll.h +++ b/src/conn_poll.h @@ -28,6 +28,5 @@ int conn_poll_interrupt(juice_agent_t *agent); int conn_poll_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, int ds); int conn_poll_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size); -conn_registry_t *conn_poll_get_registry(udp_socket_config_t *config); #endif diff --git a/src/conn_thread.c b/src/conn_thread.c index eaa553a1..c12eb15c 100644 --- a/src/conn_thread.c +++ b/src/conn_thread.c @@ -41,13 +41,6 @@ static thread_return_t THREAD_CALL conn_thread_entry(void *arg) { return (thread_return_t)0; } -conn_registry_t *conn_thread_get_registry(udp_socket_config_t *config) { - (void)config; - conn_mode_entry_t *entry = conn_get_mode_entry(JUICE_CONCURRENCY_MODE_THREAD); - - return entry->registry; -} - int conn_thread_prepare(juice_agent_t *agent, struct pollfd *pfd, timestamp_t *next_timestamp) { conn_impl_t *conn_impl = agent->conn_impl; mutex_lock(&conn_impl->mutex); diff --git a/src/conn_thread.h b/src/conn_thread.h index 35c7b04d..ceb23a4c 100644 --- a/src/conn_thread.h +++ b/src/conn_thread.h @@ -28,6 +28,5 @@ int conn_thread_interrupt(juice_agent_t *agent); int conn_thread_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size, int ds); int conn_thread_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size); -conn_registry_t *conn_thread_get_registry(udp_socket_config_t *config); #endif From 5440c0347af4ca679bb1c0a975659b057a1f9390 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 11 Feb 2025 18:33:25 +0100 Subject: [PATCH 11/12] fix: store port after opening --- src/conn_mux.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/conn_mux.c b/src/conn_mux.c index 08d952ed..1c406aed 100644 --- a/src/conn_mux.c +++ b/src/conn_mux.c @@ -242,7 +242,6 @@ int conn_mux_registry_init(conn_registry_t *registry, udp_socket_config_t *confi } registry_impl->map_size = INITIAL_MAP_SIZE; registry_impl->map_count = 0; - registry_impl->port = config->port_begin; registry_impl->sock = udp_create_socket(config); if (registry_impl->sock == INVALID_SOCKET) { @@ -252,6 +251,8 @@ int conn_mux_registry_init(conn_registry_t *registry, udp_socket_config_t *confi return -1; } + registry_impl->port = udp_get_port(registry_impl->sock); + mutex_init(®istry_impl->send_mutex, 0); registry->impl = registry_impl; From bf0d9e4a84780ec6f543667530f6dd49db6cc114 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 12 Feb 2025 12:08:51 +0100 Subject: [PATCH 12/12] chore: use pointer to pointer to avoid function call --- src/conn.c | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/conn.c b/src/conn.c index 0f7c1f19..e14fca7b 100644 --- a/src/conn.c +++ b/src/conn.c @@ -47,7 +47,7 @@ static conn_mode_entry_t *get_agent_mode_entry(juice_agent_t *agent) { return conn_get_mode_entry(mode); } -static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) { +static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config, conn_registry_t **acquired) { // entry must be locked conn_registry_t *registry; @@ -58,8 +58,10 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi } if (!registry) { - if (!entry->registry_init_func) + if (!entry->registry_init_func) { + *acquired = NULL; return 0; + } JLOG_DEBUG("Creating connections registry"); @@ -96,6 +98,8 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi mutex_lock(®istry->mutex); } + *acquired = registry; + // registry is locked return 0; } @@ -129,20 +133,13 @@ static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry int conn_create(juice_agent_t *agent, udp_socket_config_t *config) { conn_mode_entry_t *entry = get_agent_mode_entry(agent); + conn_registry_t *registry; mutex_lock(&entry->mutex); - if (acquire_registry(entry, config)) { // locks the registry if created + if (acquire_registry(entry, config, ®istry)) { // locks the registry if created mutex_unlock(&entry->mutex); return -1; } - conn_registry_t *registry; - - if (entry->get_registry_func) { - registry = entry->get_registry_func(config); - } else { - registry = entry->registry; - } - agent->registry = registry; JLOG_DEBUG("Creating connection"); @@ -283,14 +280,15 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco config.bind_address = bind_address; config.port_begin = config.port_end = local_port; + conn_registry_t *registry; + // locks the registry, creating it first if required - if(acquire_registry(entry, &config)) { + if(acquire_registry(entry, &config, ®istry)) { JLOG_DEBUG("juice_mux_listen acquiring registry failed"); mutex_unlock(&entry->mutex); return -1; } - conn_registry_t *registry = entry->get_registry_func(&config); if (!registry) { JLOG_DEBUG("juice_mux_listen registry not found after creating it"); mutex_unlock(&entry->mutex);