Skip to content

Commit 7541561

Browse files
Add CPU profile collection to diagnostics handler (#4394)
Add the optional CPU profile collection to the diagnostic action handler. CPU profiles will be collected if the REQUEST_DIAGNOSTICS action has the optional additional_metrics parameter list contains "CPU".
1 parent 1d6d1c3 commit 7541561

File tree

7 files changed

+177
-15
lines changed

7 files changed

+177
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: enhancement
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Add CPU profile collection to the Fleet diagnostics action handler
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: |
20+
Add the optional CPU profile collection to the diagnostic action handler.
21+
CPU profiles will be collected if the REQUEST_DIAGNOSTICS action has the
22+
optional additional_metrics parameter list contains "CPU".
23+
24+
# Affected component; a word indicating the component this changeset affects.
25+
component:
26+
27+
# PR URL; optional; the PR number that added the changeset.
28+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
29+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
30+
# Please provide it if you are adding a fragment for a different PR.
31+
pr: 4394
32+
33+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
34+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
35+
issue: 3491

internal/pkg/agent/application/actions/handlers/handler_action_diagnostics.go

+48-6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ import (
3131
// In either case the 1st action will succeed and the others will ack with an the error.
3232
var ErrRateLimit = fmt.Errorf("rate limit exceeded")
3333

34+
// getCPUDiag is a wrapper around diagnostics.CreateCPUProfile so it can be replaced in unit-tests.
35+
var getCPUDiag = func(ctx context.Context, d time.Duration) ([]byte, error) {
36+
return diagnostics.CreateCPUProfile(ctx, d)
37+
}
38+
3439
// Uploader is the interface used to upload a diagnostics bundle to fleet-server.
3540
type Uploader interface {
3641
UploadDiagnostics(context.Context, string, string, int64, io.Reader) (string, error)
@@ -122,7 +127,7 @@ func (h *Diagnostics) collectDiag(ctx context.Context, action *fleetapi.ActionDi
122127
}
123128

124129
h.log.Debug("Gathering agent diagnostics.")
125-
aDiag, err := h.runHooks(ctx)
130+
aDiag, err := h.runHooks(ctx, action)
126131
if err != nil {
127132
action.Err = err
128133
h.log.Errorw("diagnostics action handler failed to run diagnostics hooks",
@@ -134,7 +139,7 @@ func (h *Diagnostics) collectDiag(ctx context.Context, action *fleetapi.ActionDi
134139
uDiag := h.diagUnits(ctx)
135140

136141
h.log.Debug("Gathering component diagnostics.")
137-
cDiag := h.diagComponents(ctx)
142+
cDiag := h.diagComponents(ctx, action)
138143

139144
var r io.Reader
140145
// attempt to create the a temporary diagnostics file on disk in order to avoid loading a
@@ -185,9 +190,26 @@ func (h *Diagnostics) collectDiag(ctx context.Context, action *fleetapi.ActionDi
185190
}
186191

187192
// runHooks runs the agent diagnostics hooks.
188-
func (h *Diagnostics) runHooks(ctx context.Context) ([]client.DiagnosticFileResult, error) {
193+
func (h *Diagnostics) runHooks(ctx context.Context, action *fleetapi.ActionDiagnostics) ([]client.DiagnosticFileResult, error) {
189194
hooks := append(h.diagProvider.DiagnosticHooks(), diagnostics.GlobalHooks()...)
190-
diags := make([]client.DiagnosticFileResult, 0, len(hooks))
195+
196+
// Currently CPU is the only additional metric we can collect.
197+
// If this changes we would need to change how we scan AdditionalMetrics.
198+
collectCPU := false
199+
for _, metric := range action.AdditionalMetrics {
200+
if metric == "CPU" {
201+
h.log.Debug("Diagnostics will collect CPU profile.")
202+
collectCPU = true
203+
break
204+
}
205+
}
206+
207+
resultLen := len(hooks)
208+
if collectCPU {
209+
resultLen++
210+
}
211+
diags := make([]client.DiagnosticFileResult, 0, resultLen)
212+
191213
for _, hook := range hooks {
192214
if ctx.Err() != nil {
193215
return diags, ctx.Err()
@@ -205,6 +227,20 @@ func (h *Diagnostics) runHooks(ctx context.Context) ([]client.DiagnosticFileResu
205227
elapsed := time.Since(startTime)
206228
h.log.Debugw(fmt.Sprintf("Hook %s execution complete, took %s", hook.Name, elapsed.String()), "hook", hook.Name, "filename", hook.Filename, "elapsed", elapsed.String())
207229
}
230+
if collectCPU {
231+
p, err := getCPUDiag(ctx, diagnostics.DiagCPUDuration)
232+
if err != nil {
233+
return diags, fmt.Errorf("unable to gather CPU profile: %w", err)
234+
}
235+
diags = append(diags, client.DiagnosticFileResult{
236+
Name: diagnostics.DiagCPUName,
237+
Filename: diagnostics.DiagCPUFilename,
238+
Description: diagnostics.DiagCPUDescription,
239+
ContentType: diagnostics.DiagCPUContentType,
240+
Content: p,
241+
Generated: time.Now().UTC(),
242+
})
243+
}
208244
return diags, nil
209245
}
210246

@@ -246,14 +282,20 @@ func (h *Diagnostics) diagUnits(ctx context.Context) []client.DiagnosticUnitResu
246282
}
247283

248284
// diagUnits gathers diagnostics from components.
249-
func (h *Diagnostics) diagComponents(ctx context.Context) []client.DiagnosticComponentResult {
285+
func (h *Diagnostics) diagComponents(ctx context.Context, action *fleetapi.ActionDiagnostics) []client.DiagnosticComponentResult {
250286
cDiag := make([]client.DiagnosticComponentResult, 0)
251287
h.log.Debug("Performing component diagnostics")
252288
startTime := time.Now()
253289
defer func() {
254290
h.log.Debugf("Component diagnostics complete. Took: %s", time.Since(startTime))
255291
}()
256-
rr, err := h.diagProvider.PerformComponentDiagnostics(ctx, []cproto.AdditionalDiagnosticRequest{})
292+
additionalMetrics := []cproto.AdditionalDiagnosticRequest{}
293+
for _, metric := range action.AdditionalMetrics {
294+
if metric == "CPU" {
295+
additionalMetrics = append(additionalMetrics, cproto.AdditionalDiagnosticRequest_CPU)
296+
}
297+
}
298+
rr, err := h.diagProvider.PerformComponentDiagnostics(ctx, additionalMetrics)
257299
if err != nil {
258300
h.log.Errorf("Error fetching component-level diagnostics: %w", err)
259301
}

internal/pkg/agent/application/actions/handlers/handler_action_diagnostics_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/elastic/elastic-agent-client/v7/pkg/client"
2020
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
21+
"github.com/elastic/elastic-agent/pkg/control/v2/cproto"
2122

2223
"github.com/elastic/elastic-agent/internal/pkg/agent/application/actions/handlers/mocks"
2324
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
@@ -350,3 +351,54 @@ func TestDiagnosticHandlerContexteExpiredErrorWithLogs(t *testing.T) {
350351
1)
351352
// we could assert the logs for the hooks, but those will be the same as the happy path, so for brevity we won't
352353
}
354+
355+
func TestDiagnosticHandlerWithCPUProfile(t *testing.T) {
356+
tempAgentRoot := t.TempDir()
357+
paths.SetTop(tempAgentRoot)
358+
err := os.MkdirAll(path.Join(tempAgentRoot, "data"), 0755)
359+
require.NoError(t, err)
360+
361+
// make a flag to check if a CPU profile is collected.
362+
cpuCalled := false
363+
getCPUDiag = func(_ context.Context, _ time.Duration) ([]byte, error) {
364+
cpuCalled = true
365+
return []byte(`hello, world!`), nil
366+
}
367+
368+
mockDiagProvider := mocks.NewDiagnosticsProvider(t)
369+
mockUploader := mocks.NewUploader(t)
370+
testLogger, _ := logger.NewTesting("diagnostic-handler-test")
371+
handler := NewDiagnostics(testLogger, mockDiagProvider, defaultRateLimit, mockUploader)
372+
373+
mockDiagProvider.EXPECT().DiagnosticHooks().Return([]diagnostics.Hook{hook1})
374+
mockDiagProvider.EXPECT().PerformDiagnostics(mock.Anything, mock.Anything).Return([]runtime.ComponentUnitDiagnostic{mockUnitDiagnostic})
375+
376+
// only match if CPU metrics are requested.
377+
mockDiagProvider.EXPECT().PerformComponentDiagnostics(mock.Anything, mock.MatchedBy(func(additionalMetrics []cproto.AdditionalDiagnosticRequest) bool {
378+
for _, metric := range additionalMetrics {
379+
if metric == cproto.AdditionalDiagnosticRequest_CPU {
380+
return true
381+
}
382+
}
383+
return false
384+
})).Return([]runtime.ComponentDiagnostic{}, nil)
385+
386+
mockAcker := mocks.NewAcker(t)
387+
mockAcker.EXPECT().Ack(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, a fleetapi.Action) error {
388+
require.IsType(t, new(fleetapi.ActionDiagnostics), a)
389+
assert.NoError(t, a.(*fleetapi.ActionDiagnostics).Err)
390+
return nil
391+
})
392+
mockAcker.EXPECT().Commit(mock.Anything).Return(nil)
393+
394+
mockUploader.EXPECT().UploadDiagnostics(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("upload-id", nil)
395+
396+
diagAction := &fleetapi.ActionDiagnostics{
397+
AdditionalMetrics: []string{"CPU"},
398+
}
399+
handler.collectDiag(context.Background(), diagAction, mockAcker)
400+
401+
// Check that CPU profile was collected and passed to PerformComponentDiagnostics
402+
assert.True(t, cpuCalled, "CPU profile collector was not called.")
403+
mockDiagProvider.AssertExpectations(t)
404+
}

internal/pkg/diagnostics/diagnostics.go

+9
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ const (
3737
agentName = "elastic-agent"
3838
)
3939

40+
// DiagCPU* are contstants to describe the CPU profile that is collected when the --cpu-profile flag is used with the diagnostics command, or the diagnostics action contains "CPU" in the additional_metrics list.
41+
const (
42+
DiagCPUName = "cpuprofile"
43+
DiagCPUFilename = "cpu.pprof"
44+
DiagCPUDescription = "CPU profile"
45+
DiagCPUContentType = "application/octet-stream"
46+
DiagCPUDuration = 30 * time.Second
47+
)
48+
4049
// Hook is a hook that gets used when diagnostic information is requested from the Elastic Agent.
4150
type Hook struct {
4251
Name string

internal/pkg/fleetapi/action.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -434,10 +434,11 @@ func (a *ActionCancel) AckEvent() AckEvent {
434434

435435
// ActionDiagnostics is a request to gather and upload a diagnostics bundle.
436436
type ActionDiagnostics struct {
437-
ActionID string `json:"action_id"`
438-
ActionType string `json:"type"`
439-
UploadID string `json:"-"`
440-
Err error `json:"-"`
437+
ActionID string `json:"action_id"`
438+
ActionType string `json:"type"`
439+
AdditionalMetrics []string `json:"additional_metrics"`
440+
UploadID string `json:"-"`
441+
Err error `json:"-"`
441442
}
442443

443444
// ID returns the ID of the action.

internal/pkg/fleetapi/action_test.go

+23
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,29 @@ func TestActionsUnmarshalJSON(t *testing.T) {
152152
assert.Equal(t, "http://example.com", action.SourceURI)
153153
assert.Equal(t, 1, action.Retry)
154154
})
155+
t.Run("ActionDiagnostics with no additional metrics", func(t *testing.T) {
156+
p := []byte(`[{"id":"testid","type":"REQUEST_DIAGNOSTICS","data":{}}]`)
157+
a := &Actions{}
158+
err := a.UnmarshalJSON(p)
159+
require.Nil(t, err)
160+
action, ok := (*a)[0].(*ActionDiagnostics)
161+
require.True(t, ok, "unable to cast action to specific type")
162+
assert.Equal(t, "testid", action.ActionID)
163+
assert.Equal(t, ActionTypeDiagnostics, action.ActionType)
164+
assert.Empty(t, action.AdditionalMetrics)
165+
})
166+
t.Run("ActionDiagnostics with additional CPU metrics", func(t *testing.T) {
167+
p := []byte(`[{"id":"testid","type":"REQUEST_DIAGNOSTICS","data":{"additional_metrics":["CPU"]}}]`)
168+
a := &Actions{}
169+
err := a.UnmarshalJSON(p)
170+
require.Nil(t, err)
171+
action, ok := (*a)[0].(*ActionDiagnostics)
172+
require.True(t, ok, "unable to cast action to specific type")
173+
assert.Equal(t, "testid", action.ActionID)
174+
assert.Equal(t, ActionTypeDiagnostics, action.ActionType)
175+
require.Len(t, action.AdditionalMetrics, 1)
176+
assert.Equal(t, "CPU", action.AdditionalMetrics[0])
177+
})
155178
}
156179

157180
func TestActionUnenrollMarshalMap(t *testing.T) {

pkg/control/v2/server/server.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -205,17 +205,17 @@ func (s *Server) DiagnosticAgent(ctx context.Context, req *cproto.DiagnosticAgen
205205
for _, metric := range req.AdditionalMetrics {
206206
switch metric {
207207
case cproto.AdditionalDiagnosticRequest_CPU:
208-
duration := time.Second * 30
208+
duration := diagnostics.DiagCPUDuration
209209
s.logger.Infof("Collecting CPU metrics, waiting for %s", duration)
210210
cpuResults, err := diagnostics.CreateCPUProfile(ctx, duration)
211211
if err != nil {
212212
return nil, fmt.Errorf("error gathering CPU profile: %w", err)
213213
}
214214
res = append(res, &cproto.DiagnosticFileResult{
215-
Name: "cpuprofile",
216-
Filename: "cpu.pprof",
217-
Description: "CPU profile",
218-
ContentType: "application/octet-stream",
215+
Name: diagnostics.DiagCPUName,
216+
Filename: diagnostics.DiagCPUFilename,
217+
Description: diagnostics.DiagCPUDescription,
218+
ContentType: diagnostics.DiagCPUContentType,
219219
Content: cpuResults,
220220
Generated: timestamppb.New(time.Now().UTC()),
221221
})

0 commit comments

Comments
 (0)