diff --git a/include/fluent-bit/flb_log_event.h b/include/fluent-bit/flb_log_event.h index 4e394858580..2c1d99acc31 100644 --- a/include/fluent-bit/flb_log_event.h +++ b/include/fluent-bit/flb_log_event.h @@ -38,6 +38,8 @@ #define FLB_LOG_EVENT_GROUP_END (int32_t) -2 struct flb_log_event { + msgpack_object *group_attributes; + msgpack_object *group_metadata; msgpack_object *raw_timestamp; struct flb_time timestamp; msgpack_object *metadata; diff --git a/include/fluent-bit/flb_log_event_decoder.h b/include/fluent-bit/flb_log_event_decoder.h index 503b43cfe12..7fdbaa619b9 100644 --- a/include/fluent-bit/flb_log_event_decoder.h +++ b/include/fluent-bit/flb_log_event_decoder.h @@ -46,7 +46,10 @@ #define FLB_LOG_EVENT_EXPECTED_HEADER_ELEMENT_COUNT 2 struct flb_log_event_decoder { + msgpack_object *current_group_attributes; + msgpack_unpacked unpacked_group_record; int dynamically_allocated; + msgpack_object *current_group_metadata; msgpack_unpacked unpacked_empty_map; size_t previous_offset; msgpack_unpacked unpacked_event; diff --git a/plugins/out_splunk/splunk.c b/plugins/out_splunk/splunk.c index 413c1e04036..5e9bd37ab4e 100644 --- a/plugins/out_splunk/splunk.c +++ b/plugins/out_splunk/splunk.c @@ -53,6 +53,215 @@ static int cb_splunk_init(struct flb_output_instance *ins, return 0; } +static msgpack_object *local_msgpack_map_lookup( + msgpack_object *map_object, + char *key) +{ + size_t key_length; + size_t index; + msgpack_object_map *map; + + if (key == NULL) { + return NULL; + } + + if (map_object == NULL) { + return NULL; + } + + if (map_object->type != MSGPACK_OBJECT_MAP) { + return NULL; + } + + map = &map_object->via.map; + + key_length = strlen(key); + + for (index = 0; index < map->size ; index++) { + if (map->ptr[index].key.type == MSGPACK_OBJECT_STR) { + if (map->ptr[index].key.via.str.size == key_length) { + if (strncmp(map->ptr[index].key.via.str.ptr, + key, + key_length) == 0) { + return &map->ptr[index].val; + } + } + } + } + + return NULL; +} + +static int local_msgpack_map_string_lookup( + msgpack_object *map_object, + char *key, + char **value, + size_t *value_size) +{ + msgpack_object *value_object; + + value_object = local_msgpack_map_lookup(map_object, key); + + if (value_object == NULL) { + return -1; + } + + if (value_object->type != MSGPACK_OBJECT_STR) { + return -2; + } + + *value = (char *) value_object->via.str.ptr; + *value_size = value_object->via.str.size; + + return 0; +} + +static int local_msgpack_map_string_extract( + msgpack_object *map_object, + char *key, + char *output_buffer, + size_t output_buffer_size) +{ + size_t value_size; + int result; + char *value; + + result = local_msgpack_map_string_lookup(map_object, + key, + &value, + &value_size); + + if (result != 0) { + return -1; + } + + if (value_size >= output_buffer_size) { + return -2; + } + + strncpy(output_buffer, + value, + value_size); + + output_buffer[value_size] = '\0'; + + return 0; +} + +static inline void local_msgpack_pack_cstr(msgpack_packer *packer, char *value) +{ + msgpack_pack_str(packer, strlen(value)); + msgpack_pack_str_body(packer, value, strlen(value)); +} + +static int pack_otel_data(struct flb_splunk *ctx, + msgpack_packer *mp_pck, + struct flb_mp_map_header *mh_pck, + msgpack_object *group_metadata, + msgpack_object *group_attributes, + msgpack_object *record_attributes) +{ + msgpack_object *source_map; + char schema[8]; + int result; + struct flb_mp_map_header mh_tmp; + msgpack_object *value; + size_t index; + + result = local_msgpack_map_string_extract(group_metadata, + "schema", + schema, + sizeof(schema)); + + if (result != 0) { + return 0; + } + + if (strcmp(schema, "otlp") != 0) { + return 0; + } + + source_map = local_msgpack_map_lookup(group_attributes, "resource"); + + if (source_map != NULL) { + source_map = local_msgpack_map_lookup(source_map, "attributes"); + + if (source_map != NULL) { + value = local_msgpack_map_lookup(source_map, + "host.name"); + + if (value != NULL) { + flb_mp_map_header_append(mh_pck); + + local_msgpack_pack_cstr(mp_pck, "host"); + + msgpack_pack_object(mp_pck, *value); + } + else { + return -2; + } + } + } + + flb_mp_map_header_append(mh_pck); + + local_msgpack_pack_cstr(mp_pck, "fields"); + + flb_mp_map_header_init(&mh_tmp, mp_pck); + + source_map = local_msgpack_map_lookup(record_attributes, "otlp"); + + if (source_map != NULL) { + value = local_msgpack_map_lookup(source_map, + "severity_number"); + + if (value != NULL && + (value->type == MSGPACK_OBJECT_POSITIVE_INTEGER || + value->type == MSGPACK_OBJECT_NEGATIVE_INTEGER)) { + flb_mp_map_header_append(&mh_tmp); + + local_msgpack_pack_cstr(mp_pck, "otel.log.severity.number"); + + msgpack_pack_object(mp_pck, *value); + } + else { + return -2; + } + + value = local_msgpack_map_lookup(source_map, + "severity_text"); + + if (value != NULL && + value->type == MSGPACK_OBJECT_STR) { + flb_mp_map_header_append(&mh_tmp); + + local_msgpack_pack_cstr(mp_pck, "otel.log.severity.text"); + + msgpack_pack_object(mp_pck, *value); + } + else { + return -3; + } + + source_map = local_msgpack_map_lookup(source_map, "attributes"); + + if (source_map != NULL && + source_map->type == MSGPACK_OBJECT_MAP) { + + for (index = 0; index < source_map->via.map.size ; index++) { + flb_mp_map_header_append(&mh_tmp); + + msgpack_pack_object(mp_pck, source_map->via.map.ptr[index].key); + msgpack_pack_object(mp_pck, source_map->via.map.ptr[index].val); + } + } + } + + flb_mp_map_header_end(&mh_tmp); + + return 0; +} + static int pack_map_meta(struct flb_splunk *ctx, struct flb_mp_map_header *mh, msgpack_packer *mp_pck, @@ -202,9 +411,15 @@ static int pack_map_meta(struct flb_splunk *ctx, } static int pack_map(struct flb_splunk *ctx, msgpack_packer *mp_pck, - struct flb_time *tm, msgpack_object map, - char *tag, int tag_len) + struct flb_time *tm, + msgpack_object *group_metadata, + msgpack_object *group_attributes, + msgpack_object *record_attributes, + msgpack_object map, + char *tag, + int tag_len) { + int result; int i; double t; int map_size; @@ -232,6 +447,20 @@ static int pack_map(struct flb_splunk *ctx, msgpack_packer *mp_pck, /* Pack Splunk metadata */ pack_map_meta(ctx, &mh, mp_pck, map, tag, tag_len); + /* Pack Otel specific metadata */ + + result = pack_otel_data(ctx, + mp_pck, + &mh, + group_metadata, + group_attributes, + record_attributes); + + if (result != 0) { + printf("ERROR %d\n", result); + return -1; + } + /* Add k/v pairs under the key 'event' instead of to the top level object */ flb_mp_map_header_append(&mh); msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT) -1); @@ -475,12 +704,28 @@ static inline int splunk_format(const void *in_buf, size_t in_bytes, * record, we just warn the user and try to pack it * as a normal map. */ - ret = pack_map(ctx, &mp_pck, &log_event.timestamp, map, tag, tag_len); + ret = pack_map(ctx, + &mp_pck, + &log_event.timestamp, + log_event.group_metadata, + log_event.group_attributes, + log_event.metadata, + map, + tag, + tag_len); } } else { /* Pack as a map */ - ret = pack_map(ctx, &mp_pck, &log_event.timestamp, map, tag, tag_len); + ret = pack_map(ctx, + &mp_pck, + &log_event.timestamp, + log_event.group_metadata, + log_event.group_attributes, + log_event.metadata, + map, + tag, + tag_len); } /* Validate packaging */ diff --git a/plugins/out_stdout/stdout.c b/plugins/out_stdout/stdout.c index 625d3d3ec89..bfe181d51bc 100644 --- a/plugins/out_stdout/stdout.c +++ b/plugins/out_stdout/stdout.c @@ -234,6 +234,17 @@ static void cb_stdout_flush(struct flb_event_chunk *event_chunk, while (flb_log_event_decoder_next(&log_decoder, &log_event) == FLB_EVENT_DECODER_SUCCESS) { + + if (log_event.group_attributes != NULL) { + printf("GROUP METADATA : \n\n"); + msgpack_object_print(stdout, *log_event.group_metadata); + printf("\n\n"); + + printf("GROUP ATTRIBUTES : \n\n"); + msgpack_object_print(stdout, *log_event.group_attributes); + printf("\n\n"); + } + printf("[%zd] %s: [[", cnt++, event_chunk->tag); printf("%"PRId32".%09lu, ", (int32_t) log_event.timestamp.tm.tv_sec, diff --git a/src/flb_log_event_decoder.c b/src/flb_log_event_decoder.c index a63d674a12a..e229b42f251 100644 --- a/src/flb_log_event_decoder.c +++ b/src/flb_log_event_decoder.c @@ -72,6 +72,11 @@ void flb_log_event_decoder_reset(struct flb_log_event_decoder *context, context->buffer = input_buffer; context->length = input_length; context->last_result = FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA; + context->current_group_metadata = NULL; + context->current_group_attributes = NULL; + + msgpack_unpacked_destroy(&context->unpacked_group_record); + msgpack_unpacked_init(&context->unpacked_group_record); msgpack_unpacked_destroy(&context->unpacked_event); msgpack_unpacked_init(&context->unpacked_event); @@ -103,7 +108,7 @@ int flb_log_event_decoder_init(struct flb_log_event_decoder *context, context->dynamically_allocated = FLB_FALSE; context->initialized = FLB_TRUE; - context->read_groups = FLB_TRUE; + context->read_groups = FLB_FALSE; flb_log_event_decoder_reset(context, input_buffer, input_length); @@ -141,6 +146,7 @@ void flb_log_event_decoder_destroy(struct flb_log_event_decoder *context) if (context != NULL) { if (context->initialized) { + msgpack_unpacked_destroy(&context->unpacked_group_record); msgpack_unpacked_destroy(&context->unpacked_empty_map); msgpack_unpacked_destroy(&context->unpacked_event); } @@ -180,12 +186,12 @@ int flb_log_event_decoder_decode_timestamp(msgpack_object *input, return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE; } - output->tm.tv_sec = + output->tm.tv_sec = (int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER( FLB_ALIGNED_DWORD_READ( (unsigned char *) &input->via.ext.ptr[0])); - output->tm.tv_nsec = + output->tm.tv_nsec = (int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER( FLB_ALIGNED_DWORD_READ( (unsigned char *) &input->via.ext.ptr[4])); @@ -340,16 +346,49 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context, /* get log event type */ ret = flb_log_event_decoder_get_record_type(event, &record_type); if (ret != 0) { + context->current_group_metadata = NULL; + context->current_group_attributes = NULL; + context->last_result = FLB_EVENT_DECODER_ERROR_DESERIALIZATION_FAILURE; return context->last_result; } - /* - * if we hava a group type record and the caller don't want groups, just - * skip this record and move to the next one. + /* Meta records such as the group opener and closer are identified by negative + * timestamp values. In these cases we track the current group metadata and + * attributes in order to transparently provide them through the log_event + * structure but we also want to allow the client code raw access to such + * records which is why the read_groups decoder context property is used + * to determine the behavior. */ - if (record_type != FLB_LOG_EVENT_NORMAL && !context->read_groups) { - return flb_log_event_decoder_next(context, event); + if (record_type != FLB_LOG_EVENT_NORMAL) { + if (context->read_groups != FLB_TRUE) { + msgpack_unpacked_destroy(&context->unpacked_group_record); + + if (record_type == FLB_LOG_EVENT_GROUP_START) { + memcpy(&context->unpacked_group_record, + &context->unpacked_event, + sizeof(msgpack_unpacked)); + + context->current_group_metadata = event->metadata; + context->current_group_attributes = event->body; + } + else { + msgpack_unpacked_destroy(&context->unpacked_event); + + context->current_group_metadata = NULL; + context->current_group_attributes = NULL; + } + + msgpack_unpacked_init(&context->unpacked_event); + + memset(event, 0, sizeof(struct flb_log_event)); + + return flb_log_event_decoder_next(context, event); + } + } + else { + event->group_metadata = context->current_group_metadata; + event->group_attributes = context->current_group_attributes; } } diff --git a/tests/runtime/in_opentelemetry.c b/tests/runtime/in_opentelemetry.c index 7dc0a2c2435..f281bd3bd23 100644 --- a/tests/runtime/in_opentelemetry.c +++ b/tests/runtime/in_opentelemetry.c @@ -91,6 +91,8 @@ static char *get_group_metadata(void *chunk, size_t size) ret = flb_log_event_decoder_init(&log_decoder, chunk, size); TEST_CHECK(ret == FLB_EVENT_DECODER_SUCCESS); + flb_log_event_decoder_read_groups(&log_decoder, FLB_TRUE); + ret = flb_log_event_decoder_next(&log_decoder, &log_event); if (ret != FLB_EVENT_DECODER_SUCCESS) { return NULL; @@ -112,6 +114,8 @@ static char *get_group_body(void *chunk, size_t size) ret = flb_log_event_decoder_init(&log_decoder, chunk, size); TEST_CHECK(ret == FLB_EVENT_DECODER_SUCCESS); + flb_log_event_decoder_read_groups(&log_decoder, FLB_TRUE); + ret = flb_log_event_decoder_next(&log_decoder, &log_event); if (ret != FLB_EVENT_DECODER_SUCCESS) { return NULL; @@ -133,6 +137,8 @@ static char *get_log_body(void *chunk, size_t size) ret = flb_log_event_decoder_init(&log_decoder, chunk, size); TEST_CHECK(ret == FLB_EVENT_DECODER_SUCCESS); + flb_log_event_decoder_read_groups(&log_decoder, FLB_TRUE); + /* 0: group header */ flb_log_event_decoder_next(&log_decoder, &log_event);