Skip to content

Commit

Permalink
Relocate libbeat output & instrumentation (#9247)
Browse files Browse the repository at this point in the history
* Move GOMAXPROCS adjustment to beatcmd

This is something that should run just once for
the lifetime of the process. I'm intending to
move the reload logic out of beater and into
beatcmd (or a subpackage).

* Move libbeat output and instrumentation to beater

Move initialisation of libbeat output (publisher/pipeline)
and instrumentation (APM tracer) to package beater. Some of
the complexity around reloading is reduced as now have full
control over creating the libbeat output. Similarly, we can
reduce complexity related to the "tracer server" by running
it per reloaded server.
  • Loading branch information
axw authored Oct 12, 2022
1 parent 1749bd7 commit 681525b
Show file tree
Hide file tree
Showing 14 changed files with 662 additions and 830 deletions.
84 changes: 11 additions & 73 deletions internal/beatcmd/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/licenser"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/report"
"github.com/elastic/beats/v7/libbeat/monitoring/report/log"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/file"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -67,10 +64,6 @@ const (
defaultMonitoringUsername = "apm_system"
)

var (
libbeatMetricsRegistry = monitoring.Default.GetRegistry("libbeat")
)

// Beat provides the runnable and configurable instance of a beat.
type Beat struct {
beat.Beat
Expand Down Expand Up @@ -130,23 +123,14 @@ func NewBeat(args BeatParams) (*Beat, error) {
return b, nil
}

// init initializes logging, tracing ("instrumentation"), monitoring, config
// management, and GOMAXPROCS.
// init initializes logging, config management, GOMAXPROCS, and GC percent.
func (b *Beat) init() error {
if err := configure.Logging(b.Info.Beat, b.Config.Logging); err != nil {
return fmt.Errorf("error initializing logging: %w", err)
}
// log paths values to help with troubleshooting
logp.Info(paths.Paths.String())

// instrumentation.New expects a config object with "instrumentation"
// as a child, so create a new config with instrumentation added.
instrumentation, err := instrumentation.New(b.rawConfig, b.Info.Beat, b.Info.Version)
if err != nil {
return err
}
b.Instrumentation = instrumentation

// Load the unique ID and "first start" info from meta.json.
metaPath := paths.Resolve(paths.Data, "meta.json")
if err := b.loadMeta(metaPath); err != nil {
Expand All @@ -155,10 +139,11 @@ func (b *Beat) init() error {
logp.Info("Beat ID: %v", b.Info.ID)

// Initialize central config manager.
b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
manager, err := management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
if err != nil {
return err
}
b.Manager = manager

if maxProcs := b.Config.MaxProcs; maxProcs > 0 {
logp.Info("Set max procs limit: %v", maxProcs)
Expand Down Expand Up @@ -283,22 +268,6 @@ func (b *Beat) createBeater(beatCreator beat.Creator) (beat.Beater, error) {
logp.Info("output is configured through central management")
}

monitors := pipeline.Monitors{
Metrics: libbeatMetricsRegistry,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
outputFactory := b.makeOutputFactory(b.Config.Output)
publisher, err := pipeline.Load(b.Info, monitors, pipeline.Config{}, nopProcessingSupporter{}, outputFactory)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}
b.Publisher = publisher

// TODO(axw) pass registry into BeatParams, for testing purposes.
reload.Register.MustRegister("output", b.makeOutputReloader(publisher.OutputReloader()))

return beatCreator(&b.Beat, b.Config.APMServer)
}

Expand Down Expand Up @@ -382,14 +351,21 @@ func (b *Beat) Run(ctx context.Context) error {
}
}

logger := logp.NewLogger("")
done := make(chan struct{})
go func() {
defer close(done)
adjustMaxProcs(ctx, 30*time.Second, diffInfof(logger), logger.Errorf)
}()
defer func() { <-done }()

beater, err := b.createBeater(b.create)
if err != nil {
return err
}

go func() {
<-ctx.Done()
b.Instrumentation.Tracer().Close()
beater.Stop()
}()
svc.HandleSignals(cancel, cancel)
Expand Down Expand Up @@ -480,34 +456,6 @@ func (b *Beat) registerElasticsearchVersionCheck() (func(), error) {
return func() { elasticsearch.DeregisterGlobalCallback(uuid) }, nil
}

func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable {
return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error {
if b.OutputConfigReloader != nil {
if err := b.OutputConfigReloader.Reload(config); err != nil {
return err
}
}
return outReloader.Reload(config, b.createOutput)
})
}

func (b *Beat) makeOutputFactory(
cfg config.Namespace,
) func(outputs.Observer) (string, outputs.Group, error) {
return func(outStats outputs.Observer) (string, outputs.Group, error) {
out, err := b.createOutput(outStats, cfg)
return cfg.Name(), out, err
}
}

func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outputs.Group, error) {
if !cfg.IsSet() {
return outputs.Group{}, nil
}
indexSupporter := newSupporter(nil, b.Info, b.rawConfig)
return outputs.Load(indexSupporter, b.Info, stats, cfg.Name(), cfg.Config())
}

func (b *Beat) registerClusterUUIDFetching() (func(), error) {
callback := b.clusterUUIDFetchingCallback()
uuid, err := elasticsearch.RegisterConnectCallback(callback)
Expand Down Expand Up @@ -650,13 +598,3 @@ func logSystemInfo(info beat.Info) {
}
}
}

type nopProcessingSupporter struct{}

func (nopProcessingSupporter) Close() error {
return nil
}

func (nopProcessingSupporter) Create(cfg beat.ProcessingConfig, _ bool) (beat.Processor, error) {
return cfg.Processor, nil
}
130 changes: 130 additions & 0 deletions internal/beatcmd/beat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beatcmd

import (
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

// TestRunMaxProcs ensures Beat.Run calls the GOMAXPROCS adjustment code by looking for log messages.
func TestRunMaxProcs(t *testing.T) {
for _, n := range []int{1, 2, 4} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", n), func(t *testing.T) {
t.Setenv("GOMAXPROCS", strconv.Itoa(n))
beat, _ := newNopBeat(t, "output.console.enabled: true")

// Capture logs for testing.
logp.DevelopmentSetup(logp.ToObserverOutput())
logs := logp.ObserverLogs()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g, ctx := errgroup.WithContext(ctx)
defer g.Wait()
g.Go(func() error { return beat.Run(ctx) })

timeout := time.NewTimer(10 * time.Second)
defer timeout.Stop()
for {
select {
case <-timeout.C:
t.Error("timed out waiting for log message, total logs observed:", logs.Len())
for _, log := range logs.All() {
t.Log(log.LoggerName, log.Message)
}
return
case <-time.After(10 * time.Millisecond):
}

logs := logs.FilterMessageSnippet(fmt.Sprintf(
`maxprocs: Honoring GOMAXPROCS="%d" as set in environment`, n,
))
if logs.Len() > 0 {
break
}
}

cancel()
assert.NoError(t, g.Wait())
})
}
}

func newNopBeat(t testing.TB, configYAML string) (*Beat, *nopBeater) {
resetGlobals()
initCfgfile(t, configYAML)
nopBeater := newNopBeater()
beat, err := NewBeat(BeatParams{
Create: func(b *beat.Beat, cfg *config.C) (beat.Beater, error) {
return nopBeater, nil
},
})
require.NoError(t, err)
return beat, nopBeater
}

func resetGlobals() {
// Clear monitoring registries to allow the new Beat to populate them.
monitoring.GetNamespace("info").SetRegistry(nil)
monitoring.GetNamespace("state").SetRegistry(nil)
for _, name := range []string{"system", "beat", "libbeat"} {
registry := monitoring.Default.GetRegistry(name)
if registry != nil {
registry.Clear()
}
}

// Create a new reload registry, as the Beat.Run method will register with it.
reload.Register = reload.NewRegistry()
}

type nopBeater struct {
running chan struct{}
done chan struct{}
}

func newNopBeater() *nopBeater {
return &nopBeater{
running: make(chan struct{}),
done: make(chan struct{}),
}
}

func (b *nopBeater) Run(*beat.Beat) error {
close(b.running)
<-b.done
return nil
}

func (b *nopBeater) Stop() {
close(b.done)
}
30 changes: 3 additions & 27 deletions internal/beatcmd/locker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,7 @@ import (
)

func TestLocker(t *testing.T) {
initCfgfile(t, `output.console.enabled: true`)

nopBeater := &nopBeater{
running: make(chan struct{}),
done: make(chan struct{}),
}
beat1, err := NewBeat(BeatParams{
Create: func(*beat.Beat, *config.C) (beat.Beater, error) {
return nopBeater, nil
},
})
require.NoError(t, err)
beat1, nopBeater := newNopBeat(t, `output.console.enabled: true`)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -55,6 +44,8 @@ func TestLocker(t *testing.T) {
// the lock should be held.
<-nopBeater.running

// Create another Beat using the same configuration and data directory;
// its Run method should fail to acquire the lock while beat1 is running.
beat2, err := NewBeat(BeatParams{
Create: func(*beat.Beat, *config.C) (beat.Beater, error) {
panic("should not be called")
Expand All @@ -67,18 +58,3 @@ func TestLocker(t *testing.T) {
cancel()
assert.NoError(t, g.Wait())
}

type nopBeater struct {
running chan struct{}
done chan struct{}
}

func (b *nopBeater) Run(*beat.Beat) error {
close(b.running)
<-b.done
return nil
}

func (b *nopBeater) Stop() {
close(b.done)
}
Loading

0 comments on commit 681525b

Please sign in to comment.