Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiple juice_mux_listen callbacks for different ports #292

4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ set(TESTS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/test/main.c
${CMAKE_CURRENT_SOURCE_DIR}/test/crc32.c
${CMAKE_CURRENT_SOURCE_DIR}/test/base64.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-multiple.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-no-host.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-unhandle.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun.c
${CMAKE_CURRENT_SOURCE_DIR}/test/gathering.c
${CMAKE_CURRENT_SOURCE_DIR}/test/connectivity.c
Expand Down
2 changes: 2 additions & 0 deletions src/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ struct juice_agent {

thread_t resolver_thread;
bool resolver_thread_started;

conn_registry_t *registry;
};

juice_agent_t *agent_create(const juice_config_t *config);
Expand Down
143 changes: 124 additions & 19 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <assert.h>
#include <string.h>
#include <stdio.h>

#define INITIAL_REGISTRY_SIZE 16

Expand All @@ -33,30 +34,114 @@ typedef struct conn_mode_entry {
int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size);

mutex_t mutex;
conn_registry_t *registry;
conn_registry_t **registries;
int registries_size;
int registries_count;
} conn_mode_entry_t;

#define MODE_ENTRIES_SIZE 3

static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = {
{conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup,
conn_poll_lock, conn_poll_unlock, conn_poll_interrupt, conn_poll_send, conn_poll_get_addrs,
MUTEX_INITIALIZER, NULL},
MUTEX_INITIALIZER, NULL, 0, 0},
{conn_mux_registry_init, conn_mux_registry_cleanup, conn_mux_init, conn_mux_cleanup,
conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs,
MUTEX_INITIALIZER, NULL},
MUTEX_INITIALIZER, NULL, 0, 0},
{NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock,
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}};
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL, 0, 0}};

static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) {
juice_concurrency_mode_t mode = agent->config.concurrency_mode;
assert(mode >= 0 && mode < MODE_ENTRIES_SIZE);
return mode_entries + (int)mode;
}

// accepts a host name and a port and returns a token that can be used to
// distinguish one mux request handler from another. E.g.
//
// '127.0.0.1', 8080 -> '127.0.0.1:8080'
// '::', 8080 -> '[::]:8080'
// NULL, 8080 -> 'any:8080'
// '', 8080 -> 'any:8080'
static char *get_address (const char *bind_address, uint16_t port) {
if (!bind_address || strcmp(bind_address, "") == 0) {
bind_address = "any";
}

// search for '.' in bind_address, treat as IPv4 if found
char *result = strchr(bind_address, '.');
int index = (int)(result - bind_address);
int maxAddrSize = 48; // ip6 is 39 chars + [ + ] + : + 5 for the port + \0
char *address = (char*) calloc(1, maxAddrSize * sizeof(char));

// ip6
char *format = "[%s]:%d";

if (index > -1) {
// ip4
format = "%s:%d";
}

sprintf(address, format, bind_address, port);

return address;
}

static conn_registry_t *get_port_registry(conn_mode_entry_t *entry, const char *bind_address, uint16_t port) {
char *address = get_address(bind_address, port);

for (int i = 0; i < entry->registries_size; i++) {
if (!entry->registries[i]) {
continue;
}

if (strcmp(entry->registries[i]->address, address) == 0) {
return entry->registries[i];
}
}

return NULL;
}

static int add_registry(conn_mode_entry_t *entry, conn_registry_t *registry) {
int i = 0;
while (i < entry->registries_size && entry->registries[i])
++i;

if (i == entry->registries_size) {
int new_size = entry->registries_size * 2;

if (new_size == 0) {
new_size = 1;
}

JLOG_DEBUG("Reallocating registries array, new_size=%d", new_size);
assert(new_size > 0);

conn_registry_t **new_registries =
realloc(entry->registries, new_size * sizeof(conn_registry_t *));
if (!new_registries) {
JLOG_FATAL("Memory reallocation failed for registries array");
return -1;
}

entry->registries = new_registries;
entry->registries_size = new_size;
memset(entry->registries + i, 0, (new_size - i) * sizeof(conn_registry_t *));
}

entry->registries[i] = registry;
registry->registry_index = i;
++entry->registries_count;

return 0;
}

static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) {
// entry must be locked
conn_registry_t *registry = entry->registry;
conn_registry_t *registry = get_port_registry(entry, config->bind_address, config->port_begin);

if (!registry) {
if (!entry->registry_init_func)
return 0;
Expand All @@ -83,16 +168,25 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi
mutex_init(&registry->mutex, MUTEX_RECURSIVE);
mutex_lock(&registry->mutex);

registry->address = get_address(config->bind_address, config->port_begin);

if (entry->registry_init_func(registry, config)) {
JLOG_FATAL("Registry initialization failed");
mutex_unlock(&registry->mutex);
free(registry->agents);
free(registry->address);
free(registry);
return -1;
}

entry->registry = registry;

if (add_registry(entry, registry)) {
JLOG_FATAL("Adding registry to entry failed");
mutex_unlock(&registry->mutex);
free(registry->agents);
free(registry->address);
free(registry);
return -1;
}
} else {
mutex_lock(&registry->mutex);
}
Expand All @@ -101,9 +195,8 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi
return 0;
}

static void release_registry(conn_mode_entry_t *entry) {
static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry) {
// entry must be locked
conn_registry_t *registry = entry->registry;
if (!registry)
return;

Expand All @@ -116,9 +209,19 @@ static void release_registry(conn_mode_entry_t *entry) {
if (entry->registry_cleanup_func)
entry->registry_cleanup_func(registry);

if (registry->registry_index > -1) {
int i = registry->registry_index;
assert(entry->registries[i] == registry);
entry->registries[i] = NULL;
registry->registry_index = -1;
}

assert(entry->registries_count > 0);
--entry->registries_count;

free(registry->agents);
free(registry->address);
free(registry);
entry->registry = NULL;
return;
}

Expand All @@ -136,7 +239,8 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
return -1;
}

conn_registry_t *registry = entry->registry;
conn_registry_t *registry = get_port_registry(entry, config->bind_address, config->port_begin);
agent->registry = registry;

JLOG_DEBUG("Creating connection");
if (registry) {
Expand Down Expand Up @@ -164,7 +268,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
}

if (get_mode_entry(agent)->init_func(agent, registry, config)) {
release_registry(entry); // unlocks the registry
release_registry(entry, registry); // unlocks the registry
mutex_unlock(&entry->mutex);
return -1;
}
Expand Down Expand Up @@ -194,7 +298,7 @@ void conn_destroy(juice_agent_t *agent) {
mutex_lock(&entry->mutex);

JLOG_DEBUG("Destroying connection");
conn_registry_t *registry = entry->registry;
conn_registry_t *registry = agent->registry;
if (registry) {
mutex_lock(&registry->mutex);

Expand All @@ -205,12 +309,13 @@ void conn_destroy(juice_agent_t *agent) {
assert(registry->agents[i] == agent);
registry->agents[i] = NULL;
agent->conn_index = -1;
agent->registry = NULL;
}

assert(registry->agents_count > 0);
--registry->agents_count;

release_registry(entry); // unlocks the registry
release_registry(entry, registry); // unlocks the registry

} else {
entry->cleanup_func(agent);
Expand Down Expand Up @@ -264,7 +369,7 @@ static int juice_mux_stop_listen(const char *bind_address, int local_port) {

mutex_lock(&entry->mutex);

conn_registry_t *registry = entry->registry;
conn_registry_t *registry = get_port_registry(entry, bind_address, local_port);
if (!registry) {
mutex_unlock(&entry->mutex);
return -1;
Expand All @@ -276,7 +381,7 @@ static int juice_mux_stop_listen(const char *bind_address, int local_port) {
registry->mux_incoming_user_ptr = NULL;
conn_mux_interrupt_registry(registry);

release_registry(entry);
release_registry(entry, registry);

mutex_unlock(&entry->mutex);

Expand All @@ -296,9 +401,9 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco

mutex_lock(&entry->mutex);

if (entry->registry) {
if (get_port_registry(entry, bind_address, local_port)) {
mutex_unlock(&entry->mutex);
JLOG_DEBUG("juice_mux_listen needs to be called before establishing any mux connection.");
JLOG_DEBUG("juice_mux_listen there is already a listener for this host/port combination.");
return -1;
}

Expand All @@ -307,7 +412,7 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco
return -1;
}

conn_registry_t *registry = entry->registry;
conn_registry_t *registry = get_port_registry(entry, bind_address, local_port);
if(!registry) {
mutex_unlock(&entry->mutex);
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ typedef struct juice_agent juice_agent_t;
// See include/juice/juice.h for implemented concurrency modes

typedef struct conn_registry {
int registry_index;
char *address;
void *impl;
mutex_t mutex;
juice_agent_t **agents;
Expand Down
29 changes: 29 additions & 0 deletions test/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ int test_turn(void);
int test_conflict(void);
int test_bind(void);
int test_ufrag(void);
int test_stun_unhandled(void);
int test_stun_unhandled_multiple(void);
int test_stun_unhandled_no_host(void);
int test_stun_unhandled_unhandle(void);

#ifndef NO_SERVER
int test_server(void);
Expand Down Expand Up @@ -104,6 +108,31 @@ int main(int argc, char **argv) {
return -1;
}

#ifndef _WIN32
// windows fails to read STUN message from listen socket:
// udp.c:196: Ignoring ECONNRESET returned by recvfrom
printf("\nRunning unhandled STUN message test...\n");
if (test_stun_unhandled()) {
fprintf(stderr, "Unhandled STUN message test failed\n");
return -1;
}
printf("\nRunning mutiple handler unhandled STUN message test...\n");
if (test_stun_unhandled_multiple()) {
fprintf(stderr, "Mutiple handler unhandled STUN message test failed\n");
return -1;
}
printf("\nRunning unhandled, unhandled STUN message test...\n");
if (test_stun_unhandled_unhandle()) {
fprintf(stderr, "Unhandled, unhandled STUN message test failed\n");
return -1;
}
printf("\nRunning no host unhandled STUN message test...\n");
if (test_stun_unhandled_no_host()) {
fprintf(stderr, "Mutiple no host unhandled STUN message test failed\n");
return -1;
}
#endif

#ifndef NO_SERVER
printf("\nRunning server test...\n");
if (test_server()) {
Expand Down
Loading
Loading