Skip to content

Commit 67b26e5

Browse files
On week improvements (#4156)
Add some comments/simplify the fleet-server startup process
1 parent 9fa4c04 commit 67b26e5

File tree

3 files changed

+50
-43
lines changed

3 files changed

+50
-43
lines changed

internal/pkg/checkin/bulk.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ import (
1818
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
1919
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
2020

21+
"github.com/rs/zerolog"
22+
2123
estypes "github.com/elastic/go-elasticsearch/v8/typedapi/types"
2224
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/scriptlanguage"
23-
"github.com/rs/zerolog"
2425
)
2526

2627
const defaultFlushInterval = 10 * time.Second
@@ -142,26 +143,20 @@ func (bc *Bulk) CheckIn(id string, status string, message string, meta []byte, c
142143

143144
// Run starts the flush timer and exit only when the context is cancelled.
144145
func (bc *Bulk) Run(ctx context.Context) error {
145-
146146
tick := time.NewTicker(bc.opts.flushInterval)
147147
defer tick.Stop()
148148

149-
var err error
150-
LOOP:
151149
for {
152150
select {
153151
case <-tick.C:
154-
if err = bc.flush(ctx); err != nil {
152+
if err := bc.flush(ctx); err != nil {
155153
zerolog.Ctx(ctx).Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'")
156154
}
157155

158156
case <-ctx.Done():
159-
err = ctx.Err()
160-
break LOOP
157+
return ctx.Err()
161158
}
162159
}
163-
164-
return err
165160
}
166161

167162
// flush sends the minium data needed to update records in elasticsearch.

internal/pkg/server/agent.go

+23-21
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,23 @@ import (
1111
"io"
1212
"strconv"
1313
"strings"
14-
"sync"
1514
"time"
1615

16+
"github.com/rs/zerolog"
17+
1718
"github.com/elastic/fleet-server/v7/internal/pkg/build"
1819
"github.com/elastic/fleet-server/v7/internal/pkg/config"
1920
"github.com/elastic/fleet-server/v7/internal/pkg/es"
2021
"github.com/elastic/fleet-server/v7/internal/pkg/reload"
2122
"github.com/elastic/fleet-server/v7/internal/pkg/sleep"
2223
"github.com/elastic/fleet-server/v7/internal/pkg/state"
2324
"github.com/elastic/fleet-server/v7/internal/pkg/ver"
24-
"github.com/rs/zerolog"
25+
26+
"gopkg.in/yaml.v3"
2527

2628
"github.com/elastic/elastic-agent-client/v7/pkg/client"
2729
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
2830
"github.com/elastic/go-ucfg"
29-
"gopkg.in/yaml.v3"
3031
)
3132

3233
const (
@@ -65,7 +66,7 @@ type Agent struct {
6566
srv *Fleet
6667
srvCtx context.Context
6768
srvCanceller context.CancelFunc
68-
srvDone chan bool
69+
srvDone chan struct{}
6970

7071
outputCheckCanceller context.CancelFunc
7172
chReconfigure chan struct{}
@@ -97,6 +98,7 @@ func NewAgent(cliCfg *ucfg.Config, reader io.Reader, bi build.Info, reloadables
9798

9899
// Run starts a Server instance using config from the configured client.
99100
func (a *Agent) Run(ctx context.Context) error {
101+
// ctx is cancelled when a SIGTERM or SIGINT is received.
100102
log := zerolog.Ctx(ctx)
101103
a.agent.RegisterDiagnosticHook("fleet-server config", "fleet-server's current configuration", "fleet-server.yml", "application/yml", func() []byte {
102104
if a.srv == nil {
@@ -150,24 +152,21 @@ func (a *Agent) Run(ctx context.Context) error {
150152
log.Warn().Msg("Diagnostics hook failure config is nil.")
151153
return []byte(`Diagnostics hook failure config is nil`)
152154
}
153-
ctx, cancel := context.WithTimeout(ctx, time.Second*30) // TODO(michel-laterman): duration/timeout should be part of the diagnostics action from fleet-server (https://github.com/elastic/fleet-server/issues/3648) and the control protocol (https://github.com/elastic/elastic-agent-client/issues/113)
155+
ctx, cancel := context.WithTimeout(ctx, time.Second*30) // diag specific context, has a timeout // TODO(michel-laterman): duration/timeout should be part of the diagnostics action from fleet-server (https://github.com/elastic/fleet-server/issues/3648) and the control protocol (https://github.com/elastic/elastic-agent-client/issues/113)
154156
defer cancel()
155157
return cfg.Output.Elasticsearch.DiagRequests(ctx)
156158
})
157159

158-
subCtx, subCanceller := context.WithCancel(ctx)
159-
defer subCanceller()
160-
161-
var wg sync.WaitGroup
162-
wg.Add(1)
160+
// doneCh is used to track when agent wrapper run loop returns
161+
doneCh := make(chan struct{})
163162
go func() {
164-
defer wg.Done()
163+
defer close(doneCh)
165164

166165
t := time.NewTicker(1 * time.Second)
167166
defer t.Stop()
168167
for {
169168
select {
170-
case <-subCtx.Done():
169+
case <-ctx.Done():
171170
return
172171
case err := <-a.agent.Errors():
173172
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) {
@@ -176,13 +175,13 @@ func (a *Agent) Run(ctx context.Context) error {
176175
case change := <-a.agent.UnitChanges():
177176
switch change.Type {
178177
case client.UnitChangedAdded:
179-
err := a.unitAdded(subCtx, change.Unit)
178+
err := a.unitAdded(ctx, change.Unit)
180179
if err != nil {
181180
log.Error().Str("unit", change.Unit.ID()).Err(err)
182181
_ = change.Unit.UpdateState(client.UnitStateFailed, err.Error(), nil)
183182
}
184183
case client.UnitChangedModified:
185-
err := a.unitModified(subCtx, change.Unit)
184+
err := a.unitModified(ctx, change.Unit)
186185
if err != nil {
187186
log.Error().Str("unit", change.Unit.ID()).Err(err)
188187
_ = change.Unit.UpdateState(client.UnitStateFailed, err.Error(), nil)
@@ -202,7 +201,7 @@ func (a *Agent) Run(ctx context.Context) error {
202201
if agentInfo != nil && agentInfo.ID != "" {
203202
// Agent ID is not set for the component.
204203
t.Stop()
205-
err := a.reconfigure(subCtx)
204+
err := a.reconfigure(ctx)
206205
if err != nil && !errors.Is(err, context.Canceled) {
207206
log.Error().Err(err).Msg("Bootstrap error when reconfiguring")
208207
}
@@ -212,13 +211,13 @@ func (a *Agent) Run(ctx context.Context) error {
212211
}()
213212

214213
log.Info().Msg("starting communication connection back to Elastic Agent")
215-
err := a.agent.Start(subCtx)
216-
if err != nil {
214+
err := a.agent.Start(ctx)
215+
if err != nil && !errors.Is(err, context.Canceled) {
217216
return err
218217
}
219218

220-
<-subCtx.Done()
221-
wg.Wait()
219+
<-ctx.Done() // wait for a termination signal
220+
<-doneCh // wait for agent wrapper goroutine to terminate
222221

223222
return nil
224223
}
@@ -355,7 +354,7 @@ func (a *Agent) start(ctx context.Context) error {
355354
}
356355
}
357356

358-
srvDone := make(chan bool)
357+
srvDone := make(chan struct{})
359358
srvCtx, srvCanceller := context.WithCancel(ctx)
360359
srv, err := NewFleet(a.bi, state.NewChained(state.NewLog(), a), false)
361360
if err != nil {
@@ -394,6 +393,7 @@ func (a *Agent) reconfigure(ctx context.Context) error {
394393
}
395394

396395
// reload the generic reloadables
396+
// Currently logger is the only reloadable
397397
for _, r := range a.reloadables {
398398
err = r.Reload(ctx, cfg)
399399
if err != nil {
@@ -405,6 +405,7 @@ func (a *Agent) reconfigure(ctx context.Context) error {
405405
}
406406

407407
func (a *Agent) stop() {
408+
// stop is called when expected config state indicates an input or output should stop
408409
if a.srvCanceller == nil {
409410
return
410411
}
@@ -418,7 +419,7 @@ func (a *Agent) stop() {
418419
a.srvCtx = nil
419420
a.srv = nil
420421
canceller()
421-
<-a.srvDone
422+
<-a.srvDone // wait for srv.Run loop to terminate either because root-context received a signal, or stop has been called
422423
a.srvDone = nil
423424

424425
if a.inputUnit != nil {
@@ -460,6 +461,7 @@ func (a *Agent) configFromUnits(ctx context.Context) (*config.Config, error) {
460461

461462
// elastic-agent should be setting bootstrap with config provided through enrollment flags
462463
if bootstrapCfg, ok := outMap["bootstrap"]; ok {
464+
// Check if an output check loop is running, cancel if it is.
463465
if a.outputCheckCanceller != nil {
464466
a.outputCheckCanceller()
465467
a.outputCheckCanceller = nil

internal/pkg/server/fleet.go

+23-13
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,14 @@ func (f *Fleet) GetConfig() *config.Config {
8686

8787
// Run runs the fleet server
8888
func (f *Fleet) Run(ctx context.Context, initCfg *config.Config) error {
89+
// ctx is cancelled when a SIGTERM or SIGINT is received or when the agent wrappers calls stop because a unit is being stopped/removed.
90+
91+
// Replace context with cancellable ctx
92+
// in order to automatically cancel all the go routines
93+
// that were started in the scope of this function on function exit
94+
ctx, cn := context.WithCancel(ctx)
95+
defer cn()
96+
8997
log := zerolog.Ctx(ctx)
9098
err := initCfg.LoadServerLimits()
9199
if err != nil {
@@ -102,12 +110,6 @@ func (f *Fleet) Run(ctx context.Context, initCfg *config.Config) error {
102110
var curCfg *config.Config
103111
newCfg := initCfg
104112

105-
// Replace context with cancellable ctx
106-
// in order to automatically cancel all the go routines
107-
// that were started in the scope of this function on function exit
108-
ctx, cn := context.WithCancel(ctx)
109-
defer cn()
110-
111113
stop := func(cn context.CancelFunc, g *errgroup.Group) {
112114
if cn != nil {
113115
cn()
@@ -320,7 +322,6 @@ func initRuntime(cfg *config.Config) {
320322
gcPercent := cfg.Inputs[0].Server.Runtime.GCPercent
321323
if gcPercent != 0 {
322324
old := debug.SetGCPercent(gcPercent)
323-
324325
zerolog.Ctx(context.TODO()).Info().
325326
Int("old", old).
326327
Int("new", gcPercent).
@@ -329,7 +330,6 @@ func initRuntime(cfg *config.Config) {
329330
memoryLimit := cfg.Inputs[0].Server.Runtime.MemoryLimit
330331
if memoryLimit != 0 {
331332
old := debug.SetMemoryLimit(memoryLimit)
332-
333333
zerolog.Ctx(context.TODO()).Info().
334334
Int64("old", old).
335335
Int64("new", memoryLimit).
@@ -433,6 +433,19 @@ func (f *Fleet) runServer(ctx context.Context, cfg *config.Config) (err error) {
433433
return g.Wait()
434434
}
435435

436+
// runSubsystems starts all other subsystems for fleet-server
437+
// we assume bulker.Run is called in another goroutine, it's ctx is not the same ctx passed into runSubsystems and used with the passed errgroup.
438+
// however if the bulker returns an error, the passed errgroup is canceled.
439+
// runSubsystems will also do an ES version check and run migrations if started in agent-mode
440+
// The started subsystems are:
441+
// - Elasticsearch GC - cleanup expired fleet actions
442+
// - Policy Index Monitor - track new documents in the .fleet-policies index
443+
// - Policy Monitor - parse .fleet-policies docuuments into usable policies
444+
// - Policy Self Monitor - report fleet-server health status based on .fleet-policies index
445+
// - Action Monitor - track new documents in the .fleet-actions index
446+
// - Action Dispatcher - send actions from the .fleet-actions index to agents that check in
447+
// - Bulk Checkin handler - batches agent checkin messages to _bulk endpoint, minimizes changed attributes
448+
// - HTTP APIs - start http server on 8220 (default) for external agents, and on 8221 (default) for managing agent in agent-mode or local communications.
436449
func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgroup.Group, bulker bulk.Bulk, tracer *apm.Tracer) (err error) {
437450
esCli := bulker.Client()
438451

@@ -448,12 +461,9 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro
448461
}
449462
return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err)
450463
}
451-
}
452464

453-
// Migrations are not executed in standalone mode. When needed, they will be executed
454-
// by some external process.
455-
if !f.standAlone {
456-
// Run migrations
465+
// Migrations are not executed in standalone mode. When needed, they will be executed
466+
// by some external process.
457467
loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
458468
return dl.Migrate(ctx, bulker)
459469
})

0 commit comments

Comments
 (0)