From 3669a3508afc683946375a5621d5ce54f909b06b Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 30 Sep 2022 21:51:08 +0800 Subject: [PATCH] Move libbeat output and instrumentation to beater Move initialisation of libbeat output (publisher/pipeline) and instrumentation (APM tracer) to package beater. Some of the complexity around reloading is reduced as now have full control over creating the libbeat output. Similarly, we can reduce complexity related to the "tracer server" by running it per reloaded server. --- internal/beatcmd/beat.go | 76 +--- internal/beatcmd/beat_test.go | 24 +- internal/beatcmd/maxprocs.go | 17 + internal/beatcmd/maxprocs_test.go | 17 + internal/beatcmd/testcmd.go | 6 +- internal/beater/beater.go | 387 ++++++++---------- internal/beater/beater_test.go | 82 ++-- internal/beater/integration_test.go | 2 +- internal/beater/server_test.go | 385 ++++++----------- internal/beater/tracing.go | 64 +-- internal/beater/tracing_test.go | 18 +- .../{beatcmd => idxmgmt}/supporter_factory.go | 6 +- .../supporter_factory_test.go | 6 +- 13 files changed, 419 insertions(+), 671 deletions(-) rename internal/{beatcmd => idxmgmt}/supporter_factory.go (96%) rename internal/{beatcmd => idxmgmt}/supporter_factory_test.go (96%) diff --git a/internal/beatcmd/beat.go b/internal/beatcmd/beat.go index 7acd1c368a9..2ec7fbfe322 100644 --- a/internal/beatcmd/beat.go +++ b/internal/beatcmd/beat.go @@ -37,14 +37,11 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" - "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/licenser" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/report" "github.com/elastic/beats/v7/libbeat/monitoring/report/log" - "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" - "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/file" "github.com/elastic/elastic-agent-libs/logp" @@ -67,10 +64,6 @@ const ( defaultMonitoringUsername = "apm_system" ) -var ( - libbeatMetricsRegistry = monitoring.Default.GetRegistry("libbeat") -) - // Beat provides the runnable and configurable instance of a beat. type Beat struct { beat.Beat @@ -130,8 +123,7 @@ func NewBeat(args BeatParams) (*Beat, error) { return b, nil } -// init initializes logging, tracing ("instrumentation"), monitoring, config -// management, and GOMAXPROCS. +// init initializes logging, config management, GOMAXPROCS, and GC percent. func (b *Beat) init() error { if err := configure.Logging(b.Info.Beat, b.Config.Logging); err != nil { return fmt.Errorf("error initializing logging: %w", err) @@ -139,14 +131,6 @@ func (b *Beat) init() error { // log paths values to help with troubleshooting logp.Info(paths.Paths.String()) - // instrumentation.New expects a config object with "instrumentation" - // as a child, so create a new config with instrumentation added. - instrumentation, err := instrumentation.New(b.rawConfig, b.Info.Beat, b.Info.Version) - if err != nil { - return err - } - b.Instrumentation = instrumentation - // Load the unique ID and "first start" info from meta.json. metaPath := paths.Resolve(paths.Data, "meta.json") if err := b.loadMeta(metaPath); err != nil { @@ -155,10 +139,11 @@ func (b *Beat) init() error { logp.Info("Beat ID: %v", b.Info.ID) // Initialize central config manager. - b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID) + manager, err := management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID) if err != nil { return err } + b.Manager = manager if maxProcs := b.Config.MaxProcs; maxProcs > 0 { logp.Info("Set max procs limit: %v", maxProcs) @@ -283,22 +268,6 @@ func (b *Beat) createBeater(beatCreator beat.Creator) (beat.Beater, error) { logp.Info("output is configured through central management") } - monitors := pipeline.Monitors{ - Metrics: libbeatMetricsRegistry, - Telemetry: monitoring.GetNamespace("state").GetRegistry(), - Logger: logp.L().Named("publisher"), - Tracer: b.Instrumentation.Tracer(), - } - outputFactory := b.makeOutputFactory(b.Config.Output) - publisher, err := pipeline.Load(b.Info, monitors, pipeline.Config{}, nopProcessingSupporter{}, outputFactory) - if err != nil { - return nil, fmt.Errorf("error initializing publisher: %w", err) - } - b.Publisher = publisher - - // TODO(axw) pass registry into BeatParams, for testing purposes. - reload.Register.MustRegister("output", b.makeOutputReloader(publisher.OutputReloader())) - return beatCreator(&b.Beat, b.Config.APMServer) } @@ -397,7 +366,6 @@ func (b *Beat) Run(ctx context.Context) error { go func() { <-ctx.Done() - b.Instrumentation.Tracer().Close() beater.Stop() }() svc.HandleSignals(cancel, cancel) @@ -488,34 +456,6 @@ func (b *Beat) registerElasticsearchVersionCheck() (func(), error) { return func() { elasticsearch.DeregisterGlobalCallback(uuid) }, nil } -func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable { - return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error { - if b.OutputConfigReloader != nil { - if err := b.OutputConfigReloader.Reload(config); err != nil { - return err - } - } - return outReloader.Reload(config, b.createOutput) - }) -} - -func (b *Beat) makeOutputFactory( - cfg config.Namespace, -) func(outputs.Observer) (string, outputs.Group, error) { - return func(outStats outputs.Observer) (string, outputs.Group, error) { - out, err := b.createOutput(outStats, cfg) - return cfg.Name(), out, err - } -} - -func (b *Beat) createOutput(stats outputs.Observer, cfg config.Namespace) (outputs.Group, error) { - if !cfg.IsSet() { - return outputs.Group{}, nil - } - indexSupporter := newSupporter(nil, b.Info, b.rawConfig) - return outputs.Load(indexSupporter, b.Info, stats, cfg.Name(), cfg.Config()) -} - func (b *Beat) registerClusterUUIDFetching() (func(), error) { callback := b.clusterUUIDFetchingCallback() uuid, err := elasticsearch.RegisterConnectCallback(callback) @@ -658,13 +598,3 @@ func logSystemInfo(info beat.Info) { } } } - -type nopProcessingSupporter struct{} - -func (nopProcessingSupporter) Close() error { - return nil -} - -func (nopProcessingSupporter) Create(cfg beat.ProcessingConfig, _ bool) (beat.Processor, error) { - return cfg.Processor, nil -} diff --git a/internal/beatcmd/beat_test.go b/internal/beatcmd/beat_test.go index 7010248d9f2..02733b8ddde 100644 --- a/internal/beatcmd/beat_test.go +++ b/internal/beatcmd/beat_test.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package beatcmd import ( @@ -7,14 +24,15 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" ) // TestRunMaxProcs ensures Beat.Run calls the GOMAXPROCS adjustment code by looking for log messages. diff --git a/internal/beatcmd/maxprocs.go b/internal/beatcmd/maxprocs.go index 63da1a2baba..c7a6480c471 100644 --- a/internal/beatcmd/maxprocs.go +++ b/internal/beatcmd/maxprocs.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package beatcmd import ( diff --git a/internal/beatcmd/maxprocs_test.go b/internal/beatcmd/maxprocs_test.go index cedc475c257..9f711f947ae 100644 --- a/internal/beatcmd/maxprocs_test.go +++ b/internal/beatcmd/maxprocs_test.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package beatcmd import ( diff --git a/internal/beatcmd/testcmd.go b/internal/beatcmd/testcmd.go index bb3ddfd6027..ffef2e6686c 100644 --- a/internal/beatcmd/testcmd.go +++ b/internal/beatcmd/testcmd.go @@ -22,10 +22,12 @@ import ( "github.com/spf13/cobra" - beaterconfig "github.com/elastic/apm-server/internal/beater/config" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/testing" + + beaterconfig "github.com/elastic/apm-server/internal/beater/config" + "github.com/elastic/apm-server/internal/idxmgmt" ) func genTestCmd(beatParams BeatParams) *cobra.Command { @@ -70,7 +72,7 @@ func newTestOutputCommand(beatParams BeatParams) *cobra.Command { if err != nil { return err } - indexSupporter := newSupporter(nil, beat.Info, beat.rawConfig) + indexSupporter := idxmgmt.NewSupporter(nil, beat.Info, beat.rawConfig) output, err := outputs.Load( indexSupporter, beat.Info, nil, beat.Config.Output.Name(), beat.Config.Output.Config(), ) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index e186dcc303b..38afc6ce342 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -43,8 +43,11 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/licenser" + "github.com/elastic/beats/v7/libbeat/outputs" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -60,6 +63,7 @@ import ( javaattacher "github.com/elastic/apm-server/internal/beater/java_attacher" "github.com/elastic/apm-server/internal/beater/ratelimit" "github.com/elastic/apm-server/internal/elasticsearch" + "github.com/elastic/apm-server/internal/idxmgmt" "github.com/elastic/apm-server/internal/kibana" "github.com/elastic/apm-server/internal/logs" "github.com/elastic/apm-server/internal/model" @@ -96,29 +100,28 @@ type CreatorParams struct { // NewCreator returns a new beat.Creator which creates beaters // using the provided CreatorParams. func NewCreator(args CreatorParams) beat.Creator { - return func(b *beat.Beat, ucfg *agentconfig.C) (beat.Beater, error) { + return func(b *beat.Beat, apmServerConfig *agentconfig.C) (beat.Beater, error) { logger := args.Logger if logger != nil { logger = logger.Named(logs.Beater) } else { logger = logp.NewLogger(logs.Beater) } + bt := &beater{ - rawConfig: ucfg, - stopped: false, - logger: logger, - wrapServer: args.WrapServer, - waitPublished: publish.NewWaitPublishedAcker(), - outputConfigReloader: newChanReloader(), - libbeatMonitoringRegistry: monitoring.Default.GetRegistry("libbeat"), + apmServerConfig: apmServerConfig, + rootConfig: (*agentconfig.C)(((*ucfg.Config)(apmServerConfig)).Parent()), + stopped: false, + logger: logger, + wrapServer: args.WrapServer, } var elasticsearchOutputConfig *agentconfig.C - if hasElasticsearchOutput(b) { + if b.Config != nil && b.Config.Output.Name() == "elasticsearch" { elasticsearchOutputConfig = b.Config.Output.Config() } var err error - bt.config, err = config.NewConfig(bt.rawConfig, elasticsearchOutputConfig) + bt.config, err = config.NewConfig(bt.apmServerConfig, elasticsearchOutputConfig) if err != nil { return nil, err } @@ -132,26 +135,16 @@ func NewCreator(args CreatorParams) beat.Creator { } } - if b.Manager != nil && b.Manager.Enabled() { - // Subscribe to output changes for reconfiguring apm-server's Elasticsearch - // clients, which use the Elasticsearch output config by default. We install - // this during beat creation to ensure output config reloads are not missed; - // reloads will be blocked until the chanReloader is served by beater.run. - b.OutputConfigReloader = bt.outputConfigReloader - } - return bt, nil } } type beater struct { - rawConfig *agentconfig.C - config *config.Config - logger *logp.Logger - wrapServer WrapServerFunc - waitPublished *publish.WaitPublishedAcker - outputConfigReloader *chanReloader - libbeatMonitoringRegistry *monitoring.Registry + apmServerConfig *agentconfig.C + rootConfig *agentconfig.C + config *config.Config + logger *logp.Logger + wrapServer WrapServerFunc mutex sync.Mutex // guards stopServer and stopped stopServer func() @@ -163,24 +156,10 @@ type beater struct { func (bt *beater) Run(b *beat.Beat) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if err := bt.run(ctx, cancel, b); err != nil { - return err - } - return bt.waitPublished.Wait(ctx) + return bt.run(ctx, cancel, b) } func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b *beat.Beat) error { - tracer, tracerServer, err := initTracing(b, bt.config, bt.logger) - if err != nil { - return err - } - if tracerServer != nil { - defer tracerServer.Close() - } - if tracer != nil { - defer tracer.Close() - } - // add deprecation warning if running on a 32-bit system if runtime.GOARCH == "386" { bt.logger.Warn("deprecation notice: support for 32-bit system target architecture will be removed in an upcoming version") @@ -189,13 +168,9 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * reloader := reloader{ runServerContext: ctx, args: sharedServerRunnerParams{ - Beat: b, - WrapServer: bt.wrapServer, - Logger: bt.logger, - Tracer: tracer, - TracerServer: tracerServer, - Acker: bt.waitPublished, - LibbeatMonitoringRegistry: bt.libbeatMonitoringRegistry, + Beat: b, + WrapServer: bt.wrapServer, + Logger: bt.logger, }, } @@ -213,19 +188,10 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * return nil } - g, ctx := errgroup.WithContext(context.Background()) - g.Go(func() error { - <-stopped - return nil - }) if b.Manager != nil && b.Manager.Enabled() { // Managed by Agent: register input and output reloaders to reconfigure the server. reload.Register.MustRegisterList("inputs", &reloader) - g.Go(func() error { - return bt.outputConfigReloader.serve( - ctx, reload.ReloadableFunc(reloader.reloadOutput), - ) - }) + reload.Register.MustRegister("output", reload.ReloadableFunc(reloader.reloadOutput)) // Start the manager after all the hooks are initialized // and defined this ensure reloading consistency.. @@ -236,7 +202,8 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * } else { // Management disabled, use statically defined config. - reloader.rawConfig = bt.rawConfig + reloader.apmServerConfig = bt.apmServerConfig + reloader.rootConfig = bt.rootConfig if b.Config != nil { reloader.outputConfig = b.Config.Output } @@ -247,7 +214,8 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * return err } } - return g.Wait() + <-stopped + return nil } // setStopServerFunc sets a function to call when the server is stopped. @@ -267,11 +235,12 @@ type reloader struct { runServerContext context.Context args sharedServerRunnerParams - mu sync.Mutex - rawConfig *agentconfig.C - outputConfig agentconfig.Namespace - fleetConfig *config.Fleet - runner *serverRunner + mu sync.Mutex + rootConfig *agentconfig.C + apmServerConfig *agentconfig.C + outputConfig agentconfig.Namespace + fleetConfig *config.Fleet + runner *serverRunner } func (r *reloader) stop() { @@ -298,17 +267,21 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { r.mu.Lock() defer r.mu.Unlock() - r.rawConfig = integrationConfig.APMServer + r.rootConfig = cfg.Config + r.apmServerConfig = integrationConfig.APMServer + // Merge in datastream namespace passed in from apm integration if integrationConfig.DataStream != nil && integrationConfig.DataStream.Namespace != "" { c := agentconfig.MustNewConfigFrom(map[string]interface{}{ "data_streams.namespace": integrationConfig.DataStream.Namespace, }) - if r.rawConfig, err = agentconfig.MergeConfigs(r.rawConfig, c); err != nil { + r.apmServerConfig, err = agentconfig.MergeConfigs(r.apmServerConfig, c) + if err != nil { return err } } r.fleetConfig = &integrationConfig.Fleet + return r.reload() } @@ -330,14 +303,15 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { // serverRunner (if any) to exit. Calls to reload must be sycnhronized explicitly // by acquiring reloader#mu by callers. func (r *reloader) reload() error { - if r.rawConfig == nil { + if r.apmServerConfig == nil || !r.outputConfig.IsSet() { // APM Server config not loaded yet. return nil } runner, err := newServerRunner(r.runServerContext, serverRunnerParams{ sharedServerRunnerParams: r.args, - RawConfig: r.rawConfig, + RootConfig: r.rootConfig, + APMServerConfig: r.apmServerConfig, FleetConfig: r.fleetConfig, OutputConfig: r.outputConfig, }) @@ -388,36 +362,31 @@ type serverRunner struct { started chan struct{} done chan struct{} - pipeline beat.PipelineConnector - acker *publish.WaitPublishedAcker namespace string config *config.Config - rawConfig *agentconfig.C + rootConfig *agentconfig.C + apmServerConfig *agentconfig.C + outputConfig agentconfig.Namespace elasticsearchOutputConfig *agentconfig.C fleetConfig *config.Fleet beat *beat.Beat logger *logp.Logger - tracer *apm.Tracer - tracerServer *tracerServer wrapServer WrapServerFunc - libbeatMonitoringRegistry *monitoring.Registry } type serverRunnerParams struct { sharedServerRunnerParams - RawConfig *agentconfig.C - FleetConfig *config.Fleet - OutputConfig agentconfig.Namespace + RootConfig *agentconfig.C + APMServerConfig *agentconfig.C + FleetConfig *config.Fleet + OutputConfig agentconfig.Namespace } type sharedServerRunnerParams struct { Beat *beat.Beat WrapServer WrapServerFunc Logger *logp.Logger - Tracer *apm.Tracer - TracerServer *tracerServer - Acker *publish.WaitPublishedAcker LibbeatMonitoringRegistry *monitoring.Registry } @@ -427,7 +396,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne esOutputConfig = args.OutputConfig.Config() } - cfg, err := config.NewConfig(args.RawConfig, esOutputConfig) + cfg, err := config.NewConfig(args.APMServerConfig, esOutputConfig) if err != nil { return nil, err } @@ -441,18 +410,15 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne started: make(chan struct{}), config: cfg, - rawConfig: args.RawConfig, + rootConfig: args.RootConfig, + apmServerConfig: args.APMServerConfig, + outputConfig: args.OutputConfig, elasticsearchOutputConfig: esOutputConfig, fleetConfig: args.FleetConfig, - acker: args.Acker, - pipeline: args.Beat.Publisher, namespace: cfg.DataStreams.Namespace, beat: args.Beat, logger: args.Logger, - tracer: args.Tracer, - tracerServer: args.TracerServer, wrapServer: args.WrapServer, - libbeatMonitoringRegistry: args.LibbeatMonitoringRegistry, }, nil } @@ -467,14 +433,16 @@ func (s *serverRunner) run(listener net.Listener) error { kibanaClient = kibana.NewConnectingClient(s.config.Kibana.ClientConfig) } - cfg := ucfg.Config(*s.rawConfig) - parentCfg := cfg.Parent() // Check for an environment variable set when running in a cloud environment eac := os.Getenv("ELASTIC_AGENT_CLOUD") if eac != "" && s.config.Kibana.Enabled { // Don't block server startup sending the config. go func() { - if err := kibana.SendConfig(s.runServerContext, kibanaClient, parentCfg); err != nil { + if err := kibana.SendConfig( + s.runServerContext, + kibanaClient, + (*ucfg.Config)(s.rootConfig), + ); err != nil { s.logger.Infof("failed to upload config to kibana: %v", err) } }() @@ -498,6 +466,17 @@ func (s *serverRunner) run(listener net.Listener) error { } } + instrumentation, err := instrumentation.New(s.rootConfig, s.beat.Info.Beat, s.beat.Info.Version) + if err != nil { + return err + } + tracer := instrumentation.Tracer() + tracerServerListener := instrumentation.Listener() + if tracerServerListener != nil { + defer tracerServerListener.Close() + } + defer tracer.Close() + g, ctx := errgroup.WithContext(s.runServerContext) // Ensure the libbeat output and go-elasticsearch clients do not index @@ -505,7 +484,7 @@ func (s *serverRunner) run(listener net.Listener) error { publishReady := make(chan struct{}) drain := make(chan struct{}) g.Go(func() error { - if err := s.waitReady(ctx, kibanaClient); err != nil { + if err := s.waitReady(ctx, kibanaClient, tracer); err != nil { // One or more preconditions failed; drop events. close(drain) return errors.Wrap(err, "error waiting for server to be ready") @@ -560,13 +539,6 @@ func (s *serverRunner) run(listener net.Listener) error { sourcemapFetcher = cachingFetcher } - // Create the runServer function. We start with newBaseRunServer, and then - // wrap depending on the configuration in order to inject behaviour. - runServer := newBaseRunServer(listener) - if s.tracerServer != nil { - runServer = runServerWithTracerServer(runServer, s.tracerServer, s.tracer) - } - authenticator, err := auth.NewAuthenticator(s.config.AgentAuth) if err != nil { return err @@ -585,7 +557,7 @@ func (s *serverRunner) run(listener net.Listener) error { // even if TLS is enabled, as TLS is handled by the net/http server. gRPCLogger := s.logger.Named("grpc") grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor( - apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(s.tracer)), + apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)), interceptors.ClientMetadata(), interceptors.Logging(gRPCLogger), interceptors.Metrics(gRPCLogger), @@ -596,7 +568,10 @@ func (s *serverRunner) run(listener net.Listener) error { // Create the BatchProcessor chain that is used to process all events, // including the metrics aggregated by APM Server. - finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor(newElasticsearchClient) + finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor( + tracer, + newElasticsearchClient, + ) if err != nil { return err } @@ -627,13 +602,15 @@ func (s *serverRunner) run(listener net.Listener) error { return agentConfigReporter.Run(ctx) }) + // Create the runServer function. We start with newBaseRunServer, and then + // wrap depending on the configuration in order to inject behaviour. serverParams := ServerParams{ UUID: s.beat.Info.ID, Config: s.config, Managed: s.beat.Manager != nil && s.beat.Manager.Enabled(), Namespace: s.namespace, Logger: s.logger, - Tracer: s.tracer, + Tracer: tracer, Authenticator: authenticator, RateLimitStore: ratelimitStore, BatchProcessor: batchProcessor, @@ -644,6 +621,7 @@ func (s *serverRunner) run(listener net.Listener) error { NewElasticsearchClient: newElasticsearchClient, GRPCServer: grpcServer, } + runServer := newBaseRunServer(listener) if s.wrapServer != nil { // Wrap the serverParams and runServer function, enabling // injection of behaviour into the processing chain. @@ -678,9 +656,26 @@ func (s *serverRunner) run(listener net.Listener) error { } serverParams.BatchProcessor = append(preBatchProcessors, serverParams.BatchProcessor) + // Start the main server and the optional server for self-instrumentation. g.Go(func() error { return runServer(ctx, serverParams) }) + if tracerServerListener != nil { + tracerServer, err := newTracerServer(tracerServerListener, s.logger, serverParams.BatchProcessor) + if err != nil { + return fmt.Errorf("failed to create self-instrumentation server: %w", err) + } + g.Go(func() error { + if err := tracerServer.Serve(tracerServerListener); err != http.ErrServerClosed { + return err + } + return nil + }) + go func() { + <-ctx.Done() + tracerServer.Shutdown(s.backgroundContext) + }() + } // Signal that the runner has started close(s.started) @@ -693,7 +688,11 @@ func (s *serverRunner) run(listener net.Listener) error { } // waitReady waits until the server is ready to index events. -func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client) error { +func (s *serverRunner) waitReady( + ctx context.Context, + kibanaClient kibana.Client, + tracer *apm.Tracer, +) error { var preconditions []func(context.Context) error var esOutputClient elasticsearch.Client if s.elasticsearchOutputConfig != nil { @@ -765,37 +764,32 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client } return nil } - return waitReady(ctx, s.config.WaitReadyInterval, s.tracer, s.logger, check) + return waitReady(ctx, s.config.WaitReadyInterval, tracer, s.logger, check) } // newFinalBatchProcessor returns the final model.BatchProcessor that publishes events, // and a cleanup function which should be called on server shutdown. If the output is // "elasticsearch", then we use modelindexer; otherwise we use the libbeat publisher. func (s *serverRunner) newFinalBatchProcessor( + tracer *apm.Tracer, newElasticsearchClient func(cfg *elasticsearch.Config) (elasticsearch.Client, error), ) (model.BatchProcessor, func(context.Context) error, error) { + + monitoring.Default.Remove("libbeat") + libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat") if s.elasticsearchOutputConfig == nil { - // When the publisher stops cleanly it will close its pipeline client, - // calling the acker's Close method. We need to call Open for each new - // publisher to ensure we wait for all clients and enqueued events to - // be closed at shutdown time. - s.acker.Open() - pipeline := pipetool.WithACKer(s.pipeline, s.acker) - publisher, err := publish.NewPublisher(pipeline, s.tracer) - if err != nil { - return nil, nil, err - } - // We only want to restore the previous libbeat registry if the output - // has a name, otherwise, keep the libbeat registry as is. This is to - // account for cases where the output config may be sent empty by the - // Elastic Agent. - if s.beat.Config != nil && s.beat.Config.Output.Name() != "" { - monitoring.Default.Remove("libbeat") - monitoring.Default.Add("libbeat", s.libbeatMonitoringRegistry, monitoring.Full) - } - return publisher, publisher.Stop, nil + return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry) } + stateRegistry := monitoring.GetNamespace("state").GetRegistry() + outputRegistry := stateRegistry.GetRegistry("output") + if outputRegistry != nil { + outputRegistry.Clear() + } else { + outputRegistry = stateRegistry.NewRegistry("output") + } + monitoring.NewString(outputRegistry, "name").Set("elasticsearch") + var esConfig struct { *elasticsearch.Config `config:",inline"` FlushBytes string `config:"flush_bytes"` @@ -824,7 +818,7 @@ func (s *serverRunner) newFinalBatchProcessor( CompressionLevel: esConfig.CompressionLevel, FlushBytes: flushBytes, FlushInterval: esConfig.FlushInterval, - Tracer: s.tracer, + Tracer: tracer, MaxRequests: esConfig.MaxRequests, }) if err != nil { @@ -834,16 +828,15 @@ func (s *serverRunner) newFinalBatchProcessor( // Install our own libbeat-compatible metrics callback which uses the modelindexer stats. // All the metrics below are required to be reported to be able to display all relevant // fields in the Stack Monitoring UI. - monitoring.Default.Remove("libbeat") - monitoring.NewFunc(monitoring.Default, "libbeat.output.write", func(_ monitoring.Mode, v monitoring.Visitor) { + monitoring.NewFunc(libbeatMonitoringRegistry, "output.write", func(_ monitoring.Mode, v monitoring.Visitor) { v.OnRegistryStart() defer v.OnRegistryFinished() v.OnKey("bytes") v.OnInt(indexer.Stats().BytesTotal) }) - outputType := monitoring.NewString(monitoring.Default.GetRegistry("libbeat.output"), "type") + outputType := monitoring.NewString(libbeatMonitoringRegistry.GetRegistry("output"), "type") outputType.Set("elasticsearch") - monitoring.NewFunc(monitoring.Default, "libbeat.output.events", func(_ monitoring.Mode, v monitoring.Visitor) { + monitoring.NewFunc(libbeatMonitoringRegistry, "output.events", func(_ monitoring.Mode, v monitoring.Visitor) { v.OnRegistryStart() defer v.OnRegistryFinished() stats := indexer.Stats() @@ -860,7 +853,7 @@ func (s *serverRunner) newFinalBatchProcessor( v.OnKey("total") v.OnInt(stats.Added) }) - monitoring.NewFunc(monitoring.Default, "libbeat.pipeline.events", func(_ monitoring.Mode, v monitoring.Visitor) { + monitoring.NewFunc(libbeatMonitoringRegistry, "pipeline.events", func(_ monitoring.Mode, v monitoring.Visitor) { v.OnRegistryStart() defer v.OnRegistryFinished() v.OnKey("total") @@ -879,23 +872,46 @@ func (s *serverRunner) newFinalBatchProcessor( return indexer, indexer.Close, nil } -func hasElasticsearchOutput(b *beat.Beat) bool { - return b.Config != nil && b.Config.Output.Name() == "elasticsearch" -} - -func initTracing(b *beat.Beat, cfg *config.Config, logger *logp.Logger) (*apm.Tracer, *tracerServer, error) { - tracer := b.Instrumentation.Tracer() - listener := b.Instrumentation.Listener() - - var tracerServer *tracerServer - if listener != nil { - var err error - tracerServer, err = newTracerServer(listener, logger) - if err != nil { - return nil, nil, err +func (s *serverRunner) newLibbeatFinalBatchProcessor( + tracer *apm.Tracer, + libbeatMonitoringRegistry *monitoring.Registry, +) (model.BatchProcessor, func(context.Context) error, error) { + // When the publisher stops cleanly it will close its pipeline client, + // calling the acker's Close method and unblock Wait. + acker := publish.NewWaitPublishedAcker() + acker.Open() + + monitors := pipeline.Monitors{ + Metrics: libbeatMonitoringRegistry, + Telemetry: monitoring.GetNamespace("state").GetRegistry(), + Logger: logp.L().Named("publisher"), + Tracer: tracer, + } + outputFactory := func(stats outputs.Observer) (string, outputs.Group, error) { + if !s.outputConfig.IsSet() { + return "", outputs.Group{}, nil + } + indexSupporter := idxmgmt.NewSupporter(nil, s.beat.Info, s.apmServerConfig) + outputName := s.outputConfig.Name() + output, err := outputs.Load(indexSupporter, s.beat.Info, stats, outputName, s.outputConfig.Config()) + return outputName, output, err + } + pipeline, err := pipeline.Load(s.beat.Info, monitors, pipeline.Config{}, nopProcessingSupporter{}, outputFactory) + if err != nil { + return nil, nil, fmt.Errorf("failed to create libbeat output pipeline: %w", err) + } + pipelineConnector := pipetool.WithACKer(pipeline, acker) + publisher, err := publish.NewPublisher(pipelineConnector, tracer) + if err != nil { + return nil, nil, err + } + stop := func(ctx context.Context) error { + if err := publisher.Stop(ctx); err != nil { + return err } + return acker.Wait(ctx) } - return tracer, tracerServer, nil + return publisher, stop, nil } // Stop stops the beater gracefully. @@ -909,26 +925,10 @@ func (bt *beater) Stop() { "stopping apm-server... waiting maximum of %v seconds for queues to drain", bt.config.ShutdownTimeout.Seconds(), ) - bt.outputConfigReloader.cancel() bt.stopServer() bt.stopped = true } -// runServerWithTracerServer wraps runServer such that it also runs -// tracerServer, stopping it and the tracer when the server shuts down. -func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServer, tracer *apm.Tracer) RunServerFunc { - return func(ctx context.Context, args ServerParams) error { - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - return tracerServer.serve(ctx, args.BatchProcessor) - }) - g.Go(func() error { - return runServer(ctx, args) - }) - return g.Wait() - } -} - func newSourcemapFetcher( beatInfo beat.Info, cfg config.SourceMapping, @@ -1004,65 +1004,6 @@ func newSourcemapFetcher( return chained, nil } -// chanReloader implements libbeat/common/reload.Reloadable, converting -// Reload calls into requests send to a channel consumed by serve. -type chanReloader struct { - ctx context.Context - cancel context.CancelFunc - ch chan reloadRequest -} - -func newChanReloader() *chanReloader { - ctx, cancel := context.WithCancel(context.Background()) - ch := make(chan reloadRequest) - return &chanReloader{ctx, cancel, ch} -} - -type reloadRequest struct { - cfg *reload.ConfigWithMeta - result chan<- error -} - -// Reload sends a reload request to r.ch, which is consumed by another -// goroutine running r.serve. Reload blocks until serve has handled the -// reload request, or until the reloader's context has been cancelled. -func (r *chanReloader) Reload(cfg *reload.ConfigWithMeta) error { - result := make(chan error, 1) - select { - case <-r.ctx.Done(): - return r.ctx.Err() - case r.ch <- reloadRequest{cfg: cfg, result: result}: - } - select { - case <-r.ctx.Done(): - return r.ctx.Err() - case err := <-result: - return err - } -} - -// serve handles reload requests enqueued by Reload, returning when either -// ctx or r.ctx are cancelled. -func (r *chanReloader) serve(ctx context.Context, reloader reload.Reloadable) error { - for { - select { - case <-r.ctx.Done(): - return r.ctx.Err() - case <-ctx.Done(): - return ctx.Err() - case req := <-r.ch: - err := reloader.Reload(req.cfg) - select { - case <-r.ctx.Done(): - return r.ctx.Err() - case <-ctx.Done(): - return ctx.Err() - case req.result <- err: - } - } - } -} - // TODO: This is copying behavior from libbeat: // https://github.com/elastic/beats/blob/b9ced47dba8bb55faa3b2b834fd6529d3c4d0919/libbeat/cmd/instance/beat.go#L927-L950 // Remove this when cluster_uuid no longer needs to be queried from ES. @@ -1117,3 +1058,13 @@ func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error s.Set(response.ClusterUUID) return nil } + +type nopProcessingSupporter struct{} + +func (nopProcessingSupporter) Close() error { + return nil +} + +func (nopProcessingSupporter) Create(cfg beat.ProcessingConfig, _ bool) (beat.Processor, error) { + return cfg.Processor, nil +} diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index fd1dfb9b813..e997f12fc45 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -46,9 +46,6 @@ import ( "github.com/elastic/apm-server/internal/model" "github.com/elastic/apm-server/internal/model/modelindexer/modelindexertest" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/idxmgmt" - "github.com/elastic/beats/v7/libbeat/instrumentation" - "github.com/elastic/beats/v7/libbeat/outputs" agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -65,15 +62,15 @@ type testBeater struct { client *http.Client } -func setupServer(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docsOut chan<- []byte) (*testBeater, error) { +func setupServer(t *testing.T, cfg *agentconfig.C, docsOut chan<- []byte) (*testBeater, error) { if testing.Short() { t.Skip("skipping server test") } - apmBeat, cfg := newBeat(t, cfg, beatConfig, docsOut) - return setupBeater(t, apmBeat, cfg, beatConfig) + apmBeat, cfg := newBeat(t, cfg, docsOut) + return setupBeater(t, apmBeat, cfg) } -func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docsOut chan<- []byte) (*beat.Beat, *agentconfig.C) { +func newBeat(t *testing.T, cfg *agentconfig.C, docsOut chan<- []byte) (*beat.Beat, *agentconfig.C) { info := beat.Info{ Beat: "test-apm-server", IndexPrefix: "test-apm-server", @@ -82,17 +79,44 @@ func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docs } combinedConfig := agentconfig.MustNewConfigFrom(map[string]interface{}{ - "host": "localhost:0", + "apm-server": map[string]interface{}{ + "host": "localhost:0", - // Disable waiting for integration to be installed by default, - // to simplify tests. This feature is tested independently. - "data_streams.wait_for_integration": false, + // Disable waiting for integration to be installed by default, + // to simplify tests. This feature is tested independently. + "data_streams.wait_for_integration": false, + }, }) if cfg != nil { require.NoError(t, cfg.Unpack(combinedConfig)) } - var pub beat.Pipeline + // If docsOut is non-nil, we configure the Elasticsearch output. + // + // If no output is configured and docsOut is nil, then we use the + // Elasticsearch output and consume and drop the documents. + var unpacked struct { + APMServer *agentconfig.C `config:"apm-server"` + Output agentconfig.Namespace `config:"output"` + } + err := combinedConfig.Unpack(&unpacked) + require.NoError(t, err) + if !unpacked.Output.IsSet() && docsOut == nil { + docs := make(chan []byte) + docsOut = docs + stop := make(chan struct{}) + t.Cleanup(func() { close(stop) }) + go func() { + for { + select { + case <-stop: + return + case <-docs: + } + } + }() + } + if docsOut != nil { // Clear the state monitoring registry to avoid panicking due // to double registration. @@ -100,10 +124,6 @@ func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docs stateRegistry.Clear() defer stateRegistry.Clear() - // beatConfig must be nil if an event channel is supplied. - require.Nil(t, beatConfig) - beatConfig = &beat.BeatConfig{} - mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Elastic-Product", "Elasticsearch") @@ -128,8 +148,8 @@ func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docs srv := httptest.NewServer(mux) t.Cleanup(srv.Close) - err := beatConfig.Output.Unpack(agentconfig.MustNewConfigFrom(map[string]interface{}{ - "elasticsearch": map[string]interface{}{ + err := combinedConfig.Merge(agentconfig.MustNewConfigFrom(map[string]interface{}{ + "output.elasticsearch": map[string]interface{}{ "enabled": true, "hosts": []string{srv.URL}, "flush_bytes": "1", // no delay @@ -137,37 +157,24 @@ func newBeat(t *testing.T, cfg *agentconfig.C, beatConfig *beat.BeatConfig, docs }, })) require.NoError(t, err) - } - if beatConfig != nil && beatConfig.Output.Name() == "elasticsearch" { - // capture events using the configured elasticsearch output - supporter, err := idxmgmt.DefaultSupport(logp.NewLogger("beater_test"), info, nil) + // Update unpacked.Output + err = combinedConfig.Unpack(&unpacked) require.NoError(t, err) - outputGroup, err := outputs.Load(supporter, info, nil, "elasticsearch", beatConfig.Output.Config()) - require.NoError(t, err) - pub = dummyPipeline(cfg, info, outputGroup.Clients...) - } else { - // don't capture events - pub = dummyPipeline(cfg, info) } - instrumentation, err := instrumentation.New(combinedConfig, info.Beat, info.Version) - require.NoError(t, err) return &beat.Beat{ - Publisher: pub, - Info: info, - Config: beatConfig, - Instrumentation: instrumentation, - }, combinedConfig + Info: info, + Config: &beat.BeatConfig{Output: unpacked.Output}, + }, unpacked.APMServer } func setupBeater( t *testing.T, apmBeat *beat.Beat, ucfg *agentconfig.C, - beatConfig *beat.BeatConfig, ) (*testBeater, error) { - tb, err := newTestBeater(t, apmBeat, ucfg, beatConfig) + tb, err := newTestBeater(t, apmBeat, ucfg) if err != nil { return nil, err } @@ -190,7 +197,6 @@ func newTestBeater( t *testing.T, apmBeat *beat.Beat, ucfg *agentconfig.C, - beatConfig *beat.BeatConfig, ) (*testBeater, error) { core, observedLogs := observer.New(zapcore.DebugLevel) diff --git a/internal/beater/integration_test.go b/internal/beater/integration_test.go index 3f7dbab6b07..70e230e953c 100644 --- a/internal/beater/integration_test.go +++ b/internal/beater/integration_test.go @@ -135,7 +135,7 @@ func TestPublishIntegration(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // fresh APM Server for each run docsChan := make(chan []byte) - apm, err := setupServer(t, nil, nil, docsChan) + apm, err := setupServer(t, nil, docsChan) require.NoError(t, err) defer apm.Stop() diff --git a/internal/beater/server_test.go b/internal/beater/server_test.go index ad307780016..066082d209d 100644 --- a/internal/beater/server_test.go +++ b/internal/beater/server_test.go @@ -46,28 +46,20 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/management" - "github.com/elastic/beats/v7/libbeat/outputs" - pubs "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/pipeline" - "github.com/elastic/beats/v7/libbeat/publisher/processing" - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + _ "github.com/elastic/beats/v7/libbeat/outputs/console" + _ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" agentconfig "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/apm-server/internal/beater/api" "github.com/elastic/apm-server/internal/beater/config" - "github.com/elastic/apm-server/internal/elasticsearch" ) -type m map[string]interface{} - func TestServerOk(t *testing.T) { - apm, err := setupServer(t, nil, nil, nil) + apm, err := setupServer(t, nil, nil) require.NoError(t, err) defer apm.Stop() @@ -80,7 +72,7 @@ func TestServerOk(t *testing.T) { } func TestServerRoot(t *testing.T) { - apm, err := setupServer(t, nil, nil, nil) + apm, err := setupServer(t, nil, nil) require.NoError(t, err) defer apm.Stop() @@ -133,9 +125,8 @@ func TestServerRoot(t *testing.T) { func TestServerRootWithToken(t *testing.T) { token := "verysecret" badToken := "Verysecret" - ucfg, err := agentconfig.NewConfigFrom(m{"auth.secret_token": token}) - assert.NoError(t, err) - apm, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"apm-server.auth.secret_token": token}) + apm, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer apm.Stop() @@ -174,11 +165,8 @@ func TestServerTcpNoPort(t *testing.T) { t.Error(err) } } - ucfg, err := agentconfig.NewConfigFrom(map[string]interface{}{ - "host": "localhost", - }) - assert.NoError(t, err) - btr, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"apm-server.host": "localhost"}) + btr, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer btr.Stop() @@ -203,9 +191,8 @@ func TestServerOkUnix(t *testing.T) { } addr := tmpTestUnix(t) - ucfg, err := agentconfig.NewConfigFrom(map[string]interface{}{"host": "unix:" + addr}) - assert.NoError(t, err) - btr, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"apm-server.host": "unix:" + addr}) + btr, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer btr.Stop() @@ -217,7 +204,7 @@ func TestServerOkUnix(t *testing.T) { } func TestServerHealth(t *testing.T) { - apm, err := setupServer(t, nil, nil, nil) + apm, err := setupServer(t, nil, nil) require.NoError(t, err) defer apm.Stop() @@ -230,9 +217,13 @@ func TestServerHealth(t *testing.T) { } func TestServerRumSwitch(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(m{"rum": m{"enabled": true, "allow_origins": []string{"*"}}}) - assert.NoError(t, err) - apm, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.rum": map[string]interface{}{ + "enabled": true, + "allow_origins": []string{"*"}, + }, + }) + apm, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer apm.Stop() @@ -244,20 +235,6 @@ func TestServerRumSwitch(t *testing.T) { } } -func TestServerSourcemapBadConfig(t *testing.T) { - // TODO(axw) fix this, it shouldn't be possible - // to create config with an empty hosts list. - t.Skip("test is broken, config is no longer invalid") - - ucfg, err := agentconfig.NewConfigFrom( - m{"rum": m{"enabled": true, "source_mapping": m{"elasticsearch": m{"hosts": []string{}}}}}, - ) - require.NoError(t, err) - s, err := setupServer(t, ucfg, nil, nil) - require.Nil(t, s) - require.Error(t, err) -} - func TestServerCORS(t *testing.T) { tests := []struct { expectedStatus int @@ -298,9 +275,13 @@ func TestServerCORS(t *testing.T) { for idx, test := range tests { t.Run(fmt.Sprint(idx), func(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(m{"rum": m{"enabled": true, "allow_origins": test.allowedOrigins}}) - assert.NoError(t, err) - apm, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.rum": map[string]interface{}{ + "enabled": true, + "allow_origins": test.allowedOrigins, + }, + }) + apm, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer apm.Stop() @@ -319,7 +300,7 @@ func TestServerCORS(t *testing.T) { } func TestServerNoContentType(t *testing.T) { - apm, err := setupServer(t, nil, nil, nil) + apm, err := setupServer(t, nil, nil) require.NoError(t, err) defer apm.Stop() @@ -330,76 +311,8 @@ func TestServerNoContentType(t *testing.T) { assert.Equal(t, http.StatusAccepted, rsp.StatusCode) } -func TestServerSourcemapElasticsearch(t *testing.T) { - for name, tc := range map[string]struct { - expected elasticsearch.Hosts - config m - outputConfig m - }{ - "nil": { - expected: nil, - config: m{}, - }, - "esConfigured": { - expected: elasticsearch.Hosts{"localhost:5200"}, - config: m{ - "rum": m{ - "enabled": "true", - "source_mapping.elasticsearch.hosts": []string{"localhost:5200"}, - }, - }, - }, - "esFromOutput": { - expected: elasticsearch.Hosts{"localhost:5201"}, - config: m{ - "rum": m{ - "enabled": "true", - }, - }, - outputConfig: m{ - "elasticsearch": m{ - "enabled": true, - "hosts": []string{"localhost:5201"}, - }, - }, - }, - "esOutputDisabled": { - expected: nil, - config: m{ - "rum": m{ - "enabled": "true", - }, - }, - outputConfig: m{ - "elasticsearch": m{ - "enabled": false, - "hosts": []string{"localhost:5202"}, - }, - }, - }, - } { - t.Run(name, func(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(tc.config) - require.NoError(t, err) - - var beatConfig beat.BeatConfig - ocfg, err := agentconfig.NewConfigFrom(tc.outputConfig) - require.NoError(t, err) - require.NoError(t, beatConfig.Output.Unpack(ocfg)) - - apm, err := setupServer(t, ucfg, &beatConfig, nil) - require.NoError(t, err) - defer apm.Stop() - - if tc.expected != nil { - assert.Equal(t, tc.expected, apm.config.RumConfig.SourceMapping.ESConfig.Hosts) - } - }) - } -} - func TestServerJaegerGRPC(t *testing.T) { - server, err := setupServer(t, nil, nil, nil) + server, err := setupServer(t, nil, nil) require.NoError(t, err) defer server.Stop() @@ -416,9 +329,10 @@ func TestServerJaegerGRPC(t *testing.T) { } func TestServerOTLPGRPC(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(m{"auth.secret_token": "abc123"}) - assert.NoError(t, err) - server, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.auth.secret_token": "abc123", + }) + server, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer server.Stop() @@ -465,50 +379,42 @@ func TestServerConfigReload(t *testing.T) { cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ // Set an invalid host to illustrate that the static config // is not used for defining the listening address. - "host": "testing.invalid:123", + "apm-server.host": "testing.invalid:123", }) - apmBeat, cfg := newBeat(t, cfg, nil, nil) + apmBeat, cfg := newBeat(t, cfg, nil) apmBeat.Manager = &mockManager{enabled: true} - beater, err := newTestBeater(t, apmBeat, cfg, nil) + beater, err := newTestBeater(t, apmBeat, cfg) require.NoError(t, err) - require.NotNil(t, apmBeat.OutputConfigReloader) beater.start() - - // Now that the beater is running, send config changes. The reloader - // is not registered until after the beater starts running, so we - // must loop until it is set. - var reloadable reload.ReloadableList - for { - // The Reloader is not registered until after the beat has started running. - reloadable = reload.Register.GetReloadableList("inputs") - if reloadable != nil { - break - } - time.Sleep(10 * time.Millisecond) - } + reloadInputs, reloadOutput := waitReloaders() // The config must contain an "apm-server" section, and will be rejected otherwise. - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.NewConfig()}}) + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.NewConfig()}}) assert.EqualError(t, err, "'apm-server' not found in integration config") + // Load initial output config, otherwise the server will not do anything with the config. + err = reloadOutput.Reload(&reload.ConfigWithMeta{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "elasticsearch.hosts": []string{"http://testing.invalid"}, + }), + }) + assert.NoError(t, err) + // Creating the socket listener is performed synchronously in the Reload method // to ensure zero downtime when reloading an already running server. Illustrate // that the socket listener is created synhconously in Reload by attempting to // reload with an invalid host. - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ - "apm-server": map[string]interface{}{ - "host": "testing.invalid:123", - }, + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.host": "testing.invalid:123", })}}) require.Error(t, err) assert.Regexp(t, "listen tcp: lookup testing.invalid.*", err.Error()) - inputConfig := agentconfig.MustNewConfigFrom(map[string]interface{}{ - "apm-server": map[string]interface{}{ - "host": "localhost:0", - }, - }) - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: inputConfig}}) + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.host": "localhost:0", + }), + }}) require.NoError(t, err) healthcheck := func(addr string) string { @@ -526,8 +432,12 @@ func TestServerConfigReload(t *testing.T) { assert.NotEmpty(t, healthcheck(addr1)) // non-empty as there's no auth required // Reload config, causing the HTTP server to be restarted. - require.NoError(t, inputConfig.SetString("apm-server.auth.secret_token", -1, "secret")) - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: inputConfig}}) + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.host": "localhost:0", + "apm-server.auth.secret_token": "secret", + }), + }}) require.NoError(t, err) addr2, err := beater.waitListenAddr(1 * time.Second) @@ -539,7 +449,11 @@ func TestServerConfigReload(t *testing.T) { assert.Error(t, err) // Reload output config, should also cause HTTP server to be restarted. - err = apmBeat.OutputConfigReloader.Reload(&reload.ConfigWithMeta{Config: agentconfig.NewConfig()}) + err = reloadOutput.Reload(&reload.ConfigWithMeta{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "elasticsearch.hosts": []string{"http://testing.invalid"}, + }), + }) assert.NoError(t, err) addr3, err := beater.waitListenAddr(1 * time.Second) @@ -564,7 +478,7 @@ func TestServerOutputConfigReload(t *testing.T) { }() reload.Register = reload.NewRegistry() - apmBeat, cfg := newBeat(t, nil, nil, nil) + apmBeat, cfg := newBeat(t, nil, nil) apmBeat.Manager = &mockManager{enabled: true} runServerCalls := make(chan ServerParams, 1) @@ -581,21 +495,15 @@ func TestServerOutputConfigReload(t *testing.T) { require.NoError(t, err) t.Cleanup(beater.Stop) go beater.Run(apmBeat) + reloadInputs, reloadOutput := waitReloaders() - // Now that the beater is running, send config changes. The reloader - // is not registered until after the beater starts running, so we - // must loop until it is set. - var reloadable reload.ReloadableList - for { - // The Reloader is not registered until after the beat has started running. - reloadable = reload.Register.GetReloadableList("inputs") - if reloadable != nil { - break - } - time.Sleep(10 * time.Millisecond) - } + reloadOutput.Reload(&reload.ConfigWithMeta{ + Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "elasticsearch.enabled": true, + }), + }) - inputConfig := agentconfig.MustNewConfigFrom(map[string]interface{}{ + reloadInputs.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ "data_stream.namespace": "custom", "apm-server": map[string]interface{}{ "host": "localhost:0", @@ -606,16 +514,14 @@ func TestServerOutputConfigReload(t *testing.T) { }}, }, }, - }) - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: inputConfig}}) - require.NoError(t, err) + })}}) runServerArgs := <-runServerCalls assert.Equal(t, "", runServerArgs.Config.Sampling.Tail.ESConfig.Username) assert.Equal(t, "custom", runServerArgs.Namespace) // Reloaded output config should be passed into apm-server config. - err = apmBeat.OutputConfigReloader.Reload(&reload.ConfigWithMeta{ + reloadOutput.Reload(&reload.ConfigWithMeta{ Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ "elasticsearch.username": "updated", }), @@ -651,18 +557,23 @@ func TestServerWaitForIntegrationKibana(t *testing.T) { defer srv.Close() cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ - "wait_ready_interval": "100ms", - "kibana.enabled": true, - "kibana.host": srv.URL, + "apm-server": map[string]interface{}{ + "wait_ready_interval": "100ms", + "kibana.enabled": true, + "kibana.host": srv.URL, + }, + // Configure the console output, to disable the Elasticsearch + // integration package installation check. + "output.console.enabled": true, }) // newBeat sets `data_streams.wait_for_integration: false`, // remove it so we test the default behaviour. - apmBeat, cfg := newBeat(t, cfg, nil, nil) + apmBeat, cfg := newBeat(t, cfg, nil) removed, err := cfg.Remove("data_streams.wait_for_integration", -1) require.NoError(t, err) require.True(t, removed) - beater, err := setupBeater(t, apmBeat, cfg, nil) + beater, err := setupBeater(t, apmBeat, cfg) require.NoError(t, err) timeout := time.After(10 * time.Second) @@ -717,24 +628,22 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { srv := httptest.NewServer(mux) defer srv.Close() - cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"wait_ready_interval": "100ms"}) - var beatConfig beat.BeatConfig - err := beatConfig.Output.Unpack(agentconfig.MustNewConfigFrom(map[string]interface{}{ - "elasticsearch": map[string]interface{}{ + cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.wait_ready_interval": "100ms", + "output.elasticsearch": map[string]interface{}{ "hosts": []string{srv.URL}, "backoff": map[string]interface{}{"init": "10ms", "max": "10ms"}, "max_retries": 1000, }, - })) - require.NoError(t, err) + }) // newBeat sets `data_streams.wait_for_integration: false`, // remove it so we test the default behaviour. - apmBeat, cfg := newBeat(t, cfg, &beatConfig, nil) + apmBeat, cfg := newBeat(t, cfg, nil) removed, err := cfg.Remove("data_streams.wait_for_integration", -1) require.NoError(t, err) require.True(t, removed) - beater, err := setupBeater(t, apmBeat, cfg, &beatConfig) + beater, err := setupBeater(t, apmBeat, cfg) require.NoError(t, err) // Send some events to the server. They should be accepted and enqueued. @@ -809,22 +718,18 @@ func TestServerFailedPreconditionDoesNotIndex(t *testing.T) { srv := httptest.NewServer(mux) defer srv.Close() - cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"wait_ready_interval": "100ms"}) - var beatConfig beat.BeatConfig - err := beatConfig.Output.Unpack(agentconfig.MustNewConfigFrom(map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "hosts": []string{srv.URL}, - }, - })) - require.NoError(t, err) + cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "apm-server.wait_ready_interval": "100ms", + "output.elasticsearch.hosts": []string{srv.URL}, + }) // newBeat sets `data_streams.wait_for_integration: false`, // remove it so we test the default behaviour. - apmBeat, cfg := newBeat(t, cfg, &beatConfig, nil) + apmBeat, cfg := newBeat(t, cfg, nil) removed, err := cfg.Remove("data_streams.wait_for_integration", -1) require.NoError(t, err) require.True(t, removed) - beater, err := setupBeater(t, apmBeat, cfg, &beatConfig) + beater, err := setupBeater(t, apmBeat, cfg) require.NoError(t, err) // Send some events to the server. They should be accepted and enqueued. @@ -888,40 +793,24 @@ func TestServerElasticsearchOutput(t *testing.T) { monitoring.Default.Remove("libbeat.whatever") monitoring.NewInt(monitoring.Default, "libbeat.whatever") - apmBeat, cfg := newBeat(t, nil, nil, nil) + apmBeat, cfg := newBeat(t, nil, nil) apmBeat.Manager = &mockManager{enabled: true} - beater, err := newTestBeater(t, apmBeat, cfg, nil) + beater, err := newTestBeater(t, apmBeat, cfg) require.NoError(t, err) beater.start() + reloadInputs, reloadOutput := waitReloaders() - // Reload output config to show that apm-server will switch to the - // output dynamically. - err = apmBeat.OutputConfigReloader.Reload(&reload.ConfigWithMeta{ - Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ - "elasticsearch": map[string]interface{}{ - "hosts": []string{srv.URL}, - "flush_bytes": "1kb", // test data is >1kb - "backoff": map[string]interface{}{"init": "1ms", "max": "1ms"}, - "max_retries": 0, - }, - }), - }) - assert.NoError(t, err) - - // Now that the beater is running, send config changes. The reloader - // is not registered until after the beater starts running, so we - // must loop until it is set. - var reloadable reload.ReloadableList - for { - // The Reloader is not registered until after the beat has started running. - reloadable = reload.Register.GetReloadableList("inputs") - if reloadable != nil { - break - } - time.Sleep(10 * time.Millisecond) - } + err = reloadOutput.Reload(&reload.ConfigWithMeta{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + "elasticsearch": map[string]interface{}{ + "hosts": []string{srv.URL}, + "flush_bytes": "1kb", // test data is >1kb + "backoff": map[string]interface{}{"init": "1ms", "max": "1ms"}, + "max_retries": 0, + }, + })}) + require.NoError(t, err) - err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ + err = reloadInputs.Reload([]*reload.ConfigWithMeta{{Config: agentconfig.MustNewConfigFrom(map[string]interface{}{ "apm-server": map[string]interface{}{ "host": "localhost:0", }, @@ -934,7 +823,7 @@ func TestServerElasticsearchOutput(t *testing.T) { req := makeTransactionRequest(t, "http://"+listenAddr) req.Header.Add("Content-Type", "application/x-ndjson") resp, err := http.DefaultClient.Do(req) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, http.StatusAccepted, resp.StatusCode) resp.Body.Close() @@ -983,9 +872,8 @@ func TestServerElasticsearchOutput(t *testing.T) { } func TestServerPProf(t *testing.T) { - ucfg, err := agentconfig.NewConfigFrom(m{"pprof.enabled": true}) - assert.NoError(t, err) - server, err := setupServer(t, ucfg, nil, nil) + ucfg := agentconfig.MustNewConfigFrom(map[string]interface{}{"apm-server.pprof.enabled": true}) + server, err := setupServer(t, ucfg, nil) require.NoError(t, err) defer server.Stop() @@ -1001,51 +889,16 @@ func TestServerPProf(t *testing.T) { } } -type dummyOutputClient struct { -} - -func (d *dummyOutputClient) Publish(_ context.Context, batch pubs.Batch) error { - batch.ACK() - return nil -} -func (d *dummyOutputClient) Close() error { return nil } -func (d *dummyOutputClient) String() string { return "" } - -func dummyPipeline(cfg *agentconfig.C, info beat.Info, clients ...outputs.Client) *pipeline.Pipeline { - if len(clients) == 0 { - clients = []outputs.Client{&dummyOutputClient{}} - } - if cfg == nil { - cfg = agentconfig.NewConfig() - } - processors, err := processing.MakeDefaultSupport(false)(info, logp.NewLogger("testbeat"), cfg) - if err != nil { - panic(err) - } - p, err := pipeline.New( - info, - pipeline.Monitors{}, - func(lis queue.ACKListener) (queue.Queue, error) { - return memqueue.NewQueue(nil, memqueue.Settings{ - ACKListener: lis, - Events: 20, - }), nil - }, - outputs.Group{ - Clients: clients, - BatchSize: 5, - Retry: 0, // no retry. on error drop events - }, - pipeline.Settings{ - WaitClose: 0, - WaitCloseMode: pipeline.NoWaitOnClose, - Processors: processors, - }, - ) - if err != nil { - panic(err) +func waitReloaders() (reloadInputs reload.ReloadableList, reloadOutput reload.Reloadable) { + for { + // The reloaders are not registered until after the beat has started running. + reloadInputs = reload.Register.GetReloadableList("inputs") + reloadOutput = reload.Register.GetReloadable("output") + if reloadInputs != nil && reloadOutput != nil { + return reloadInputs, reloadOutput + } + time.Sleep(10 * time.Millisecond) } - return p } var testData = func() []byte { diff --git a/internal/beater/tracing.go b/internal/beater/tracing.go index 8a6dffc015e..97c633e3267 100644 --- a/internal/beater/tracing.go +++ b/internal/beater/tracing.go @@ -18,7 +18,6 @@ package beater import ( - "context" "net" "net/http" @@ -31,29 +30,7 @@ import ( "github.com/elastic/apm-server/internal/model" ) -type tracerServer struct { - server *http.Server - logger *logp.Logger - requests <-chan tracerServerRequest -} - -func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer, error) { - requests := make(chan tracerServerRequest) - processBatch := model.ProcessBatchFunc(func(ctx context.Context, batch *model.Batch) error { - result := make(chan error, 1) - request := tracerServerRequest{ctx: ctx, batch: batch, res: result} - select { - case <-ctx.Done(): - return ctx.Err() - case requests <- request: - } - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-result: - return err - } - }) +func newTracerServer(listener net.Listener, logger *logp.Logger, batchProcessor model.BatchProcessor) (*http.Server, error) { cfg := config.DefaultConfig() ratelimitStore, err := ratelimit.NewStore(1, 1, 1) // unused, arbitrary params if err != nil { @@ -65,7 +42,7 @@ func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer, } mux, err := api.NewMux( cfg, - processBatch, + batchProcessor, authenticator, newAgentConfigFetcher(cfg, nil /* kibana client */), ratelimitStore, @@ -76,46 +53,11 @@ func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer, if err != nil { return nil, err } - server := &http.Server{ + return &http.Server{ Handler: mux, IdleTimeout: cfg.IdleTimeout, ReadTimeout: cfg.ReadTimeout, WriteTimeout: cfg.WriteTimeout, MaxHeaderBytes: cfg.MaxHeaderSize, - } - go func() { - if err := server.Serve(listener); err != http.ErrServerClosed { - logger.Error(err.Error()) - } - }() - return &tracerServer{ - server: server, - logger: logger, - requests: requests, }, nil } - -// Close closes the tracerServer's listener. -func (s *tracerServer) Close() error { - return s.server.Shutdown(context.Background()) -} - -// serve serves batch processing requests for the tracer server. -// -// This may be called multiple times in series, but not concurrently. -func (s *tracerServer) serve(ctx context.Context, batchProcessor model.BatchProcessor) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case req := <-s.requests: - req.res <- batchProcessor.ProcessBatch(req.ctx, req.batch) - } - } -} - -type tracerServerRequest struct { - ctx context.Context - batch *model.Batch - res chan<- error -} diff --git a/internal/beater/tracing_test.go b/internal/beater/tracing_test.go index ae34d89459a..fd926ac1fda 100644 --- a/internal/beater/tracing_test.go +++ b/internal/beater/tracing_test.go @@ -34,13 +34,25 @@ func TestServerTracingEnabled(t *testing.T) { for _, enabled := range []bool{false, true} { t.Run(fmt.Sprint(enabled), func(t *testing.T) { - cfg := agentconfig.MustNewConfigFrom(m{ - "host": "localhost:0", + cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ "instrumentation.enabled": enabled, + + // The output instrumentation may send transactions for + // bulk operations, e.g. there will be "flush" transactions + // send by modelindexer for _bulk requests. When the server + // sends traces to itself, it will enter a state where it + // continues to regularly send traces to itself from the + // traced output. + // + // TODO(axw) we should consider having a separate processor + // pipeline (including output) with no tracing. For now, we + // set a short shutdown timeout so that if an trace events + // are not consumed, they will not block shutdown. + "apm-server.shutdown_timeout": "1ns", }) docs := make(chan []byte, 10) - beater, err := setupServer(t, cfg, nil, docs) + beater, err := setupServer(t, cfg, docs) require.NoError(t, err) // Make an HTTP request to the server, which should be traced diff --git a/internal/beatcmd/supporter_factory.go b/internal/idxmgmt/supporter_factory.go similarity index 96% rename from internal/beatcmd/supporter_factory.go rename to internal/idxmgmt/supporter_factory.go index 9c62e1226ff..3551d450766 100644 --- a/internal/beatcmd/supporter_factory.go +++ b/internal/idxmgmt/supporter_factory.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package beatcmd +package idxmgmt import ( "github.com/pkg/errors" @@ -32,10 +32,10 @@ import ( "github.com/elastic/apm-server/internal/logs" ) -// newSupporter creates a new idxmgmt.Supporter which directs all events +// NewSupporter creates a new idxmgmt.Supporter which directs all events // to data streams. The given root config will be checked for deprecated/removed // configuration, and if any are present warnings will be logged. -func newSupporter(log *logp.Logger, info beat.Info, configRoot *config.C) idxmgmt.Supporter { +func NewSupporter(log *logp.Logger, info beat.Info, configRoot *config.C) idxmgmt.Supporter { if log == nil { log = logp.NewLogger(logs.IndexManagement) } else { diff --git a/internal/beatcmd/supporter_factory_test.go b/internal/idxmgmt/supporter_factory_test.go similarity index 96% rename from internal/beatcmd/supporter_factory_test.go rename to internal/idxmgmt/supporter_factory_test.go index 2d4eb43e83d..6555995bcc9 100644 --- a/internal/beatcmd/supporter_factory_test.go +++ b/internal/idxmgmt/supporter_factory_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package beatcmd +package idxmgmt import ( "testing" @@ -37,7 +37,7 @@ import ( ) func TestNewSupporter(t *testing.T) { - supporter := newSupporter(nil, beat.Info{}, config.MustNewConfigFrom(map[string]interface{}{})) + supporter := NewSupporter(nil, beat.Info{}, config.MustNewConfigFrom(map[string]interface{}{})) // The data streams supporter does not set up templates or ILM. These // are expected to be set up externally, typically by Fleet. @@ -82,7 +82,7 @@ func TestNewSupporterWarnings(t *testing.T) { "setup.template.pattern": "custom", } - newSupporter(logger, beat.Info{}, config.MustNewConfigFrom(attrs)) + NewSupporter(logger, beat.Info{}, config.MustNewConfigFrom(attrs)) var warnings []string for _, record := range observed.All() {