Skip to content

Commit

Permalink
Move libbeat output and instrumentation to beater
Browse files Browse the repository at this point in the history
Move initialisation of libbeat output (publisher/pipeline)
and instrumentation (APM tracer) to package beater. Some of
the complexity around reloading is reduced as now have full
control over creating the libbeat output. Similarly, we can
reduce complexity related to the "tracer server" by running
it per reloaded server.
  • Loading branch information
axw committed Oct 1, 2022
1 parent e44b61f commit 03f4125
Show file tree
Hide file tree
Showing 13 changed files with 397 additions and 671 deletions.
76 changes: 3 additions & 73 deletions internal/beatcmd/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/licenser"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/report"
"github.com/elastic/beats/v7/libbeat/monitoring/report/log"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/file"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -67,10 +64,6 @@ const (
defaultMonitoringUsername = "apm_system"
)

var (
libbeatMetricsRegistry = monitoring.Default.GetRegistry("libbeat")
)

// Beat provides the runnable and configurable instance of a beat.
type Beat struct {
beat.Beat
Expand Down Expand Up @@ -130,23 +123,14 @@ func NewBeat(args BeatParams) (*Beat, error) {
return b, nil
}

// init initializes logging, tracing ("instrumentation"), monitoring, config
// management, and GOMAXPROCS.
// init initializes logging, config management, GOMAXPROCS, and GC percent.
func (b *Beat) init() error {
if err := configure.Logging(b.Info.Beat, b.Config.Logging); err != nil {
return fmt.Errorf("error initializing logging: %w", err)
}
// log paths values to help with troubleshooting
logp.Info(paths.Paths.String())

// instrumentation.New expects a config object with "instrumentation"
// as a child, so create a new config with instrumentation added.
instrumentation, err := instrumentation.New(b.rawConfig, b.Info.Beat, b.Info.Version)
if err != nil {
return err
}
b.Instrumentation = instrumentation

// Load the unique ID and "first start" info from meta.json.
metaPath := paths.Resolve(paths.Data, "meta.json")
if err := b.loadMeta(metaPath); err != nil {
Expand All @@ -155,10 +139,11 @@ func (b *Beat) init() error {
logp.Info("Beat ID: %v", b.Info.ID)

// Initialize central config manager.
b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
manager, err := management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
if err != nil {
return err
}
b.Manager = manager

if maxProcs := b.Config.MaxProcs; maxProcs > 0 {
logp.Info("Set max procs limit: %v", maxProcs)
Expand Down Expand Up @@ -283,22 +268,6 @@ func (b *Beat) createBeater(beatCreator beat.Creator) (beat.Beater, error) {
logp.Info("output is configured through central management")
}

monitors := pipeline.Monitors{
Metrics: libbeatMetricsRegistry,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
outputFactory := b.makeOutputFactory(b.Config.Output)
publisher, err := pipeline.Load(b.Info, monitors, pipeline.Config{}, nopProcessingSupporter{}, outputFactory)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}
b.Publisher = publisher

// TODO(axw) pass registry into BeatParams, for testing purposes.
reload.Register.MustRegister("output", b.makeOutputReloader(publisher.OutputReloader()))

return beatCreator(&b.Beat, b.Config.APMServer)
}

Expand Down Expand Up @@ -397,7 +366,6 @@ func (b *Beat) Run(ctx context.Context) error {

go func() {
<-ctx.Done()
b.Instrumentation.Tracer().Close()
beater.Stop()
}()
svc.HandleSignals(cancel, cancel)
Expand Down Expand Up @@ -488,34 +456,6 @@ func (b *Beat) registerElasticsearchVersionCheck() (func(), error) {
return func() { elasticsearch.DeregisterGlobalCallback(uuid) }, nil
}

func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable {
return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error {
if b.OutputConfigReloader != nil {
if err := b.OutputConfigReloader.Reload(config); err != nil {
return err
}
}
return outReloader.Reload(config, b.createOutput)
})
}

func (b *Beat) makeOutputFactory(
cfg config.Namespace,
) func(outputs.Observer) (string, outputs.Group, error) {
return func(outStats outputs.Observer) (string, outputs.Group, error) {
out, err := b.createOutput(outStats, cfg)
return cfg.Name(), out, err
}
}

func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outputs.Group, error) {
if !cfg.IsSet() {
return outputs.Group{}, nil
}
indexSupporter := newSupporter(nil, b.Info, b.rawConfig)
return outputs.Load(indexSupporter, b.Info, stats, cfg.Name(), cfg.Config())
}

func (b *Beat) registerClusterUUIDFetching() (func(), error) {
callback := b.clusterUUIDFetchingCallback()
uuid, err := elasticsearch.RegisterConnectCallback(callback)
Expand Down Expand Up @@ -658,13 +598,3 @@ func logSystemInfo(info beat.Info) {
}
}
}

type nopProcessingSupporter struct{}

func (nopProcessingSupporter) Close() error {
return nil
}

func (nopProcessingSupporter) Create(cfg beat.ProcessingConfig, _ bool) (beat.Processor, error) {
return cfg.Processor, nil
}
24 changes: 21 additions & 3 deletions internal/beatcmd/beat_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beatcmd

import (
Expand All @@ -7,14 +24,15 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

// TestRunMaxProcs ensures Beat.Run calls the GOMAXPROCS adjustment code by looking for log messages.
Expand Down
17 changes: 17 additions & 0 deletions internal/beatcmd/maxprocs.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beatcmd

import (
Expand Down
17 changes: 17 additions & 0 deletions internal/beatcmd/maxprocs_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beatcmd

import (
Expand Down
6 changes: 4 additions & 2 deletions internal/beatcmd/testcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (

"github.com/spf13/cobra"

beaterconfig "github.com/elastic/apm-server/internal/beater/config"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/testing"

beaterconfig "github.com/elastic/apm-server/internal/beater/config"
"github.com/elastic/apm-server/internal/idxmgmt"
)

func genTestCmd(beatParams BeatParams) *cobra.Command {
Expand Down Expand Up @@ -70,7 +72,7 @@ func newTestOutputCommand(beatParams BeatParams) *cobra.Command {
if err != nil {
return err
}
indexSupporter := newSupporter(nil, beat.Info, beat.rawConfig)
indexSupporter := idxmgmt.NewSupporter(nil, beat.Info, beat.rawConfig)
output, err := outputs.Load(
indexSupporter, beat.Info, nil, beat.Config.Output.Name(), beat.Config.Output.Config(),
)
Expand Down
Loading

0 comments on commit 03f4125

Please sign in to comment.