Skip to content

Commit 23f93a4

Browse files
committed
feat: support multiple juice_mux_listen callbacks
Supports multiple callbacks for unhandled STUN requests. Stores multiple registries, one per local listening port in a `conn_registry_t **registries;` field on `conn_mode_entry_t`. The list of registries grows in the same way as the list of agents in `conn_mode_entry_t`. Adds a `port` property to `conn_registry_t` which is the port the registry is bound to, and an `index` property which is the index of the registry in the containing `registries` list in `conn_mode_entry_t`.
1 parent 2fb91a3 commit 23f93a4

9 files changed

+490
-19
lines changed

CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ set(TESTS_SOURCES
6868
${CMAKE_CURRENT_SOURCE_DIR}/test/main.c
6969
${CMAKE_CURRENT_SOURCE_DIR}/test/crc32.c
7070
${CMAKE_CURRENT_SOURCE_DIR}/test/base64.c
71+
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled.c
72+
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-multiple.c
73+
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-no-host.c
74+
${CMAKE_CURRENT_SOURCE_DIR}/test/stun-unhandled-unhandle.c
7175
${CMAKE_CURRENT_SOURCE_DIR}/test/stun.c
7276
${CMAKE_CURRENT_SOURCE_DIR}/test/gathering.c
7377
${CMAKE_CURRENT_SOURCE_DIR}/test/connectivity.c

src/agent.h

+2
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ struct juice_agent {
149149

150150
thread_t resolver_thread;
151151
bool resolver_thread_started;
152+
153+
conn_registry_t *registry;
152154
};
153155

154156
juice_agent_t *agent_create(const juice_config_t *config);

src/conn.c

+124-19
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include <assert.h>
1717
#include <string.h>
18+
#include <stdio.h>
1819

1920
#define INITIAL_REGISTRY_SIZE 16
2021

@@ -33,30 +34,114 @@ typedef struct conn_mode_entry {
3334
int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size);
3435

3536
mutex_t mutex;
36-
conn_registry_t *registry;
37+
conn_registry_t **registries;
38+
int registries_size;
39+
int registries_count;
3740
} conn_mode_entry_t;
3841

3942
#define MODE_ENTRIES_SIZE 3
4043

4144
static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = {
4245
{conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup,
4346
conn_poll_lock, conn_poll_unlock, conn_poll_interrupt, conn_poll_send, conn_poll_get_addrs,
44-
MUTEX_INITIALIZER, NULL},
47+
MUTEX_INITIALIZER, NULL, 0, 0},
4548
{conn_mux_registry_init, conn_mux_registry_cleanup, conn_mux_init, conn_mux_cleanup,
4649
conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs,
47-
MUTEX_INITIALIZER, NULL},
50+
MUTEX_INITIALIZER, NULL, 0, 0},
4851
{NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock,
49-
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}};
52+
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL, 0, 0}};
5053

5154
static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) {
5255
juice_concurrency_mode_t mode = agent->config.concurrency_mode;
5356
assert(mode >= 0 && mode < MODE_ENTRIES_SIZE);
5457
return mode_entries + (int)mode;
5558
}
5659

60+
// accepts a host name and a port and returns a token that can be used to
61+
// distinguish one mux request handler from another. E.g.
62+
//
63+
// '127.0.0.1', 8080 -> '127.0.0.1:8080'
64+
// '::', 8080 -> '[::]:8080'
65+
// NULL, 8080 -> 'any:8080'
66+
// '', 8080 -> 'any:8080'
67+
static char *get_address (const char *bind_address, uint16_t port) {
68+
if (!bind_address || strcmp(bind_address, "") == 0) {
69+
bind_address = "any";
70+
}
71+
72+
// search for '.' in bind_address, treat as IPv4 if found
73+
char *result = strchr(bind_address, '.');
74+
int index = (int)(result - bind_address);
75+
int maxAddrSize = 48; // ip6 is 39 chars + [ + ] + : + 5 for the port + \0
76+
char *address = (char*) calloc(1, maxAddrSize * sizeof(char));
77+
78+
// ip6
79+
char *format = "[%s]:%d";
80+
81+
if (index > -1) {
82+
// ip4
83+
format = "%s:%d";
84+
}
85+
86+
sprintf(address, format, bind_address, port);
87+
88+
return address;
89+
}
90+
91+
static conn_registry_t *get_port_registry(conn_mode_entry_t *entry, const char *bind_address, uint16_t port) {
92+
char *address = get_address(bind_address, port);
93+
94+
for (int i = 0; i < entry->registries_size; i++) {
95+
if (!entry->registries[i]) {
96+
continue;
97+
}
98+
99+
if (strcmp(entry->registries[i]->address, address) == 0) {
100+
return entry->registries[i];
101+
}
102+
}
103+
104+
return NULL;
105+
}
106+
107+
static int add_registry(conn_mode_entry_t *entry, conn_registry_t *registry) {
108+
int i = 0;
109+
while (i < entry->registries_size && entry->registries[i])
110+
++i;
111+
112+
if (i == entry->registries_size) {
113+
int new_size = entry->registries_size * 2;
114+
115+
if (new_size == 0) {
116+
new_size = 1;
117+
}
118+
119+
JLOG_DEBUG("Reallocating registries array, new_size=%d", new_size);
120+
assert(new_size > 0);
121+
122+
conn_registry_t **new_registries =
123+
realloc(entry->registries, new_size * sizeof(conn_registry_t *));
124+
if (!new_registries) {
125+
JLOG_FATAL("Memory reallocation failed for registries array");
126+
return -1;
127+
}
128+
129+
entry->registries = new_registries;
130+
entry->registries_size = new_size;
131+
memset(entry->registries + i, 0, (new_size - i) * sizeof(conn_registry_t *));
132+
}
133+
134+
entry->registries[i] = registry;
135+
registry->registry_index = i;
136+
++entry->registries_count;
137+
138+
return 0;
139+
}
140+
57141
static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) {
58142
// entry must be locked
59-
conn_registry_t *registry = entry->registry;
143+
conn_registry_t *registry = get_port_registry(entry, config->bind_address, config->port_begin);
144+
60145
if (!registry) {
61146
if (!entry->registry_init_func)
62147
return 0;
@@ -83,16 +168,25 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi
83168
mutex_init(&registry->mutex, MUTEX_RECURSIVE);
84169
mutex_lock(&registry->mutex);
85170

171+
registry->address = get_address(config->bind_address, config->port_begin);
172+
86173
if (entry->registry_init_func(registry, config)) {
87174
JLOG_FATAL("Registry initialization failed");
88175
mutex_unlock(&registry->mutex);
89176
free(registry->agents);
177+
free(registry->address);
90178
free(registry);
91179
return -1;
92180
}
93181

94-
entry->registry = registry;
95-
182+
if (add_registry(entry, registry)) {
183+
JLOG_FATAL("Adding registry to entry failed");
184+
mutex_unlock(&registry->mutex);
185+
free(registry->agents);
186+
free(registry->address);
187+
free(registry);
188+
return -1;
189+
}
96190
} else {
97191
mutex_lock(&registry->mutex);
98192
}
@@ -101,9 +195,8 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi
101195
return 0;
102196
}
103197

104-
static void release_registry(conn_mode_entry_t *entry) {
198+
static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry) {
105199
// entry must be locked
106-
conn_registry_t *registry = entry->registry;
107200
if (!registry)
108201
return;
109202

@@ -116,9 +209,19 @@ static void release_registry(conn_mode_entry_t *entry) {
116209
if (entry->registry_cleanup_func)
117210
entry->registry_cleanup_func(registry);
118211

212+
if (registry->registry_index > -1) {
213+
int i = registry->registry_index;
214+
assert(entry->registries[i] == registry);
215+
entry->registries[i] = NULL;
216+
registry->registry_index = -1;
217+
}
218+
219+
assert(entry->registries_count > 0);
220+
--entry->registries_count;
221+
119222
free(registry->agents);
223+
free(registry->address);
120224
free(registry);
121-
entry->registry = NULL;
122225
return;
123226
}
124227

@@ -136,7 +239,8 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
136239
return -1;
137240
}
138241

139-
conn_registry_t *registry = entry->registry;
242+
conn_registry_t *registry = get_port_registry(entry, config->bind_address, config->port_begin);
243+
agent->registry = registry;
140244

141245
JLOG_DEBUG("Creating connection");
142246
if (registry) {
@@ -164,7 +268,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
164268
}
165269

166270
if (get_mode_entry(agent)->init_func(agent, registry, config)) {
167-
release_registry(entry); // unlocks the registry
271+
release_registry(entry, registry); // unlocks the registry
168272
mutex_unlock(&entry->mutex);
169273
return -1;
170274
}
@@ -194,7 +298,7 @@ void conn_destroy(juice_agent_t *agent) {
194298
mutex_lock(&entry->mutex);
195299

196300
JLOG_DEBUG("Destroying connection");
197-
conn_registry_t *registry = entry->registry;
301+
conn_registry_t *registry = agent->registry;
198302
if (registry) {
199303
mutex_lock(&registry->mutex);
200304

@@ -205,12 +309,13 @@ void conn_destroy(juice_agent_t *agent) {
205309
assert(registry->agents[i] == agent);
206310
registry->agents[i] = NULL;
207311
agent->conn_index = -1;
312+
agent->registry = NULL;
208313
}
209314

210315
assert(registry->agents_count > 0);
211316
--registry->agents_count;
212317

213-
release_registry(entry); // unlocks the registry
318+
release_registry(entry, registry); // unlocks the registry
214319

215320
} else {
216321
entry->cleanup_func(agent);
@@ -264,7 +369,7 @@ static int juice_mux_stop_listen(const char *bind_address, int local_port) {
264369

265370
mutex_lock(&entry->mutex);
266371

267-
conn_registry_t *registry = entry->registry;
372+
conn_registry_t *registry = get_port_registry(entry, bind_address, local_port);
268373
if (!registry) {
269374
mutex_unlock(&entry->mutex);
270375
return -1;
@@ -276,7 +381,7 @@ static int juice_mux_stop_listen(const char *bind_address, int local_port) {
276381
registry->mux_incoming_user_ptr = NULL;
277382
conn_mux_interrupt_registry(registry);
278383

279-
release_registry(entry);
384+
release_registry(entry, registry);
280385

281386
mutex_unlock(&entry->mutex);
282387

@@ -296,9 +401,9 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco
296401

297402
mutex_lock(&entry->mutex);
298403

299-
if (entry->registry) {
404+
if (get_port_registry(entry, bind_address, local_port)) {
300405
mutex_unlock(&entry->mutex);
301-
JLOG_DEBUG("juice_mux_listen needs to be called before establishing any mux connection.");
406+
JLOG_DEBUG("juice_mux_listen there is already a listener for this host/port combination.");
302407
return -1;
303408
}
304409

@@ -307,7 +412,7 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco
307412
return -1;
308413
}
309414

310-
conn_registry_t *registry = entry->registry;
415+
conn_registry_t *registry = get_port_registry(entry, bind_address, local_port);
311416
if(!registry) {
312417
mutex_unlock(&entry->mutex);
313418
return -1;

src/conn.h

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ typedef struct juice_agent juice_agent_t;
2525
// See include/juice/juice.h for implemented concurrency modes
2626

2727
typedef struct conn_registry {
28+
int registry_index;
29+
char *address;
2830
void *impl;
2931
mutex_t mutex;
3032
juice_agent_t **agents;

test/main.c

+29
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ int test_turn(void);
2222
int test_conflict(void);
2323
int test_bind(void);
2424
int test_ufrag(void);
25+
int test_stun_unhandled(void);
26+
int test_stun_unhandled_multiple(void);
27+
int test_stun_unhandled_no_host(void);
28+
int test_stun_unhandled_unhandle(void);
2529

2630
#ifndef NO_SERVER
2731
int test_server(void);
@@ -104,6 +108,31 @@ int main(int argc, char **argv) {
104108
return -1;
105109
}
106110

111+
#ifndef _WIN32
112+
// windows fails to read STUN message from listen socket:
113+
// udp.c:196: Ignoring ECONNRESET returned by recvfrom
114+
printf("\nRunning unhandled STUN message test...\n");
115+
if (test_stun_unhandled()) {
116+
fprintf(stderr, "Unhandled STUN message test failed\n");
117+
return -1;
118+
}
119+
printf("\nRunning mutiple handler unhandled STUN message test...\n");
120+
if (test_stun_unhandled_multiple()) {
121+
fprintf(stderr, "Mutiple handler unhandled STUN message test failed\n");
122+
return -1;
123+
}
124+
printf("\nRunning unhandled, unhandled STUN message test...\n");
125+
if (test_stun_unhandled_unhandle()) {
126+
fprintf(stderr, "Unhandled, unhandled STUN message test failed\n");
127+
return -1;
128+
}
129+
printf("\nRunning no host unhandled STUN message test...\n");
130+
if (test_stun_unhandled_no_host()) {
131+
fprintf(stderr, "Mutiple no host unhandled STUN message test failed\n");
132+
return -1;
133+
}
134+
#endif
135+
107136
#ifndef NO_SERVER
108137
printf("\nRunning server test...\n");
109138
if (test_server()) {

0 commit comments

Comments
 (0)