diff --git a/internal/beatcmd/beat.go b/internal/beatcmd/beat.go index 7acd1c368a9..2ec7fbfe322 100644 --- a/internal/beatcmd/beat.go +++ b/internal/beatcmd/beat.go @@ -37,14 +37,11 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" - "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/licenser" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/report" "github.com/elastic/beats/v7/libbeat/monitoring/report/log" - "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" - "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" "github.com/elastic/elastic-agent-libs/logp" @@ -67,10 +64,6 @@ const ( defaultMonitoringUsername = "apm_system" ) -var ( - libbeatMetricsRegistry = monitoring.Default.GetRegistry("libbeat") -) - // Beat provides the runnable and configurable instance of a beat. type Beat struct { beat.Beat @@ -130,8 +123,7 @@ func NewBeat(args BeatParams) (*Beat, error) { return b, nil } -// init initializes logging, tracing ("instrumentation"), monitoring, config -// management, and GOMAXPROCS. +// init initializes logging, config management, GOMAXPROCS, and GC percent. func (b *Beat) init() error { if err := configure.Logging(b.Info.Beat, b.Config.Logging); err != nil { return fmt.Errorf("error initializing logging: %w", err) @@ -139,14 +131,6 @@ func (b *Beat) init() error { // log paths values to help with troubleshooting logp.Info(paths.Paths.String()) - // instrumentation.New expects a config object with "instrumentation" - // as a child, so create a new config with instrumentation added. - instrumentation, err := instrumentation.New(b.rawConfig, b.Info.Beat, b.Info.Version) - if err != nil { - return err - } - b.Instrumentation = instrumentation - // Load the unique ID and "first start" info from meta.json. metaPath := paths.Resolve(paths.Data, "meta.json") if err := b.loadMeta(metaPath); err != nil { @@ -155,10 +139,11 @@ func (b *Beat) init() error { logp.Info("Beat ID: %v", b.Info.ID) // Initialize central config manager. - b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID) + manager, err := management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID) if err != nil { return err } + b.Manager = manager if maxProcs := b.Config.MaxProcs; maxProcs > 0 { logp.Info("Set max procs limit: %v", maxProcs) @@ -283,22 +268,6 @@ func (b *Beat) createBeater(beatCreator beat.Creator) (beat.Beater, error) { logp.Info("output is configured through central management") } - monitors := pipeline.Monitors{ - Metrics: libbeatMetricsRegistry, - Telemetry: monitoring.GetNamespace("state").GetRegistry(), - Logger: logp.L().Named("publisher"), - Tracer: b.Instrumentation.Tracer(), - } - outputFactory := b.makeOutputFactory(b.Config.Output) - publisher, err := pipeline.Load(b.Info, monitors, pipeline.Config{}, nopProcessingSupporter{}, outputFactory) - if err != nil { - return nil, fmt.Errorf("error initializing publisher: %w", err) - } - b.Publisher = publisher - - // TODO(axw) pass registry into BeatParams, for testing purposes. - reload.Register.MustRegister("output", b.makeOutputReloader(publisher.OutputReloader())) - return beatCreator(&b.Beat, b.Config.APMServer) } @@ -397,7 +366,6 @@ func (b *Beat) Run(ctx context.Context) error { go func() { <-ctx.Done() - b.Instrumentation.Tracer().Close() beater.Stop() }() svc.HandleSignals(cancel, cancel) @@ -488,34 +456,6 @@ func (b *Beat) registerElasticsearchVersionCheck() (func(), error) { return func() { elasticsearch.DeregisterGlobalCallback(uuid) }, nil } -func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable { - return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error { - if b.OutputConfigReloader != nil { - if err := b.OutputConfigReloader.Reload(config); err != nil { - return err - } - } - return outReloader.Reload(config, b.createOutput) - }) -} - -func (b *Beat) makeOutputFactory( - cfg config.Namespace, -) func(outputs.Observer) (string, outputs.Group, error) { - return func(outStats outputs.Observer) (string, outputs.Group, error) { - out, err := b.createOutput(outStats, cfg) - return cfg.Name(), out, err - } -} - -func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outputs.Group, error) { - if !cfg.IsSet() { - return outputs.Group{}, nil - } - indexSupporter := newSupporter(nil, b.Info, b.rawConfig) - return outputs.Load(indexSupporter, b.Info, stats, cfg.Name(), cfg.Config()) -} - func (b *Beat) registerClusterUUIDFetching() (func(), error) { callback := b.clusterUUIDFetchingCallback() uuid, err := elasticsearch.RegisterConnectCallback(callback) @@ -658,13 +598,3 @@ func logSystemInfo(info beat.Info) { } } } - -type nopProcessingSupporter struct{} - -func (nopProcessingSupporter) Close() error { - return nil -} - -func (nopProcessingSupporter) Create(cfg beat.ProcessingConfig, _ bool) (beat.Processor, error) { - return cfg.Processor, nil -} diff --git a/internal/beatcmd/beat_test.go b/internal/beatcmd/beat_test.go index 7010248d9f2..02733b8ddde 100644 --- a/internal/beatcmd/beat_test.go +++ b/internal/beatcmd/beat_test.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package beatcmd import ( @@ -7,14 +24,15 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" ) // TestRunMaxProcs ensures Beat.Run calls the GOMAXPROCS adjustment code by looking for log messages. diff --git a/internal/beatcmd/maxprocs.go b/internal/beatcmd/maxprocs.go index 63da1a2baba..c7a6480c471 100644 --- a/internal/beatcmd/maxprocs.go +++ b/internal/beatcmd/maxprocs.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package beatcmd import ( diff --git a/internal/beatcmd/maxprocs_test.go b/internal/beatcmd/maxprocs_test.go index cedc475c257..9f711f947ae 100644 --- a/internal/beatcmd/maxprocs_test.go +++ b/internal/beatcmd/maxprocs_test.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package beatcmd import ( diff --git a/internal/beatcmd/testcmd.go b/internal/beatcmd/testcmd.go index bb3ddfd6027..ffef2e6686c 100644 --- a/internal/beatcmd/testcmd.go +++ b/internal/beatcmd/testcmd.go @@ -22,10 +22,12 @@ import ( "github.com/spf13/cobra" - beaterconfig "github.com/elastic/apm-server/internal/beater/config" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/testing" + + beaterconfig "github.com/elastic/apm-server/internal/beater/config" + "github.com/elastic/apm-server/internal/idxmgmt" ) func genTestCmd(beatParams BeatParams) *cobra.Command { @@ -70,7 +72,7 @@ func newTestOutputCommand(beatParams BeatParams) *cobra.Command { if err != nil { return err } - indexSupporter := newSupporter(nil, beat.Info, beat.rawConfig) + indexSupporter := idxmgmt.NewSupporter(nil, beat.Info, beat.rawConfig) output, err := outputs.Load( indexSupporter, beat.Info, nil, beat.Config.Output.Name(), beat.Config.Output.Config(), ) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index e186dcc303b..38afc6ce342 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -43,8 +43,11 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/licenser" + "github.com/elastic/beats/v7/libbeat/outputs" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -60,6 +63,7 @@ import ( javaattacher "github.com/elastic/apm-server/internal/beater/java_attacher" "github.com/elastic/apm-server/internal/beater/ratelimit" "github.com/elastic/apm-server/internal/elasticsearch" + "github.com/elastic/apm-server/internal/idxmgmt" "github.com/elastic/apm-server/internal/kibana" "github.com/elastic/apm-server/internal/logs" "github.com/elastic/apm-server/internal/model" @@ -96,29 +100,28 @@ type CreatorParams struct { // NewCreator returns a new beat.Creator which creates beaters // using the provided CreatorParams. func NewCreator(args CreatorParams) beat.Creator { - return func(b *beat.Beat, ucfg *agentconfig.C) (beat.Beater, error) { + return func(b *beat.Beat, apmServerConfig *agentconfig.C) (beat.Beater, error) { logger := args.Logger if logger != nil { logger = logger.Named(logs.Beater) } else { logger = logp.NewLogger(logs.Beater) } + bt := &beater{ - rawConfig: ucfg, - stopped: false, - logger: logger, - wrapServer: args.WrapServer, - waitPublished: publish.NewWaitPublishedAcker(), - outputConfigReloader: newChanReloader(), - libbeatMonitoringRegistry: monitoring.Default.GetRegistry("libbeat"), + apmServerConfig: apmServerConfig, + rootConfig: (*agentconfig.C)(((*ucfg.Config)(apmServerConfig)).Parent()), + stopped: false, + logger: logger, + wrapServer: args.WrapServer, } var elasticsearchOutputConfig *agentconfig.C - if hasElasticsearchOutput(b) { + if b.Config != nil && b.Config.Output.Name() == "elasticsearch" { elasticsearchOutputConfig = b.Config.Output.Config() } var err error - bt.config, err = config.NewConfig(bt.rawConfig, elasticsearchOutputConfig) + bt.config, err = config.NewConfig(bt.apmServerConfig, elasticsearchOutputConfig) if err != nil { return nil, err } @@ -132,26 +135,16 @@ func NewCreator(args CreatorParams) beat.Creator { } } - if b.Manager != nil && b.Manager.Enabled() { - // Subscribe to output changes for reconfiguring apm-server's Elasticsearch - // clients, which use the Elasticsearch output config by default. We install - // this during beat creation to ensure output config reloads are not missed; - // reloads will be blocked until the chanReloader is served by beater.run. - b.OutputConfigReloader = bt.outputConfigReloader - } - return bt, nil } } type beater struct { - rawConfig *agentconfig.C - config *config.Config - logger *logp.Logger - wrapServer WrapServerFunc - waitPublished *publish.WaitPublishedAcker - outputConfigReloader *chanReloader - libbeatMonitoringRegistry *monitoring.Registry + apmServerConfig *agentconfig.C + rootConfig *agentconfig.C + config *config.Config + logger *logp.Logger + wrapServer WrapServerFunc mutex sync.Mutex // guards stopServer and stopped stopServer func() @@ -163,24 +156,10 @@ type beater struct { func (bt *beater) Run(b *beat.Beat) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if err := bt.run(ctx, cancel, b); err != nil { - return err - } - return bt.waitPublished.Wait(ctx) + return bt.run(ctx, cancel, b) } func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b *beat.Beat) error { - tracer, tracerServer, err := initTracing(b, bt.config, bt.logger) - if err != nil { - return err - } - if tracerServer != nil { - defer tracerServer.Close() - } - if tracer != nil { - defer tracer.Close() - } - // add deprecation warning if running on a 32-bit system if runtime.GOARCH == "386" { bt.logger.Warn("deprecation notice: support for 32-bit system target architecture will be removed in an upcoming version") @@ -189,13 +168,9 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * reloader := reloader{ runServerContext: ctx, args: sharedServerRunnerParams{ - Beat: b, - WrapServer: bt.wrapServer, - Logger: bt.logger, - Tracer: tracer, - TracerServer: tracerServer, - Acker: bt.waitPublished, - LibbeatMonitoringRegistry: bt.libbeatMonitoringRegistry, + Beat: b, + WrapServer: bt.wrapServer, + Logger: bt.logger, }, } @@ -213,19 +188,10 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * return nil } - g, ctx := errgroup.WithContext(context.Background()) - g.Go(func() error { - <-stopped - return nil - }) if b.Manager != nil && b.Manager.Enabled() { // Managed by Agent: register input and output reloaders to reconfigure the server. reload.Register.MustRegisterList("inputs", &reloader) - g.Go(func() error { - return bt.outputConfigReloader.serve( - ctx, reload.ReloadableFunc(reloader.reloadOutput), - ) - }) + reload.Register.MustRegister("output", reload.ReloadableFunc(reloader.reloadOutput)) // Start the manager after all the hooks are initialized // and defined this ensure reloading consistency.. @@ -236,7 +202,8 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * } else { // Management disabled, use statically defined config. - reloader.rawConfig = bt.rawConfig + reloader.apmServerConfig = bt.apmServerConfig + reloader.rootConfig = bt.rootConfig if b.Config != nil { reloader.outputConfig = b.Config.Output } @@ -247,7 +214,8 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * return err } } - return g.Wait() + <-stopped + return nil } // setStopServerFunc sets a function to call when the server is stopped. @@ -267,11 +235,12 @@ type reloader struct { runServerContext context.Context args sharedServerRunnerParams - mu sync.Mutex - rawConfig *agentconfig.C - outputConfig agentconfig.Namespace - fleetConfig *config.Fleet - runner *serverRunner + mu sync.Mutex + rootConfig *agentconfig.C + apmServerConfig *agentconfig.C + outputConfig agentconfig.Namespace + fleetConfig *config.Fleet + runner *serverRunner } func (r *reloader) stop() { @@ -298,17 +267,21 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { r.mu.Lock() defer r.mu.Unlock() - r.rawConfig = integrationConfig.APMServer + r.rootConfig = cfg.Config + r.apmServerConfig = integrationConfig.APMServer + // Merge in datastream namespace passed in from apm integration if integrationConfig.DataStream != nil && integrationConfig.DataStream.Namespace != "" { c := agentconfig.MustNewConfigFrom(map[string]interface{}{ "data_streams.namespace": integrationConfig.DataStream.Namespace, }) - if r.rawConfig, err = agentconfig.MergeConfigs(r.rawConfig, c); err != nil { + r.apmServerConfig, err = agentconfig.MergeConfigs(r.apmServerConfig, c) + if err != nil { return err } } r.fleetConfig = &integrationConfig.Fleet + return r.reload() } @@ -330,14 +303,15 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { // serverRunner (if any) to exit. Calls to reload must be sycnhronized explicitly // by acquiring reloader#mu by callers. func (r *reloader) reload() error { - if r.rawConfig == nil { + if r.apmServerConfig == nil || !r.outputConfig.IsSet() { // APM Server config not loaded yet. return nil } runner, err := newServerRunner(r.runServerContext, serverRunnerParams{ sharedServerRunnerParams: r.args, - RawConfig: r.rawConfig, + RootConfig: r.rootConfig, + APMServerConfig: r.apmServerConfig, FleetConfig: r.fleetConfig, OutputConfig: r.outputConfig, }) @@ -388,36 +362,31 @@ type serverRunner struct { started chan struct{} done chan struct{} - pipeline beat.PipelineConnector - acker *publish.WaitPublishedAcker namespace string config *config.Config - rawConfig *agentconfig.C + rootConfig *agentconfig.C + apmServerConfig *agentconfig.C + outputConfig agentconfig.Namespace elasticsearchOutputConfig *agentconfig.C fleetConfig *config.Fleet beat *beat.Beat logger *logp.Logger - tracer *apm.Tracer - tracerServer *tracerServer wrapServer WrapServerFunc - libbeatMonitoringRegistry *monitoring.Registry } type serverRunnerParams struct { sharedServerRunnerParams - RawConfig *agentconfig.C - FleetConfig *config.Fleet - OutputConfig agentconfig.Namespace + RootConfig *agentconfig.C + APMServerConfig *agentconfig.C + FleetConfig *config.Fleet + OutputConfig agentconfig.Namespace } type sharedServerRunnerParams struct { Beat *beat.Beat WrapServer WrapServerFunc Logger *logp.Logger - Tracer *apm.Tracer - TracerServer *tracerServer - Acker *publish.WaitPublishedAcker LibbeatMonitoringRegistry *monitoring.Registry } @@ -427,7 +396,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne esOutputConfig = args.OutputConfig.Config() } - cfg, err := config.NewConfig(args.RawConfig, esOutputConfig) + cfg, err := config.NewConfig(args.APMServerConfig, esOutputConfig) if err != nil { return nil, err } @@ -441,18 +410,15 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne started: make(chan struct{}), config: cfg, - rawConfig: args.RawConfig, + rootConfig: args.RootConfig, + apmServerConfig: args.APMServerConfig, + outputConfig: args.OutputConfig, elasticsearchOutputConfig: esOutputConfig, fleetConfig: args.FleetConfig, - acker: args.Acker, - pipeline: args.Beat.Publisher, namespace: cfg.DataStreams.Namespace, beat: args.Beat, logger: args.Logger, - tracer: args.Tracer, - tracerServer: args.TracerServer, wrapServer: args.WrapServer, - libbeatMonitoringRegistry: args.LibbeatMonitoringRegistry, }, nil } @@ -467,14 +433,16 @@ func (s *serverRunner) run(listener net.Listener) error { kibanaClient = kibana.NewConnectingClient(s.config.Kibana.ClientConfig) } - cfg := ucfg.Config(*s.rawConfig) - parentCfg := cfg.Parent() // Check for an environment variable set when running in a cloud environment eac := os.Getenv("ELASTIC_AGENT_CLOUD") if eac != "" && s.config.Kibana.Enabled { // Don't block server startup sending the config. go func() { - if err := kibana.SendConfig(s.runServerContext, kibanaClient, parentCfg); err != nil { + if err := kibana.SendConfig( + s.runServerContext, + kibanaClient, + (*ucfg.Config)(s.rootConfig), + ); err != nil { s.logger.Infof("failed to upload config to kibana: %v", err) } }() @@ -498,6 +466,17 @@ func (s *serverRunner) run(listener net.Listener) error { } } + instrumentation, err := instrumentation.New(s.rootConfig, s.beat.Info.Beat, s.beat.Info.Version) + if err != nil { + return err + } + tracer := instrumentation.Tracer() + tracerServerListener := instrumentation.Listener() + if tracerServerListener != nil { + defer tracerServerListener.Close() + } + defer tracer.Close() + g, ctx := errgroup.WithContext(s.runServerContext) // Ensure the libbeat output and go-elasticsearch clients do not index @@ -505,7 +484,7 @@ func (s *serverRunner) run(listener net.Listener) error { publishReady := make(chan struct{}) drain := make(chan struct{}) g.Go(func() error { - if err := s.waitReady(ctx, kibanaClient); err != nil { + if err := s.waitReady(ctx, kibanaClient, tracer); err != nil { // One or more preconditions failed; drop events. close(drain) return errors.Wrap(err, "error waiting for server to be ready") @@ -560,13 +539,6 @@ func (s *serverRunner) run(listener net.Listener) error { sourcemapFetcher = cachingFetcher } - // Create the runServer function. We start with newBaseRunServer, and then - // wrap depending on the configuration in order to inject behaviour. - runServer := newBaseRunServer(listener) - if s.tracerServer != nil { - runServer = runServerWithTracerServer(runServer, s.tracerServer, s.tracer) - } - authenticator, err := auth.NewAuthenticator(s.config.AgentAuth) if err != nil { return err @@ -585,7 +557,7 @@ func (s *serverRunner) run(listener net.Listener) error { // even if TLS is enabled, as TLS is handled by the net/http server. gRPCLogger := s.logger.Named("grpc") grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor( - apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(s.tracer)), + apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)), interceptors.ClientMetadata(), interceptors.Logging(gRPCLogger), interceptors.Metrics(gRPCLogger), @@ -596,7 +568,10 @@ func (s *serverRunner) run(listener net.Listener) error { // Create the BatchProcessor chain that is used to process all events, // including the metrics aggregated by APM Server. - finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor(newElasticsearchClient) + finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor( + tracer, + newElasticsearchClient, + ) if err != nil { return err } @@ -627,13 +602,15 @@ func (s *serverRunner) run(listener net.Listener) error { return agentConfigReporter.Run(ctx) }) + // Create the runServer function. We start with newBaseRunServer, and then + // wrap depending on the configuration in order to inject behaviour. serverParams := ServerParams{ UUID: s.beat.Info.ID, Config: s.config, Managed: s.beat.Manager != nil && s.beat.Manager.Enabled(), Namespace: s.namespace, Logger: s.logger, - Tracer: s.tracer, + Tracer: tracer, Authenticator: authenticator, RateLimitStore: ratelimitStore, BatchProcessor: batchProcessor, @@ -644,6 +621,7 @@ func (s *serverRunner) run(listener net.Listener) error { NewElasticsearchClient: newElasticsearchClient, GRPCServer: grpcServer, } + runServer := newBaseRunServer(listener) if s.wrapServer != nil { // Wrap the serverParams and runServer function, enabling // injection of behaviour into the processing chain. @@ -678,9 +656,26 @@ func (s *serverRunner) run(listener net.Listener) error { } serverParams.BatchProcessor = append(preBatchProcessors, serverParams.BatchProcessor) + // Start the main server and the optional server for self-instrumentation. g.Go(func() error { return runServer(ctx, serverParams) }) + if tracerServerListener != nil { + tracerServer, err := newTracerServer(tracerServerListener, s.logger, serverParams.BatchProcessor) + if err != nil { + return fmt.Errorf("failed to create self-instrumentation server: %w", err) + } + g.Go(func() error { + if err := tracerServer.Serve(tracerServerListener); err != http.ErrServerClosed { + return err + } + return nil + }) + go func() { + <-ctx.Done() + tracerServer.Shutdown(s.backgroundContext) + }() + } // Signal that the runner has started close(s.started) @@ -693,7 +688,11 @@ func (s *serverRunner) run(listener net.Listener) error { } // waitReady waits until the server is ready to index events. -func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client) error { +func (s *serverRunner) waitReady( + ctx context.Context, + kibanaClient kibana.Client, + tracer *apm.Tracer, +) error { var preconditions []func(context.Context) error var esOutputClient elasticsearch.Client if s.elasticsearchOutputConfig != nil { @@ -765,37 +764,32 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client } return nil } - return waitReady(ctx, s.config.WaitReadyInterval, s.tracer, s.logger, check) + return waitReady(ctx, s.config.WaitReadyInterval, tracer, s.logger, check) } // newFinalBatchProcessor returns the final model.BatchProcessor that publishes events, // and a cleanup function which should be called on server shutdown. If the output is // "elasticsearch", then we use modelindexer; otherwise we use the libbeat publisher. func (s *serverRunner) newFinalBatchProcessor( + tracer *apm.Tracer, newElasticsearchClient func(cfg *elasticsearch.Config) (elasticsearch.Client, error), ) (model.BatchProcessor, func(context.Context) error, error) { + + monitoring.Default.Remove("libbeat") + libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat") if s.elasticsearchOutputConfig == nil { - // When the publisher stops cleanly it will close its pipeline client, - // calling the acker's Close method. We need to call Open for each new - // publisher to ensure we wait for all clients and enqueued events to - // be closed at shutdown time. - s.acker.Open() - pipeline := pipetool.WithACKer(s.pipeline, s.acker) - publisher, err := publish.NewPublisher(pipeline, s.tracer) - if err != nil { - return nil, nil, err - } - // We only want to restore the previous libbeat registry if the output - // has a name, otherwise, keep the libbeat registry as is. This is to - // account for cases where the output config may be sent empty by the - // Elastic Agent. - if s.beat.Config != nil && s.beat.Config.Output.Name() != "" { - monitoring.Default.Remove("libbeat") - monitoring.Default.Add("libbeat", s.libbeatMonitoringRegistry, monitoring.Full) - } - return publisher, publisher.Stop, nil + return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry) } + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + outputRegistry := stateRegistry.GetRegistry("output") + if outputRegistry != nil { + outputRegistry.Clear() + } else { + outputRegistry = stateRegistry.NewRegistry("output") + } + monitoring.NewString(outputRegistry, "name").Set("elasticsearch") + var esConfig struct { *elasticsearch.Config `config:",inline"` FlushBytes string `config:"flush_bytes"` @@ -824,7 +818,7 @@ func (s *serverRunner) newFinalBatchProcessor( CompressionLevel: esConfig.CompressionLevel, FlushBytes: flushBytes, FlushInterval: esConfig.FlushInterval, - Tracer: s.tracer, + Tracer: tracer, MaxRequests: esConfig.MaxRequests, }) if err != nil { @@ -834,16 +828,15 @@ func (s *serverRunner) newFinalBatchProcessor( // Install our own libbeat-compatible metrics callback which uses the modelindexer stats. // All the metrics below are required to be reported to be able to display all relevant // fields in the Stack Monitoring UI. - monitoring.Default.Remove("libbeat") - monitoring.NewFunc(monitoring.Default, "libbeat.output.write", func(_ monitoring.Mode, v monitoring.Visitor) { + monitoring.NewFunc(libbeatMonitoringRegistry, "output.write", func(_ monitoring.Mode, v monitoring.Visitor) { v.OnRegistryStart() defer v.OnRegistryFinished() v.OnKey("bytes") v.OnInt(indexer.Stats().BytesTotal) }) - outputType := monitoring.NewString(monitoring.Default.GetRegistry("libbeat.output"), "type") + outputType := monitoring.NewString(libbeatMonitoringRegistry.GetRegistry("output"), "type") outputType.Set("elasticsearch") - monitoring.NewFunc(monitoring.Default, "libbeat.output.events", func(_ monitoring.Mode, v monitoring.Visitor) { + monitoring.NewFunc(libbeatMonitoringRegistry, "output.events", func(_ monitoring.Mode, v monitoring.Visitor) { v.OnRegistryStart() defer v.OnRegistryFinished() stats := indexer.Stats() @@ -860,7 +853,7 @@ func (s *serverRunner) newFinalBatchProcessor( v.OnKey("total") v.OnInt(stats.Added) }) - monitoring.NewFunc(monitoring.Default, "libbeat.pipeline.events", func(_ monitoring.Mode, v monitoring.Visitor) { + monitoring.NewFunc(libbeatMonitoringRegistry, "pipeline.events", func(_ monitoring.Mode, v monitoring.Visitor) { v.OnRegistryStart() defer v.OnRegistryFinished() v.OnKey("total") @@ -879,23 +872,46 @@ func (s *serverRunner) newFinalBatchProcessor( return indexer, indexer.Close, nil } -func hasElasticsearchOutput(b *beat.Beat) bool { - return b.Config != nil && b.Config.Output.Name() == "elasticsearch" -} - -func initTracing(b *beat.Beat, cfg *config.Config, logger *logp.Logger) (*apm.Tracer, *tracerServer, error) { - tracer := b.Instrumentation.Tracer() - listener := b.Instrumentation.Listener() - - var tracerServer *tracerServer - if listener != nil { - var err error - tracerServer, err = newTracerServer(listener, logger) - if err != nil { - return nil, nil, err +func (s *serverRunner) newLibbeatFinalBatchProcessor( + tracer *apm.Tracer, + libbeatMonitoringRegistry *monitoring.Registry, +) (model.BatchProcessor, func(context.Context) error, error) { + // When the publisher stops cleanly it will close its pipeline client, + // calling the acker's Close method and unblock Wait. + acker := publish.NewWaitPublishedAcker() + acker.Open() + + monitors := pipeline.Monitors{ + Metrics: libbeatMonitoringRegistry, + Telemetry: monitoring.GetNamespace("state").GetRegistry(), + Logger: logp.L().Named("publisher"), + Tracer: tracer, + } + outputFactory := func(stats outputs.Observer) (string, outputs.Group, error) { + if !s.outputConfig.IsSet() { + return "", outputs.Group{}, nil + } + indexSupporter := idxmgmt.NewSupporter(nil, s.beat.Info, s.apmServerConfig) + outputName := s.outputConfig.Name() + output, err := outputs.Load(indexSupporter, s.beat.Info, stats, outputName, s.outputConfig.Config()) + return outputName, output, err + } + pipeline, err := pipeline.Load(s.beat.Info, monitors, pipeline.Config{}, nopProcessingSupporter{}, outputFactory) + if err != nil { + return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err) + } + pipelineConnector := pipetool.WithACKer(pipeline, acker) + publisher, err := publish.NewPublisher(pipelineConnector, tracer) + if err != nil { + return nil, nil, err + } + stop := func(ctx context.Context) error { + if err := publisher.Stop(ctx); err != nil { + return err } + return acker.Wait(ctx) } - return tracer, tracerServer, nil + return publisher, stop, nil } // Stop stops the beater gracefully. @@ -909,26 +925,10 @@ func (bt *beater) Stop() { "stopping apm-server... waiting maximum of %v seconds for queues to drain", bt.config.ShutdownTimeout.Seconds(), ) - bt.outputConfigReloader.cancel() bt.stopServer() bt.stopped = true } -// runServerWithTracerServer wraps runServer such that it also runs -// tracerServer, stopping it and the tracer when the server shuts down. -func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServer, tracer *apm.Tracer) RunServerFunc { - return func(ctx context.Context, args ServerParams) error { - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - return tracerServer.serve(ctx, args.BatchProcessor) - }) - g.Go(func() error { - return runServer(ctx, args) - }) - return g.Wait() - } -} - func newSourcemapFetcher( beatInfo beat.Info, cfg config.SourceMapping, @@ -1004,65 +1004,6 @@ func newSourcemapFetcher( return chained, nil } -// chanReloader implements libbeat/common/reload.Reloadable, converting -// Reload calls into requests send to a channel consumed by serve. -type chanReloader struct { - ctx context.Context - cancel context.CancelFunc - ch chan reloadRequest -} - -func newChanReloader() *chanReloader { - ctx, cancel := context.WithCancel(context.Background()) - ch := make(chan reloadRequest) - return &chanReloader{ctx, cancel, ch} -} - -type reloadRequest struct { - cfg *reload.ConfigWithMeta - result chan<- error -} - -// Reload sends a reload request to r.ch, which is consumed by another -// goroutine running r.serve. Reload blocks until serve has handled the -// reload request, or until the reloader's context has been cancelled. -func (r *chanReloader) Reload(cfg *reload.ConfigWithMeta) error { - result := make(chan error, 1) - select { - case <-r.ctx.Done(): - return r.ctx.Err() - case r.ch <- reloadRequest{cfg: cfg, result: result}: - } - select { - case <-r.ctx.Done(): - return r.ctx.Err() - case err := <-result: - return err - } -} - -// serve handles reload requests enqueued by Reload, returning when either -// ctx or r.ctx are cancelled. -func (r *chanReloader) serve(ctx context.Context, reloader reload.Reloadable) error { - for { - select { - case <-r.ctx.Done(): - return r.ctx.Err() - case <-ctx.Done(): - return ctx.Err() - case req := <-r.ch: - err := reloader.Reload(req.cfg) - select { - case <-r.ctx.Done(): - return r.ctx.Err() - case <-ctx.Done(): - return ctx.Err() - case req.result <- err: - } - } - } -} - // TODO: This is copying behavior from libbeat: // https://github.com/elastic/beats/blob/b9ced47dba8bb55faa3b2b834fd6529d3c4d0919/libbeat/cmd/instance/beat.go#L927-L950 // Remove this when cluster_uuid no longer needs to be queried from ES. @@ -1117,3 +1058,13 @@ func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error s.Set(response.ClusterUUID) return nil } + +type nopProcessingSupporter struct{} + +func (nopProcessingSupporter) Close() error { + return nil +} + +func (nopProcessingSupporter) Create(cfg beat.ProcessingConfig, _ bool) (beat.Processor, error) { + return cfg.Processor, nil +} diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index fd1dfb9b813..e997f12fc45 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -46,9 +46,6 @@ import ( "github.com/elastic/apm-server/internal/model" "github.com/elastic/apm-server/internal/model/modelindexer/modelindexertest" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/idxmgmt" - "github.com/elastic/beats/v7/libbeat/instrumentation" - "github.com/elastic/beats/v7/libbeat/outputs" agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -65,15 +62,15 @@ type testBeater struct { client *http.Client } -func setupServer(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docsOut chan<- []byte) (*testBeater, error) { +func setupServer(t *testing.T, cfg *agentconfig.C, docsOut chan<- []byte) (*testBeater, error) { if testing.Short() { t.Skip("skipping server test") } - apmBeat, cfg := newBeat(t, cfg, beatConfig, docsOut) - return setupBeater(t, apmBeat, cfg, beatConfig) + apmBeat, cfg := newBeat(t, cfg, docsOut) + return setupBeater(t, apmBeat, cfg) } -func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docsOut chan<- []byte) (*beat.Beat, *agentconfig.C) { +func newBeat(t *testing.T, cfg *agentconfig.C, docsOut chan<- []byte) (*beat.Beat, *agentconfig.C) { info := beat.Info{ Beat: "test-apm-server", IndexPrefix: "test-apm-server", @@ -82,17 +79,44 @@ func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docs } combinedConfig := agentconfig.MustNewConfigFrom(map[string]interface{}{ - "host": "localhost:0", + "apm-server": map[string]interface{}{ + "host": "localhost:0", - // Disable waiting for integration to be installed by default, - // to simplify tests. This feature is tested independently. - "data_streams.wait_for_integration": false, + // Disable waiting for integration to be installed by default, + // to simplify tests. This feature is tested independently. + "data_streams.wait_for_integration": false, + }, }) if cfg != nil { require.NoError(t, cfg.Unpack(combinedConfig)) } - var pub beat.Pipeline + // If docsOut is non-nil, we configure the Elasticsearch output. + // + // If no output is configured and docsOut is nil, then we use the + // Elasticsearch output and consume and drop the documents. + var unpacked struct { + APMServer *agentconfig.C `config:"apm-server"` + Output agentconfig.Namespace `config:"output"` + } + err := combinedConfig.Unpack(&unpacked) + require.NoError(t, err) + if !unpacked.Output.IsSet() && docsOut == nil { + docs := make(chan []byte) + docsOut = docs + stop := make(chan struct{}) + t.Cleanup(func() { close(stop) }) + go func() { + for { + select { + case <-stop: + return + case <-docs: + } + } + }() + } + if docsOut != nil { // Clear the state monitoring registry to avoid panicking due // to double registration. @@ -100,10 +124,6 @@ func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docs stateRegistry.Clear() defer stateRegistry.Clear() - // beatConfig must be nil if an event channel is supplied. - require.Nil(t, beatConfig) - beatConfig = &beat.BeatConfig{} - mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Elastic-Product", "Elasticsearch") @@ -128,8 +148,8 @@ func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docs srv := httptest.NewServer(mux) t.Cleanup(srv.Close) - err := beatConfig.Output.Unpack(agentconfig.MustNewConfigFrom(map[string]interface{}{ - "elasticsearch": map[string]interface{}{ + err := combinedConfig.Merge(agentconfig.MustNewConfigFrom(map[string]interface{}{ + "output.elasticsearch": map[string]interface{}{ "enabled": true, "hosts": []string{srv.URL}, "flush_bytes": "1", // no delay @@ -137,37 +157,24 @@ func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docs }, })) require.NoError(t, err) - } - if beatConfig != nil && beatConfig.Output.Name() == "elasticsearch" { - // capture events using the configured elasticsearch output - supporter, err := idxmgmt.DefaultSupport(logp.NewLogger("beater_test"), info, nil) + // Update unpacked.Output + err = combinedConfig.Unpack(&unpacked) require.NoError(t, err) - outputGroup, err := outputs.Load(supporter, info, nil, "elasticsearch", beatConfig.Output.Config()) - require.NoError(t, err) - pub = dummyPipeline(cfg, info, outputGroup.Clients...) - } else { - // don't capture events - pub = dummyPipeline(cfg, info) } - instrumentation, err := instrumentation.New(combinedConfig, info.Beat, info.Version) - require.NoError(t, err) return &beat.Beat{ - Publisher: pub, - Info: info, - Config: beatConfig, - Instrumentation: instrumentation, - }, combinedConfig + Info: info, + Config: &beat.BeatConfig{Output: unpacked.Output}, + }, unpacked.APMServer } func setupBeater( t *testing.T, apmBeat *beat.Beat, ucfg *agentconfig.C, - beatConfig *beat.BeatConfig, ) (*testBeater, error) { - tb, err := newTestBeater(t, apmBeat, ucfg, beatConfig) + tb, err := newTestBeater(t, apmBeat, ucfg) if err != nil { return nil, err } @@ -190,7 +197,6 @@ func newTestBeater( t *testing.T, apmBeat *beat.Beat, ucfg *agentconfig.C, - beatConfig *beat.BeatConfig, ) (*testBeater, error) { core, observedLogs := observer.New(zapcore.DebugLevel) diff --git a/internal/beater/integration_test.go b/internal/beater/integration_test.go index 3f7dbab6b07..70e230e953c 100644 --- a/internal/beater/integration_test.go +++ b/internal/beater/integration_test.go @@ -135,7 +135,7 @@ func TestPublishIntegration(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // fresh APM Server for each run docsChan := make(chan []byte) - apm, err := setupServer(t, nil, nil, docsChan) + apm, err := setupServer(t, nil, docsChan) require.NoError(t, err) defer apm.Stop() diff --git a/internal/beater/server_test.go b/internal/beater/server_test.go index ad307780016..066082d209d 100644 --- a/internal/beater/server_test.go +++ b/internal/beater/server_test.go @@ -46,28 +46,20 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/management" - "github.com/elastic/beats/v7/libbeat/outputs" - pubs "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/pipeline" - "github.com/elastic/beats/v7/libbeat/publisher/processing" - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + _ "github.com/elastic/beats/v7/libbeat/outputs/console" + _ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/apm-server/internal/beater/api" "github.com/elastic/apm-server/internal/beater/config" - "github.com/elastic/apm-server/internal/elasticsearch" ) -type m map[string]interface{} - func TestServerOk(t *testing.T) { - apm, err := setupServer(t, nil, nil, nil) + apm, err := setupServer(t, nil, nil) require.NoError(t, err) defer apm.Stop() @@ -80,7 +72,7 @@ func TestServerOk(t *testing.T) { } func TestServerRoot(t *testing.T) { - apm, err := setupServer(t, nil, nil, nil) + apm, err := setupServer(t, nil, nil) require.NoError(t, err) defer apm.Stop() @@ -133,9 +125,8 @@ func TestServerRoot(t *testing.T) { func TestServerRootWithToken(t *testing.T) { token := "verysecret" badToken := "Verysecret" - ucfg, err := agentconfig.NewConfigFrom(m{"auth.secret_token": token}) - assert.NoError(t, err) - apm, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"apm-server.auth.secret_token": token}) + apm, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer apm.Stop() @@ -174,11 +165,8 @@ func TestServerTcpNoPort(t *testing.T) { t.Error(err) } } - ucfg, err := agentconfig.NewConfigFrom(map[string]interface{}{ - "host": "localhost", - }) - assert.NoError(t, err) - btr, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"apm-server.host": "localhost"}) + btr, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer btr.Stop() @@ -203,9 +191,8 @@ func TestServerOkUnix(t *testing.T) { } addr := tmpTestUnix(t) - ucfg, err := agentconfig.NewConfigFrom(map[string]interface{}{"host": "unix:" + addr}) - assert.NoError(t, err) - btr, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"apm-server.host": "unix:" + addr}) + btr, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer btr.Stop() @@ -217,7 +204,7 @@ func TestServerOkUnix(t *testing.T) { } func TestServerHealth(t *testing.T) { - apm, err := setupServer(t, nil, nil, nil) + apm, err := setupServer(t, nil, nil) require.NoError(t, err) defer apm.Stop() @@ -230,9 +217,13 @@ func TestServerHealth(t *testing.T) { } func TestServerRumSwitch(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(m{"rum": m{"enabled": true, "allow_origins": []string{"*"}}}) - assert.NoError(t, err) - apm, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.rum": map[string]interface{}{ + "enabled": true, + "allow_origins": []string{"*"}, + }, + }) + apm, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer apm.Stop() @@ -244,20 +235,6 @@ func TestServerRumSwitch(t *testing.T) { } } -func TestServerSourcemapBadConfig(t *testing.T) { - // TODO(axw) fix this, it shouldn't be possible - // to create config with an empty hosts list. - t.Skip("test is broken, config is no longer invalid") - - ucfg, err := agentconfig.NewConfigFrom( - m{"rum": m{"enabled": true, "source_mapping": m{"elasticsearch": m{"hosts": []string{}}}}}, - ) - require.NoError(t, err) - s, err := setupServer(t, ucfg, nil, nil) - require.Nil(t, s) - require.Error(t, err) -} - func TestServerCORS(t *testing.T) { tests := []struct { expectedStatus int @@ -298,9 +275,13 @@ func TestServerCORS(t *testing.T) { for idx, test := range tests { t.Run(fmt.Sprint(idx), func(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(m{"rum": m{"enabled": true, "allow_origins": test.allowedOrigins}}) - assert.NoError(t, err) - apm, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.rum": map[string]interface{}{ + "enabled": true, + "allow_origins": test.allowedOrigins, + }, + }) + apm, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer apm.Stop() @@ -319,7 +300,7 @@ func TestServerCORS(t *testing.T) { } func TestServerNoContentType(t *testing.T) { - apm, err := setupServer(t, nil, nil, nil) + apm, err := setupServer(t, nil, nil) require.NoError(t, err) defer apm.Stop() @@ -330,76 +311,8 @@ func TestServerNoContentType(t *testing.T) { assert.Equal(t, http.StatusAccepted, rsp.StatusCode) } -func TestServerSourcemapElasticsearch(t *testing.T) { - for name, tc := range map[string]struct { - expected elasticsearch.Hosts - config m - outputConfig m - }{ - "nil": { - expected: nil, - config: m{}, - }, - "esConfigured": { - expected: elasticsearch.Hosts{"localhost:5200"}, - config: m{ - "rum": m{ - "enabled": "true", - "source_mapping.elasticsearch.hosts": []string{"localhost:5200"}, - }, - }, - }, - "esFromOutput": { - expected: elasticsearch.Hosts{"localhost:5201"}, - config: m{ - "rum": m{ - "enabled": "true", - }, - }, - outputConfig: m{ - "elasticsearch": m{ - "enabled": true, - "hosts": []string{"localhost:5201"}, - }, - }, - }, - "esOutputDisabled": { - expected: nil, - config: m{ - "rum": m{ - "enabled": "true", - }, - }, - outputConfig: m{ - "elasticsearch": m{ - "enabled": false, - "hosts": []string{"localhost:5202"}, - }, - }, - }, - } { - t.Run(name, func(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(tc.config) - require.NoError(t, err) - - var beatConfig beat.BeatConfig - ocfg, err := agentconfig.NewConfigFrom(tc.outputConfig) - require.NoError(t, err) - require.NoError(t, beatConfig.Output.Unpack(ocfg)) - - apm, err := setupServer(t, ucfg, &beatConfig, nil) - require.NoError(t, err) - defer apm.Stop() - - if tc.expected != nil { - assert.Equal(t, tc.expected, apm.config.RumConfig.SourceMapping.ESConfig.Hosts) - } - }) - } -} - func TestServerJaegerGRPC(t *testing.T) { - server, err := setupServer(t, nil, nil, nil) + server, err := setupServer(t, nil, nil) require.NoError(t, err) defer server.Stop() @@ -416,9 +329,10 @@ func TestServerJaegerGRPC(t *testing.T) { } func TestServerOTLPGRPC(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(m{"auth.secret_token": "abc123"}) - assert.NoError(t, err) - server, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.auth.secret_token": "abc123", + }) + server, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer server.Stop() @@ -465,50 +379,42 @@ func TestServerConfigReload(t *testing.T) { cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ // Set an invalid host to illustrate that the static config // is not used for defining the listening address. - "host": "testing.invalid:123", + "apm-server.host": "testing.invalid:123", }) - apmBeat, cfg := newBeat(t, cfg, nil, nil) + apmBeat, cfg := newBeat(t, cfg, nil) apmBeat.Manager = &mockManager{enabled: true} - beater, err := newTestBeater(t, apmBeat, cfg, nil) + beater, err := newTestBeater(t, apmBeat, cfg) require.NoError(t, err) - require.NotNil(t, apmBeat.OutputConfigReloader) beater.start() - - // Now that the beater is running, send config changes. The reloader - // is not registered until after the beater starts running, so we - // must loop until it is set. - var reloadable reload.ReloadableList - for { - // The Reloader is not registered until after the beat has started running. - reloadable = reload.Register.GetReloadableList("inputs") - if reloadable != nil { - break - } - time.Sleep(10 * time.Millisecond) - } + reloadInputs, reloadOutput := waitReloaders() // The config must contain an "apm-server" section, and will be rejected otherwise. - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.NewConfig()}}) + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.NewConfig()}}) assert.EqualError(t, err, "'apm-server' not found in integration config") + // Load initial output config, otherwise the server will not do anything with the config. + err = reloadOutput.Reload(&reload.ConfigWithMeta{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "elasticsearch.hosts": []string{"http://testing.invalid"}, + }), + }) + assert.NoError(t, err) + // Creating the socket listener is performed synchronously in the Reload method // to ensure zero downtime when reloading an already running server. Illustrate // that the socket listener is created synhconously in Reload by attempting to // reload with an invalid host. - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ - "apm-server": map[string]interface{}{ - "host": "testing.invalid:123", - }, + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.host": "testing.invalid:123", })}}) require.Error(t, err) assert.Regexp(t, "listen tcp: lookup testing.invalid.*", err.Error()) - inputConfig := agentconfig.MustNewConfigFrom(map[string]interface{}{ - "apm-server": map[string]interface{}{ - "host": "localhost:0", - }, - }) - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: inputConfig}}) + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.host": "localhost:0", + }), + }}) require.NoError(t, err) healthcheck := func(addr string) string { @@ -526,8 +432,12 @@ func TestServerConfigReload(t *testing.T) { assert.NotEmpty(t, healthcheck(addr1)) // non-empty as there's no auth required // Reload config, causing the HTTP server to be restarted. - require.NoError(t, inputConfig.SetString("apm-server.auth.secret_token", -1, "secret")) - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: inputConfig}}) + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.host": "localhost:0", + "apm-server.auth.secret_token": "secret", + }), + }}) require.NoError(t, err) addr2, err := beater.waitListenAddr(1 * time.Second) @@ -539,7 +449,11 @@ func TestServerConfigReload(t *testing.T) { assert.Error(t, err) // Reload output config, should also cause HTTP server to be restarted. - err = apmBeat.OutputConfigReloader.Reload(&reload.ConfigWithMeta{Config: agentconfig.NewConfig()}) + err = reloadOutput.Reload(&reload.ConfigWithMeta{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "elasticsearch.hosts": []string{"http://testing.invalid"}, + }), + }) assert.NoError(t, err) addr3, err := beater.waitListenAddr(1 * time.Second) @@ -564,7 +478,7 @@ func TestServerOutputConfigReload(t *testing.T) { }() reload.Register = reload.NewRegistry() - apmBeat, cfg := newBeat(t, nil, nil, nil) + apmBeat, cfg := newBeat(t, nil, nil) apmBeat.Manager = &mockManager{enabled: true} runServerCalls := make(chan ServerParams, 1) @@ -581,21 +495,15 @@ func TestServerOutputConfigReload(t *testing.T) { require.NoError(t, err) t.Cleanup(beater.Stop) go beater.Run(apmBeat) + reloadInputs, reloadOutput := waitReloaders() - // Now that the beater is running, send config changes. The reloader - // is not registered until after the beater starts running, so we - // must loop until it is set. - var reloadable reload.ReloadableList - for { - // The Reloader is not registered until after the beat has started running. - reloadable = reload.Register.GetReloadableList("inputs") - if reloadable != nil { - break - } - time.Sleep(10 * time.Millisecond) - } + reloadOutput.Reload(&reload.ConfigWithMeta{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "elasticsearch.enabled": true, + }), + }) - inputConfig := agentconfig.MustNewConfigFrom(map[string]interface{}{ + reloadInputs.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ "data_stream.namespace": "custom", "apm-server": map[string]interface{}{ "host": "localhost:0", @@ -606,16 +514,14 @@ func TestServerOutputConfigReload(t *testing.T) { }}, }, }, - }) - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: inputConfig}}) - require.NoError(t, err) + })}}) runServerArgs := <-runServerCalls assert.Equal(t, "", runServerArgs.Config.Sampling.Tail.ESConfig.Username) assert.Equal(t, "custom", runServerArgs.Namespace) // Reloaded output config should be passed into apm-server config. - err = apmBeat.OutputConfigReloader.Reload(&reload.ConfigWithMeta{ + reloadOutput.Reload(&reload.ConfigWithMeta{ Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ "elasticsearch.username": "updated", }), @@ -651,18 +557,23 @@ func TestServerWaitForIntegrationKibana(t *testing.T) { defer srv.Close() cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ - "wait_ready_interval": "100ms", - "kibana.enabled": true, - "kibana.host": srv.URL, + "apm-server": map[string]interface{}{ + "wait_ready_interval": "100ms", + "kibana.enabled": true, + "kibana.host": srv.URL, + }, + // Configure the console output, to disable the Elasticsearch + // integration package installation check. + "output.console.enabled": true, }) // newBeat sets `data_streams.wait_for_integration: false`, // remove it so we test the default behaviour. - apmBeat, cfg := newBeat(t, cfg, nil, nil) + apmBeat, cfg := newBeat(t, cfg, nil) removed, err := cfg.Remove("data_streams.wait_for_integration", -1) require.NoError(t, err) require.True(t, removed) - beater, err := setupBeater(t, apmBeat, cfg, nil) + beater, err := setupBeater(t, apmBeat, cfg) require.NoError(t, err) timeout := time.After(10 * time.Second) @@ -717,24 +628,22 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { srv := httptest.NewServer(mux) defer srv.Close() - cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"wait_ready_interval": "100ms"}) - var beatConfig beat.BeatConfig - err := beatConfig.Output.Unpack(agentconfig.MustNewConfigFrom(map[string]interface{}{ - "elasticsearch": map[string]interface{}{ + cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.wait_ready_interval": "100ms", + "output.elasticsearch": map[string]interface{}{ "hosts": []string{srv.URL}, "backoff": map[string]interface{}{"init": "10ms", "max": "10ms"}, "max_retries": 1000, }, - })) - require.NoError(t, err) + }) // newBeat sets `data_streams.wait_for_integration: false`, // remove it so we test the default behaviour. - apmBeat, cfg := newBeat(t, cfg, &beatConfig, nil) + apmBeat, cfg := newBeat(t, cfg, nil) removed, err := cfg.Remove("data_streams.wait_for_integration", -1) require.NoError(t, err) require.True(t, removed) - beater, err := setupBeater(t, apmBeat, cfg, &beatConfig) + beater, err := setupBeater(t, apmBeat, cfg) require.NoError(t, err) // Send some events to the server. They should be accepted and enqueued. @@ -809,22 +718,18 @@ func TestServerFailedPreconditionDoesNotIndex(t *testing.T) { srv := httptest.NewServer(mux) defer srv.Close() - cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"wait_ready_interval": "100ms"}) - var beatConfig beat.BeatConfig - err := beatConfig.Output.Unpack(agentconfig.MustNewConfigFrom(map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "hosts": []string{srv.URL}, - }, - })) - require.NoError(t, err) + cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.wait_ready_interval": "100ms", + "output.elasticsearch.hosts": []string{srv.URL}, + }) // newBeat sets `data_streams.wait_for_integration: false`, // remove it so we test the default behaviour. - apmBeat, cfg := newBeat(t, cfg, &beatConfig, nil) + apmBeat, cfg := newBeat(t, cfg, nil) removed, err := cfg.Remove("data_streams.wait_for_integration", -1) require.NoError(t, err) require.True(t, removed) - beater, err := setupBeater(t, apmBeat, cfg, &beatConfig) + beater, err := setupBeater(t, apmBeat, cfg) require.NoError(t, err) // Send some events to the server. They should be accepted and enqueued. @@ -888,40 +793,24 @@ func TestServerElasticsearchOutput(t *testing.T) { monitoring.Default.Remove("libbeat.whatever") monitoring.NewInt(monitoring.Default, "libbeat.whatever") - apmBeat, cfg := newBeat(t, nil, nil, nil) + apmBeat, cfg := newBeat(t, nil, nil) apmBeat.Manager = &mockManager{enabled: true} - beater, err := newTestBeater(t, apmBeat, cfg, nil) + beater, err := newTestBeater(t, apmBeat, cfg) require.NoError(t, err) beater.start() + reloadInputs, reloadOutput := waitReloaders() - // Reload output config to show that apm-server will switch to the - // output dynamically. - err = apmBeat.OutputConfigReloader.Reload(&reload.ConfigWithMeta{ - Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "hosts": []string{srv.URL}, - "flush_bytes": "1kb", // test data is >1kb - "backoff": map[string]interface{}{"init": "1ms", "max": "1ms"}, - "max_retries": 0, - }, - }), - }) - assert.NoError(t, err) - - // Now that the beater is running, send config changes. The reloader - // is not registered until after the beater starts running, so we - // must loop until it is set. - var reloadable reload.ReloadableList - for { - // The Reloader is not registered until after the beat has started running. - reloadable = reload.Register.GetReloadableList("inputs") - if reloadable != nil { - break - } - time.Sleep(10 * time.Millisecond) - } + err = reloadOutput.Reload(&reload.ConfigWithMeta{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": []string{srv.URL}, + "flush_bytes": "1kb", // test data is >1kb + "backoff": map[string]interface{}{"init": "1ms", "max": "1ms"}, + "max_retries": 0, + }, + })}) + require.NoError(t, err) - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ "apm-server": map[string]interface{}{ "host": "localhost:0", }, @@ -934,7 +823,7 @@ func TestServerElasticsearchOutput(t *testing.T) { req := makeTransactionRequest(t, "http://"+listenAddr) req.Header.Add("Content-Type", "application/x-ndjson") resp, err := http.DefaultClient.Do(req) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, http.StatusAccepted, resp.StatusCode) resp.Body.Close() @@ -983,9 +872,8 @@ func TestServerElasticsearchOutput(t *testing.T) { } func TestServerPProf(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(m{"pprof.enabled": true}) - assert.NoError(t, err) - server, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"apm-server.pprof.enabled": true}) + server, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer server.Stop() @@ -1001,51 +889,16 @@ func TestServerPProf(t *testing.T) { } } -type dummyOutputClient struct { -} - -func (d *dummyOutputClient) Publish(_ context.Context, batch pubs.Batch) error { - batch.ACK() - return nil -} -func (d *dummyOutputClient) Close() error { return nil } -func (d *dummyOutputClient) String() string { return "" } - -func dummyPipeline(cfg *agentconfig.C, info beat.Info, clients ...outputs.Client) *pipeline.Pipeline { - if len(clients) == 0 { - clients = []outputs.Client{&dummyOutputClient{}} - } - if cfg == nil { - cfg = agentconfig.NewConfig() - } - processors, err := processing.MakeDefaultSupport(false)(info, logp.NewLogger("testbeat"), cfg) - if err != nil { - panic(err) - } - p, err := pipeline.New( - info, - pipeline.Monitors{}, - func(lis queue.ACKListener) (queue.Queue, error) { - return memqueue.NewQueue(nil, memqueue.Settings{ - ACKListener: lis, - Events: 20, - }), nil - }, - outputs.Group{ - Clients: clients, - BatchSize: 5, - Retry: 0, // no retry. on error drop events - }, - pipeline.Settings{ - WaitClose: 0, - WaitCloseMode: pipeline.NoWaitOnClose, - Processors: processors, - }, - ) - if err != nil { - panic(err) +func waitReloaders() (reloadInputs reload.ReloadableList, reloadOutput reload.Reloadable) { + for { + // The reloaders are not registered until after the beat has started running. + reloadInputs = reload.Register.GetReloadableList("inputs") + reloadOutput = reload.Register.GetReloadable("output") + if reloadInputs != nil && reloadOutput != nil { + return reloadInputs, reloadOutput + } + time.Sleep(10 * time.Millisecond) } - return p } var testData = func() []byte { diff --git a/internal/beater/tracing.go b/internal/beater/tracing.go index 8a6dffc015e..97c633e3267 100644 --- a/internal/beater/tracing.go +++ b/internal/beater/tracing.go @@ -18,7 +18,6 @@ package beater import ( - "context" "net" "net/http" @@ -31,29 +30,7 @@ import ( "github.com/elastic/apm-server/internal/model" ) -type tracerServer struct { - server *http.Server - logger *logp.Logger - requests <-chan tracerServerRequest -} - -func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer, error) { - requests := make(chan tracerServerRequest) - processBatch := model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error { - result := make(chan error, 1) - request := tracerServerRequest{ctx: ctx, batch: batch, res: result} - select { - case <-ctx.Done(): - return ctx.Err() - case requests <- request: - } - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-result: - return err - } - }) +func newTracerServer(listener net.Listener, logger *logp.Logger, batchProcessor model.BatchProcessor) (*http.Server, error) { cfg := config.DefaultConfig() ratelimitStore, err := ratelimit.NewStore(1, 1, 1) // unused, arbitrary params if err != nil { @@ -65,7 +42,7 @@ func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer, } mux, err := api.NewMux( cfg, - processBatch, + batchProcessor, authenticator, newAgentConfigFetcher(cfg, nil /* kibana client */), ratelimitStore, @@ -76,46 +53,11 @@ func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer, if err != nil { return nil, err } - server := &http.Server{ + return &http.Server{ Handler: mux, IdleTimeout: cfg.IdleTimeout, ReadTimeout: cfg.ReadTimeout, WriteTimeout: cfg.WriteTimeout, MaxHeaderBytes: cfg.MaxHeaderSize, - } - go func() { - if err := server.Serve(listener); err != http.ErrServerClosed { - logger.Error(err.Error()) - } - }() - return &tracerServer{ - server: server, - logger: logger, - requests: requests, }, nil } - -// Close closes the tracerServer's listener. -func (s *tracerServer) Close() error { - return s.server.Shutdown(context.Background()) -} - -// serve serves batch processing requests for the tracer server. -// -// This may be called multiple times in series, but not concurrently. -func (s *tracerServer) serve(ctx context.Context, batchProcessor model.BatchProcessor) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case req := <-s.requests: - req.res <- batchProcessor.ProcessBatch(req.ctx, req.batch) - } - } -} - -type tracerServerRequest struct { - ctx context.Context - batch *model.Batch - res chan<- error -} diff --git a/internal/beater/tracing_test.go b/internal/beater/tracing_test.go index ae34d89459a..fd926ac1fda 100644 --- a/internal/beater/tracing_test.go +++ b/internal/beater/tracing_test.go @@ -34,13 +34,25 @@ func TestServerTracingEnabled(t *testing.T) { for _, enabled := range []bool{false, true} { t.Run(fmt.Sprint(enabled), func(t *testing.T) { - cfg := agentconfig.MustNewConfigFrom(m{ - "host": "localhost:0", + cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ "instrumentation.enabled": enabled, + + // The output instrumentation may send transactions for + // bulk operations, e.g. there will be "flush" transactions + // send by modelindexer for _bulk requests. When the server + // sends traces to itself, it will enter a state where it + // continues to regularly send traces to itself from the + // traced output. + // + // TODO(axw) we should consider having a separate processor + // pipeline (including output) with no tracing. For now, we + // set a short shutdown timeout so that if an trace events + // are not consumed, they will not block shutdown. + "apm-server.shutdown_timeout": "1ns", }) docs := make(chan []byte, 10) - beater, err := setupServer(t, cfg, nil, docs) + beater, err := setupServer(t, cfg, docs) require.NoError(t, err) // Make an HTTP request to the server, which should be traced diff --git a/internal/beatcmd/supporter_factory.go b/internal/idxmgmt/supporter_factory.go similarity index 96% rename from internal/beatcmd/supporter_factory.go rename to internal/idxmgmt/supporter_factory.go index 9c62e1226ff..3551d450766 100644 --- a/internal/beatcmd/supporter_factory.go +++ b/internal/idxmgmt/supporter_factory.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package beatcmd +package idxmgmt import ( "github.com/pkg/errors" @@ -32,10 +32,10 @@ import ( "github.com/elastic/apm-server/internal/logs" ) -// newSupporter creates a new idxmgmt.Supporter which directs all events +// NewSupporter creates a new idxmgmt.Supporter which directs all events // to data streams. The given root config will be checked for deprecated/removed // configuration, and if any are present warnings will be logged. -func newSupporter(log *logp.Logger, info beat.Info, configRoot *config.C) idxmgmt.Supporter { +func NewSupporter(log *logp.Logger, info beat.Info, configRoot *config.C) idxmgmt.Supporter { if log == nil { log = logp.NewLogger(logs.IndexManagement) } else { diff --git a/internal/beatcmd/supporter_factory_test.go b/internal/idxmgmt/supporter_factory_test.go similarity index 96% rename from internal/beatcmd/supporter_factory_test.go rename to internal/idxmgmt/supporter_factory_test.go index 2d4eb43e83d..6555995bcc9 100644 --- a/internal/beatcmd/supporter_factory_test.go +++ b/internal/idxmgmt/supporter_factory_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package beatcmd +package idxmgmt import ( "testing" @@ -37,7 +37,7 @@ import ( ) func TestNewSupporter(t *testing.T) { - supporter := newSupporter(nil, beat.Info{}, config.MustNewConfigFrom(map[string]interface{}{})) + supporter := NewSupporter(nil, beat.Info{}, config.MustNewConfigFrom(map[string]interface{}{})) // The data streams supporter does not set up templates or ILM. These // are expected to be set up externally, typically by Fleet. @@ -82,7 +82,7 @@ func TestNewSupporterWarnings(t *testing.T) { "setup.template.pattern": "custom", } - newSupporter(logger, beat.Info{}, config.MustNewConfigFrom(attrs)) + NewSupporter(logger, beat.Info{}, config.MustNewConfigFrom(attrs)) var warnings []string for _, record := range observed.All() {