forked from elastic/elastic-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler_action_application.go
181 lines (155 loc) · 5.69 KB
/
handler_action_application.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package handlers
import (
"context"
"fmt"
"time"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
const (
defaultActionTimeout = time.Minute
maxActionTimeout = time.Hour
)
var errActionTimeoutInvalid = errors.New("action timeout is invalid")
// AppAction is a handler for application actions.
type AppAction struct {
log *logger.Logger
coord *coordinator.Coordinator
agentID string
}
// NewAppAction creates a new AppAction handler.
func NewAppAction(log *logger.Logger, coord *coordinator.Coordinator, agentID string) *AppAction {
return &AppAction{
log: log,
coord: coord,
agentID: agentID,
}
}
// Handle handles application action.
func (h *AppAction) Handle(ctx context.Context, a fleetapi.Action, acker acker.Acker) error {
h.log.Debugf("handlerAppAction: action '%+v' received", a)
action, ok := a.(*fleetapi.ActionApp)
if !ok {
return fmt.Errorf("invalid type, expected ActionApp and received %T", a)
}
// Validate action
// Disabled for 8.8.0 release in order to limit the surface
// https://github.com/elastic/security-team/issues/6501
//
// h.log.Debugf("handlerAppAction: validate action '%+v', for agentID %s", a, h.agentID)
// validated, err := protection.ValidateAction(*action, h.logLevelSetter.Protection().SignatureValidationKey, h.agentID)
// if err != nil {
// action.StartedAt = time.Now().UTC().Format(time.RFC3339Nano)
// action.CompletedAt = action.StartedAt
// h.log.Errorf("handlerAppAction: action '%+v' failed validation: %v", action, err) // error details are logged
// action.Error = fmt.Sprintf("action failed validation: %s", action.InputType) // generic error message for the action response
// return acker.Ack(ctx, action)
// }
// action = &validated
state := h.coord.State()
comp, unit, ok := findUnitFromInputType(state, action.InputType)
if !ok {
// If the matching action is not found ack the action with the error for action result document
action.StartedAt = time.Now().UTC().Format(time.RFC3339Nano)
action.CompletedAt = action.StartedAt
action.Error = fmt.Sprintf("matching app is not found for action input: %s", action.InputType)
return acker.Ack(ctx, action)
}
// Deserialize the action into map[string]interface{} for dispatching over to the apps
params, err := action.MarshalMap()
if err != nil {
return err
}
start := time.Now().UTC()
timeout := defaultActionTimeout
if action.Timeout > 0 {
timeout = time.Duration(action.Timeout) * time.Second
if timeout > maxActionTimeout {
h.log.Debugf("handlerAppAction: action '%v' timeout exceeds maximum allowed %v", action.InputType, maxActionTimeout)
err = errActionTimeoutInvalid
}
}
var res map[string]interface{}
if err == nil {
h.log.Debugf("handlerAppAction: action '%v' started with timeout: %v", action.ActionType, timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
res, err = h.coord.PerformAction(ctx, comp, unit, action.InputType, params)
}
end := time.Now().UTC()
startFormatted := start.Format(time.RFC3339Nano)
endFormatted := end.Format(time.RFC3339Nano)
h.log.Debugf("handlerAppAction: action '%v' finished, startFormatted: %v, endFormatted: %v, err: %v", action.InputType, startFormatted, endFormatted, err)
if err != nil {
action.StartedAt = startFormatted
action.CompletedAt = endFormatted
action.Error = err.Error()
} else {
action.StartedAt = readMapString(res, "started_at", startFormatted)
action.CompletedAt = readMapString(res, "completed_at", endFormatted)
action.Error = readMapString(res, "error", "")
appendActionResponse(action, action.InputType, res)
}
return acker.Ack(ctx, action)
}
var (
none = struct{}{}
// The set of action response fields are not included in the action_response property, because there are already set to top level fields
excludeActionResponseFields = map[string]struct{}{
"started_at": none,
"completed_at": none,
"error": none,
}
)
// appendActionResponse appends the action response property with all the action response values excluding the ones specified in excludeActionResponseFields
//
// "action_response": {
// "endpoint": {
// "acked": true
// }
// }
func appendActionResponse(action *fleetapi.ActionApp, inputType string, res map[string]interface{}) {
if len(res) == 0 {
return
}
m := make(map[string]interface{}, len(res))
for k, v := range res {
if _, ok := excludeActionResponseFields[k]; !ok {
m[k] = v
}
}
if len(m) > 0 {
mt := make(map[string]interface{}, 1)
mt[inputType] = m
action.Response = mt
}
}
func readMapString(m map[string]interface{}, key string, def string) string {
if m == nil {
return def
}
if v, ok := m[key]; ok {
if s, ok := v.(string); ok && s != "" {
return s
}
}
return def
}
func findUnitFromInputType(state coordinator.State, inputType string) (component.Component, component.Unit, bool) {
for _, comp := range state.Components {
for _, unit := range comp.Component.Units {
if unit.Type == client.UnitTypeInput && unit.Config != nil && unit.Config.Type == inputType {
return comp.Component, unit, true
}
}
}
return component.Component{}, component.Unit{}, false
}