Skip to content

Commit c588a73

Browse files
Status subcommand reporting agent status for otel mode - Phase 2 (#4047)
* Status subcommand reporting agent status for otel mode * changed error handling * changed error handling * check status works and returns healthy in otel e2e test * lint
1 parent 91c0a94 commit c588a73

File tree

8 files changed

+120
-15
lines changed

8 files changed

+120
-15
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,4 @@ fleet.yml.lock
5858
fleet.yml.old
5959
pkg/component/fake/component/component
6060
pkg/component/fake/shipper/shipper
61+
internal/pkg/agent/install/testblocking/testblocking

internal/pkg/agent/application/application.go

+5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
2929
"github.com/elastic/elastic-agent/internal/pkg/composable"
3030
"github.com/elastic/elastic-agent/internal/pkg/config"
31+
"github.com/elastic/elastic-agent/internal/pkg/otel"
3132
"github.com/elastic/elastic-agent/internal/pkg/release"
3233
"github.com/elastic/elastic-agent/pkg/component"
3334
"github.com/elastic/elastic-agent/pkg/component/runtime"
@@ -46,6 +47,7 @@ func New(
4647
testingMode bool,
4748
fleetInitTimeout time.Duration,
4849
disableMonitoring bool,
50+
runAsOtel bool,
4951
modifiers ...component.PlatformModifier,
5052
) (*coordinator.Coordinator, coordinator.ConfigManager, composable.Controller, error) {
5153

@@ -144,6 +146,9 @@ func New(
144146
log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period)
145147
configMgr = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader)
146148
}
149+
} else if runAsOtel {
150+
// ignoring configuration in elastic-agent.yml
151+
configMgr = otel.NewOtelModeConfigManager()
147152
} else {
148153
isManaged = true
149154
var store storage.Store

internal/pkg/agent/application/application_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func TestLimitsLog(t *testing.T) {
6363
true, // testingMode
6464
time.Millisecond, // fleetInitTimeout
6565
true, // disable monitoring
66+
false, // not otel mode
6667
)
6768
require.NoError(t, err)
6869

internal/pkg/agent/cmd/run.go

+40-9
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ import (
1212
"os/signal"
1313
"path/filepath"
1414
"strings"
15+
"sync"
1516
"syscall"
1617
"time"
1718

1819
"go.elastic.co/apm"
1920
apmtransport "go.elastic.co/apm/transport"
2021
"gopkg.in/yaml.v2"
2122

23+
"github.com/hashicorp/go-multierror"
2224
"github.com/spf13/cobra"
2325

2426
"github.com/elastic/elastic-agent-libs/api"
@@ -61,6 +63,7 @@ const (
6163
)
6264

6365
type cfgOverrider func(cfg *configuration.Configuration)
66+
type awaiters []<-chan struct{}
6467

6568
func newRunCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command {
6669
cmd := &cobra.Command{
@@ -136,17 +139,38 @@ func run(override cfgOverrider, testingMode bool, fleetInitTimeout time.Duration
136139
go service.ProcessWindowsControlEvents(stopBeat)
137140

138141
// detect otel
139-
if runAsOtel := otel.IsOtelConfig(ctx, paths.ConfigFile()); runAsOtel {
140-
return otel.Run(ctx, cancel, stop, testingMode)
142+
runAsOtel := otel.IsOtelConfig(ctx, paths.ConfigFile())
143+
var awaiters awaiters
144+
var resErr error
145+
if runAsOtel {
146+
var otelStartWg sync.WaitGroup
147+
otelAwaiter := make(chan struct{})
148+
awaiters = append(awaiters, otelAwaiter)
149+
150+
otelStartWg.Add(1)
151+
go func() {
152+
otelStartWg.Done()
153+
if err := otel.Run(ctx, stop); err != nil {
154+
resErr = multierror.Append(resErr, err)
155+
}
156+
157+
// close awaiter handled in run loop
158+
close(otelAwaiter)
159+
}()
160+
161+
// wait for otel to start
162+
otelStartWg.Wait()
141163
}
142164

143-
// not otel continue as usual
144-
return runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, modifiers...)
165+
if err := runElasticAgent(ctx, cancel, override, stop, testingMode, fleetInitTimeout, runAsOtel, awaiters, modifiers...); err != nil {
166+
resErr = multierror.Append(resErr, err)
167+
}
145168

169+
return resErr
146170
}
147171

148-
func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, modifiers ...component.PlatformModifier) error {
149-
cfg, err := loadConfig(ctx, override)
172+
func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cfgOverrider, stop chan bool, testingMode bool, fleetInitTimeout time.Duration, runAsOtel bool, awaiters awaiters, modifiers ...component.PlatformModifier) error {
173+
cfg, err := loadConfig(ctx, override, runAsOtel)
150174
if err != nil {
151175
return err
152176
}
@@ -259,7 +283,7 @@ func runElasticAgent(ctx context.Context, cancel context.CancelFunc, override cf
259283
l.Info("APM instrumentation disabled")
260284
}
261285

262-
coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
286+
coord, configMgr, composable, err := application.New(ctx, l, baseLogger, logLvl, agentInfo, rex, tracer, testingMode, fleetInitTimeout, configuration.IsFleetServerBootstrap(cfg.Fleet), runAsOtel, modifiers...)
263287
if err != nil {
264288
return err
265289
}
@@ -366,6 +390,9 @@ LOOP:
366390
}
367391
cancel()
368392
err = <-appErr
393+
for _, a := range awaiters {
394+
<-a // wait for awaiter to be done
395+
}
369396

370397
if logShutdown {
371398
l.Info("Shutting down completed.")
@@ -376,7 +403,11 @@ LOOP:
376403
return err
377404
}
378405

379-
func loadConfig(ctx context.Context, override cfgOverrider) (*configuration.Configuration, error) {
406+
func loadConfig(ctx context.Context, override cfgOverrider, runAsOtel bool) (*configuration.Configuration, error) {
407+
if runAsOtel {
408+
return configuration.DefaultConfiguration(), nil
409+
}
410+
380411
pathConfigFile := paths.ConfigFile()
381412
rawConfig, err := config.LoadFile(pathConfigFile)
382413
if err != nil {
@@ -527,7 +558,7 @@ func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configurati
527558
errors.M("path", enrollPath)))
528559
}
529560
logger.Info("Successfully performed delayed enrollment of this Elastic Agent.")
530-
return loadConfig(ctx, override)
561+
return loadConfig(ctx, override, false)
531562
}
532563

533564
func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig) (*apm.Tracer, error) {

internal/pkg/otel/config_manager.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package otel
6+
7+
import (
8+
"context"
9+
10+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
11+
)
12+
13+
// OtelModeConfigManager serves as a config manager for OTel use cases
14+
// In this case agent should ignore all configuration coming from elastic-agent.yml file
15+
// or other sources.
16+
type OtelModeConfigManager struct {
17+
ch chan coordinator.ConfigChange
18+
errCh chan error
19+
}
20+
21+
// NewOtelModeConfigManager creates new OtelModeConfigManager ignoring
22+
// configuration coming from other sources.
23+
func NewOtelModeConfigManager() *OtelModeConfigManager {
24+
return &OtelModeConfigManager{
25+
ch: make(chan coordinator.ConfigChange),
26+
errCh: make(chan error),
27+
}
28+
}
29+
30+
func (t *OtelModeConfigManager) Run(ctx context.Context) error {
31+
<-ctx.Done()
32+
return ctx.Err()
33+
}
34+
35+
func (t *OtelModeConfigManager) Errors() <-chan error {
36+
return t.errCh
37+
}
38+
39+
// ActionErrors returns the error channel for actions.
40+
// Returns nil channel.
41+
func (t *OtelModeConfigManager) ActionErrors() <-chan error {
42+
return nil
43+
}
44+
45+
func (t *OtelModeConfigManager) Watch() <-chan coordinator.ConfigChange {
46+
return t.ch
47+
}

internal/pkg/otel/run.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func IsOtelConfig(ctx context.Context, pathConfigFile string) bool {
4343
return false
4444
}
4545

46-
func Run(ctx context.Context, cancel context.CancelFunc, stop chan bool, testingMode bool) error {
46+
func Run(ctx context.Context, stop chan bool) error {
4747
fmt.Fprintln(os.Stdout, "Starting in otel mode")
4848
settings, err := newSettings(paths.ConfigFile(), release.Version())
4949
if err != nil {

pkg/testing/fixture.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ func RunProcess(t *testing.T,
445445
// when `Run` is called.
446446
//
447447
// if shouldWatchState is set to false, communicating state does not happen.
448-
func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, states ...State) error {
448+
func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, enableTestingMode bool, states ...State) error {
449449
if _, deadlineSet := ctx.Deadline(); !deadlineSet {
450450
f.t.Fatal("Context passed to Fixture.Run() has no deadline set.")
451451
}
@@ -491,7 +491,10 @@ func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, stat
491491
stdOut := newLogWatcher(logProxy)
492492
stdErr := newLogWatcher(logProxy)
493493

494-
args := []string{"run", "-e", "--disable-encrypted-store", "--testing-mode"}
494+
args := []string{"run", "-e", "--disable-encrypted-store"}
495+
if enableTestingMode {
496+
args = append(args, "--testing-mode")
497+
}
495498

496499
args = append(args, f.additionalArgs...)
497500

@@ -601,7 +604,7 @@ func (f *Fixture) RunWithClient(ctx context.Context, shouldWatchState bool, stat
601604
// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored
602605
// when `Run` is called.
603606
func (f *Fixture) Run(ctx context.Context, states ...State) error {
604-
return f.RunWithClient(ctx, true, states...)
607+
return f.RunWithClient(ctx, true, true, states...)
605608
}
606609

607610
// Exec provides a way of performing subcommand on the prepared Elastic Agent binary.

testing/integration/otel_test.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,31 @@ func TestOtelFileProcessing(t *testing.T) {
124124
fixtureWg.Add(1)
125125
go func() {
126126
defer fixtureWg.Done()
127-
err = fixture.RunWithClient(ctx, false)
127+
err = fixture.RunWithClient(ctx, false, false)
128128
}()
129129

130130
var content []byte
131131
watchLines := linesTrackMap([]string{
132132
`"stringValue":"syslog"`, // syslog is being processed
133133
`"stringValue":"system.log"`, // system.log is being processed
134134
})
135+
136+
// check `elastic-agent status` returns successfully
137+
require.Eventuallyf(t, func() bool {
138+
// This will return errors until it connects to the agent,
139+
// they're mostly noise because until the agent starts running
140+
// we will get connection errors. If the test fails
141+
// the agent logs will be present in the error message
142+
// which should help to explain why the agent was not
143+
// healthy.
144+
err := fixture.IsHealthy(ctx)
145+
return err == nil
146+
},
147+
2*time.Minute, time.Second,
148+
"Elastic-Agent did not report healthy. Agent status error: \"%v\"",
149+
err,
150+
)
151+
135152
require.Eventually(t,
136153
func() bool {
137154
// verify file exists
@@ -240,7 +257,7 @@ func TestOtelAPMIngestion(t *testing.T) {
240257
var fixtureWg sync.WaitGroup
241258
fixtureWg.Add(1)
242259
go func() {
243-
fixture.RunWithClient(ctx, false)
260+
fixture.RunWithClient(ctx, false, false)
244261
fixtureWg.Done()
245262
}()
246263

0 commit comments

Comments
 (0)