Skip to content

Commit c232532

Browse files
report output health (#3127)
* report output health * updated error messages, added data_stream fields * updated message * added output name to logging * added tests to policy_output * added test to reportOutputHealth in self monitor * added changelog * remove break on healthy to keep self monitor running * Update internal/pkg/policy/policy_output.go Co-authored-by: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> * moved OutputHealth to schema.json --------- Co-authored-by: Michel Laterman <82832767+michel-laterman@users.noreply.github.com>
1 parent 808f9a1 commit c232532

9 files changed

+305
-11
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Report output health state to logs-fleet_server.output_health-default data stream
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component:
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
pr: https://github.com/owner/repo/3127
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/owner/repo/3116

internal/pkg/dl/constants.go

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const (
1616
FleetPolicies = ".fleet-policies"
1717
FleetPoliciesLeader = ".fleet-policies-leader"
1818
FleetServers = ".fleet-servers"
19+
FleetOutputHealth = "logs-fleet_server.output_health-default"
1920
)
2021

2122
// Query fields

internal/pkg/dl/output_health.go

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package dl
6+
7+
import (
8+
"context"
9+
"encoding/json"
10+
"time"
11+
12+
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
13+
"github.com/elastic/fleet-server/v7/internal/pkg/model"
14+
"github.com/gofrs/uuid"
15+
)
16+
17+
func CreateOutputHealth(ctx context.Context, bulker bulk.Bulk, doc model.OutputHealth) error {
18+
return createOutputHealth(ctx, bulker, FleetOutputHealth, doc)
19+
}
20+
21+
func createOutputHealth(ctx context.Context, bulker bulk.Bulk, index string, doc model.OutputHealth) error {
22+
if doc.Timestamp == "" {
23+
doc.Timestamp = time.Now().UTC().Format(time.RFC3339)
24+
}
25+
doc.DataStream = &model.DataStream{
26+
Dataset: "fleet_server.output_health",
27+
Type: "logs",
28+
Namespace: "default",
29+
}
30+
body, err := json.Marshal(doc)
31+
if err != nil {
32+
return err
33+
}
34+
35+
id, err := uuid.NewV4()
36+
if err != nil {
37+
return err
38+
}
39+
_, err = bulker.Create(ctx, index, id.String(), body, bulk.WithRefresh())
40+
return err
41+
}

internal/pkg/model/schema.go

+25
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pkg/policy/policy_output.go

+23-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/rs/zerolog"
1616
"go.elastic.co/apm/v2"
1717

18+
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1819
"github.com/elastic/fleet-server/v7/internal/pkg/apikey"
1920
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
2021
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
@@ -260,17 +261,36 @@ func (p *Output) prepareElasticsearch(
260261
ctx := zlog.WithContext(ctx)
261262
outputAPIKey, err :=
262263
generateOutputAPIKey(ctx, outputBulker, agent.Id, p.Name, p.Role.Raw)
263-
// reporting output error status to self monitor and not returning the error to keep fleet-server running
264+
265+
// reporting output health and not returning the error to keep fleet-server running
264266
if outputAPIKey == nil && p.Type == OutputTypeRemoteElasticsearch {
265267
if err != nil {
266-
zerolog.Ctx(ctx).Warn().Err(err).Msg("Could not create API key in remote ES")
268+
doc := model.OutputHealth{
269+
Output: p.Name,
270+
State: client.UnitStateDegraded.String(),
271+
Message: fmt.Sprintf("remote ES could not create API key due to error: %v", err),
272+
}
273+
zerolog.Ctx(ctx).Warn().Err(err).Str("outputName", p.Name).Msg(doc.Message)
274+
275+
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
276+
zlog.Error().Err(err).Str("outputName", p.Name).Msg("error writing output health")
277+
}
267278
}
268279

269280
// replace type remote_elasticsearch with elasticsearch as agent doesn't recognize remote_elasticsearch
270281
outputMap[p.Name][FieldOutputType] = OutputTypeElasticsearch
271282
// remove the service token from the agent policy sent to the agent
272283
delete(outputMap[p.Name], FieldOutputServiceToken)
273284
return nil
285+
} else if p.Type == OutputTypeRemoteElasticsearch {
286+
doc := model.OutputHealth{
287+
Output: p.Name,
288+
State: client.UnitStateHealthy.String(),
289+
Message: "",
290+
}
291+
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
292+
zlog.Error().Err(err).Msg("create output health")
293+
}
274294
}
275295
if err != nil {
276296
return fmt.Errorf("failed generate output API key: %w", err)
@@ -304,7 +324,7 @@ func (p *Output) prepareElasticsearch(
304324
// Using painless script to append the old keys to the history
305325
body, err := renderUpdatePainlessScript(p.Name, fields)
306326
if err != nil {
307-
return fmt.Errorf("could no tupdate painless script: %w", err)
327+
return fmt.Errorf("could not update painless script: %w", err)
308328
}
309329

310330
if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {

internal/pkg/policy/policy_output_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@ package policy
88

99
import (
1010
"context"
11+
"encoding/json"
12+
"errors"
1113
"testing"
1214

1315
"github.com/stretchr/testify/assert"
1416
"github.com/stretchr/testify/mock"
1517
"github.com/stretchr/testify/require"
1618

19+
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1720
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
21+
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
1822
"github.com/elastic/fleet-server/v7/internal/pkg/model"
1923
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
2024
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
@@ -462,6 +466,14 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) {
462466
mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
463467
Return(&apiKey, nil).Once()
464468
bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once()
469+
bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool {
470+
var doc model.OutputHealth
471+
err := json.Unmarshal(body, &doc)
472+
if err != nil {
473+
t.Fatal(err)
474+
}
475+
return doc.Message == "" && doc.State == client.UnitStateHealthy.String()
476+
}), mock.Anything).Return("", nil)
465477

466478
output := Output{
467479
Type: OutputTypeRemoteElasticsearch,
@@ -500,4 +512,48 @@ func TestPolicyRemoteESOutputPrepare(t *testing.T) {
500512

501513
bulker.AssertExpectations(t)
502514
})
515+
516+
t.Run("Report degraded output health on API key create failure", func(t *testing.T) {
517+
logger := testlog.SetLogger(t)
518+
bulker := ftesting.NewMockBulk()
519+
var apiKey *bulk.APIKey = nil
520+
var err error = errors.New("error connecting")
521+
522+
outputBulker := ftesting.NewMockBulk()
523+
outputBulker.On("APIKeyCreate",
524+
mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
525+
Return(apiKey, err).Once()
526+
bulker.On("CreateAndGetBulker", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(outputBulker, false).Once()
527+
bulker.On("Create", mock.Anything, dl.FleetOutputHealth, mock.Anything, mock.MatchedBy(func(body []byte) bool {
528+
var doc model.OutputHealth
529+
err := json.Unmarshal(body, &doc)
530+
if err != nil {
531+
t.Fatal(err)
532+
}
533+
return doc.Message == "remote ES could not create API key due to error: error connecting" && doc.State == client.UnitStateDegraded.String()
534+
}), mock.Anything).Return("", nil)
535+
536+
output := Output{
537+
Type: OutputTypeRemoteElasticsearch,
538+
Name: "test output",
539+
Role: &RoleT{
540+
Sha2: "new-hash",
541+
Raw: TestPayload,
542+
},
543+
}
544+
545+
policyMap := map[string]map[string]interface{}{
546+
"test output": map[string]interface{}{
547+
"hosts": []interface{}{"http://localhost"},
548+
"service_token": "serviceToken1",
549+
"type": OutputTypeRemoteElasticsearch,
550+
},
551+
}
552+
testAgent := &model.Agent{Outputs: map[string]*model.PolicyOutput{}}
553+
554+
err = output.Prepare(context.Background(), logger, bulker, testAgent, policyMap)
555+
require.NoError(t, err, "expected prepare to pass")
556+
557+
bulker.AssertExpectations(t)
558+
})
503559
}

internal/pkg/policy/self.go

+30-8
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,7 @@ LOOP:
109109
return err
110110
}
111111
cT.Reset(m.checkTime)
112-
if state == client.UnitStateHealthy {
113-
// running; can stop
114-
break LOOP
115-
}
112+
m.log.Trace().Msg(state.String())
116113
case hits := <-s.Output():
117114
policies := make([]model.Policy, len(hits))
118115
for i, hit := range hits {
@@ -125,10 +122,7 @@ LOOP:
125122
if err != nil {
126123
return err
127124
}
128-
if state == client.UnitStateHealthy {
129-
// running; can stop
130-
break LOOP
131-
}
125+
m.log.Trace().Msg(state.String())
132126
}
133127
}
134128

@@ -218,6 +212,8 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
218212
return client.UnitStateStarting, nil
219213
}
220214

215+
reportOutputHealth(ctx, m.bulker, m.log)
216+
221217
state := client.UnitStateHealthy
222218
extendMsg := ""
223219
var payload map[string]interface{}
@@ -253,6 +249,32 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
253249
return state, nil
254250
}
255251

252+
func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, logger zerolog.Logger) {
253+
//pinging logic
254+
bulkerMap := bulker.GetBulkerMap()
255+
for outputName, outputBulker := range bulkerMap {
256+
doc := model.OutputHealth{
257+
Output: outputName,
258+
State: client.UnitStateHealthy.String(),
259+
Message: "",
260+
}
261+
res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx))
262+
if err != nil {
263+
doc.State = client.UnitStateDegraded.String()
264+
doc.Message = fmt.Sprintf("remote ES is not reachable due to error: %s", err.Error())
265+
logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message)
266+
267+
} else if res.StatusCode != 200 {
268+
doc.State = client.UnitStateDegraded.String()
269+
doc.Message = fmt.Sprintf("remote ES is not reachable due to unexpected status code %d", res.StatusCode)
270+
logger.Error().Err(err).Str("outputName", outputName).Msg(doc.Message)
271+
}
272+
if err := dl.CreateOutputHealth(ctx, bulker, doc); err != nil {
273+
logger.Error().Err(err).Str("outputName", outputName).Msg("error writing output health")
274+
}
275+
}
276+
}
277+
256278
func HasFleetServerInput(inputs []map[string]interface{}) bool {
257279
for _, input := range inputs {
258280
attr, ok := input["type"].(string)

0 commit comments

Comments
 (0)