Skip to content

Commit 424d207

Browse files
Merge pull request #292 from achingbrain/feat/support-multiple-juice_mux_listen-callbacks
feat: support multiple juice_mux_listen callbacks for different ports
2 parents 7215280 + bf0d9e4 commit 424d207

12 files changed

+665
-88
lines changed

CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ set(TESTS_SOURCES
6868
${CMAKE_CURRENT_SOURCE_DIR}/test/main.c
6969
${CMAKE_CURRENT_SOURCE_DIR}/test/crc32.c
7070
${CMAKE_CURRENT_SOURCE_DIR}/test/base64.c
71+
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled.c
72+
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-multiple.c
73+
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-no-host.c
74+
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-unhandle.c
7175
${CMAKE_CURRENT_SOURCE_DIR}/test/stun.c
7276
${CMAKE_CURRENT_SOURCE_DIR}/test/gathering.c
7377
${CMAKE_CURRENT_SOURCE_DIR}/test/connectivity.c

src/agent.h

+2
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ struct juice_agent {
149149

150150
thread_t resolver_thread;
151151
bool resolver_thread_started;
152+
153+
conn_registry_t *registry;
152154
};
153155

154156
juice_agent_t *agent_create(const juice_config_t *config);

src/conn.c

+69-83
Original file line numberDiff line numberDiff line change
@@ -15,51 +15,53 @@
1515

1616
#include <assert.h>
1717
#include <string.h>
18+
#include <stdio.h>
1819

1920
#define INITIAL_REGISTRY_SIZE 16
2021

21-
typedef struct conn_mode_entry {
22-
int (*registry_init_func)(conn_registry_t *registry, udp_socket_config_t *config);
23-
void (*registry_cleanup_func)(conn_registry_t *registry);
24-
25-
int (*init_func)(juice_agent_t *agent, struct conn_registry *registry,
26-
udp_socket_config_t *config);
27-
void (*cleanup_func)(juice_agent_t *agent);
28-
void (*lock_func)(juice_agent_t *agent);
29-
void (*unlock_func)(juice_agent_t *agent);
30-
int (*interrupt_func)(juice_agent_t *agent);
31-
int (*send_func)(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
32-
int ds);
33-
int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size);
34-
35-
mutex_t mutex;
36-
conn_registry_t *registry;
37-
} conn_mode_entry_t;
38-
3922
#define MODE_ENTRIES_SIZE 3
4023

4124
static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = {
4225
{conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup,
4326
conn_poll_lock, conn_poll_unlock, conn_poll_interrupt, conn_poll_send, conn_poll_get_addrs,
44-
MUTEX_INITIALIZER, NULL},
27+
NULL, NULL, NULL, MUTEX_INITIALIZER, NULL},
4528
{conn_mux_registry_init, conn_mux_registry_cleanup, conn_mux_init, conn_mux_cleanup,
4629
conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs,
47-
MUTEX_INITIALIZER, NULL},
48-
{NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock,
49-
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}};
30+
conn_mux_listen, conn_mux_get_registry, conn_mux_can_release_registry, MUTEX_INITIALIZER, NULL},
31+
{NULL, NULL, conn_thread_init, conn_thread_cleanup,
32+
conn_thread_lock, conn_thread_unlock, conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs,
33+
NULL, NULL, NULL, MUTEX_INITIALIZER, NULL}
34+
};
5035

51-
static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) {
52-
juice_concurrency_mode_t mode = agent->config.concurrency_mode;
36+
#define MODE_ENTRIES_SIZE 3
37+
38+
static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE];
39+
40+
conn_mode_entry_t *conn_get_mode_entry(juice_concurrency_mode_t mode) {
5341
assert(mode >= 0 && mode < MODE_ENTRIES_SIZE);
5442
return mode_entries + (int)mode;
5543
}
5644

57-
static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) {
45+
static conn_mode_entry_t *get_agent_mode_entry(juice_agent_t *agent) {
46+
juice_concurrency_mode_t mode = agent->config.concurrency_mode;
47+
return conn_get_mode_entry(mode);
48+
}
49+
50+
static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config, conn_registry_t **acquired) {
5851
// entry must be locked
59-
conn_registry_t *registry = entry->registry;
52+
conn_registry_t *registry;
53+
54+
if (entry->get_registry_func) {
55+
registry = entry->get_registry_func(config);
56+
} else {
57+
registry = entry->registry;
58+
}
59+
6060
if (!registry) {
61-
if (!entry->registry_init_func)
61+
if (!entry->registry_init_func) {
62+
*acquired = NULL;
6263
return 0;
64+
}
6365

6466
JLOG_DEBUG("Creating connections registry");
6567

@@ -92,33 +94,34 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi
9294
}
9395

9496
entry->registry = registry;
95-
9697
} else {
9798
mutex_lock(&registry->mutex);
9899
}
99100

101+
*acquired = registry;
102+
100103
// registry is locked
101104
return 0;
102105
}
103106

104-
static void release_registry(conn_mode_entry_t *entry) {
107+
static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry) {
105108
// entry must be locked
106-
conn_registry_t *registry = entry->registry;
107109
if (!registry)
108110
return;
109111

110112
// registry must be locked
113+
bool canRelease = entry->can_release_registry_func ? entry->can_release_registry_func(registry) : true;
111114

112-
if (registry->agents_count == 0 && registry->cb_mux_incoming == NULL) {
115+
if (registry->agents_count == 0 && canRelease) {
113116
JLOG_DEBUG("No connection left, destroying connections registry");
114117
mutex_unlock(&registry->mutex);
115118

116119
if (entry->registry_cleanup_func)
117120
entry->registry_cleanup_func(registry);
118121

122+
entry->registry = NULL;
119123
free(registry->agents);
120124
free(registry);
121-
entry->registry = NULL;
122125
return;
123126
}
124127

@@ -129,14 +132,15 @@ static void release_registry(conn_mode_entry_t *entry) {
129132
}
130133

131134
int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
132-
conn_mode_entry_t *entry = get_mode_entry(agent);
135+
conn_mode_entry_t *entry = get_agent_mode_entry(agent);
136+
conn_registry_t *registry;
133137
mutex_lock(&entry->mutex);
134-
if(acquire_registry(entry, config)) { // locks the registry if created
138+
if (acquire_registry(entry, config, &registry)) { // locks the registry if created
135139
mutex_unlock(&entry->mutex);
136140
return -1;
137141
}
138142

139-
conn_registry_t *registry = entry->registry;
143+
agent->registry = registry;
140144

141145
JLOG_DEBUG("Creating connection");
142146
if (registry) {
@@ -163,8 +167,8 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
163167
memset(registry->agents + i, 0, (new_size - i) * sizeof(juice_agent_t *));
164168
}
165169

166-
if (get_mode_entry(agent)->init_func(agent, registry, config)) {
167-
release_registry(entry); // unlocks the registry
170+
if (get_agent_mode_entry(agent)->init_func(agent, registry, config)) {
171+
release_registry(entry, registry); // unlocks the registry
168172
mutex_unlock(&entry->mutex);
169173
return -1;
170174
}
@@ -176,7 +180,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
176180
mutex_unlock(&registry->mutex);
177181

178182
} else {
179-
if (get_mode_entry(agent)->init_func(agent, NULL, config)) {
183+
if (get_agent_mode_entry(agent)->init_func(agent, NULL, config)) {
180184
mutex_unlock(&entry->mutex);
181185
return -1;
182186
}
@@ -190,11 +194,11 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
190194
}
191195

192196
void conn_destroy(juice_agent_t *agent) {
193-
conn_mode_entry_t *entry = get_mode_entry(agent);
197+
conn_mode_entry_t *entry = get_agent_mode_entry(agent);
194198
mutex_lock(&entry->mutex);
195199

196200
JLOG_DEBUG("Destroying connection");
197-
conn_registry_t *registry = entry->registry;
201+
conn_registry_t *registry = agent->registry;
198202
if (registry) {
199203
mutex_lock(&registry->mutex);
200204

@@ -210,7 +214,8 @@ void conn_destroy(juice_agent_t *agent) {
210214
assert(registry->agents_count > 0);
211215
--registry->agents_count;
212216

213-
release_registry(entry); // unlocks the registry
217+
agent->registry = NULL;
218+
release_registry(entry, registry); // unlocks the registry
214219

215220
} else {
216221
entry->cleanup_func(agent);
@@ -224,99 +229,80 @@ void conn_lock(juice_agent_t *agent) {
224229
if (!agent->conn_impl)
225230
return;
226231

227-
get_mode_entry(agent)->lock_func(agent);
232+
get_agent_mode_entry(agent)->lock_func(agent);
228233
}
229234

230235
void conn_unlock(juice_agent_t *agent) {
231236
if (!agent->conn_impl)
232237
return;
233238

234-
get_mode_entry(agent)->unlock_func(agent);
239+
get_agent_mode_entry(agent)->unlock_func(agent);
235240
}
236241

237242
int conn_interrupt(juice_agent_t *agent) {
238243
if (!agent->conn_impl)
239244
return -1;
240245

241-
return get_mode_entry(agent)->interrupt_func(agent);
246+
return get_agent_mode_entry(agent)->interrupt_func(agent);
242247
}
243248

244249
int conn_send(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
245250
int ds) {
246251
if (!agent->conn_impl)
247252
return -1;
248253

249-
return get_mode_entry(agent)->send_func(agent, dst, data, size, ds);
254+
return get_agent_mode_entry(agent)->send_func(agent, dst, data, size, ds);
250255
}
251256

252257
int conn_get_addrs(juice_agent_t *agent, addr_record_t *records, size_t size) {
253258
if (!agent->conn_impl)
254259
return -1;
255260

256-
return get_mode_entry(agent)->get_addrs_func(agent, records, size);
261+
return get_agent_mode_entry(agent)->get_addrs_func(agent, records, size);
257262
}
258263

259-
static int juice_mux_stop_listen(const char *bind_address, int local_port) {
260-
(void)bind_address;
261-
(void)local_port;
262-
264+
int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_incoming_t cb, void *user_ptr) {
263265
conn_mode_entry_t *entry = &mode_entries[JUICE_CONCURRENCY_MODE_MUX];
264266

265-
mutex_lock(&entry->mutex);
266-
267-
conn_registry_t *registry = entry->registry;
268-
if (!registry) {
269-
mutex_unlock(&entry->mutex);
267+
if (!entry->mux_listen_func) {
268+
JLOG_DEBUG("juice_mux_listen mux_listen_func is not implemented");
270269
return -1;
271270
}
272271

273-
mutex_lock(&registry->mutex);
274-
275-
registry->cb_mux_incoming = NULL;
276-
registry->mux_incoming_user_ptr = NULL;
277-
conn_mux_interrupt_registry(registry);
278-
279-
release_registry(entry);
280-
281-
mutex_unlock(&entry->mutex);
282-
283-
return 0;
284-
}
285-
286-
int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_incoming_t cb, void *user_ptr)
287-
{
288-
if (!cb)
289-
return juice_mux_stop_listen(bind_address, local_port);
272+
if (!entry->get_registry_func) {
273+
JLOG_DEBUG("juice_mux_listen get_registry_func is not implemented");
274+
return -1;
275+
}
290276

291-
conn_mode_entry_t *entry = &mode_entries[JUICE_CONCURRENCY_MODE_MUX];
277+
mutex_lock(&entry->mutex);
292278

293279
udp_socket_config_t config;
294280
config.bind_address = bind_address;
295281
config.port_begin = config.port_end = local_port;
296282

297-
mutex_lock(&entry->mutex);
283+
conn_registry_t *registry;
298284

299-
if (entry->registry) {
285+
// locks the registry, creating it first if required
286+
if(acquire_registry(entry, &config, &registry)) {
287+
JLOG_DEBUG("juice_mux_listen acquiring registry failed");
300288
mutex_unlock(&entry->mutex);
301-
JLOG_DEBUG("juice_mux_listen needs to be called before establishing any mux connection.");
302289
return -1;
303290
}
304291

305-
if(acquire_registry(entry, &config)) { // locks the registry if created
292+
if (!registry) {
293+
JLOG_DEBUG("juice_mux_listen registry not found after creating it");
306294
mutex_unlock(&entry->mutex);
307295
return -1;
308296
}
309297

310-
conn_registry_t *registry = entry->registry;
311-
if(!registry) {
298+
if (entry->mux_listen_func(registry, cb, user_ptr)) {
299+
JLOG_DEBUG("juice_mux_listen failed to call mux_listen_func for %s:%d", bind_address, local_port);
300+
release_registry(entry, registry);
312301
mutex_unlock(&entry->mutex);
313302
return -1;
314303
}
315304

316-
registry->cb_mux_incoming = cb;
317-
registry->mux_incoming_user_ptr = user_ptr;
318-
319-
mutex_unlock(&registry->mutex);
305+
release_registry(entry, registry);
320306
mutex_unlock(&entry->mutex);
321307
return 0;
322308
}

src/conn.h

+22-2
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,30 @@ typedef struct conn_registry {
3030
juice_agent_t **agents;
3131
int agents_size;
3232
int agents_count;
33-
juice_cb_mux_incoming_t cb_mux_incoming;
34-
void *mux_incoming_user_ptr;
3533
} conn_registry_t;
3634

35+
typedef struct conn_mode_entry {
36+
int (*registry_init_func)(conn_registry_t *registry, udp_socket_config_t *config);
37+
void (*registry_cleanup_func)(conn_registry_t *registry);
38+
39+
int (*init_func)(juice_agent_t *agent, struct conn_registry *registry,
40+
udp_socket_config_t *config);
41+
void (*cleanup_func)(juice_agent_t *agent);
42+
void (*lock_func)(juice_agent_t *agent);
43+
void (*unlock_func)(juice_agent_t *agent);
44+
int (*interrupt_func)(juice_agent_t *agent);
45+
int (*send_func)(juice_agent_t *agent, const addr_record_t *dst, const char *data, size_t size,
46+
int ds);
47+
int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size);
48+
int (*mux_listen_func)(conn_registry_t *registry, juice_cb_mux_incoming_t cb, void *user_ptr);
49+
conn_registry_t *(*get_registry_func)(udp_socket_config_t *config);
50+
bool (*can_release_registry_func)(conn_registry_t *registry);
51+
52+
mutex_t mutex;
53+
conn_registry_t *registry;
54+
} conn_mode_entry_t;
55+
56+
conn_mode_entry_t *conn_get_mode_entry(juice_concurrency_mode_t mode);
3757
int conn_create(juice_agent_t *agent, udp_socket_config_t *config);
3858
void conn_destroy(juice_agent_t *agent);
3959
void conn_lock(juice_agent_t *agent);

0 commit comments

Comments
 (0)