-
Notifications
You must be signed in to change notification settings - Fork 158
/
Copy pathapm_config_modifier.go
151 lines (123 loc) · 4.72 KB
/
apm_config_modifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package application
import (
"fmt"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/config"
monitoringcfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/utils"
)
// InjectAPMConfig is a modifier passed to coordinator in order to set the global APM configuration used for the agent
// into each Component coming from input/output configuration
func InjectAPMConfig(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) {
tracesEnabled, err := getAPMTracesEnabled(cfg)
if err != nil {
return comps, fmt.Errorf("error retrieving APM traces flag: %w", err)
}
if !tracesEnabled {
// nothing to do
return comps, nil
}
apmConfig, err := getAPMConfigFromMap(cfg)
if err != nil {
return comps, fmt.Errorf("error retrieving apm config: %w", err)
}
if apmConfig == nil {
// nothing to do
return comps, nil
}
for i := range comps {
// We shouldn't really go straight from config datamodel to protobuf datamodel (a core datamodel would be nice to
// abstract from protocol details)
if comps[i].Component == nil {
comps[i].Component = new(proto.Component)
}
comps[i].Component.ApmConfig = runtime.MapAPMConfig(apmConfig)
}
return comps, nil
}
func getAPMTracesEnabled(cfg map[string]any) (bool, error) {
rawTracesEnabled, err := utils.GetNestedMap(cfg, "agent", "monitoring", "traces")
if errors.Is(err, utils.ErrKeyNotFound) {
// We didn't find the key, return false without any error
return false, nil
}
if err != nil {
return false, fmt.Errorf("error accessing trace flag: %w", err)
}
traceEnabled, ok := rawTracesEnabled.(bool)
if !ok {
return false, fmt.Errorf("trace flag has unexpected type %T", rawTracesEnabled)
}
return traceEnabled, nil
}
func getAPMConfigFromMap(cfg map[string]any) (*monitoringcfg.APMConfig, error) {
nestedValue, err := utils.GetNestedMap(cfg, "agent", "monitoring", "apm")
if errors.Is(err, utils.ErrKeyNotFound) {
// No APM config found, nothing to do
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("error traversing config: %w", err)
}
rawApmConfig, ok := nestedValue.(map[string]any)
if !ok {
return nil, fmt.Errorf("the retrieved apm configs is not a map: %T", nestedValue)
}
newConfigFrom, err := config.NewConfigFrom(rawApmConfig)
if err != nil {
return nil, fmt.Errorf("error parsing apm config: %w", err)
}
monitoringConfig := new(monitoringcfg.APMConfig)
err = newConfigFrom.UnpackTo(monitoringConfig)
if err != nil {
return nil, fmt.Errorf("error unpacking apm config: %w", err)
}
return monitoringConfig, nil
}
func noop(change coordinator.ConfigChange) coordinator.ConfigChange {
return change
}
// PatchAPMConfig is a temporary configuration patcher function (see ConfigPatchManager and ConfigPatch for reference) that
// will patch the configuration coming from Fleet adding the APM parameters from the elastic agent configuration file
// until Fleet supports this config directly
func PatchAPMConfig(log *logger.Logger, rawConfig *config.Config) func(change coordinator.ConfigChange) coordinator.ConfigChange {
configMap, err := rawConfig.ToMapStr()
if err != nil {
log.Errorf("error decoding raw config, patching disabled: %v", err)
return noop
}
tracesEnabled, err := getAPMTracesEnabled(configMap)
if err != nil {
log.Errorf("error retrieving trace flag, patching disabled: %v", err)
return noop
}
apmConfig, err := getAPMConfigFromMap(configMap)
if err != nil {
log.Errorf("error retrieving apm config, patching disabled: %v", err)
return noop
}
if !tracesEnabled && apmConfig == nil {
// traces disabled and no apm config -> no patching happening
log.Debugf("traces disabled and no apm config: no patching necessary")
return noop
}
monitoringPatch := map[string]any{"traces": tracesEnabled}
if apmConfig != nil {
monitoringPatch["apm"] = apmConfig
}
return func(change coordinator.ConfigChange) coordinator.ConfigChange {
err := change.Config().Merge(map[string]any{"agent": map[string]any{"monitoring": monitoringPatch}})
if err != nil {
log.Errorf("error patching apm config into configchange: %v", err)
}
return change
}
}