diff --git a/x-pack/libbeat/management/config.go b/x-pack/libbeat/management/config.go index ae19bd08cc48..edcde4f82f9b 100644 --- a/x-pack/libbeat/management/config.go +++ b/x-pack/libbeat/management/config.go @@ -11,8 +11,9 @@ import ( // Config for central management type Config struct { - Enabled bool `config:"enabled" yaml:"enabled"` - Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"` + Enabled bool `config:"enabled" yaml:"enabled"` + Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"` + OutputRestart bool `config:"restart_on_output_change" yaml:"restart_on_output_change"` } // ConfigBlock stores a piece of config from central management @@ -29,6 +30,7 @@ type ConfigBlocksWithType struct { // ConfigBlocks holds a list of type + configs objects type ConfigBlocks []ConfigBlocksWithType +// DefaultConfig returns the default config for the V2 manager func DefaultConfig() *Config { return &Config{ Blacklist: ConfigBlacklistSettings{ diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 9605bbed4874..660770388b1e 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -41,15 +41,19 @@ type BeatV2Manager struct { payload map[string]interface{} // stop callback must be registered by libbeat, as with the V1 callback - stopFunc func() - stopMut sync.Mutex - beatStop sync.Once + stopFunc func() + stopOnOutputReload bool + stopMut sync.Mutex + beatStop sync.Once // sync channel for shutting down the manager after we get a stop from // either the agent or the beat stopChan chan struct{} isRunning bool + // is set on first instance of a config reload, + // allowing us to restart the beat if stopOnOutputReload is set + outputIsConfigured bool // used for the debug callback to report as-running config lastOutputCfg *reload.ConfigWithMeta @@ -87,12 +91,16 @@ func NewV2AgentManager(config *conf.C, registry *reload.Registry, _ uuid.UUID) ( // NewV2AgentManagerWithClient actually creates the manager instance used by the rest of the beats. func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agentClient client.V2) (lbmanagement.Manager, error) { log := logp.NewLogger(lbmanagement.DebugK) + if config.OutputRestart { + log.Info("Output reload is enabled, the beat will restart as needed on change of output config") + } m := &BeatV2Manager{ - config: config, - logger: log.Named("V2-manager"), - registry: registry, - units: make(map[string]*client.Unit), - stopChan: make(chan struct{}, 1), + stopOnOutputReload: config.OutputRestart, + config: config, + logger: log.Named("V2-manager"), + registry: registry, + units: make(map[string]*client.Unit), + stopChan: make(chan struct{}, 1), } if config.Enabled { @@ -338,6 +346,16 @@ func (cm *BeatV2Manager) handleOutputReload(unit *client.Unit) { } cm.logger.Debugf("Got Output unit config '%s'", rawConfig.GetId()) + // if needed, stop the beat, let agent restart + if cm.stopOnOutputReload && cm.outputIsConfigured { + cm.logger.Info("beat will now stop for output reload") + // in the future we'll want some set "reloading" state, + // but for now set the state this way. + _ = unit.UpdateState(client.UnitStateStopping, "got output unit, beat will restart", nil) + cm.Stop() + return + } + reloadConfig, err := groupByOutputs(rawConfig) if err != nil { errString := fmt.Errorf("Failed to generate config for output: %w", err) @@ -359,11 +377,14 @@ func (cm *BeatV2Manager) handleOutputReload(unit *client.Unit) { _ = unit.UpdateState(client.UnitStateFailed, errString.Error(), nil) return } + // set to true, we'll reload the output if we need to re-configure + cm.outputIsConfigured = true _ = unit.UpdateState(client.UnitStateHealthy, "reloaded output component", nil) } // handle the updated config for an input unit func (cm *BeatV2Manager) handleInputReload(unit *client.Unit) { + cm.addUnit(unit) _, _, rawConfig := unit.Expected() if rawConfig == nil { cm.logger.Warnf("got input update with no config, ignoring")