Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiple juice_mux_listen callbacks for different ports #292

4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ set(TESTS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/test/main.c
${CMAKE_CURRENT_SOURCE_DIR}/test/crc32.c
${CMAKE_CURRENT_SOURCE_DIR}/test/base64.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-multiple.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-no-host.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-unhandle.c
${CMAKE_CURRENT_SOURCE_DIR}/test/stun.c
${CMAKE_CURRENT_SOURCE_DIR}/test/gathering.c
${CMAKE_CURRENT_SOURCE_DIR}/test/connectivity.c
Expand Down
2 changes: 2 additions & 0 deletions src/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ struct juice_agent {

thread_t resolver_thread;
bool resolver_thread_started;

conn_registry_t *registry;
};

juice_agent_t *agent_create(const juice_config_t *config);
Expand Down
152 changes: 69 additions & 83 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,53 @@

#include <assert.h>
#include <string.h>
#include <stdio.h>

#define INITIAL_REGISTRY_SIZE 16

typedef struct conn_mode_entry {
int (*registry_init_func)(conn_registry_t *registry, udp_socket_config_t *config);
void (*registry_cleanup_func)(conn_registry_t *registry);

int (*init_func)(juice_agent_t *agent, struct conn_registry *registry,
udp_socket_config_t *config);
void (*cleanup_func)(juice_agent_t *agent);
void (*lock_func)(juice_agent_t *agent);
void (*unlock_func)(juice_agent_t *agent);
int (*interrupt_func)(juice_agent_t *agent);
int (*send_func)(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
int ds);
int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size);

mutex_t mutex;
conn_registry_t *registry;
} conn_mode_entry_t;

#define MODE_ENTRIES_SIZE 3

static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = {
{conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup,
conn_poll_lock, conn_poll_unlock, conn_poll_interrupt, conn_poll_send, conn_poll_get_addrs,
MUTEX_INITIALIZER, NULL},
NULL, NULL, NULL, MUTEX_INITIALIZER, NULL},
{conn_mux_registry_init, conn_mux_registry_cleanup, conn_mux_init, conn_mux_cleanup,
conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs,
MUTEX_INITIALIZER, NULL},
{NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock,
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}};
conn_mux_listen, conn_mux_get_registry, conn_mux_can_release_registry, MUTEX_INITIALIZER, NULL},
{NULL, NULL, conn_thread_init, conn_thread_cleanup,
conn_thread_lock, conn_thread_unlock, conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs,
NULL, NULL, NULL, MUTEX_INITIALIZER, NULL}
};

static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) {
juice_concurrency_mode_t mode = agent->config.concurrency_mode;
#define MODE_ENTRIES_SIZE 3

static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE];

conn_mode_entry_t *conn_get_mode_entry(juice_concurrency_mode_t mode) {
assert(mode >= 0 && mode < MODE_ENTRIES_SIZE);
return mode_entries + (int)mode;
}

static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) {
static conn_mode_entry_t *get_agent_mode_entry(juice_agent_t *agent) {
juice_concurrency_mode_t mode = agent->config.concurrency_mode;
return conn_get_mode_entry(mode);
}

static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config, conn_registry_t **acquired) {
// entry must be locked
conn_registry_t *registry = entry->registry;
conn_registry_t *registry;

if (entry->get_registry_func) {
registry = entry->get_registry_func(config);
} else {
registry = entry->registry;
}

if (!registry) {
if (!entry->registry_init_func)
if (!entry->registry_init_func) {
*acquired = NULL;
return 0;
}

JLOG_DEBUG("Creating connections registry");

Expand Down Expand Up @@ -92,33 +94,34 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi
}

entry->registry = registry;

} else {
mutex_lock(&registry->mutex);
}

*acquired = registry;

// registry is locked
return 0;
}

static void release_registry(conn_mode_entry_t *entry) {
static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry) {
// entry must be locked
conn_registry_t *registry = entry->registry;
if (!registry)
return;

// registry must be locked
bool canRelease = entry->can_release_registry_func ? entry->can_release_registry_func(registry) : true;

if (registry->agents_count == 0 && registry->cb_mux_incoming == NULL) {
if (registry->agents_count == 0 && canRelease) {
JLOG_DEBUG("No connection left, destroying connections registry");
mutex_unlock(&registry->mutex);

if (entry->registry_cleanup_func)
entry->registry_cleanup_func(registry);

entry->registry = NULL;
free(registry->agents);
free(registry);
entry->registry = NULL;
return;
}

Expand All @@ -129,14 +132,15 @@ static void release_registry(conn_mode_entry_t *entry) {
}

int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
conn_mode_entry_t *entry = get_mode_entry(agent);
conn_mode_entry_t *entry = get_agent_mode_entry(agent);
conn_registry_t *registry;
mutex_lock(&entry->mutex);
if(acquire_registry(entry, config)) { // locks the registry if created
if (acquire_registry(entry, config, &registry)) { // locks the registry if created
mutex_unlock(&entry->mutex);
return -1;
}

conn_registry_t *registry = entry->registry;
agent->registry = registry;

JLOG_DEBUG("Creating connection");
if (registry) {
Expand All @@ -163,8 +167,8 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
memset(registry->agents + i, 0, (new_size - i) * sizeof(juice_agent_t *));
}

if (get_mode_entry(agent)->init_func(agent, registry, config)) {
release_registry(entry); // unlocks the registry
if (get_agent_mode_entry(agent)->init_func(agent, registry, config)) {
release_registry(entry, registry); // unlocks the registry
mutex_unlock(&entry->mutex);
return -1;
}
Expand All @@ -176,7 +180,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
mutex_unlock(&registry->mutex);

} else {
if (get_mode_entry(agent)->init_func(agent, NULL, config)) {
if (get_agent_mode_entry(agent)->init_func(agent, NULL, config)) {
mutex_unlock(&entry->mutex);
return -1;
}
Expand All @@ -190,11 +194,11 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
}

void conn_destroy(juice_agent_t *agent) {
conn_mode_entry_t *entry = get_mode_entry(agent);
conn_mode_entry_t *entry = get_agent_mode_entry(agent);
mutex_lock(&entry->mutex);

JLOG_DEBUG("Destroying connection");
conn_registry_t *registry = entry->registry;
conn_registry_t *registry = agent->registry;
if (registry) {
mutex_lock(&registry->mutex);

Expand All @@ -210,7 +214,8 @@ void conn_destroy(juice_agent_t *agent) {
assert(registry->agents_count > 0);
--registry->agents_count;

release_registry(entry); // unlocks the registry
agent->registry = NULL;
release_registry(entry, registry); // unlocks the registry

} else {
entry->cleanup_func(agent);
Expand All @@ -224,99 +229,80 @@ void conn_lock(juice_agent_t *agent) {
if (!agent->conn_impl)
return;

get_mode_entry(agent)->lock_func(agent);
get_agent_mode_entry(agent)->lock_func(agent);
}

void conn_unlock(juice_agent_t *agent) {
if (!agent->conn_impl)
return;

get_mode_entry(agent)->unlock_func(agent);
get_agent_mode_entry(agent)->unlock_func(agent);
}

int conn_interrupt(juice_agent_t *agent) {
if (!agent->conn_impl)
return -1;

return get_mode_entry(agent)->interrupt_func(agent);
return get_agent_mode_entry(agent)->interrupt_func(agent);
}

int conn_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
int ds) {
if (!agent->conn_impl)
return -1;

return get_mode_entry(agent)->send_func(agent, dst, data, size, ds);
return get_agent_mode_entry(agent)->send_func(agent, dst, data, size, ds);
}

int conn_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size) {
if (!agent->conn_impl)
return -1;

return get_mode_entry(agent)->get_addrs_func(agent, records, size);
return get_agent_mode_entry(agent)->get_addrs_func(agent, records, size);
}

static int juice_mux_stop_listen(const char *bind_address, int local_port) {
(void)bind_address;
(void)local_port;

int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_incoming_t cb, void *user_ptr) {
conn_mode_entry_t *entry = &mode_entries[JUICE_CONCURRENCY_MODE_MUX];

mutex_lock(&entry->mutex);

conn_registry_t *registry = entry->registry;
if (!registry) {
mutex_unlock(&entry->mutex);
if (!entry->mux_listen_func) {
JLOG_DEBUG("juice_mux_listen mux_listen_func is not implemented");
return -1;
}

mutex_lock(&registry->mutex);

registry->cb_mux_incoming = NULL;
registry->mux_incoming_user_ptr = NULL;
conn_mux_interrupt_registry(registry);

release_registry(entry);

mutex_unlock(&entry->mutex);

return 0;
}

int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_incoming_t cb, void *user_ptr)
{
if (!cb)
return juice_mux_stop_listen(bind_address, local_port);
if (!entry->get_registry_func) {
JLOG_DEBUG("juice_mux_listen get_registry_func is not implemented");
return -1;
}

conn_mode_entry_t *entry = &mode_entries[JUICE_CONCURRENCY_MODE_MUX];
mutex_lock(&entry->mutex);

udp_socket_config_t config;
config.bind_address = bind_address;
config.port_begin = config.port_end = local_port;

mutex_lock(&entry->mutex);
conn_registry_t *registry;

if (entry->registry) {
// locks the registry, creating it first if required
if(acquire_registry(entry, &config, &registry)) {
JLOG_DEBUG("juice_mux_listen acquiring registry failed");
mutex_unlock(&entry->mutex);
JLOG_DEBUG("juice_mux_listen needs to be called before establishing any mux connection.");
return -1;
}

if(acquire_registry(entry, &config)) { // locks the registry if created
if (!registry) {
JLOG_DEBUG("juice_mux_listen registry not found after creating it");
mutex_unlock(&entry->mutex);
return -1;
}

conn_registry_t *registry = entry->registry;
if(!registry) {
if (entry->mux_listen_func(registry, cb, user_ptr)) {
JLOG_DEBUG("juice_mux_listen failed to call mux_listen_func for %s:%d", bind_address, local_port);
release_registry(entry, registry);
mutex_unlock(&entry->mutex);
return -1;
}

registry->cb_mux_incoming = cb;
registry->mux_incoming_user_ptr = user_ptr;

mutex_unlock(&registry->mutex);
release_registry(entry, registry);
mutex_unlock(&entry->mutex);
return 0;
}
24 changes: 22 additions & 2 deletions src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,30 @@ typedef struct conn_registry {
juice_agent_t **agents;
int agents_size;
int agents_count;
juice_cb_mux_incoming_t cb_mux_incoming;
void *mux_incoming_user_ptr;
} conn_registry_t;

typedef struct conn_mode_entry {
int (*registry_init_func)(conn_registry_t *registry, udp_socket_config_t *config);
void (*registry_cleanup_func)(conn_registry_t *registry);

int (*init_func)(juice_agent_t *agent, struct conn_registry *registry,
udp_socket_config_t *config);
void (*cleanup_func)(juice_agent_t *agent);
void (*lock_func)(juice_agent_t *agent);
void (*unlock_func)(juice_agent_t *agent);
int (*interrupt_func)(juice_agent_t *agent);
int (*send_func)(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
int ds);
int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size);
int (*mux_listen_func)(conn_registry_t *registry, juice_cb_mux_incoming_t cb, void *user_ptr);
conn_registry_t *(*get_registry_func)(udp_socket_config_t *config);
bool (*can_release_registry_func)(conn_registry_t *registry);

mutex_t mutex;
conn_registry_t *registry;
} conn_mode_entry_t;

conn_mode_entry_t *conn_get_mode_entry(juice_concurrency_mode_t mode);
int conn_create(juice_agent_t *agent, udp_socket_config_t *config);
void conn_destroy(juice_agent_t *agent);
void conn_lock(juice_agent_t *agent);
Expand Down
Loading