diff --git a/src/app.rs b/src/app.rs index 6e2f934ca613e..91f32b223167c 100644 --- a/src/app.rs +++ b/src/app.rs @@ -476,12 +476,15 @@ pub async fn load_configs( if watch_config { // Start listening for config changes immediately. - config::watcher::spawn_thread(config_paths.iter().map(Into::into), None).map_err( - |error| { - error!(message = "Unable to start config watcher.", %error); - exitcode::CONFIG - }, - )?; + config::watcher::spawn_thread( + signal_handler.clone_tx(), + config_paths.iter().map(Into::into), + None, + ) + .map_err(|error| { + error!(message = "Unable to start config watcher.", %error); + exitcode::CONFIG + })?; } info!( diff --git a/src/config/watcher.rs b/src/config/watcher.rs index bff6fbe7965fe..85ce2cf54ef1d 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -1,11 +1,9 @@ use std::{path::PathBuf, time::Duration}; -#[cfg(unix)] use std::{ sync::mpsc::{channel, Receiver}, thread, }; -#[cfg(unix)] use notify::{recommended_watcher, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use crate::Error; @@ -17,18 +15,16 @@ use crate::Error; /// - Invalid config, caused either by user or by data race. /// - Frequent changes, caused by user/editor modifying/saving file in small chunks. /// so we can use smaller, more responsive delay. -#[cfg(unix)] const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1); -#[cfg(unix)] const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); -/// Triggers SIGHUP when file on config_path changes. +/// Sends a ReloadFromDisk on config_path changes. /// Accumulates file changes until no change for given duration has occurred. /// Has best effort guarantee of detecting all file changes from the end of /// this function until the main thread stops. -#[cfg(unix)] pub fn spawn_thread<'a>( + signal_tx: crate::signal::SignalTx, config_paths: impl IntoIterator + 'a, delay: impl Into>, ) -> Result<(), Error> { @@ -65,7 +61,9 @@ pub fn spawn_thread<'a>( debug!(message = "Reloaded paths."); info!("Configuration file changed."); - raise_sighup(); + _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| { + error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) + }); } else { debug!(message = "Ignoring event.", event = ?event) } @@ -83,31 +81,15 @@ pub fn spawn_thread<'a>( // so for a good measure raise SIGHUP and let reload logic // determine if anything changed. info!("Speculating that configuration files have changed."); - raise_sighup(); + _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| { + error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) + }); } }); Ok(()) } -#[cfg(windows)] -/// Errors on Windows. -pub fn spawn_thread<'a>( - _config_paths: impl IntoIterator + 'a, - _delay: impl Into>, -) -> Result<(), Error> { - Err("Reloading config on Windows isn't currently supported. Related issue https://github.com/vectordotdev/vector/issues/938 .".into()) -} - -#[cfg(unix)] -fn raise_sighup() { - use nix::sys::signal; - _ = signal::raise(signal::Signal::SIGHUP).map_err(|error| { - error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) - }); -} - -#[cfg(unix)] fn create_watcher( config_paths: &[PathBuf], ) -> Result< @@ -124,7 +106,6 @@ fn create_watcher( Ok((watcher, receiver)) } -#[cfg(unix)] fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Result<(), Error> { for path in config_paths { watcher.watch(path, RecursiveMode::NonRecursive)?; @@ -134,20 +115,22 @@ fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Resu #[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000 mod tests { - use std::{fs::File, io::Write, time::Duration}; - - use tokio::signal::unix::{signal, SignalKind}; - use super::*; - use crate::test_util::{temp_dir, temp_file, trace_init}; - - async fn test(file: &mut File, timeout: Duration) -> bool { - let mut signal = signal(SignalKind::hangup()).expect("Signal handlers should not panic."); + use crate::{ + signal::SignalRx, + test_util::{temp_dir, temp_file, trace_init}, + }; + use std::{fs::File, io::Write, time::Duration}; + use tokio::sync::broadcast; + async fn test(file: &mut File, timeout: Duration, mut receiver: SignalRx) -> bool { file.write_all(&[0]).unwrap(); file.sync_all().unwrap(); - tokio::time::timeout(timeout, signal.recv()).await.is_ok() + match tokio::time::timeout(timeout, receiver.recv()).await { + Ok(Ok(crate::signal::SignalTo::ReloadFromDisk)) => true, + _ => false, + } } #[tokio::test] @@ -161,9 +144,10 @@ mod tests { std::fs::create_dir(&dir).unwrap(); let mut file = File::create(&file_path).unwrap(); - spawn_thread(&[dir], delay).unwrap(); + let (signal_tx, signal_rx) = broadcast::channel(128); + spawn_thread(signal_tx, &[dir], delay).unwrap(); - if !test(&mut file, delay * 5).await { + if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); } } @@ -176,14 +160,16 @@ mod tests { let file_path = temp_file(); let mut file = File::create(&file_path).unwrap(); - spawn_thread(&[file_path], delay).unwrap(); + let (signal_tx, signal_rx) = broadcast::channel(128); + spawn_thread(signal_tx, &[file_path], delay).unwrap(); - if !test(&mut file, delay * 5).await { + if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); } } #[tokio::test] + #[cfg(unix)] async fn sym_file_update() { trace_init(); @@ -193,9 +179,10 @@ mod tests { let mut file = File::create(&file_path).unwrap(); std::os::unix::fs::symlink(&file_path, &sym_file).unwrap(); - spawn_thread(&[sym_file], delay).unwrap(); + let (signal_tx, signal_rx) = broadcast::channel(128); + spawn_thread(signal_tx, &[sym_file], delay).unwrap(); - if !test(&mut file, delay * 5).await { + if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); } }