Skip to content

Commit ec14389

Browse files
leonardo-albertovichedsiper
authored andcommitted
log: added a new management signal to overcome a dedadlock
Signed-off-by: Leonardo Alminana <leonardo.alminana@chronosphere.io>
1 parent df1bf13 commit ec14389

File tree

2 files changed

+82
-12
lines changed

2 files changed

+82
-12
lines changed

include/fluent-bit/flb_log.h

+5
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ extern FLB_TLS_DEFINE(struct flb_log, flb_log_ctx)
5454
#define FLB_LOG_EVENT MK_EVENT_NOTIFICATION
5555
#define FLB_LOG_MNG 1024
5656

57+
58+
#define FLB_LOG_MNG_TERMINATION_SIGNAL 1
59+
#define FLB_LOG_MNG_REFRESH_SIGNAL 2
60+
61+
5762
#define FLB_LOG_CACHE_ENTRIES 10
5863
#define FLB_LOG_CACHE_TEXT_BUF_SIZE 1024
5964

src/flb_log.c

+77-12
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,43 @@ struct log_message {
4747
char msg[4096 - sizeof(size_t)];
4848
};
4949

50-
static inline int consume_byte(flb_pipefd_t fd)
50+
static inline int64_t flb_log_consume_signal(struct flb_log *context)
5151
{
52-
int ret;
53-
uint64_t val;
52+
int64_t signal_value;
53+
int result;
54+
55+
result = flb_pipe_r(context->ch_mng[0],
56+
&signal_value,
57+
sizeof(signal_value));
5458

55-
/* We need to consume the byte */
56-
ret = flb_pipe_r(fd, &val, sizeof(val));
57-
if (ret <= 0) {
59+
if (result <= 0) {
5860
flb_errno();
61+
5962
return -1;
6063
}
6164

62-
return 0;
65+
return signal_value;
66+
}
67+
68+
static inline int flb_log_enqueue_signal(struct flb_log *context,
69+
int64_t signal_value)
70+
{
71+
int result;
72+
73+
result = flb_pipe_w(context->ch_mng[1],
74+
&signal_value,
75+
sizeof(signal_value));
76+
77+
if (result <= 0) {
78+
flb_errno();
79+
80+
result = 1;
81+
}
82+
else {
83+
result = 0;
84+
}
85+
86+
return result;
6387
}
6488

6589
static inline int log_push(struct log_message *msg, struct flb_log *log)
@@ -95,15 +119,19 @@ static inline int log_read(flb_pipefd_t fd, struct flb_log *log)
95119
* we can trust we will always get a full message on each read(2).
96120
*/
97121
bytes = flb_pipe_read_all(fd, &msg, sizeof(struct log_message));
122+
98123
if (bytes <= 0) {
99124
flb_errno();
125+
100126
return -1;
101127
}
102128
if (msg.size > sizeof(msg.msg)) {
103129
fprintf(stderr, "[log] message too long: %zi > %zi",
104130
msg.size, sizeof(msg.msg));
131+
105132
return -1;
106133
}
134+
107135
log_push(&msg, log);
108136

109137
return bytes;
@@ -115,6 +143,7 @@ static void log_worker_collector(void *data)
115143
int run = FLB_TRUE;
116144
struct mk_event *event = NULL;
117145
struct flb_log *log = data;
146+
int64_t signal_value;
118147

119148
FLB_TLS_INIT(flb_log_ctx);
120149
FLB_TLS_SET(flb_log_ctx, log);
@@ -129,13 +158,31 @@ static void log_worker_collector(void *data)
129158

130159
while (run) {
131160
mk_event_wait(log->evl);
161+
132162
mk_event_foreach(event, log->evl) {
133163
if (event->type == FLB_LOG_EVENT) {
134164
log_read(event->fd, log);
135165
}
136166
else if (event->type == FLB_LOG_MNG) {
137-
consume_byte(event->fd);
138-
run = FLB_FALSE;
167+
signal_value = flb_log_consume_signal(log);
168+
169+
if (signal_value == FLB_LOG_MNG_TERMINATION_SIGNAL) {
170+
run = FLB_FALSE;
171+
}
172+
else if (signal_value == FLB_LOG_MNG_REFRESH_SIGNAL) {
173+
/* This signal is only used to
174+
* break the loop when a new client is
175+
* added in order to prevent a deadlock
176+
* that happens if the newly added pipes capacity
177+
* is exceeded during the initialization process
178+
* of a threaded input plugin which causes write
179+
* to block (until the logger thread consumes
180+
* the buffered data) which in turn keeps the
181+
* thread from triggering the status set
182+
* condition which causes the main thread to
183+
* lock indefinitely as described in issue 9667.
184+
*/
185+
}
139186
}
140187
}
141188
}
@@ -326,18 +373,35 @@ int flb_log_worker_init(struct flb_worker *worker)
326373
/* Register the read-end of the pipe (log[0]) into the event loop */
327374
ret = mk_event_add(log->evl, worker->log[0],
328375
FLB_LOG_EVENT, MK_EVENT_READ, &worker->event);
376+
329377
if (ret == -1) {
330378
flb_pipe_destroy(worker->log);
379+
380+
return -1;
381+
}
382+
383+
ret = flb_log_enqueue_signal(log, FLB_LOG_MNG_REFRESH_SIGNAL);
384+
385+
if (ret == -1) {
386+
mk_event_del(log->evl, &worker->event);
387+
388+
flb_pipe_destroy(worker->log);
389+
331390
return -1;
332391
}
333392

334393
/* Log cache to reduce noise */
335394
cache = flb_log_cache_create(10, FLB_LOG_CACHE_ENTRIES);
336395
if (!cache) {
396+
mk_event_del(log->evl, &worker->event);
397+
337398
flb_pipe_destroy(worker->log);
399+
338400
return -1;
339401
}
402+
340403
worker->log_cache = cache;
404+
341405
return 0;
342406
}
343407

@@ -431,6 +495,7 @@ struct flb_log *flb_log_create(struct flb_config *config, int type,
431495
/* Register channel manager into the event loop */
432496
ret = mk_event_add(log->evl, log->ch_mng[0],
433497
FLB_LOG_MNG, MK_EVENT_READ, &log->event);
498+
434499
if (ret == -1) {
435500
fprintf(stderr, "[log] could not register event\n");
436501
mk_event_loop_destroy(log->evl);
@@ -650,6 +715,7 @@ void flb_log_print(int type, const char *file, int line, const char *fmt, ...)
650715
w = flb_worker_get();
651716
if (w) {
652717
n = flb_pipe_write_all(w->log[1], &msg, sizeof(msg));
718+
653719
if (n == -1) {
654720
fprintf(stderr, "%s", (char *) msg.msg);
655721
perror("write");
@@ -681,10 +747,9 @@ int flb_errno_print(int errnum, const char *file, int line)
681747

682748
int flb_log_destroy(struct flb_log *log, struct flb_config *config)
683749
{
684-
uint64_t val = FLB_TRUE;
685-
686750
/* Signal the child worker, stop working */
687-
flb_pipe_w(log->ch_mng[1], &val, sizeof(val));
751+
flb_log_enqueue_signal(log, FLB_LOG_MNG_TERMINATION_SIGNAL);
752+
688753
pthread_join(log->tid, NULL);
689754

690755
/* Release resources */

0 commit comments

Comments
 (0)