Skip to content

Commit 47a9051

Browse files
0x006EA1E5edsiper
authored andcommitted
out_loki: add stuctured_metadata_map_keys
* Adds stuctured_metadata_map_keys config to dynamically populate stuctured_metadata from a map * Add docker-compose to test loki backend Signed-off-by: Greg Eales <0x006EA1E5@gmail.com>
1 parent 166c936 commit 47a9051

File tree

8 files changed

+532
-21
lines changed

8 files changed

+532
-21
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
### Description
2+
3+
This directory has a docker-compose file and its
4+
configuration required to run:
5+
6+
1) A fluentbit installation with a dummy input, and Loki output configured for `structured_metadata_map_keys`
7+
3) A Loki installation
8+
4) A grafana installation with a default Loki datasource
9+
10+
To run this, execute:
11+
12+
$ docker-compose up --force-recreate -d
13+
14+
n.b., the [docker compose file](./docker-compose.yml) contains an `image` and a commented out `build` section. Change
15+
these to build from local source.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
service:
2+
log_level: debug
3+
4+
pipeline:
5+
inputs:
6+
- name: dummy
7+
tag: logs
8+
dummy: |
9+
{
10+
"message": "simple log generated",
11+
"logger": "my.logger",
12+
"level": "INFO",
13+
"hostname": "localhost",
14+
"my_map_of_attributes_1": {
15+
"key_1": "hello, world!",
16+
"key_2": "goodbye, world!"
17+
},
18+
"my_map_of_maps_1": {
19+
"root_key": {
20+
"sub_key_1": "hello, world!",
21+
"sub_key_2": "goodbye, world!"
22+
}
23+
}
24+
}
25+
26+
outputs:
27+
- name: loki
28+
match: logs
29+
host: loki
30+
remove_keys: hostname,my_map_of_attributes_1,my_map_of_maps_1
31+
label_keys: $level,$logger
32+
labels: service_name=test
33+
structured_metadata: $hostname
34+
structured_metadata_map_keys: $my_map_of_attributes_1,$my_map_of_maps_1['root_key']
35+
line_format: key_value
36+
drop_single_key: on
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# config file version
2+
apiVersion: 1
3+
4+
# list of datasources that should be deleted from the database
5+
deleteDatasources:
6+
- name: Loki
7+
orgId: 1
8+
9+
# list of datasources to insert/update depending
10+
# whats available in the database
11+
datasources:
12+
- name: Loki
13+
type: loki
14+
access: proxy
15+
orgId: 1
16+
url: http://loki:3100
17+
basicAuth: false
18+
isDefault: true
19+
version: 1
20+
editable: false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
auth_enabled: false
2+
3+
server:
4+
http_listen_port: 3100
5+
grpc_listen_port: 9096
6+
7+
common:
8+
instance_addr: 127.0.0.1
9+
path_prefix: /tmp/loki
10+
storage:
11+
filesystem:
12+
chunks_directory: /tmp/loki/chunks
13+
rules_directory: /tmp/loki/rules
14+
replication_factor: 1
15+
ring:
16+
kvstore:
17+
store: inmemory
18+
19+
query_range:
20+
results_cache:
21+
cache:
22+
embedded_cache:
23+
enabled: true
24+
max_size_mb: 100
25+
26+
schema_config:
27+
configs:
28+
- from: 2020-10-24
29+
store: tsdb
30+
object_store: filesystem
31+
schema: v13
32+
index:
33+
prefix: index_
34+
period: 24h
35+
36+
ruler:
37+
alertmanager_url: http://localhost:9093
38+
39+
# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
40+
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
41+
#
42+
# Statistics help us better understand how Loki is used, and they show us performance
43+
# levels for most users. This helps us prioritize features and documentation.
44+
# For more information on what's sent, look at
45+
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
46+
# Refer to the buildReport method to see what goes into a report.
47+
#
48+
# If you would like to disable reporting, uncomment the following lines:
49+
#analytics:
50+
# reporting_enabled: false
51+
limits_config:
52+
allow_structured_metadata: true
53+
volume_enabled: true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
services:
2+
fluentbit:
3+
# Comment out `image` and uncomment `build` to build the fluent-bit image from local source
4+
image: fluent/fluent-bit:latest
5+
# build:
6+
# context: ../../
7+
# dockerfile: dockerfiles/Dockerfile
8+
depends_on:
9+
- loki
10+
container_name: fluentbit
11+
command: /fluent-bit/bin/fluent-bit -c /etc/fluent-bit_loki_out-structured_metadata_map.yaml
12+
ports:
13+
- 2021:2021
14+
networks:
15+
- loki-network
16+
volumes:
17+
- ./config/fluent-bit_loki_out-structured_metadata_map.yaml:/etc/fluent-bit_loki_out-structured_metadata_map.yaml
18+
19+
grafana:
20+
image: grafana/grafana:11.4.0
21+
depends_on:
22+
- loki
23+
- fluentbit
24+
ports:
25+
- 3000:3000
26+
volumes:
27+
- ./config/grafana/provisioning:/etc/grafana/provisioning
28+
networks:
29+
- loki-network
30+
environment:
31+
- GF_SECURITY_ADMIN_PASSWORD=admin
32+
33+
loki:
34+
image: grafana/loki:2.9.2
35+
command: -config.file=/etc/loki/loki-config.yaml
36+
networks:
37+
- loki-network
38+
ports:
39+
- 3100:3100
40+
volumes:
41+
- ./config/loki-config.yaml:/etc/loki/loki-config.yaml
42+
43+
networks:
44+
loki-network:
45+
driver: bridge

plugins/out_loki/loki.c

+142-5
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx)
298298
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_list) {
299299
kv = mk_list_entry(head, struct flb_loki_kv, _head);
300300

301+
/* unlink and destroy */
302+
mk_list_del(&kv->_head);
303+
flb_loki_kv_destroy(kv);
304+
}
305+
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_map_keys_list) {
306+
kv = mk_list_entry(head, struct flb_loki_kv, _head);
307+
301308
/* unlink and destroy */
302309
mk_list_del(&kv->_head);
303310
flb_loki_kv_destroy(kv);
@@ -416,6 +423,93 @@ static void pack_kv(struct flb_loki *ctx,
416423
}
417424
}
418425

426+
/*
427+
* Similar to pack_kv above, except will only use msgpack_objects of type
428+
* MSGPACK_OBJECT_MAP, and will iterate over the keys adding each entry as a
429+
* separate item. Non-string map values are serialised to JSON, as Loki requires
430+
* all values to be strings.
431+
*/
432+
static void pack_maps(struct flb_loki *ctx,
433+
msgpack_packer *mp_pck,
434+
char *tag, int tag_len,
435+
msgpack_object *map,
436+
struct flb_mp_map_header *mh,
437+
struct mk_list *list)
438+
{
439+
struct mk_list *head;
440+
struct flb_loki_kv *kv;
441+
442+
msgpack_object *start_key;
443+
msgpack_object *out_key;
444+
msgpack_object *out_val;
445+
446+
msgpack_object_map accessed_map;
447+
uint32_t accessed_map_index;
448+
msgpack_object_kv accessed_map_kv;
449+
450+
char *accessed_map_val_json;
451+
452+
mk_list_foreach(head, list) {
453+
/* get the flb_loki_kv for this iteration of the loop */
454+
kv = mk_list_entry(head, struct flb_loki_kv, _head);
455+
456+
/* record accessor key/value pair */
457+
if (kv->ra_key != NULL && kv->ra_val == NULL) {
458+
459+
/* try to get the value for the record accessor */
460+
if (flb_ra_get_kv_pair(kv->ra_key, *map, &start_key, &out_key, &out_val)
461+
!= -1) {
462+
463+
/*
464+
* we require the value to be a map, or it doesn't make sense as
465+
* this is adding a map's key / values
466+
*/
467+
if (out_val->type != MSGPACK_OBJECT_MAP || out_val->via.map.size <= 0) {
468+
flb_plg_debug(ctx->ins, "No valid map data found for key %s",
469+
kv->ra_key->pattern);
470+
}
471+
else {
472+
accessed_map = out_val->via.map;
473+
474+
/* for each entry in the accessed map... */
475+
for (accessed_map_index = 0; accessed_map_index < accessed_map.size;
476+
accessed_map_index++) {
477+
478+
/* get the entry */
479+
accessed_map_kv = accessed_map.ptr[accessed_map_index];
480+
481+
/* Pack the key and value */
482+
flb_mp_map_header_append(mh);
483+
484+
pack_label_key(mp_pck, (char*) accessed_map_kv.key.via.str.ptr,
485+
accessed_map_kv.key.via.str.size);
486+
487+
/* If the value is a string, just pack it... */
488+
if (accessed_map_kv.val.type == MSGPACK_OBJECT_STR) {
489+
msgpack_pack_str_with_body(mp_pck,
490+
accessed_map_kv.val.via.str.ptr,
491+
accessed_map_kv.val.via.str.size);
492+
}
493+
/*
494+
* ...otherwise convert value to JSON string, as Loki always
495+
* requires a string value
496+
*/
497+
else {
498+
accessed_map_val_json = flb_msgpack_to_json_str(1024,
499+
&accessed_map_kv.val);
500+
if (accessed_map_val_json) {
501+
msgpack_pack_str_with_body(mp_pck, accessed_map_val_json,
502+
strlen(accessed_map_val_json));
503+
flb_free(accessed_map_val_json);
504+
}
505+
}
506+
}
507+
}
508+
}
509+
}
510+
}
511+
}
512+
419513
static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
420514
msgpack_packer *mp_pck,
421515
char *tag, int tag_len,
@@ -424,7 +518,17 @@ static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
424518
struct flb_mp_map_header mh;
425519
/* Initialize dynamic map header */
426520
flb_mp_map_header_init(&mh, mp_pck);
427-
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
521+
if (ctx->structured_metadata_map_keys) {
522+
pack_maps(ctx, mp_pck, tag, tag_len, map, &mh,
523+
&ctx->structured_metadata_map_keys_list);
524+
}
525+
/*
526+
* explicit structured_metadata entries override
527+
* structured_metadata_map_keys entries
528+
* */
529+
if (ctx->structured_metadata) {
530+
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
531+
}
428532
flb_mp_map_header_end(&mh);
429533
return 0;
430534
}
@@ -788,6 +892,7 @@ static int parse_labels(struct flb_loki *ctx)
788892

789893
flb_loki_kv_init(&ctx->labels_list);
790894
flb_loki_kv_init(&ctx->structured_metadata_list);
895+
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);
791896

792897
if (ctx->structured_metadata) {
793898
ret = parse_kv(ctx, ctx->structured_metadata, &ctx->structured_metadata_list, &ra_used);
@@ -796,6 +901,28 @@ static int parse_labels(struct flb_loki *ctx)
796901
}
797902
}
798903

904+
/* Append structured metadata map keys set in the configuration */
905+
if (ctx->structured_metadata_map_keys) {
906+
mk_list_foreach(head, ctx->structured_metadata_map_keys) {
907+
entry = mk_list_entry(head, struct flb_slist_entry, _head);
908+
if (entry->str[0] != '$') {
909+
flb_plg_error(ctx->ins,
910+
"invalid structured metadata map key, the name must start "
911+
"with '$'");
912+
return -1;
913+
}
914+
915+
ret = flb_loki_kv_append(ctx, &ctx->structured_metadata_map_keys_list,
916+
entry->str, NULL);
917+
if (ret == -1) {
918+
return -1;
919+
}
920+
else if (ret > 0) {
921+
ra_used++;
922+
}
923+
}
924+
}
925+
799926
if (ctx->labels) {
800927
ret = parse_kv(ctx, ctx->labels, &ctx->labels_list, &ra_used);
801928
if (ret == -1) {
@@ -971,6 +1098,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
9711098
ctx->ins = ins;
9721099
flb_loki_kv_init(&ctx->labels_list);
9731100
flb_loki_kv_init(&ctx->structured_metadata_list);
1101+
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);
9741102

9751103
/* Register context with plugin instance */
9761104
flb_output_set_context(ins, ctx);
@@ -1539,12 +1667,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
15391667
while ((ret = flb_log_event_decoder_next(
15401668
&log_decoder,
15411669
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
1542-
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
1670+
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
1671+
ctx->structured_metadata_map_keys ? 3 : 2);
15431672

15441673
/* Append the timestamp */
15451674
pack_timestamp(&mp_pck, &log_event.timestamp);
15461675
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
1547-
if (ctx->structured_metadata) {
1676+
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
15481677
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL);
15491678
}
15501679
}
@@ -1575,12 +1704,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
15751704
msgpack_pack_str_body(&mp_pck, "values", 6);
15761705
msgpack_pack_array(&mp_pck, 1);
15771706

1578-
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
1707+
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
1708+
ctx->structured_metadata_map_keys ? 3 : 2);
15791709

15801710
/* Append the timestamp */
15811711
pack_timestamp(&mp_pck, &log_event.timestamp);
15821712
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
1583-
if (ctx->structured_metadata) {
1713+
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
15841714
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body);
15851715
}
15861716
}
@@ -1905,6 +2035,13 @@ static struct flb_config_map config_map[] = {
19052035
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata),
19062036
"optional structured metadata fields for API requests."
19072037
},
2038+
2039+
{
2040+
FLB_CONFIG_MAP_CLIST, "structured_metadata_map_keys", NULL,
2041+
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata_map_keys),
2042+
"optional structured metadata fields, as derived dynamically from configured maps "
2043+
"keys, for API requests."
2044+
},
19082045

19092046
{
19102047
FLB_CONFIG_MAP_BOOL, "auto_kubernetes_labels", "false",

0 commit comments

Comments
 (0)