Skip to content

Commit 2e09734

Browse files
[Agent] Onboard to new ACK (#17148)
[Agent] Onboard to new ACK (#17148)
1 parent a2385bd commit 2e09734

File tree

7 files changed

+64
-25
lines changed

7 files changed

+64
-25
lines changed

x-pack/agent/pkg/agent/application/config.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ func localConfigDefault() *localConfig {
113113
type FleetAgentConfig struct {
114114
API *APIAccess `config:"api" yaml:"api"`
115115
Reporting *LogReporting `config:"reporting" yaml:"reporting"`
116+
Info *AgentInfo `config:"agent_info" yaml:"agent_info"`
117+
}
118+
119+
// AgentInfo is a set of agent information.
120+
type AgentInfo struct {
121+
ID string `json:"ID" yaml:"ID" config:"ID"`
116122
}
117123

118124
// APIAccess contains the required details to connect to the Kibana endpoint.
@@ -146,15 +152,17 @@ func defaultFleetAgentConfig() *FleetAgentConfig {
146152
Log: logreporter.DefaultLogConfig(),
147153
Fleet: fleetreporter.DefaultFleetManagementConfig(),
148154
},
155+
Info: &AgentInfo{},
149156
}
150157
}
151158

152-
func createFleetConfigFromEnroll(access *APIAccess) (*FleetAgentConfig, error) {
159+
func createFleetConfigFromEnroll(agentID string, access *APIAccess) (*FleetAgentConfig, error) {
153160
if err := access.Validate(); err != nil {
154161
return nil, errors.New(err, "invalid enrollment options", errors.TypeConfig)
155162
}
156163

157164
cfg := defaultFleetAgentConfig()
158165
cfg.API = access
166+
cfg.Info.ID = agentID
159167
return cfg, nil
160168
}

x-pack/agent/pkg/agent/application/enroll_cmd.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (c *EnrollCmd) Execute() error {
161161
errors.TypeNetwork)
162162
}
163163

164-
fleetConfig, err := createFleetConfigFromEnroll(&APIAccess{
164+
fleetConfig, err := createFleetConfigFromEnroll(resp.Item.ID, &APIAccess{
165165
AccessAPIKey: resp.Item.AccessAPIKey,
166166
Kibana: c.kibanaConfig,
167167
})

x-pack/agent/pkg/agent/application/fleet_acker.go

+23-7
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ package application
77
import (
88
"context"
99
"fmt"
10+
"time"
1011

1112
"github.com/elastic/beats/v7/x-pack/agent/pkg/agent/errors"
1213
"github.com/elastic/beats/v7/x-pack/agent/pkg/core/logger"
1314
"github.com/elastic/beats/v7/x-pack/agent/pkg/fleetapi"
1415
"github.com/elastic/beats/v7/x-pack/agent/pkg/scheduler"
1516
)
1617

18+
const fleetTimeFormat = "2006-01-02T15:04:05.99999-07:00"
19+
1720
type actionAcker struct {
1821
log *logger.Logger
1922
dispatcher dispatcher
@@ -38,36 +41,38 @@ func newActionAcker(
3841

3942
func (f *actionAcker) Ack(ctx context.Context, action fleetapi.Action) error {
4043
// checkin
44+
agentID := f.agentInfo.AgentID()
4145
cmd := fleetapi.NewAckCmd(f.agentInfo, f.client)
4246
req := &fleetapi.AckRequest{
43-
Actions: []string{
44-
action.ID(),
47+
Events: []fleetapi.AckEvent{
48+
constructEvent(action, agentID),
4549
},
4650
}
4751

4852
_, err := cmd.Execute(ctx, req)
4953
if err != nil {
50-
return errors.New(err, fmt.Sprintf("acknowledge action '%s' failed", action.ID()), errors.TypeNetwork)
54+
return errors.New(err, fmt.Sprintf("acknowledge action '%s' for agent '%s' failed", action.ID(), agentID), errors.TypeNetwork)
5155
}
5256

5357
return nil
5458
}
5559

5660
func (f *actionAcker) AckBatch(ctx context.Context, actions []fleetapi.Action) error {
5761
// checkin
58-
ids := make([]string, 0, len(actions))
62+
agentID := f.agentInfo.AgentID()
63+
events := make([]fleetapi.AckEvent, 0, len(actions))
5964
for _, action := range actions {
60-
ids = append(ids, action.ID())
65+
events = append(events, constructEvent(action, agentID))
6166
}
6267

6368
cmd := fleetapi.NewAckCmd(f.agentInfo, f.client)
6469
req := &fleetapi.AckRequest{
65-
Actions: ids,
70+
Events: events,
6671
}
6772

6873
_, err := cmd.Execute(ctx, req)
6974
if err != nil {
70-
return errors.New(err, fmt.Sprintf("acknowledge %d actions '%v' failed", len(actions), actions), errors.TypeNetwork)
75+
return errors.New(err, fmt.Sprintf("acknowledge %d actions '%v' for agent '%s' failed", len(actions), actions, agentID), errors.TypeNetwork)
7176
}
7277

7378
return nil
@@ -77,6 +82,17 @@ func (f *actionAcker) Commit(ctx context.Context) error {
7782
return nil
7883
}
7984

85+
func constructEvent(action fleetapi.Action, agentID string) fleetapi.AckEvent {
86+
return fleetapi.AckEvent{
87+
EventType: "ACTION_RESULT",
88+
SubType: "ACKNOWLEDGED",
89+
Timestamp: time.Now().Format(fleetTimeFormat),
90+
ActionID: action.ID(),
91+
AgentID: agentID,
92+
Message: fmt.Sprintf("Action '%s' ot type '%s' acknowledged.", action.ID(), action.Type()),
93+
}
94+
}
95+
8096
type noopAcker struct{}
8197

8298
func newNoopAcker() *noopAcker {

x-pack/agent/pkg/agent/application/fleet_acker_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020

2121
func TestAcker(t *testing.T) {
2222
type ackRequest struct {
23-
Actions []string `json:"action_ids"`
23+
Events []fleetapi.AckEvent `json:"events"`
2424
}
2525

2626
log, _ := logger.New()
@@ -45,8 +45,8 @@ func TestAcker(t *testing.T) {
4545
err = json.Unmarshal(content, &cr)
4646
assert.NoError(t, err)
4747

48-
assert.EqualValues(t, 1, len(cr.Actions))
49-
assert.EqualValues(t, testID, cr.Actions[0])
48+
assert.EqualValues(t, 1, len(cr.Events))
49+
assert.EqualValues(t, testID, cr.Events[0].ActionID)
5050

5151
resp := wrapStrToResp(http.StatusOK, `{ "actions": [], "success": true }`)
5252
return resp, nil

x-pack/agent/pkg/agent/application/lazy_acker_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
func TestLazyAcker(t *testing.T) {
2323
type ackRequest struct {
24-
Actions []string `json:"action_ids"`
24+
Events []fleetapi.AckEvent `json:"events"`
2525
}
2626

2727
log, _ := logger.New()
@@ -52,16 +52,16 @@ func TestLazyAcker(t *testing.T) {
5252
err = json.Unmarshal(content, &cr)
5353
assert.NoError(t, err)
5454

55-
if len(cr.Actions) == 0 {
55+
if len(cr.Events) == 0 {
5656
t.Fatal("expected events but got none")
5757
}
58-
if cr.Actions[0] == testID1 {
59-
assert.EqualValues(t, 2, len(cr.Actions))
60-
assert.EqualValues(t, testID1, cr.Actions[0])
61-
assert.EqualValues(t, testID2, cr.Actions[1])
58+
if cr.Events[0].ActionID == testID1 {
59+
assert.EqualValues(t, 2, len(cr.Events))
60+
assert.EqualValues(t, testID1, cr.Events[0].ActionID)
61+
assert.EqualValues(t, testID2, cr.Events[1].ActionID)
6262

6363
} else {
64-
assert.EqualValues(t, 1, len(cr.Actions))
64+
assert.EqualValues(t, 1, len(cr.Events))
6565
}
6666

6767
resp := wrapStrToResp(http.StatusOK, `{ "actions": [], "success": true }`)

x-pack/agent/pkg/fleetapi/ack_cmd.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,25 @@ import (
1616

1717
const ackPath = "/api/ingest_manager/fleet/agents/%s/acks"
1818

19+
// AckEvent is an event sent in an ACK request.
20+
type AckEvent struct {
21+
EventType string `json:"type"` // 'STATE' | 'ERROR' | 'ACTION_RESULT' | 'ACTION'
22+
SubType string `json:"subtype"` // 'RUNNING','STARTING','IN_PROGRESS','CONFIG','FAILED','STOPPING','STOPPED','DATA_DUMP','ACKNOWLEDGED','UNKNOWN';
23+
Timestamp string `json:"timestamp"` // : '2019-01-05T14:32:03.36764-05:00',
24+
ActionID string `json:"action_id"` // : '48cebde1-c906-4893-b89f-595d943b72a2',
25+
AgentID string `json:"agent_id"` // : 'agent1',
26+
Message string `json:"message,omitempty"` // : 'hello2',
27+
Payload string `json:"payload,omitempty"` // : 'payload2',
28+
}
29+
1930
// AckRequest consists of multiple actions acked to fleet ui.
2031
// POST /agents/{agentId}/acks
2132
// Authorization: ApiKey {AgentAccessApiKey}
2233
// {
2334
// "action_ids": ["id1"]
2435
// }
2536
type AckRequest struct {
26-
Actions []string `json:"action_ids"`
37+
Events []AckEvent `json:"events"`
2738
}
2839

2940
// Validate validates the enrollment request before sending it to the API.

x-pack/agent/pkg/fleetapi/ack_cmd_test.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestAck(t *testing.T) {
3232
w.WriteHeader(http.StatusOK)
3333

3434
responses := struct {
35-
ActionIDs []string `json:"action_ids"`
35+
Events []AckEvent `json:"events"`
3636
}{}
3737

3838
decoder := json.NewDecoder(r.Body)
@@ -41,9 +41,9 @@ func TestAck(t *testing.T) {
4141
err := decoder.Decode(&responses)
4242
require.NoError(t, err)
4343

44-
require.Equal(t, 1, len(responses.ActionIDs))
44+
require.Equal(t, 1, len(responses.Events))
4545

46-
id := responses.ActionIDs[0]
46+
id := responses.Events[0].ActionID
4747
require.Equal(t, "my-id", id)
4848

4949
fmt.Fprintf(w, raw)
@@ -62,8 +62,12 @@ func TestAck(t *testing.T) {
6262
cmd := NewAckCmd(&agentinfo{}, client)
6363

6464
request := AckRequest{
65-
Actions: []string{
66-
action.ID(),
65+
Events: []AckEvent{
66+
AckEvent{
67+
EventType: "ACTION_RESULT",
68+
SubType: "ACKNOWLEDGED",
69+
ActionID: action.ID(),
70+
},
6771
},
6872
}
6973

0 commit comments

Comments
 (0)