Skip to content

Commit

Permalink
Remove the watcher's dependency on SIGHUP / unix
Browse files Browse the repository at this point in the history
Right now the watcher is broken/unimplemented for Windows and a large part of the
reason for that might be how it works:

* the watcher subscribes to filesystem events (crate: notify)
* the watcher raises a unix signal (SIGHUP) on detecting changes
* the signal handler (again, unix only) reacts to SIGHUP with an internal
  reload signal
* (vector magically reloads/compares the configs)

With these changes that unix signal dependency is removed

* the watcher clones a SignalTx to emit its very own signals
* the watcher sends an internal reload signal on detecting changes
* (vector magically reloads/compares the configs)

All of this _should_ also work on Windows, has nothing to do with unix signals anymore.

I slightly modified the tests to not just wait for any Ok, but to state explicitly what
signal we expect to receive.

Cargo fmt
  • Loading branch information
darklajid committed Aug 2, 2024
1 parent 70e61fc commit 4aa4051
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 48 deletions.
15 changes: 9 additions & 6 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,12 +476,15 @@ pub async fn load_configs(

if watch_config {
// Start listening for config changes immediately.
config::watcher::spawn_thread(config_paths.iter().map(Into::into), None).map_err(
|error| {
error!(message = "Unable to start config watcher.", %error);
exitcode::CONFIG
},
)?;
config::watcher::spawn_thread(
signal_handler.clone_tx(),
config_paths.iter().map(Into::into),
None,
)
.map_err(|error| {
error!(message = "Unable to start config watcher.", %error);
exitcode::CONFIG
})?;
}

info!(
Expand Down
71 changes: 29 additions & 42 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::{path::PathBuf, time::Duration};
#[cfg(unix)]
use std::{
sync::mpsc::{channel, Receiver},
thread,
};

#[cfg(unix)]
use notify::{recommended_watcher, EventKind, RecommendedWatcher, RecursiveMode, Watcher};

use crate::Error;
Expand All @@ -17,18 +15,16 @@ use crate::Error;
/// - Invalid config, caused either by user or by data race.
/// - Frequent changes, caused by user/editor modifying/saving file in small chunks.
/// so we can use smaller, more responsive delay.
#[cfg(unix)]
const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);

#[cfg(unix)]
const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

/// Triggers SIGHUP when file on config_path changes.
/// Sends a ReloadFromDisk on config_path changes.
/// Accumulates file changes until no change for given duration has occurred.
/// Has best effort guarantee of detecting all file changes from the end of
/// this function until the main thread stops.
#[cfg(unix)]
pub fn spawn_thread<'a>(
signal_tx: crate::signal::SignalTx,
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
delay: impl Into<Option<Duration>>,
) -> Result<(), Error> {
Expand Down Expand Up @@ -65,7 +61,9 @@ pub fn spawn_thread<'a>(
debug!(message = "Reloaded paths.");

info!("Configuration file changed.");
raise_sighup();
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
});
} else {
debug!(message = "Ignoring event.", event = ?event)
}
Expand All @@ -83,31 +81,15 @@ pub fn spawn_thread<'a>(
// so for a good measure raise SIGHUP and let reload logic
// determine if anything changed.
info!("Speculating that configuration files have changed.");
raise_sighup();
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
});
}
});

Ok(())
}

#[cfg(windows)]
/// Errors on Windows.
pub fn spawn_thread<'a>(
_config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
_delay: impl Into<Option<Duration>>,
) -> Result<(), Error> {
Err("Reloading config on Windows isn't currently supported. Related issue https://github.com/vectordotdev/vector/issues/938 .".into())
}

#[cfg(unix)]
fn raise_sighup() {
use nix::sys::signal;
_ = signal::raise(signal::Signal::SIGHUP).map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
});
}

#[cfg(unix)]
fn create_watcher(
config_paths: &[PathBuf],
) -> Result<
Expand All @@ -124,7 +106,6 @@ fn create_watcher(
Ok((watcher, receiver))
}

#[cfg(unix)]
fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
watcher.watch(path, RecursiveMode::NonRecursive)?;
Expand All @@ -134,20 +115,22 @@ fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Resu

#[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000
mod tests {
use std::{fs::File, io::Write, time::Duration};

use tokio::signal::unix::{signal, SignalKind};

use super::*;
use crate::test_util::{temp_dir, temp_file, trace_init};

async fn test(file: &mut File, timeout: Duration) -> bool {
let mut signal = signal(SignalKind::hangup()).expect("Signal handlers should not panic.");
use crate::{
signal::SignalRx,
test_util::{temp_dir, temp_file, trace_init},
};
use std::{fs::File, io::Write, time::Duration};
use tokio::sync::broadcast;

async fn test(file: &mut File, timeout: Duration, mut receiver: SignalRx) -> bool {
file.write_all(&[0]).unwrap();
file.sync_all().unwrap();

tokio::time::timeout(timeout, signal.recv()).await.is_ok()
match tokio::time::timeout(timeout, receiver.recv()).await {

Check failure on line 130 in src/config/watcher.rs

View workflow job for this annotation

GitHub Actions / Checks

match expression looks like `matches!` macro
Ok(Ok(crate::signal::SignalTo::ReloadFromDisk)) => true,
_ => false,
}
}

#[tokio::test]
Expand All @@ -161,9 +144,10 @@ mod tests {
std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();

spawn_thread(&[dir], delay).unwrap();
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[dir], delay).unwrap();

if !test(&mut file, delay * 5).await {
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
Expand All @@ -176,14 +160,16 @@ mod tests {
let file_path = temp_file();
let mut file = File::create(&file_path).unwrap();

spawn_thread(&[file_path], delay).unwrap();
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[file_path], delay).unwrap();

if !test(&mut file, delay * 5).await {
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}

#[tokio::test]
#[cfg(unix)]
async fn sym_file_update() {
trace_init();

Expand All @@ -193,9 +179,10 @@ mod tests {
let mut file = File::create(&file_path).unwrap();
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();

spawn_thread(&[sym_file], delay).unwrap();
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[sym_file], delay).unwrap();

if !test(&mut file, delay * 5).await {
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
Expand Down

0 comments on commit 4aa4051

Please sign in to comment.