Skip to content

Commit d3885c4

Browse files
leonardo-albertovichedsiper
authored andcommitted
log_event_decoder: improved how group metadata is exposed to client code
Signed-off-by: Leonardo Alminana <leonardo.alminana@chronosphere.io>
1 parent 7ea7ca0 commit d3885c4

File tree

2 files changed

+50
-8
lines changed

2 files changed

+50
-8
lines changed

include/fluent-bit/flb_log_event_decoder.h

+3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@
4646
#define FLB_LOG_EVENT_EXPECTED_HEADER_ELEMENT_COUNT 2
4747

4848
struct flb_log_event_decoder {
49+
msgpack_object *current_group_attributes;
50+
msgpack_unpacked unpacked_group_record;
4951
int dynamically_allocated;
52+
msgpack_object *current_group_metadata;
5053
msgpack_unpacked unpacked_empty_map;
5154
size_t previous_offset;
5255
msgpack_unpacked unpacked_event;

src/flb_log_event_decoder.c

+47-8
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ void flb_log_event_decoder_reset(struct flb_log_event_decoder *context,
7272
context->buffer = input_buffer;
7373
context->length = input_length;
7474
context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA;
75+
context->current_group_metadata = NULL;
76+
context->current_group_attributes = NULL;
77+
78+
msgpack_unpacked_destroy(&context->unpacked_group_record);
79+
msgpack_unpacked_init(&context->unpacked_group_record);
7580

7681
msgpack_unpacked_destroy(&context->unpacked_event);
7782
msgpack_unpacked_init(&context->unpacked_event);
@@ -103,7 +108,7 @@ int flb_log_event_decoder_init(struct flb_log_event_decoder *context,
103108

104109
context->dynamically_allocated = FLB_FALSE;
105110
context->initialized = FLB_TRUE;
106-
context->read_groups = FLB_TRUE;
111+
context->read_groups = FLB_FALSE;
107112

108113
flb_log_event_decoder_reset(context, input_buffer, input_length);
109114

@@ -141,6 +146,7 @@ void flb_log_event_decoder_destroy(struct flb_log_event_decoder *context)
141146

142147
if (context != NULL) {
143148
if (context->initialized) {
149+
msgpack_unpacked_destroy(&context->unpacked_group_record);
144150
msgpack_unpacked_destroy(&context->unpacked_empty_map);
145151
msgpack_unpacked_destroy(&context->unpacked_event);
146152
}
@@ -180,12 +186,12 @@ int flb_log_event_decoder_decode_timestamp(msgpack_object *input,
180186
return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
181187
}
182188

183-
output->tm.tv_sec =
189+
output->tm.tv_sec =
184190
(int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER(
185191
FLB_ALIGNED_DWORD_READ(
186192
(unsigned char *) &input->via.ext.ptr[0]));
187193

188-
output->tm.tv_nsec =
194+
output->tm.tv_nsec =
189195
(int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER(
190196
FLB_ALIGNED_DWORD_READ(
191197
(unsigned char *) &input->via.ext.ptr[4]));
@@ -340,16 +346,49 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context,
340346
/* get log event type */
341347
ret = flb_log_event_decoder_get_record_type(event, &record_type);
342348
if (ret != 0) {
349+
context->current_group_metadata = NULL;
350+
context->current_group_attributes = NULL;
351+
343352
context->last_result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE;
344353
return context->last_result;
345354
}
346355

347-
/*
348-
* if we hava a group type record and the caller don't want groups, just
349-
* skip this record and move to the next one.
356+
/* Meta records such as the group opener and closer are identified by negative
357+
* timestamp values. In these cases we track the current group metadata and
358+
* attributes in order to transparently provide them through the log_event
359+
* structure but we also want to allow the client code raw access to such
360+
* records which is why the read_groups decoder context property is used
361+
* to determine the behavior.
350362
*/
351-
if (record_type != FLB_LOG_EVENT_NORMAL && !context->read_groups) {
352-
return flb_log_event_decoder_next(context, event);
363+
if (record_type != FLB_LOG_EVENT_NORMAL) {
364+
if (context->read_groups != FLB_TRUE) {
365+
msgpack_unpacked_destroy(&context->unpacked_group_record);
366+
367+
if (record_type == FLB_LOG_EVENT_GROUP_START) {
368+
memcpy(&context->unpacked_group_record,
369+
&context->unpacked_event,
370+
sizeof(msgpack_unpacked));
371+
372+
context->current_group_metadata = event->metadata;
373+
context->current_group_attributes = event->body;
374+
}
375+
else {
376+
msgpack_unpacked_destroy(&context->unpacked_event);
377+
378+
context->current_group_metadata = NULL;
379+
context->current_group_attributes = NULL;
380+
}
381+
382+
msgpack_unpacked_init(&context->unpacked_event);
383+
384+
memset(event, 0, sizeof(struct flb_log_event));
385+
386+
return flb_log_event_decoder_next(context, event);
387+
}
388+
}
389+
else {
390+
event->group_metadata = context->current_group_metadata;
391+
event->group_attributes = context->current_group_attributes;
353392
}
354393
}
355394

0 commit comments

Comments
 (0)